### Imports

In [None]:
!pip install -U accelerate datasets

In [None]:
!pip install -q chromadb

In [1]:
import torch.nn as nn
import numpy as np
# import Datasets
import chromadb
from chromadb import Collection
from sentence_transformers import (
    SentenceTransformer, models, losses, util, InputExample, evaluation, SentenceTransformerTrainingArguments, SentenceTransformerTrainer
)
from accelerate import Accelerator
import glob
from openai import OpenAI
import os
from dotenv import load_dotenv
import ollama
import json
# from langchain.text_splitter import NLTKTextSplitter
# from langchain_community.document_loaders import (
#     # PDFLoader,
#     # WordDocumentLoader,
#     TextLoader,
#     # ExcelLoader,
#     CSVLoader,
#     # PowerPointLoader
# )

  from tqdm.autonotebook import tqdm, trange


## Basic functions

In [2]:
def getEmbeddingList(model, sentences):
  """ This function returns the sentence embeddings for a given document using the SentenceTransformer model and encapsulates them inside a list.

  @param model: SentenceTransformer: The model to be used for getting the embeddings.
  @param sentences: list: The list of sentences for which embeddings are to be calculated. """

  embeddings = model.encode(sentences)
  return embeddings.tolist()

In [3]:
def getModel() -> SentenceTransformer:
  """ This function creates a SentenceTransformer model using the 'sentence-transformers/all-MiniLM-L6-v2' base model. It utilizes accelerator to make use of multiple GPUs
  and adds a layer to get the sentence embeddings via mean pooling. This model will be used for training sbert's sentence embeddings. """

  accelerator = Accelerator()
  print(f"Using GPUs: {accelerator.num_processes}")

  # Get the base model to train
  word_embedding_model = models.Transformer('sentence-transformers/all-MiniLM-L6-v2')

  # Add layer to get "sentence embedding" (using mean pooling)
  pooling_model = models.Pooling(word_embedding_model.get_word_embedding_dimension())
  model = SentenceTransformer(modules=[word_embedding_model, pooling_model])
  return model

In [None]:
# An initial list of actions that the AI can choose between
SET_OF_ACTIONS = ['new file', 'search web', 'search files', 'resize window', 'choose option', 'open file', 'close file', 'minimize window', 'maximize window', 'scroll up', 'scroll down', 'scroll left', 'scroll right', 'open', 'close', 'upload']
# SET_OF_ACTIONS = ['new file', 'search', 'resize window', 'choose option', 'scroll', 'open file', 'close file', 'minimize window', 'maximize window', 'scroll up', 'scroll down', 'scroll left', 'scroll right', 'copy', 'paste', 'cut', 'undo', 'redo', 'drag and drop', 'select', 'deselect', 'save', 'save as', 'open', 'close', 'upload']

# Basic file gathering and putting into database

### Getting file data

In [4]:
def list_files(initdir: str, file_extensions: list):
    '''
    Returns a list of file under initdir and all its subdirectories
    that have file extension contained in file_extensions.
    ''' 
    file_list = []
    file_count = {key: 0 for key in file_extensions}  # for reporting only
    
    # Traverse through directories to find files with specified extensions
    for root, _, files in os.walk(initdir):
        for file in files:
            ext = file.split('.')[-1].lower()
            if ext in file_extensions:
                file_path = os.path.join(root, file)
                file_list.append(file_path)
                # increment type of file
                file_count[ext] += 1
    
    # total = len(file_list)
    # print(f'There are {total} files under dir {initdir}.')
    # for k, n in file_count.items():
        # print(f'   {n} : ".{k}" files')
    return file_list

In [12]:
list_files('test', ['txt', 'c', 'py'])

['test/resolutions.txt',
 'test/sorting.py',
 'test/random.py',
 'test/example.txt',
 'test/buhao.c']

In [5]:
def get_document_info(file_path: str):
    '''
    Open the file at the given file path and return its content.
    
    @param file_path: str: The path of the file to be opened.
    @return: str: The content of the file.
    '''
    try: 
        with open(file_path, 'r') as file:
            content = file.read()
        # metadata = file.metadata
        file_name = file_path
        return (file_name, content)
    except:
        return None

    

In [69]:
get_document_info('test/resolutions.txt')

('test/resolutions.txt',
 '1. Exercise regularly and stay fit.\n2. Learn a new programming language.\n3. Read at least one book every month.\n4. Spend more time with family and friends.\n5. Travel to at least two new places.\n6. Save money and stick to a budget.\n7. Volunteer for a good cause.\n8. Improve my communication skills.\n9. Learn a musical instrument.\n10. Practice mindfulness and reduce stress.')

### ollama implementation for better semantics(you need an ollama server running in the background for this to work)

In [6]:
modelfile = '''
FROM llama3
SYSTEM You have to give a clear and detailed and accurate description of the file contents and NOTHING else.
'''

ollama.create(model='example', modelfile=modelfile)

{'status': 'success'}

In [71]:
ollama.chat(model="example", messages=[
    {
        'role': 'user',
        'content': f'{get_document_info('test/resolutions.txt')[1]}'
    }])

{'model': 'example',
 'created_at': '2024-07-31T16:44:48.64024Z',
 'message': {'role': 'assistant',
  'content': '**Text File Contents:**\n\n1. Exercise regularly and stay fit.\n2. Learn a new programming language.\n3. Read at least one book every month.\n4. Spend more time with family and friends.\n5. Travel to at least two new places.\n6. Save money and stick to a budget.\n7. Volunteer for a good cause.\n8. Improve my communication skills.\n9. Learn a musical instrument.\n10. Practice mindfulness and reduce stress.\n\n**File Type:** Text File (UTF-8 encoded)\n\n**Size:** 276 bytes\n\n**Last Modified:** Not available\n\n**Checksum:** Not available'},
 'done': True,
 'total_duration': 22521467334,
 'load_duration': 14137200500,
 'prompt_eval_count': 130,
 'prompt_eval_duration': 1196688000,
 'eval_count': 124,
 'eval_duration': 7174357000}

In [7]:
def get_ollama_description(file_path: str, modelfile: str):
    """
    Get the description of the input from the Ollama model.
    
    @param file_path: str: The file with the document.
    @param modelfile: str: The modelfile for the Ollama model.
    @return: str: The description of the input.
    """
    content = get_document_info(file_path)
    # print(content)
    ollama.create(model='example', modelfile=modelfile)
    response = ollama.chat(model="example", messages=[
        {
            'role': 'user',
            'content': f'Filename: {os.path.basename(content[0])}, File content:{content[1]}'
        }])
    return response['message']['content']

In [44]:
get_ollama_description('test/buhao.c', modelfile)

'A C program that takes two integers as input from the user and compares them to determine if one is greater than, less than, or equal to the other.'

### Database

In [8]:
embedmodel = getModel()

Using GPUs: 1


In [9]:
client = chromadb.Client()

doc_collection = client.get_or_create_collection("docs2")

In [10]:
def add_to_database(file_list: list, collection: Collection, embedmodel: SentenceTransformer, modelfile: str):
    # sentences = []
    # for file in file_list:
    #     get_ollama_description(file_path=file, modelfile=modelfile)
    
    sentences = []
    for file in file_list:
        sentences.append(get_ollama_description(file_path=file, modelfile=modelfile))
    # print(sentences)
    embeds = getEmbeddingList(model=embedmodel, sentences=sentences)
    collection.add(
        embeddings=embeds,
        documents=file_list,
        ids=[f'id{i}' for i in range(len(file_list))],
    )

In [11]:
add_to_database(file_list=list_files('test', ['txt', 'c', 'py']), collection=doc_collection, embedmodel=embedmodel, modelfile=modelfile)

### Fine tune this or look at online examples because current outputs are bad

In [61]:
# bad
input = "Python codes for web scraping"

query_result = doc_collection.query(
            query_embeddings=[getEmbeddingList(embedmodel, input)],
            n_results=1,
        )

print(query_result['documents'][0][0])

test/random.py


In [62]:
# bad
input = "new years resolutions"

query_result = doc_collection.query(
            query_embeddings=[getEmbeddingList(embedmodel, input)],
            n_results=5,
        )

print(query_result)

{'ids': [['id0', 'id3', 'id2', 'id1', 'id4']], 'distances': [[30.731735229492188, 48.405677795410156, 54.165950775146484, 57.87548065185547, 58.31592559814453]], 'metadatas': [[None, None, None, None, None]], 'embeddings': None, 'documents': [['test/resolutions.txt', 'test/example.txt', 'test/random.py', 'test/sorting.py', 'test/buhao.c']], 'uris': None, 'data': None, 'included': ['metadatas', 'documents', 'distances']}


# Commands

In [12]:
def make_file(file_path: str, content: str):
    '''
    Create a file at the given file path with the given content.
    
    @param file_path: str: The path of the file to be created.
    @param content: str: The content of the file.
    '''
    with open(file_path, 'w') as file:
        file.write(content)

In [78]:
def remove_file(file_path: str):
    '''
    Remove the file at the given file path.
    
    @param file_path: str: The path of the file to be removed.
    '''
    os.remove(file_path)

In [13]:
def search_files(query: str, collection: Collection, embedmodel: SentenceTransformer):
    '''
    Search for files in the collection that match the query.
    
    @param query: str: The query to search for.
    @param collection: Collection: The collection to search in.
    @param embedmodel: SentenceTransformer: The model to be used for getting the embeddings.
    @return: list: The list of files that match the query.
    '''
    query_result = collection.query(
        query_embeddings=[getEmbeddingList(embedmodel, query)],
        n_results=1,
    )
    return query_result['documents'][0][0]

In [14]:
def open_file(file_path: str):
    '''
    Open the file at the given file path.
    
    @param file_path: str: The path of the file to be opened.
    '''
    os.system(f'open {file_path}')

In [81]:
open_file('test/random.py')

In [63]:
# Testing input to open file:

input = "open my new years resolutions"

query_result = doc_collection.query(
            query_embeddings=[getEmbeddingList(embedmodel, input)],
            n_results=1,
        )


open_file(query_result['documents'][0][0])

In [15]:
load_dotenv()
os.getenv("OPENAI_API_KEY")
client = OpenAI()

In [21]:
# Testing function calling using GPT-4o-mini

messages = [{'role': 'user', 'content': "open my new years resolutions"}]
tools = [
    {
        "type": "function",
        "function": {
            "name": "open_file",
            "description": "Open the file at the given file path.",
            "parameters": {
                "type": "object",
                "properties": {
                    "file_path": {
                        "type": "string",
                        "description": "The path of the file to be opened."
                    },
                },
                "required": ["file_path"],
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "search_files",
            "description": "Search for the file that is most similar to the query and return the file path.",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "The query to search for the file. This query should be something that is semantically similar to what we are looking for.",
                    },
                    "collection": {
                        "type": "string",
                        "description": "The collection to search in. (Use doccollection for now)"
                    },
                    "embedmodel": {
                        "type": "string",
                        "description": "The model to be used for getting the embeddings. (Use embedmodel for now)"
                    }
                },
                "required": ["query"],
            }
        }
    }
]

response = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=messages,
    tools=tools,
    tool_choice="auto"
    
)
# print(response)

In [23]:
response_message = response.choices[0].message
tool_calls = response_message.tool_calls

available_functions = {
    "open_file": open_file,
    "search_files": search_files,
}

if tool_calls:
    messages.append(response_message)
    
    for tool_call in tool_calls:
        function_name = tool_call.function.name
        function_to_call = available_functions[function_name]
        function_args = json.loads(tool_call.function.arguments)
        
        function_response = function_to_call(
            query=function_args.get("query"),
            collection=doc_collection,
            embedmodel=embedmodel
        )
        
        # print(function_response)
        
        messages.append(
            {
                "tool_call_id": tool_call.id,
                "role": "tool",
                "name": function_name,
                "content": function_response,
            }
        )
        message = messages
        second_response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages,
            tools=tools,
            tool_choice="auto"
        )

response_message = second_response.choices[0].message
tool_calls = response_message.tool_calls

if tool_calls:
    for tool_call in tool_calls:
        function_name = tool_call.function.name
        function_to_call = available_functions[function_name]
        function_args = json.loads(tool_call.function.arguments)

        function_response = function_to_call(
            file_path=function_args.get("file_path"),
        )

test/resolutions.txt


In [None]:
# Now we need to get a vocal dataset from hf to do tests
