In [353]:
!pip install pydantic anytree networkx matplotlib inflect openapi-core jsonref prance datamodel-code-generator


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.3.1[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [354]:
import logging

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')

In [355]:
import json

def parse_spec(file_path):
    with open(file_path, 'r') as file:
        return json.load(file)

In [356]:
# filepath = "../openapi_specs/resolved/stripe-08-10-24.json"
filepath = "../openapi_specs/resolved/sgp-09-21-24.json"

spec = parse_spec(filepath)

In [357]:
# !datamodel-codegen  --input "../openapi_specs/stripe-08-10-24.yaml" --input-file-type openapi --output model.py

In [358]:
resource_blacklist = ['delete', 'query', 'cancel', 'batch', 'verify', 'process', 'validate', 'approve', 'publish', 'history', 'approve-batch', 'batch-delete']

In [359]:
import re
from inflect import engine


def standardize(name: str) -> str:
    # ignored words for singularization
    invariant_words = {'synthesis', 'analysis', 'basis', 'thesis'}

    # Convert to lowercase, kebab case
    name = name.lower().replace('_', '-')
    name = name.lower().replace(' ', '-')
    name = re.sub(r'[^a-z0-9-]', '', name)
    parts = name.split('-')

    # singularize
    p = engine()
    parts = [part if part in invariant_words else (p.singular_noun(part) or part) for part in parts]

    standardized_name = '-'.join(parts)
    standardized_name = re.sub(r'-ids?$', '', standardized_name) # strip id
    return standardized_name

In [360]:
from pydantic import BaseModel
from collections import defaultdict


class RouteMethodDescription(BaseModel):
    method: str
    description: str


# fetch all resources
def extract_resources(openapi: dict):
    resource_to_routes = defaultdict()  # resource -> {paths} -> {methods} -> description

    for path, methods in openapi['paths'].items():
        # not a _real_ resource
        if 'get' not in methods.keys() and 'post' not in methods.keys():
            continue
            
        # if path == '/v4/evaluation-datasets/{evaluation_dataset_id}/test-cases':
        #     print(methods['post']['parameters'])

        # find current resource  
        def find_last_unwrapped_element(path_list):
            for element in reversed(path_list):
                if not (element.startswith('{') and element.endswith('}')):
                    return element
            return None

        resource = find_last_unwrapped_element(path.split('/'))

        if not resource:
            continue
        
        resource = standardize(resource)
        
        if resource in resource_blacklist:
            continue
        
        if resource not in resource_to_routes:
            resource_to_routes[resource] = {}

        resource_to_routes[resource][path] = {}

        for m, v in methods.items():
            resource_to_routes[resource][path][m] = v.get('description')

    return resource_to_routes

In [361]:
extraction = extract_resources(spec)
resources = extraction.keys()

resources

dict_keys(['knowledge-base', 'async-job', 'chunk', 'upload-file', 'upload', 'artifact', 'rank', 'synthesis', 'execute', 'completion', 'chat-completion', 'embedding', 'reranking', 'deployment', 'usage-statistic', 'model-deployment', 'model', 'user-info', 'user', 'account', 'question-set', 'claim-task', 'contributor-metric', 'evaluation-metric', 'hybrid-eval-metric', 'evaluation-config', 'evaluation-dataset', 'evaluation-dataset-version', 'test-case', 'studio-project', 'application-spec', 'evaluation', 'test-case-result', 'question', 'knowledge-base-datum-source', 'upload-schedule', 'autogenerated-draft-test-case', 'generation-job', 'model-group', 'model-template', 'fine-tuning-job', 'event', 'training-dataset', 'content', 'install', 'copy-to-control-plane-hook', 'install-async', 'application-variant', 'application-deployment', 'application-variant-report', 'application-test-case-output', 'application-with-variant', 'application-schema', 'proces', 'thread', 'message', 'dashboard', 'scala

In [376]:
print(extraction['thread'])

{'/v4/applications/{application_variant_id}/threads': {'get': None, 'post': None}}


In [362]:
from typing import List, Tuple


# GOAL: create resource dependency tree
# Thesis: to create a resource you need to do POST, dependents are ids and other resource names

def organize_resources(openapi):
    edges = [] # list(parent, child)

    for path, methods in openapi['paths'].items():
        # not a _real_ resource
        if 'post' not in methods.keys():
            continue
        
        path_list = path.split('/')
        
        # edges from path structure
        flag = None
        for p in path_list:
            pf = standardize(p)
            if pf in resources and flag:
                edges.append((pf, flag))
                # pass
            if pf in resources:
                flag = pf    
        
        if flag is None:
            continue
            
        # edges from request shape
        def extract_param_names(obj) -> List[str]:
            param_names = []
            
            def recursive_extract(current_obj):
                if isinstance(current_obj, dict):
                    if 'properties' in current_obj:
                        param_names.extend(current_obj['properties'].keys())
                    else:
                        for value in current_obj.values():
                            recursive_extract(value)
                elif isinstance(current_obj, list):
                    for item in current_obj:
                        recursive_extract(item)
            
            recursive_extract(obj)
            return list(set(param_names))
        
        def is_in_main_list(string, main_list):
            return string in main_list
        
        def filter_strings(main_list, filter_list):
            conditions = [
                is_in_main_list,
            ]
            
            return [
                string for string in filter_list 
                if any(condition(string, main_list) for condition in conditions)
            ]
        
        params = []
        if 'requestBody' in methods['post']:
            params.extend(extract_param_names(methods['post']['requestBody']))
        if 'parameters' in methods['post']:
            params.extend(extract_param_names(methods['post']['parameters']))
        if not params:
            params = extract_param_names(methods['post'])
        
        params = [standardize(p) for p in params]
        for r in filter_strings(params, resources):
            edges.append((flag, r))
    
    def clean_edges(inp_e: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
        return [(e1, e2) for e1, e2 in inp_e if e1 and e2 and e1 != e2]
    
    return clean_edges(edges)

In [363]:
edgs = organize_resources(spec)
edgs = list(set(edgs))

edgs

[('theme', 'account'),
 ('evaluation-dataset-version', 'account'),
 ('claim-task', 'evaluation'),
 ('test-case', 'account'),
 ('studio-project', 'account'),
 ('upload-schedule', 'knowledge-base-datum-source'),
 ('deployment', 'account'),
 ('thread', 'application-variant'),
 ('test-case-result', 'application-spec'),
 ('application-deployment', 'application-variant'),
 ('evaluation', 'application-spec'),
 ('proces', 'thread'),
 ('test-case-result', 'evaluation'),
 ('chat-completion', 'message'),
 ('artifact', 'knowledge-base'),
 ('embedding', 'model-deployment'),
 ('training-dataset', 'account'),
 ('reranking', 'model-deployment'),
 ('model', 'model-group'),
 ('evaluation-config', 'question-set'),
 ('execute', 'model-deployment'),
 ('application-variant-report', 'account'),
 ('evaluation-dataset-version', 'autogenerated-draft-test-case'),
 ('upload', 'chunk'),
 ('generation-job', 'evaluation-dataset'),
 ('application-spec', 'account'),
 ('autogenerated-draft-test-case', 'account'),
 ('ch

In [364]:
len(edgs)

88

In [365]:
from networkx import DiGraph
import networkx as nx


def build_dependency_tree(edges: []) -> DiGraph:
    graph = nx.DiGraph()
    graph.add_edges_from(edges)

    return graph

In [366]:
g = build_dependency_tree(edgs)

In [367]:
import networkx as nx
from anytree import Node, RenderTree
from collections import deque

def print_tree_for_node(graph: nx.DiGraph, start_node_name: str):
    if start_node_name not in graph.nodes():
        print(f"Node '{start_node_name}' not found in the graph.")
        return

    # Create a dictionary to store anytree Nodes
    node_dict = {}
    visited = set()  # To keep track of visited nodes

    def create_tree(nx_node):
        queue = deque([(nx_node, None)])
        while queue:
            current_node, parent = queue.popleft()
            
            if current_node in visited:
                continue
            visited.add(current_node)
            
            # Create anytree Node if it doesn't exist
            if current_node not in node_dict:
                node_dict[current_node] = Node(str(current_node), parent=parent)
            elif parent:
                # If node exists but with different parent, create a new node
                node_dict[current_node] = Node(f"{current_node}_dup", parent=parent)
            
            # Add child nodes to the queue
            for child in graph.successors(current_node):
                if child not in visited:
                    queue.append((child, node_dict[current_node]))

    # Create the tree starting from the specified node
    create_tree(start_node_name)

    # Print the tree
    print(f"\nTree rooted at {start_node_name}:")
    print(RenderTree(node_dict[start_node_name]))

# Example usage
# edges = [
#     ('A', 'B'), ('A', 'C'), ('B', 'D'), ('B', 'E'),
#     ('C', 'F'), ('E', 'G'), ('F', 'H'),
#     ('I', 'J'), ('J', 'K'),
#     ('D', 'B')  # Adding a cycle to test
# ]
# 
# graph = nx.DiGraph(edges)
# 
# # Print tree for node 'A'
# print_tree_for_node(graph, 'A')
# 
# # Print tree for node 'B'
# print_tree_for_node(graph, 'B')
# # Try to print tree for a non-existent node
# print_tree_for_node(graph, 'Z')

In [373]:
print_tree_for_node(g, 'chat-completion')


Tree rooted at chat-completion:
Node('/chat-completion')
├── Node('/chat-completion/message')
├── Node('/chat-completion/model')
│   ├── Node('/chat-completion/model/model-group')
│   └── Node('/chat-completion/model/model-template')
├── Node('/chat-completion/account')
└── Node('/chat-completion/model-deployment')
    └── Node('/chat-completion/model-deployment/deployment')


In [369]:
def print_graph(graph: nx.DiGraph):
    print("Nodes:")
    for node in graph.nodes():
        print(f"  {node}")
    
    print("\nEdges:")
    for edge in graph.edges():
        print(f"  {edge[0]} -> {edge[1]}")

In [370]:
print_graph(g)

Nodes:
  theme
  account
  evaluation-dataset-version
  claim-task
  evaluation
  test-case
  studio-project
  upload-schedule
  knowledge-base-datum-source
  deployment
  thread
  application-variant
  test-case-result
  application-spec
  application-deployment
  proces
  chat-completion
  message
  artifact
  knowledge-base
  embedding
  model-deployment
  training-dataset
  reranking
  model
  model-group
  evaluation-config
  question-set
  execute
  application-variant-report
  autogenerated-draft-test-case
  upload
  chunk
  generation-job
  evaluation-dataset
  fine-tuning-job
  application-test-case-output
  completion
  rank
  copy-to-control-plane-hook
  install-async
  question
  model-template
  install
  upload-file
  synthesis

Edges:
  theme -> account
  evaluation-dataset-version -> account
  evaluation-dataset-version -> autogenerated-draft-test-case
  evaluation-dataset-version -> evaluation-dataset
  claim-task -> evaluation
  claim-task -> account
  evaluation -> a

In [371]:
# steps:
# plan routes needed to execute
# execute routes in sequence