In [1]:
import json
import openai
import os

In [25]:
def get_tree(rootDir=os.getcwd(), ignore=['node_modules', 'images', '_helpers', '.next', '.git']):
    tree = []
    for root, dirs, files in os.walk(rootDir):
        dirs[:] = [d for d in dirs if d not in ignore]
        for file in files:
            absolute_path = os.path.join(root, file)
            relative_path = os.path.relpath(absolute_path, rootDir)            
            tree.append(relative_path)
            
    return tree

tree = get_tree()


In [30]:
import ast

class CodeAnalyzer(ast.NodeVisitor):
    def __init__(self):
        self.imports = []
        self.classes = []
        self.functions = []
        self.calls = []

    def visit_Import(self, node):
        for alias in node.names:
            self.imports.append(alias.name)
        self.generic_visit(node)

    def visit_ImportFrom(self, node):
        self.imports.append(node.module)
        self.generic_visit(node)

    def visit_ClassDef(self, node):
        self.classes.append(node.name)
        self.generic_visit(node)

    def visit_FunctionDef(self, node):
        self.functions.append(node.name)
        self.generic_visit(node)

    def visit_Call(self, node):
        if isinstance(node.func, ast.Name):
            self.calls.append(node.func.id)
        elif isinstance(node.func, ast.Attribute):
            self.calls.append(node.func.attr)
        self.generic_visit(node)

def analyze_code(file_path):
    with open(file_path, "r") as source_code:
        tree = ast.parse(source_code.read())
        analyzer = CodeAnalyzer()
        analyzer.visit(tree)
        
        print("Imports:", analyzer.imports)
        print("Classes:", analyzer.classes)
        print("Functions:", analyzer.functions)
        print("Called functions/methods:", set(analyzer.calls))

# replace 'your_file.py' with the path of the Python file you want to analyze
analyze_code('backend/chat_api/main.py')


Imports: ['fastapi', 'fastapi.middleware.cors', 'pydantic', 'typing', 'redis', 'os', 'json', 'langchain.vectorstores', 'langchain.chat_models', 'langchain.embeddings', 'langchain.llms', 'langchain.chains.qa_with_sources', 'langchain.chains.question_answering', 'langchain.memory', 'langchain.chains', 'langchain.prompts.prompt', 'langchain.schema', 'langchain.agents', 'langchain.memory', 'langchain.chat_models', 'langchain.agents', 'langchain.agents', 'uvicorn']
Classes: ['BadRequestError', 'MissingUserIdError', 'MissingDatasetError', 'MissingQuestionError', 'LlmAnswerRequest', 'ClearCacheRequest']
Functions: ['hello_world', 'llm_answer', 'clear_cache']
Called functions/methods: {'keys', 'ConversationBufferMemory', 'PromptTemplate', 'ElasticVectorSearch', 'add_middleware', 'post', 'OpenAIEmbeddings', 'Tool', 'clear', 'load_qa_with_sources_chain', 'get', 'pop', 'as_retriever', 'OpenAI', 'RetrievalQAWithSourcesChain', 'ChatOpenAI', 'MissingQuestionError', 'FastAPI', 'replace', 'HTTPExcepti

In [None]:
import json
import uuid

def parse_practice_problems_to_json(content):
    guid = str(uuid.uuid4())
    prompt = f"""
Your job is to parse through python code and documentation with the goal of returning only
information that would required or helpful to another agent answering higher level questions. 
The agent will give you a question or task and your response should be exerpts from the context 
provided. 



CONTENT: {content}
ANSWER:"""

    # print(prompt)
    response = openai.ChatCompletion.create(
        model="gpt-3.5-turbo",
        messages=[
            {"role": "system", "content": "You are a helpful AI Assistant."},
            {"role": "user", "content": prompt}
        ],
        temperature=0.0
    )

    try:
        parsed_json = json.loads(response['choices'][0]['message']['content'])
        return parsed_json
    except json.JSONDecodeError:
        print(f"Failed to parse the following content into JSON:\n{content}")
        return None




In [None]:

practice_problems = find_practice_problems("./_helpers/output/level_1_volume_1.json")


In [None]:
questions = parse_questions(practice_problems[1]['content'])

In [None]:
q = questions[22]
q

In [None]:
from pydantic import BaseModel
from typing import List, Optional
import pandas as pd
from marvin import ai_fn, ai_model

@ai_model(model="gpt-4")
class Question(BaseModel):
    question: str
    options: List[str]
    question_data: Optional[List[dict]] = None


@ai_fn
def parse_question(question: str) -> list[Question]:
    """
    Parse a question string into a structured Question object with fields, "question", "options", and "question_data" (OPTIONAL)
    """

print(parse_question(q))

In [None]:
parse_practice_problems_to_json(obj[47])

In [None]:
import spacy
from spacy import displacy
import en_core_web_sm
import re

def preprocess_text(text):
    # Replace problem numbers and choices with a unique symbol
    text = re.sub(r'(\d+)\.', r'\1<@>', text)
    text = re.sub(r'(A|B|C)\.', r'\1<#>', text)
    return text

def postprocess_text(text):
    # Replace the unique symbol with a period
    text = text.replace('<@>', '.')
    text = text.replace('<#>', '.')
    return text

In [None]:
def parse_text_to_json(text):
    text = preprocess_text(text)

    nlp = spacy.blank('en')
    nlp.add_pipe('sentencizer') 
    doc = nlp(text)

    problems_json = []
    current_problem = None

    for sentence in doc.sents:
        if "<@>" in sentence.text: 
            # If the sentence contains a problem number, it's a new problem
            if current_problem is not None:
                problems_json.append(current_problem)
            current_problem = {"problem_number": postprocess_text(sentence.text.split('<@>')[0].strip()), "problem_text": "", "choices": []}
            print(current_problem)
            problem_text = sentence.text.split('<@>')[1].strip()
            print(problem_text)
            if "<#>" in problem_text:
                # If the problem text contains a choice, split it
                current_problem["problem_text"] = postprocess_text(problem_text.split('<#>')[0].strip())
                current_problem["choices"].append('A.' + postprocess_text(problem_text.split('<#>')[1].strip()))
            else:
                current_problem["problem_text"] = postprocess_text(problem_text)
        elif "<#>" in sentence.text:
            # If the sentence contains a choice, it's a choice for the current problem
            current_problem["choices"].append(postprocess_text(sentence.text.strip()))
        else:
            # If the sentence doesn't contain a problem number or a choice, it's part of the problem text
            current_problem["problem_text"] += postprocess_text(sentence.text.strip())
    problems_json.append(current_problem)

    return problems_json
print(parse_text_to_json(practice_problems[0]['content'][18:]))


In [None]:
print(parse_text_to_json(practice_problems[0]['content'][18:]))


In [None]:
practice_problems[0]['content'][18:]

In [43]:
import os
import ast
import pprint
import site

def get_directories_to_analyze():
    directories = []
    # Include your project directory
    directories.append('/backend/chat_api')
    # Include all site-packages directories
    directories.extend(site.getsitepackages())
    return directories

def analyze_directories(directories):
    codebase_data = {}
    for directory in directories:
        codebase_data.update(analyze_directory(directory))
    return codebase_data

directories_to_analyze = get_directories_to_analyze()

class DependencyAnalyzer(ast.NodeVisitor):
    def __init__(self, module_name):
        self.module_name = module_name
        self.dependencies = set()

    def visit_Import(self, node):
        for alias in node.names:
            self.dependencies.add(alias.name)
        self.generic_visit(node)

    def visit_ImportFrom(self, node):
        self.dependencies.add(node.module)
        self.generic_visit(node)

def analyze_directory(directory_path):
    codebase_data = {}
    dependency_graph = {}

    for root, dirs, files in os.walk(directory_path):
        for file in files:
            if file.endswith(".py"):
                file_path = os.path.join(root, file)
                module_name = os.path.splitext(file)[0]
                with open(file_path, "r") as source_code:
                    tree = ast.parse(source_code.read())
                    analyzer = DependencyAnalyzer(module_name)
                    analyzer.visit(tree)
                    codebase_data[module_name] = analyzer.dependencies
                    for dependency in analyzer.dependencies:
                        dependency_graph.setdefault(dependency, set()).add(module_name)

    return codebase_data, dependency_graph

In [47]:
codebase_data, dependency_graph = analyze_directory("./backend/chat_api/")

# pp = pprint.PrettyPrinter(indent=4)
# print("Codebase Data:")
# pp.pprint(codebase_data)
# print("\nDependency Graph:")
# pp.pprint(dependency_graph)

TypeError: analyze_directory() missing 1 required positional argument: 'directory_path'

In [45]:
relevant_files = ['langchain.chat_models', 'langchain.embeddings', 
                  'langchain.llms', 'langchain.memory', 'langchain.vectorstores']

relevant_data = {file: codebase_data[file] for file in relevant_files if file in codebase_data}

pp = pprint.PrettyPrinter(indent=4)
pp.pprint(relevant_data)

{}


In [42]:
codebase_data

{'asgi': {'fastapi', 'main'},
 'main': {'fastapi',
  'fastapi.middleware.cors',
  'json',
  'langchain.agents',
  'langchain.chains',
  'langchain.chains.qa_with_sources',
  'langchain.chains.question_answering',
  'langchain.chat_models',
  'langchain.embeddings',
  'langchain.llms',
  'langchain.memory',
  'langchain.prompts.prompt',
  'langchain.schema',
  'langchain.vectorstores',
  'os',
  'pydantic',
  'redis',
  'typing',
  'uvicorn'}}