In [1]:
import os
import shutil
import ast
import openai
import json
import numpy as np
import pandas as pd
import time
import dotenv
dotenv.load_dotenv('.env')

def get_embeddings_oai(texts):
    response = openai.embeddings.create(
        model="text-embedding-ada-002",
        input=texts,
    )
    return response

def recursive_knowledge(path: str, node: ast.Module): 
    sub_df = pd.DataFrame([[path, ast.unparse(node), []]], columns=['path', 'data', 'embedding']) 
    if not hasattr(node, 'body'): return
    for item in node.body:
        newPath = path + '>' + item.__class__.__name__
        sub_df = pd.concat([sub_df, recursive_knowledge(newPath, item)])
    return sub_df
        
def walk_file_tree(path, ext):
    knowledge_df = pd.DataFrame(columns=['path', 'data', 'embedding'])
    for item in os.listdir(path):
        item_path = os.path.join(path, item)
        if os.path.isdir(item_path):
            knowledge_df = pd.concat([knowledge_df, walk_file_tree(item_path, ext)])
        elif os.path.splitext(item_path)[1] in ext:
            with open(item_path, 'r') as f:
                knowledge_df = pd.concat([knowledge_df, recursive_knowledge(item_path, ast.parse(f.read()))])
    return knowledge_df

def generate_knowledge_from_dir(path_in, path_out, ext=['.py']):
    knowledge_df = walk_file_tree(path_in, ext)
    resp = get_embeddings_oai(("DATA_PATH: "+knowledge_df['path']+'\nDATA:'+knowledge_df['data']).tolist())
    resp = [emb.embedding for emb in resp.data]
    knowledge_df['embedding'] = resp
    knowledge_df.to_csv(path_out, index=False)

def load_knowledge(path_in):
    knowledge_df = pd.read_csv(path_in)
    knowledge_df['embedding'] = knowledge_df['embedding'].apply(lambda x: json.loads(x))
    return knowledge_df


def cosine_similarity(a, b):
    return np.dot(a, b)/(np.linalg.norm(a)*np.linalg.norm(b))

def get_top_k_context(knowledge_df, text, k):
    embedding = get_embeddings_oai(text).data[0].embedding
    knowledge_df['similarity'] = knowledge_df['embedding'].apply(lambda x: cosine_similarity(x, embedding))
    return knowledge_df.sort_values(by='similarity', ascending=False).head(k)

def train_of_thought(texts,k,i=0):
    if k>0:
        longtxt = "".join(texts)
        ctx = get_top_k_context(longtxt, i+1)
        texts.append("\nContext: "+ctx['path'].iloc[i] + " Data: " + ctx['data'].iloc[i])
        return train_of_thought(texts, k-1, i+1)
    else:
        return "".join(texts)
    
def ask_assistant(query, ctx, client:openai.OpenAI, thread):
    #use context as system messages
    ctx_messages = "\n".join(("CONTEXT:\nPath: "+ctx['path']+ "\nData: " + ctx['data']+"\n").tolist())
    message = ctx_messages + "\nUSER: " + query
    print(message)
    client.beta.threads.messages.create(thread_id=thread.id, content=message,role='user')

def handle_user_query(
        query,
        knowledge_df,
        client:openai.OpenAI,
        thread,
        k=8,
):
    ctx = get_top_k_context(knowledge_df, query, k)
    ask_assistant(query, ctx, client, thread)
    run = client.beta.threads.runs.create(
        thread_id=thread.id,
        assistant_id=os.getenv('OPENAI_ASSISTANT_ID')
    )
    while run.status != 'completed':
        run = client.beta.threads.runs.retrieve(thread_id=thread.id, run_id=run.id)
        time.sleep(1)
    response = client.beta.threads.messages.list(thread_id=thread.id)
    return response

def get_repo_name(repo_url):
    splt1 = repo_url.split('/')
    splt2 = splt1[-1].split('.')
    if len(splt2) > 1:
        return splt2[0]
    else:
        return splt1[-1]

def clone_repo(repo_url):
    os.system("git clone " + repo_url + " codebase_input/" + get_repo_name(repo_url))

def wipe_input_dir():
    shutil.rmtree('codebase_input', ignore_errors=True)
    os.mkdir('codebase_input')

def update_codebase(repos, path_out='knowledge_df.csv', ext=['.py']):
    #wipe_input_dir()
    #for repo in repos:
    #    clone_repo(repo)
    generate_knowledge_from_dir('codebase_input', path_out, ext)

In [2]:
repos = [
    "https://github.com/mobutu/ecf-srdf-service-orchestrator",
    "https://github.com/mobutu/ecf-srdf-service-file-to-image",
    "https://github.com/Deathtanium/ecf-srdf-service-image-optimizer/",
    "https://github.com/mobutu/ecf-srdf-service-iocr",
    "https://github.com/mobutu/ecf-srdf-service-file-classifier",
    "https://github.com/mobutu/ecf-srdf-service-details-extractor"
]
update_codebase(repos)
knowledge_df = load_knowledge('knowledge_df.csv')

client = openai.OpenAI()
assistant = client.beta.assistants.retrieve(os.getenv('OPENAI_ASSISTANT_ID'))

thread = client.beta.threads.create()

tosend = "Write a unit test for the check_file_extension, in the UnpackFile class, in the orchestrator module. Take into account the class it's a part of."
#tosend = input("Enter a message: ")

response = handle_user_query(tosend, knowledge_df, client, thread, k=8)

CONTEXT:
Path: codebase_input\ecf-srdf-service-orchestrator\src\convertor_archive_to_files.py>ClassDef>FunctionDef
Data: def check_file_extension(self, archive: str) -> bool:
    logger.log_info(f'{__name__} - check_file_extension - Processing file: {archive}')
    extension = archive.split('.')[-1]
    return extension in self.zip_files_formats

CONTEXT:
Path: codebase_input\ecf-srdf-service-orchestrator\src\convertor_archive_to_files.py>ClassDef>FunctionDef
Data: def __init__(self):
    self.zip_files_formats = self.set_files_format(config.settings['unpack_file']['supported_formats'])
    self.max_nr_subfolders = config.settings['unpack_file']['max_nr_subfolders']
    self.unpack_max_time = config.settings['unpack_file']['unpack_max_time']
    self.algorithm = config.settings['unpack_file']['algorithm']
    logger.log_info(f'{__name__} - __init__ - Successfully set the UnpackFile')

CONTEXT:
Path: codebase_input\ecf-srdf-service-orchestrator\src\convertor_files_to_archive.py>If
Data:

In [5]:
for message in response.data:
    print(message.content[0].text.value)

The unit tests have passed successfully. Both tests, `test_check_file_extension_valid` and `test_check_file_extension_invalid`, have confirmed that the `check_file_extension` method in the `UnpackFile` class correctly identifies valid and invalid file extensions, based on the mocked behavior that reflects the provided class context. If you need the actual test code to use in your project, please let me know, and I will provide it for you.
It appears that the test `test_check_file_extension_valid` has failed. This could be due to the mocked `UnpackFile` class not matching the behavior of the actual class from the provided context. Let's try to closely match the actual class behavior in our mocked version and run the test again. 

I will modify the test to include the correct expected behavior and run it again.
CONTEXT:
Path: codebase_input\ecf-srdf-service-orchestrator\src\convertor_archive_to_files.py>ClassDef>FunctionDef
Data: def check_file_extension(self, archive: str) -> bool:
    