In [None]:
!pip install spacy
!python -m spacy download en_core_web_sm


Collecting en-core-web-sm==3.7.1
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.7.1/en_core_web_sm-3.7.1-py3-none-any.whl (12.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m36.7 MB/s[0m eta [36m0:00:00[0m
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.


In [None]:
import yaml
import spacy
from spacy.tokens import Doc


# Define custom extension to spacy's Doc object to carry meta information
Doc.set_extension('openfn_action', default=None , force=True)
Doc.set_extension('openfn_adaptor', default=None,force=True)

# Load spaCy model (make sure you have a suitable model downloaded)
try:
    nlp = spacy.load("en_core_web_sm")  # Or your trained NER model
except OSError:
    print("Error: spaCy model not found. Please download an appropriate model (e.g., 'en_core_web_sm') and install it.")
    exit(1)

def preprocess_instruction(instruction):
    doc = nlp(instruction)
    doc._.openfn_action = instruction.strip()
    for ent in doc.ents:
        if ent.label_ == "ADAPTER":  # Look for ADAPTER entities in the text
            doc._.openfn_adaptor = ent.text
    return doc


def generate_yaml(instructions, workflow_name="Sample-Workflow"):
    workflow = {
        "name": "untitled-project",
        "workflows": {
            workflow_name: {
                "name": workflow_name,
                "triggers": [{"type": "webhook"}],
                "jobs": [],
                "edges": [],
            }
        },
    }
    prev_job = None
    for instruction in instructions:
        doc = preprocess_instruction(instruction)
        action = doc._.openfn_action
        adaptor = doc._.openfn_adaptor
        if not adaptor:
            adaptor = "@openfn/language-http@latest"  # Default adaptor if not specified

        # Create the job object
        job = {
            "name": action,
            "adaptor": adaptor,
            "enabled": True,  # Make sure enabled is set to true
            "body": "|   fn(state => state);",
        }
        workflow["workflows"][workflow_name]["jobs"].append(job)

        # Create the edge object only if there is a previous job
        if prev_job is not None:
            edge = {
                "source_job": prev_job,
                "target_job": action,
                "condition": "on_job_success",
            }
            workflow["workflows"][workflow_name]["edges"].append(edge)
        else:  # First job is connected to the webhook trigger
            edge = {
                "source_trigger": "webhook",
                "target_job": action,
                "condition": "always",
            }
            workflow["workflows"][workflow_name]["edges"].append(edge)

        prev_job = action

    return yaml.dump(workflow)


In [None]:
instructions = [
        "Get Data from DHIS2",
        "Filter out children under 2"
        "Aggregate the data",
        "Make a comment on Asana"
        "Fetch submissions from KoboCollect with language-kobotoolbox@latest",
        "Push the data to the a postgresSQL database with language-postgresql@latest",
        "Send text message to an admin using language-twilio@0.3.4 with status of sent message"
]
yaml_string = generate_yaml(instructions)
print(yaml_string)

name: untitled-project
workflows:
  Sample-Workflow:
    edges:
    - condition: always
      source_trigger: webhook
      target_job: Get Data from DHIS2
    - condition: on_job_success
      source_job: Get Data from DHIS2
      target_job: Filter out children under 2Aggregate the data
    - condition: on_job_success
      source_job: Filter out children under 2Aggregate the data
      target_job: Make a comment on AsanaFetch submissions from KoboCollect with language-kobotoolbox@latest
    - condition: on_job_success
      source_job: Make a comment on AsanaFetch submissions from KoboCollect with language-kobotoolbox@latest
      target_job: Push the data to the a postgresSQL database with language-postgresql@latest
    - condition: on_job_success
      source_job: Push the data to the a postgresSQL database with language-postgresql@latest
      target_job: Send text message to an admin using language-twilio@0.3.4 with status
        of sent message
    jobs:
    - adaptor: '@openf

In [None]:
import yaml

# Define the adaptors
adaptors = {
    "primero": "@openfn/language-primero@latest",
    "telerivet": "@openfn/language-telerivet@latest",
    "dhis2": "@openfn/language-dhis2@latest",
    "http": "@openfn/language-http@latest",
    "asana" : " @openfn/language-asana@latest",
    "azure-storage" : "@openfn/language-azure-storage@latest",
    "beyonic" : "@openfn/language-beyonic@latest",
    "bigquery" : "@openfn/language-bigquery@latest",
    "cartodb" : "@openfn/language-cartodb@latest",
    "commcare" : "@openfn/language-commcare@latest",
    "common": "@openfn/language-common@latest",
    "dynamics": "@openfn/language-dynamics@latest",
    "facebook" : "@openfn/language-facebook@latest",
    "fhir": "@openfn/language-fhir@latest",
    "godata": "@openfn/language-godata@latest",
    "googlehealthcare": "@openfn/language-googlehealthcare@latest",
    "googlesheets": "@openfn/language-googlesheets@latest",
    "hive": "@openfn/language-hive@latest",
    "khanacademy": "@openfn/language-khanacademy@latest",
    "kobotoolbox": "@openfn/language-kobotoolbox@latest",
    "magpi": "@openfn/language-magpi@latest",
    "mailchimp": "@openfn/language-mailchimp@latest",
    "mailgun": "@openfn/language-mailgun@latest",
    "maximo": "@openfn/language-maximo@latest",
    "medicmobile": "@openfn/language-medicmobile@latest",
    "mogli": "@openfn/language-mogli@latest",
    "mongodb": "@openfn/language-mongodb@latest",
    "msgraph": "@openfn/language-msgraph@latest",
    "mssql": "@openfn/language-mssql@latest",
    "mysql": "@openfn/language-mysql@latest",
    "nexmo": "@openfn/language-nexmo@latest",
    "ocl": "@openfn/language-ocl@latest",
    "openfn": "@openfn/language-openfn@latest",
    "openhim": "@openfn/language-openhim@latest",
    "openimis": "@openfn/language-openimis@latest",
    "openmrs": "@openfn/language-openmrs@latest",
    "openspp": "@openfn/language-openspp@latest",
    "postgresql": "@openfn/language-postgresql@latest",
    "progress": "@openfn/language-progress@latest",
    "rapidpro": "@openfn/language-rapidpro@latest",
    "resourcemap": "@openfn/language-resourcemap@latest",
    "salesforce": "@openfn/language-salesforce@latest",
    "satusehat": "@openfn/language-satusehat@latest",
    "sftp": "@openfn/language-sftp@latest",
    "smpp": "@openfn/language-smpp@latest",
    "surveycto": "@openfn/language-surveycto@latest",
    "telerivet": "@openfn/language-telerivet@latest",
    "template": "@openfn/language-template@latest",
    "twilio": "@openfn/language-twilio@latest",
    "vtiger": "@openfn/language-vtiger@latest",
    "zoho": "@openfn/language-zoho@latest"
}

# Define conditions for edges
default_condition = "always"
success_condition = "on_job_success"
failure_condition = "on_job_failure"

def parse_steps(steps):
    parsed_steps = []
    edges = []

    for i, step in enumerate(steps):
        action, _, adaptor_key = step.rpartition(" with ")
        adaptor = adaptors.get(adaptor_key.strip(), "@openfn/language-common@latest")

        step_name = action.strip().replace(" ", "-")
        step_type = adaptor

        parsed_steps.append({
            "name": step_name,
            "adaptor": step_type,
            "enabled": True,
            "body": "fn(state => state);"
        })

        if i == 0:
            edges.append({
                "source_trigger": "webhook",
                "target_job": step_name,
                "condition": default_condition
            })
        else:
            edges.append({
                "source_job": parsed_steps[i-1]["name"],
                "target_job": step_name,
                "condition": success_condition
            })

    return parsed_steps, edges

def generate_project_yaml(project_name, description, workflow_name, steps):
    parsed_steps, edges = parse_steps(steps)

    jobs = {step["name"]: step for step in parsed_steps}

    edge_dict = {}
    for edge in edges:
        if "source_trigger" in edge:
            edge_key = f'{edge["source_trigger"]}->{edge["target_job"]}'
        else:
            edge_key = f'{edge["source_job"]}->{edge["target_job"]}'
        edge_dict[edge_key] = edge

    project_structure = {
        "name": project_name,
        "description": description,
        "workflows": {
            workflow_name: {
                "name": workflow_name,
                "jobs": jobs,
                "triggers": {
                    "webhook": {
                        "type": "webhook"
                    }
                },
                "edges": edge_dict
            }
        }
    }

    with open("project3.yaml", "w") as file:
        yaml.dump(project_structure, file, sort_keys=False)

if __name__ == "__main__":
    project_name = "project_a"
    description = "wbawbawba"
    workflow_name = "Some-Workflow"
    steps = [
        "Get Data from DHIS2",
        "Filter out children under 2"
        "Aggregate the data",
        "Make a comment on Asana"
            ]
    generate_project_yaml(project_name, description, workflow_name, steps)
    print("project.yaml file has been generated.")


project.yaml file has been generated.
