In [1]:
import sys
import os

# Add the parent directory to sys.path
parent_dir = os.path.abspath("../")
print(parent_dir)
sys.path.append(parent_dir)


/Users/hanna/openfn/ai_experiments/apollo/services


In [3]:
from langchain.output_parsers import StructuredOutputParser, ResponseSchema
import os
import json
from dotenv import load_dotenv
import anthropic

load_dotenv()
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY")
client = anthropic.Anthropic(api_key=ANTHROPIC_API_KEY)

In [6]:
from embed_docsite.github_utils import get_docs
import re


In [7]:
docs = get_docs("adaptor_docs")

INFO:GitHubUtils:Fetched 59 URLs from GitHub for https://api.github.com/repos/OpenFn/docs/contents/adaptors
INFO:GitHubUtils:Downloaded and processed 59 files from GitHub
INFO:GitHubUtils:{'name': 'asana.md', 'docs': '---\ntitle: Asana Adaptor\n---\n\n## About Asana\n\n[Asana](https://app.asana.com/) is a web-based project management tool that helps teams organize, plan, collaborate, and execute tasks. \n\n## Integration Options\n\nAsana supports 2 primary integration options:\n\n1. Rest API: Asana has an available REST API that enables external services like OpenFn to pull data from Asana, or push data from external apps to Asana. This option is suited for scheduled, bulk syncs or workflows that must update data in Asana with external information. See [functions](/adaptors/packages/asana-docs) for more on how to use this adaptor to work with the API.\n\n2. Webhook: Asana also has a [Webhook or Data Forwarding](https://developers.asana.com/docs/webhooks-guide) to push data from Asana t

In [11]:
import pickle
with open('/Users/hanna/openfn/ai_experiments/data/adaptor_docs.pkl', 'wb') as file:
    pickle.dump(docs, file)

In [13]:
d = clean_html(docs[0]["docs"])
d = split_by_headers(d)
d

['---\ntitle: Asana Adaptor\n---',
 '## About Asana\n\n[Asana](https://app.asana.com/) is a web-based project management tool that helps teams organize, plan, collaborate, and execute tasks.',
 '## Integration Options\n\nAsana supports 2 primary integration options:\n\n1. Rest API: Asana has an available REST API that enables external services like OpenFn to pull data from Asana, or push data from external apps to Asana. This option is suited for scheduled, bulk syncs or workflows that must update data in Asana with external information. See [functions](/adaptors/packages/asana-docs) for more on how to use this adaptor to work with the API.\n\n2. Webhook: Asana also has a [Webhook or Data Forwarding](https://developers.asana.com/docs/webhooks-guide) to push data from Asana to external systems. This option is suited for real-time, event-based data integration. Check out the Asana [developer documentation](/adaptors/packages/asana-docs) to learn how to set up a webhook to push data to Op

In [12]:

def clean_html(text):
    """Remove HTML tags while preserving essential formatting."""
    text = re.sub(r'<\/?p>', '\n', text)  # Convert <p> to newlines
    text = re.sub(r'<\/?code>', '`', text)  # Convert <code> to backticks
    text = re.sub(r'<\/?strong>', '**', text)  # Convert <strong> to bold
    text = re.sub(r'<[^>]+>', '', text)  # Remove other HTML tags

    return text.strip()

def split_by_headers(text):
    """Split text into chunks based on Markdown headers (# and ##) and code blocks."""
    sections = re.split(r'(?=^#+\s.*$|^```(?:.*\n[\s\S]*?^```))', text, flags=re.MULTILINE)

    return [chunk.strip() for chunk in sections if chunk.strip()]

def get_overview(json_data):
    for item in json_data:
        if isinstance(item, dict) and "docs" in item and "name" in item:
            
            docs = item["docs"]
            name = item["name"]

            # Decode JSON string
            try:
                docs = json.loads(docs)
            except json.JSONDecodeError:
                pass
            
            docs = clean_html(docs)

            # Save all fields for adding to metadata later
            item["docs"] = docs # replace docs with cleaned text
            metadata_dict[name] = item

            # Split by headers, and where needed, sentences
            splits = split_by_headers(docs)

In [4]:
# compile list of adaptors with descriptions (offline)

describe_adaptor_system_prompt = """
You are an assistant for writing brief descriptions of adaptors offered by OpenFn, a workflow generation platform.
The summary will be used to help select appropriate adaptors for clients' desriptions of their automation tasks.
Relevant information might therefore include e.g. the purpose of the adaptor and the data formats it uses.
You will be given the name of an adaptor and the overview section from its documentation.
Answer with nothing but the name of the adaptor followed by a colon and a 2-3 sentence description.
"""

describe_adaptor_user_prompt = """The adaptor to describe: "{adaptor_name}" \n The adaptor documentation: {documentation} """

def describe_adaptor(user_question):
    message = client.messages.create(
        model="claude-3-7-sonnet-20250219", 
        max_tokens=1000,
        temperature=0,
        system=describe_adaptor_system_prompt,
        messages=[
            {
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": describe_adaptor_user_prompt.format(user_question=user_question)
                    }
                ]
            }
        ]
    )
    return message.content[0].text

In [None]:
adaptor_summaries = """

"""

In [None]:
gen_yaml_system_prompt = """
# OpenFn Workflow Generator Prompt

You are an expert assistant for the OpenFn workflow automation platform. Your task is to convert natural language descriptions of workflows into properly structured YAML files that define workflow jobs, triggers, and connections.

## Your Task

Given a text description of a workflow process, you will:
1. Identify distinct jobs/steps in the workflow
2. Determine appropriate adaptors for each job
3. Set up proper trigger mechanisms (webhook or cron)
4. Create the connections (edges) between jobs
5. Generate a valid project.yaml file that follows OpenFn's structure

## OpenFn Project.yaml Structure

A valid project.yaml must follow this structure:
```yaml
workflow-1:
  name: [Workflow Name]
  jobs:
    [job-id]:
      name: [Job Name]
      adaptor: [Adaptor Reference]
      body: "| // Add operations here"
  triggers:
    [trigger-id]:
      type: [webhook or cron]
      cron: [cron expression] # Only if type is cron
      enabled: true
  edges:
    - source_trigger: [trigger-id]
      target_job: [job-id]
      condition_type: always
      enabled: true
    - source_job: [source-job-id]
      target_job: [target-job-id]
      condition_type: on_job_success
      enabled: true
name: open-project
description: Auto-generated workflow based on provided steps.
```

## Adaptor Knowledge

Here is a list of available OpenFn adaptors:
{adaptor_list}
- `@openfn/language-common@latest`: For basic data transformation operations
- `@openfn/language-http@latest`: For making HTTP requests
- `@openfn/language-commcare@latest`: For interacting with CommCare
- `@openfn/language-fhir@latest`: For FHIR healthcare standards
- `@openfn/language-satusehat@latest`: For Satusehat system (health records)
- `@openfn/language-redis@latest`: For Redis operations
- `@openfn/language-openfn@latest`: For triggering other OpenFn workflows

## Trigger Types

- **Webhook**: Use for event-based triggers (default if not specified)
- **Cron**: Use for time-based schedules

## Rules for Job Identification

1. Each distinct action should become its own job
2. Jobs should have clear, descriptive names
3. Jobs should be connected in a logical sequence
4. Choose the most specific adaptor available for each operation
5. When in doubt about an adaptor, use `@openfn/language-common@latest`
6. Job IDs should be derived from their names, replacing spaces with hyphens

## Rules for Edge Creation

1. The first job should always connect to the trigger
2. Each subsequent job should connect to the previous job with condition_type: on_job_success
3. For branching workflows, create conditional edges as appropriate
4. All edges should be enabled by default

## Example Conversion

For the input:
"Fetch patient data from CommCare daily and send it to our FHIR system"

The output should be:
```yaml
workflow-1:
  name: Fetch Patient Data Workflow
  jobs:
    fetch-patient-data:
      name: Fetch patient data from CommCare
      adaptor: "@openfn/language-commcare@latest"
      body: "| // Add operations here"
    send-to-fhir:
      name: Send data to FHIR system
      adaptor: "@openfn/language-fhir@latest"
      body: "| // Add operations here"
  triggers:
    daily-schedule:
      type: cron
      cron: "0 0 * * *"
      enabled: true
  edges:
    - source_trigger: daily-schedule
      target_job: fetch-patient-data
      condition_type: always
      enabled: true
    - source_job: fetch-patient-data
      target_job: send-to-fhir
      condition_type: on_job_success
      enabled: true
name: open-project
description: Auto-generated workflow based on provided steps.
```

## Output Format

You MUST provide the output as a proper YAML file that follows the structure above. Do not include any explanations or commentary.
"""

In [None]:
gen_yaml_system_prompt_formatted = gen_yaml_system_prompt.format(adaptor_summaries)

In [None]:

generate_yaml_user_prompt = """The user's automation task is as follows: "{user_question}" """

def generate_yaml(user_question):
    message = client.messages.create(
        model="claude-3-7-sonnet-20250219",
        max_tokens=1000,
        temperature=0,
        system=gen_yaml_system_prompt_formatted,
        messages=[
            {
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": generate_yaml_user_prompt.format(user_question=user_question)
                    }
                ]
            }
        ]
    )
    return message.content[0].text