Process docs, save to Supabase.

Create a `.env` file and add your `SUPABASE_URL`, `SUPABASE_KEY`, `OPENAI_API_KEY`, and relevant Snowflake credentials.

In [2]:
from datetime import datetime, timedelta
from dotenv import load_dotenv
from functools import partial
import json
import logging
import mistune
import numpy as np
import os
import pickle
import re
import requests
import sys
from textwrap import dedent
import time


from bs4 import BeautifulSoup
from langchain.text_splitter import RecursiveCharacterTextSplitter, MarkdownTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from pathlib import Path
import snowflake.connector
from supabase import create_client, Client

from typing import (
    Dict,
    Union,
)

load_dotenv()

False

In [None]:
# Supabase
url: str = os.environ.get("SUPABASE_URL")
key: str = os.environ.get("SUPABASE_KEY")

supabase: Client = create_client(url, key)

In [None]:
# These are the core lists we use to store content and "source" (e.g. the URL to the doc)
# We pass this to OpenAI to create embeddings. Later, the Q&A bot searches for sources
# that match the query in that embedding.
metadatas = []
docs = []

def split_and_index_source(splitter, text, key, metadata={}):
    splits = splitter.split_text(text)

    docs.extend(splits)
    
    new_metadatas = []
    for split in splits:
        new_metadata = metadata.copy()
        new_metadata['source'] = key
        new_metadatas.append(new_metadata)

    metadatas.extend(new_metadatas)

        
## TEXT SPLITTERS ##
# These sizes are not magic numbers. According to Pinecone, embeddings with the latest
# OpenAI models work best when chunks are multiples of 256, and ChatGPT recommended
# 10-25% overlap. But these should be tested and optimized.
MAX_CHUNK_LENGTH = 512 * 4
CHUNK_OVERLAP = 512
text_splitter = RecursiveCharacterTextSplitter(
    separators= ["\n\n", ""], 
    chunk_size=MAX_CHUNK_LENGTH, 
    chunk_overlap=CHUNK_OVERLAP
)

## Will split on Markdown headings and other blocks
markdown_splitter = MarkdownTextSplitter(chunk_size=MAX_CHUNK_LENGTH, chunk_overlap=CHUNK_OVERLAP)

In [None]:
def parse_html_and_get_content(content):
    """ Parses text from HTML, return split text
    """
    soup = BeautifulSoup(content, 'html.parser')
    for br in soup.find_all('br'):
        br.replace_with('\n')
    for p in soup.find_all('p'):
        p.replace_with('\n' + p.get_text() + '\n')
        
    return {
        "text": soup.get_text(separator='\n'),
        "title": soup.title.string or "Untitled"
    }

    
def fetch_url_and_index_text(url):
    """ Fetches URL, returns list of Documents
    """
    print(f"Fetching {url}")
    try:
        response = requests.get(url)
    except:
        print(f"Failed to index {url}")
        return
    
    # Find the meta tag with the property 'article:published_time'
    # Specicic to the blog, but can extend to other sources if needed
    soup = BeautifulSoup(response.content, 'html.parser')
    meta_tag = soup.find('meta', {'property': 'article:published_time'})
    published_time = meta_tag['content'] if meta_tag else None
    
    # We don't want any content from the v1 version of the builder
    if published_time is not None:
        cutoff_date = datetime(2022, 5, 1)
        date_obj = datetime.strptime(published_time, "%Y-%m-%dT%H:%M:%S.%fZ")
        if date_obj.date() < cutoff_date.date():
            print("v1 content, skipping")
            return
    
    content = parse_html_and_get_content(response.content)
    metadata = { "url": url, "title": content["title"] }
    split_and_index_source(text_splitter, content["text"], url)


def index_sources(sources, sourceFn):
    """ 
    sources: Array of source keys (URLs, file paths, etc)
    sourceFn: Function to retrieve data from the source via URL, filesystem, etc. The function must accept the source key as input
    """


    for i in sources:
        sourceFn(i)

In [None]:
# Index public https://pipedream.com pages
urls = [
    "https://pipedream.com/support",
    "https://pipedream.com/terms",
    "https://pipedream.com/privacy",
    "https://pipedream.com/sla",
    "https://pipedream.com/dpa",
    "https://pipedream.com/pricing"
]

index_sources(urls, fetch_url_and_index_text)

In [None]:
# List public docs from the pipedream public repo (change to your path)
# Markdown files contain all relevant content

# REPLACE THIS with the location of the pipedream public repo
root_dir = "/Users/dylburger/pipedream/docs/docs"
publicDocs = list(Path(root_dir).glob("**/*.md"))

# Old content, or content that keeps yielding false positives
# Component examples are excluded, since we have a code-specific tool
# to handle those questions
blacklist = [
    "/components/migrating",
    "/apps",
    "/migrate-from-v1"
]

for p in publicDocs:
    if any(str(p).startswith(f"{root_dir}{prefix}") for prefix in blacklist):
        continue

    with open(p) as f:
        content = f.read()
        html = mistune.markdown(content)

        match = re.search(r'<h1>(.+?)<\/h1>', html)

        title = "Untitled"
        if match:
            title = match.group(1)
            
        splits = markdown_splitter.split_text(content)
        relative_path = str(p.relative_to(root_dir).parent)
        docs_url = f"https://pipedream.com/docs/{'' if relative_path == '.' else relative_path}"
        
        docs.extend(splits)
        
        new_metadatas = []
        for split in splits:
            new_metadata = {}
            new_metadata['source'] = docs_url
            new_metadatas.append(new_metadata)

        metadatas.extend(new_metadatas)

In [None]:
# Pipedream blog

i = 1
post_urls = []

while True:
    url = f"https://pipedream.com/blog/page/{i}"
    try:
        response = requests.get(url)
    except:
        print(f"Failed to fetch {url}")
        i +=1
        continue
        
    if response.status_code == 200:
        soup = BeautifulSoup(response.content, 'html.parser')
        for a in soup.find_all('a', class_='post-card-image-link'):
            post_urls.append(a['href'])

        i += 1
    else:
        break


full_urls = [f"https://pipedream.com{post_url}" for post_url in post_urls]
index_sources(full_urls, fetch_url_and_index_text)

In [None]:
import snowflake.connector

# Set up connection details
account = os.environ["SNOWFLAKE_ACCOUNT"]
user = os.environ["SNOWFLAKE_USER"]
password = os.environ["SNOWFLAKE_PASSWORD"]
warehouse = os.environ["SNOWFLAKE_WAREHOUSE"]
database = os.environ["SNOWFLAKE_DATABASE"]
schema = os.environ["SNOWFLAKE_SCHEMA"]

# Create a connection
connection = snowflake.connector.connect(
    account=account,
    user=user,
    password=password,
    warehouse=warehouse,
    database=database,
    schema=schema
)

# Execute a query and fetch the results
query = """
SELECT 
  name, 
  description,
  name_slug,
  oauth_scopes_csv,
  auth_type,
  custom_fields_json,
  test_request_json,
  website_url,
  custom_fields_description_md,
  component_code_scaffold_raw
FROM apps
WHERE status IN (3, 4);
"""
cursor = connection.cursor()
cursor.execute(query)
results = cursor.fetchall()

In [None]:
for item in results:
    app, description, name_slug, oauth_scopes_csv, auth_type, custom_fields_json, test_request_json, website_url, custom_fields_description_md, component_code_scaffold_raw = item
    test_request = json.loads(test_request_json)
    test_request_url = test_request.get('url')
    test_request_verb = test_request.get('http_method')
    
    if custom_fields_description_md is None:
        custom_fields_description_md = ''
        
    auth_text = f"Within the run method, the user's {app} credentials are exposed in the object `this.{name_slug}.$auth\`. For integrations where users provide static API keys / tokens, the $auth object contains properties for each key / token the user enters. For OAuth integrations, this object exposes the OAuth access token in the oauth_access_token property of the $auth object. ";
    test_request_code = '';
    if component_code_scaffold_raw:
        test_request_code = f"Here's an example Pipedream component that makes a test request against the {app} API:\n\n{component_code_scaffold_raw}";
    
    if (test_request_url and test_request_verb):
        auth_text += f"The test request below makes a {test_request_verb} request to {test_request_url}. You should use the same base URL in other API requests, and based on the documentation provided / other code on the internet. ";
    
    if auth_type == "keys":
        custom_fields = json.loads(custom_fields_json);
        custom_fields_text = ', '.join([o['name'] for o in custom_fields])
        auth_text += f"{app} is a key-based app. For integrations where users provide static API keys / tokens, `this.{name_slug}.$auth` contains properties for each key / token the user enters. Users are asked to enter the following custom fields: {custom_fields_text}. These are each exposed as properties in the object `this.{name_slug}.$auth`. When you make the API request, use the format from the {app} docs. Different apps pass credentials in different places in the HTTP request, e.g. headers, url params, etc. Consult the docs";
    elif auth_type == "oauth":
        auth_text += f"{app} is an OAuth app. For OAuth integrations, this object exposes the OAuth access token in the variable `this.{name_slug}.$auth.oauth_access_token`. When you make the API request, make sure to use the format from the {app} docs, e.g. you may need to pass the OAuth access token as a Bearer token in the Authorization header. Consult the docs";
    
    formatted_oauth_scopes = ''
    if oauth_scopes_csv:
        formatted_oauth_scopes = '\n'.join(oauth_scopes_csv.split(','))
        
    markdown_content = f"""# {app}

{description}

{app}'s website is {website_url}

## OAuth scopes for {app}

These are the scopes Pipedream has configured for {app} by default, which Pipedream sends in the OAuth authorization request.

{formatted_oauth_scopes}

## Auth instructions

{custom_fields_description_md}

{auth_text}

{test_request_code}
"""
    
    splits = markdown_splitter.split_text(markdown_content)
    docs_url = f"https://pipedream.com/apps/{name_slug.replace('_', '-')}"

    docs.extend(splits)

    new_metadatas = []
    for i, split in enumerate(splits):
        new_metadata = {}
        new_metadata['source'] = docs_url
        new_metadata['app_slug'] = name_slug
        new_metadatas.append(new_metadata)

    metadatas.extend(new_metadatas)
    

In [None]:
query = """
SELECT
  c.type,
  c.key,
  c.name,
  c.description,
  c.version,
  sc.configurable_props_json,
  a.name AS app_name,
  a.name_slug AS app_name_slug
FROM components c
JOIN saved_components sc
ON c.hid = sc.hid
LEFT OUTER JOIN app_saved_components apps_sc
ON sc.id = apps_sc.saved_component_id
LEFT OUTER JOIN apps a
ON apps_sc.app_id = a.id
WHERE c.LATEST_VERSION_AND_PUBLISHED_TO_ALL_USERS = TRUE
"""

cursor = connection.cursor()
cursor.execute(query)
registry_components = cursor.fetchall()

# Close the connection
cursor.close()
connection.close()

In [None]:
def map_component_type_to_human_readable(prop_type):
    human_readable_types = {
        "boolean": "a boolean",
        "boolean[]": "an array of booleans",
        "string": "a string",
        "string[]": "an array of strings",
        "integer": "an integer",
        "integer[]": "an array of integers",
        "object": "an object",
    }

    return human_readable_types.get(prop_type, prop_type)

for component in registry_components:
    component_type, key, name, description, version, configurable_props_json, app_name, app_name_slug = component
    if key is None or app_name_slug is None:
        continue

    markdown = f"# {app_name} — {name}\n\n"
    if component_type == "Source":
        markdown += f"## Using the {name} trigger\n\n"
        markdown += dedent(f"""1. Search for the {app_name} app from the trigger menu
2. Select the {name} trigger
3. Connect your {app_name} account\n""")
        
    if component_type == "Component Action":
        markdown += f"## Using the {name} action\n\n"
        markdown += dedent(f"""1. Search for the {app_name} app from the step menu
2. Select the {name} action
3. Connect your {app_name} account\n""")

    configurable_props = json.loads(configurable_props_json)
    # Start numbering from above
    i = 4
    for prop in configurable_props:
        if not 'label' in prop or not 'type' in prop:
            continue
            
        # We don't need to include optional props in the default instructions
        if prop.get("optional", False):
            continue
            
        prop_type = prop.get("type")

        if prop_type in [
            "boolean",
            "boolean[]",
            "string",
            "string[]",
            "integer",
            "integer[]",
            "object",
            "any",
        ]:
            
            selector = "one or more" if prop_type.endswith("[]") else "a"
            human_readable_type = map_component_type_to_human_readable(prop_type) or "a step export"
            markdown += f"{i}. Select {selector} {prop['label']} or pass {human_readable_type} from a previous step.\n"
        
        i += 1
                
    if component_type == "Source":
        markdown += f"{i}. Follow the instructions on the trigger configuration screen. You may need to generate a test event from {app_name} to get real data to test your workflow."
        
    if component_type == "Component Action":
        markdown += f"{i}. At the bottom of the step, click Test to run the action and test the configuration."
                
    docs_url = ''
    parts = key.split('-', 1)

    # Get the second part (the component key)
    url_key = parts[1].replace('_', '-')
    
    if component_type == "Source":
        docs_url = f"https://pipedream.com/apps/{app_name_slug.replace('_', '-')}/triggers/{url_key}\n\n"
    elif component_type == "Component Action":
        docs_url = f"https://pipedream.com/apps/{app_name_slug.replace('_', '-')}/actions/{url_key}\n\n"
                
    splits = markdown_splitter.split_text(markdown)
    docs.extend(splits)

    new_metadatas = []
    for i, split in enumerate(splits):
        new_metadata = {}
        new_metadata['source'] = docs_url
        new_metadata['app_slug'] = app_name_slug
        new_metadatas.append(new_metadata)

    metadatas.extend(new_metadatas)

In [None]:
embeddings = OpenAIEmbeddings()
embedded_docs = embeddings.embed_documents(docs)

In [None]:
# OPTIONAL: Load custom embeddings matrix, get the dot product of that * embeddings
# See https://github.com/openai/openai-cookbook/blob/main/examples/Customizing_embeddings.ipynb
custom_embeddings_matrix = np.load("custom_embeddings_matrix.npy")
modified_embeddings = np.dot(embedded_docs, custom_embeddings_matrix)
modified_embeddings_list = modified_embeddings.tolist()

In [None]:
# Submit to Postgres
batch_size = 50
for i in range(0, len(docs), batch_size):
    print(f"Inserting batch {i}")
    i_end = min(i + batch_size, len(docs))
    docs_batch = docs[i:i_end]
    # Key on metadata source
    urls = [metadata.get("source") for metadata in metadatas[i:i_end]]
    app_slug_batch = [metadata.get("app_slug") for metadata in metadatas[i:i_end]]
    embeddings_batch = modified_embeddings_list[i:i_end]
    to_insert = [{"url": url, "body": body, "embedding": embedding, "app_slug": app_slug} for url, body, embedding, app_slug in zip(urls, docs_batch, embeddings_batch, app_slug_batch)]
    data, count = supabase.table('docs_stg').insert(to_insert).execute()
