In [1]:
import numpy as np
import time
np.random.seed(42)

n_docs = [1_000, 10_000, 100_000, 500_000]
n_dims = 768

# for n in n_docs:
#     # Генерируем векторы для оценки времени выполнения запроса
#     embeddings = np.random.randn(n, n_dims)
#     # Генерируем сам запрос
#     query = np.random.randn(768)
    
#     # Замеряем время на ранжирование ближайших соседей
#     t0 = time.time()
#     similarities = embeddings.dot(query)
#     sorted_ix = np.argsort(-similarities)
#     t1 = time.time()

#     total = t1-t0
#     print(f"Время выполнения 1 поиска среди {n} документов в базе: {np.round(total,3)} секунд")


for n in n_docs:
    # Generating vectors to estimate query execution time
    embeddings = np.random.randn(n, n_dims)
    # Generating the query itself
    query = np.random.randn(768)
    
    # Measuring the time for ranking the nearest neighbors
    t0 = time.time()
    similarities = embeddings.dot(query)
    sorted_ix = np.argsort(-similarities)
    t1 = time.time()

    total = t1-t0
    print(f"Execution time for 1 search among {n} documents in the database: {np.round(total,3)} seconds")


Время выполнения 1 поиска среди 1000 документов в базе: 0.001 секунд
Время выполнения 1 поиска среди 10000 документов в базе: 0.006 секунд
Время выполнения 1 поиска среди 100000 документов в базе: 0.064 секунд
Время выполнения 1 поиска среди 500000 документов в базе: 0.311 секунд


In [2]:
similarities

array([-35.00990696,  37.17055079, -10.75623493, ..., -32.84975455,
        -1.2313611 ,  15.66084796])

In [4]:
len(similarities)

500000

In [3]:
sorted_ix

array([309285, 103005, 312889, ..., 397555, 327487, 318532])

In [5]:
len(sorted_ix)

500000

# ChromaDB

In [2]:
# !pip install chromadb

In [3]:
import chromadb
chroma_client = chromadb.Client()

In [4]:
collection = chroma_client.create_collection(name="my_collection")

In [5]:
collection.add(
    documents=["This is a document", "This is another document"],
    metadatas=[{"source": "my_source"}, {"source": "my_source"}],
    ids=["id1", "id2"]
)

/home/koffi369/.cache/chroma/onnx_models/all-MiniLM-L6-v2/onnx.tar.gz: 100%|█| 7


In [None]:
# collection.add(
#     embeddings=[[1.2, 2.3, 4.5], [6.7, 8.2, 9.2]],
#     documents=["This is a document", "This is another document"],
#     metadatas=[{"source": "my_source"}, {"source": "my_source"}],
#     ids=["id1", "id2"]
# )

In [10]:
results = collection.query(
    query_texts=["This is a query document"],
    n_results=3
)

Number of requested results 3 is greater than number of elements in index 2, updating n_results = 2


In [11]:
results

{'ids': [['id1', 'id2']],
 'distances': [[0.7111214399337769, 1.0109773874282837]],
 'metadatas': [[{'source': 'my_source'}, {'source': 'my_source'}]],
 'embeddings': None,
 'documents': [['This is a document', 'This is another document']],
 'uris': None,
 'data': None}

In [12]:
!pip show chromadb

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
Name: chromadb
Version: 0.4.22
Summary: Chroma.
Home-page: 
Author: 
Author-email: Jeff Huber <jeff@trychroma.com>, Anton Troynikov <anton@trychroma.com>
License: 
Location: /home/koffi369/anaconda3/lib/python3.11/site-packages
Requires: bcrypt, build, chroma-hnswlib, fastapi, grpcio, importlib-resources, kubernetes, mmh3, numpy, onnxruntime, opentelemetry-api, opentelemetry-exporter-otlp-proto-grpc, opentelemetry-instrumentation-fastapi, opentelemetry-sdk, overrides, posthog, pulsar-client, pydantic, pypika, PyYAML, requests, tenacity, tokenizers, tqdm, typer, typing-extensions, uvicorn
Required-by: 


In [1]:
from gpt4all import GPT4All
model = GPT4All("orca-mini-3b-gguf2-q4_0.gguf")
output = model.generate("The capital of France is ", max_tokens=3)
print(output)

100%|█████████████████████████████████████| 1.98G/1.98G [05:44<00:00, 5.74MiB/s]


201


# Test

## All classes

In [1]:
################################################################### Utils
import numpy as np
from functools import wraps
from termcolor import colored


def cosine_similarity(a: np.ndarray, b: np.ndarray):
    return np.dot(a, b.T) / (np.linalg.norm(a) * np.linalg.norm(b))




def logging(enabled = True, message = "", color = "yellow"):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            if enabled:
                print(f"LOG: {colored(message, color = color)}")
            return func(*args, **kwargs)
        return wrapper
    return decorator



################################################################ Embeddings

import torch
import os
import numpy as np
from transformers import AutoModel, AutoTokenizer
from chromadb import EmbeddingFunction
from gpt4all import Embed4All
# from dotenv import dotenv_values

# env = dotenv_values(".env")
# os.environ['HUGGINGFACE_HUB_CACHE'] = env['HUGGINGFACE_HUB_CACHE']



class BaseEmbedder(EmbeddingFunction):
    def __init__(self):
        pass

    def get_embeddings(self, texts):
        raise NotImplementedError("Subclasses should implement this!")

    def __call__(self, text):
        return self.get_embeddings(text)


class GPT4AllEmbedder(BaseEmbedder):
    def __init__(self):
        self.embedder = Embed4All() # default: all-MiniLM-L6-v2

    def get_embeddings(self, texts):
        if type(texts) == str:
            texts = [texts]
        
        embeddings = []
        for text in texts:
            embeddings.append(self.embedder.embed(text))

        return embeddings

    def __call__(self, text):
        return self.get_embeddings(text)


################################################################### VectDB
import chromadb
import uuid
import datetime
# from embedder import BaseEmbedder, HFEmbedder
# from dotenv import dotenv_values

# env = dotenv_values(".env")
# DB_PATH = env["DB_PATH"]
DB_PATH = './'

class CollectionOperator():
    def __init__(self, collection_name, db_path = DB_PATH, embedder: BaseEmbedder = None):
        self.embedder = embedder
        self.client = chromadb.PersistentClient(path = db_path)
        self.collection = self.client.get_or_create_collection(name = collection_name, embedding_function = self.embedder.get_embeddings)

    def add(self, text, metadata = {}):
        metadata['timestamp'] = str(datetime.datetime.now())

        self.collection.add(
            documents = [text],
            metadatas = [metadata],
            ids = [str(uuid.uuid4())]
        )

    def delete(self, id):
        self.collection.delete(id)

    def query(self, query, n_results, return_text = True):
        query = self.collection.query(
            query_texts = query,
            n_results = n_results,
        )

        if return_text:
            return query['documents'][0]
        else:
            return query





######################################################################### llm

# from typing import List, Optional, Any
# from gpt4all import GPT4All
# from llama_cpp import Llama

# class BaseLLM():
#     def __init__(self, model_name: Optional[str] = None, model_path: Optional[str] = None) -> None:
#         self.user = "### Instructions" #"USER"
#         self.assistant = "### Response" #"ASSISTANT"
#         self.input = "### Input"
#         self.streaming = False

#         # additional linking contexts
#         # self.memory_context = lambda question: f"""
#         # By considering below input memories from me, answer the question if its provided in memory, else just answer without memory: {question}
#         # """ 
#         ##############
#         self.memory_context = lambda question: f"""
#         Here is a context that represent all informations in the memory of a robot that he has to use to answer a question. 
#         Based on this memory context answer the following question. 
#         Here is the question: {question}
        
#         Rules: 
#         1 - If there is no relevent information in the memory to answer the question you must answer by "no relevent information "
#         2 - You are not allowed any type of information outside of the provided context to answer the question
#         """
#         ##############

#         # self.search_context = lambda question: f"""
#         # You have been given access to the Internet. By considering below search results, summarize the information if its provided in search result, else just answer without search results: {question}
#         # """

#     def generate(self, request: str, streaming: bool) -> Any:
#         raise NotImplementedError

#     def response(self, request: str) -> Any:
#         return self.generate(f"{self.user}:\n{request}\n{self.assistant}:\n", streaming = self.streaming)

#     ############################################################################################################################
#     def memory_response(self, request: str, memory_queries: List[str]) -> Any:
#         queries = f"{self.user}:\n{self.memory_context(request)}\n{self.input}:\n"

#         for i, query in enumerate(memory_queries):
#             queries += f"MEMORY CHUNK {i}: {query}\n"

#         queries += f"{self.assistant}:\n"

#         return self.generate(queries, streaming = self.streaming)
#     ############################################################################################################################

#     # def search_response(self, request: str, search_results: List[dict[str, str, str]]) -> Any:
#     #     queries = f"{self.user}:\n{self.search_context(request)}\n{self.input}:\n"

#     #     for i, query in enumerate(search_results):
#     #         queries += f"SEARCH TITLE: {query['title']}\nSEARCH LINK: {query['link']}\nSEARCH CONTENT: {query['content']}\n"

#     #     queries += f"{self.assistant}:\n"

#     #     return self.generate(queries, streaming = self.streaming)

# class GPT4AllLLM(BaseLLM):
#     def __init__(self, model_name: Optional[str] = None, model_path: Optional[str] = None) -> None:
#         super().__init__(model_name, model_path)
        
#         self.gpt = GPT4All(model_name = model_name, model_path = model_path, verbose=False)

#     def generate(self, request: str, streaming: bool) -> Any:
#         return self.gpt.generate(prompt = request, streaming = streaming)

# class LlamaCPPLLM(BaseLLM):
#     def __init__(self, model_name: Optional[str] = None) -> None:
#         super().__init__(model_name)
        
#         self.gpt = Llama(model_path = model_name, n_ctx=2048, verbose=False)

#     def generate(self, request: str, streaming: bool) -> Any:
#         return self.gpt.create_completion(prompt = request, stream = streaming, stop=[f"{self.user}:"])


 
######################################################################### new llm
from typing import List, Optional, Any
from gpt4all import GPT4All
from llama_cpp import Llama

class BaseLLM():
    def __init__(self, model_name: Optional[str] = None, model_path: Optional[str] = None) -> None:
        # self.user = '''We have a database that represents the memory of a robot. Using this memory the robot is 
        # supposed to provide relevent and extremely accurate informations in order to solve a task or answer a question. 
        # Your goal is to analyse the informations in the memory and and help the robot to provide the most relevent informations to solve the task ''' #"USER"
        self.user = '''We have a database that serves as the memory of a robot. This memory is used by the robot to provide relevant and highly accurate 
        information to solve tasks or answer questions. Your goal is to analyze the information in the memory and assist the robot in providing the most 
        relevant information to solve the task.'''
                
        # self.assistant = '''
        # Here is the set of rules you should explicetly follow  : 
        # Rules: 
        # 1 - If there is no relevent information in the memory to answer the question you must answer by "no relevent information "
        # 2 - You are not allowed any type of information outside of the provided memory to answer the question
        # Your Response:''' #"ASSISTANT"
        self.assistant = '''
        Here are the rules you should follow explicitly: 
        Rules: 
        1 - If there is no relevant information in the memory to answer the question, you must answer with "No relevant information."
        2 - You are not allowed to use any information outside of the provided memory to answer the question.
        
        Remember: There might be cases where the information from the memory is not relevant to solve the task. In such cases, you should explicitly answer with "No relevant information."
        
        Your Response:'''
        # self.input = "Here is the informations from the memory:"
        self.input = "Here are the pieces of information from the memory:"
        self.streaming = False

        ##############
        self.memory_context = lambda question: f""" 
        Here is the task  \n 
        Task: {question} \n
        """
        ##############

        # self.search_context = lambda question: f"""
        # You have been given access to the Internet. By considering below search results, summarize the information if its provided in search result, else just answer without search results: {question}
        # """

    def generate(self, request: str, streaming: bool) -> Any:
        raise NotImplementedError

    def response(self, request: str) -> Any:
        return self.generate(f"{self.user}:\n{request}\n{self.assistant}:\n", streaming = self.streaming)

    ############################################################################################################################
    def memory_response(self, request: str, memory_queries: List[str]) -> Any:
        queries = f"{self.user}:\n{self.memory_context(request)}\n{self.input}:\n"


        for i, query in enumerate(memory_queries):
            queries += f"MEMORY CHUNK {i}: {query}\n"

        queries += f"{self.assistant}:\n"

        return self.generate(queries, streaming = self.streaming)
    ############################################################################################################################

    # def search_response(self, request: str, search_results: List[dict[str, str, str]]) -> Any:
    #     queries = f"{self.user}:\n{self.search_context(request)}\n{self.input}:\n"

    #     for i, query in enumerate(search_results):
    #         queries += f"SEARCH TITLE: {query['title']}\nSEARCH LINK: {query['link']}\nSEARCH CONTENT: {query['content']}\n"

    #     queries += f"{self.assistant}:\n"

    #     return self.generate(queries, streaming = self.streaming)

class GPT4AllLLM(BaseLLM):
    def __init__(self, model_name: Optional[str] = None, model_path: Optional[str] = None) -> None:
        super().__init__(model_name, model_path)
        
        self.gpt = GPT4All(model_name = model_name, model_path = model_path, verbose=False)

    def generate(self, request: str, streaming: bool) -> Any:
        return self.gpt.generate(prompt = request, streaming = streaming)

class LlamaCPPLLM(BaseLLM):
    def __init__(self, model_name: Optional[str] = None) -> None:
        super().__init__(model_name)
        
        self.gpt = Llama(model_path = model_name, n_ctx=2048, verbose=False)

    def generate(self, request: str, streaming: bool) -> Any:
        return self.gpt.create_completion(prompt = request, stream = streaming, stop=[f"{self.user}:"])



######################################################################### summarizer


import os
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
from transformers import pipeline
from dotenv import dotenv_values

# env = dotenv_values(".env")
# os.environ['HUGGINGFACE_HUB_CACHE'] = env['HUGGINGFACE_HUB_CACHE']

# checkpoint = "t5-small"
# checkpoint = "google/mt5-small"
# checkpoint = "facebook/bart-large-cnn"
checkpoint = "sshleifer/distilbart-cnn-12-6"

# class Summarizer():
#     def __init__(self, model = checkpoint) -> None:
#         self.summarizer = pipeline("summarization", model = model)#, min_length = 30, max_length = 300

#     def summarize(self, text: str, min_length_ratio = 0.3, max_length_ratio = 1.):
#         if len(text) < 5:
#             return text

#         prompt = f"summarize: {text}"

#         return self.summarizer(prompt,  min_length = int(min_length_ratio * len(prompt.split(" "))), max_length = int(max_length_ratio * len(prompt.split(" "))))[0]['summary_text']

#     def __call__(self, text, min_length_ratio = 0.3, max_length_ratio = 1.):
#         return self.summarize(text, min_length_ratio, max_length_ratio)



# https://discuss.huggingface.co/t/summarization-on-long-documents/920/23
# https://www.width.ai/post/4-long-text-summarization-methods

class Summarizer():
    def __init__(self, model = checkpoint) -> None:
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.model = AutoModelForSeq2SeqLM.from_pretrained(model).to(self.device)
        self.tokenizer = AutoTokenizer.from_pretrained(model)

        # self.model = BartForConditionalGeneration.from_pretrained(model_name)#.to('cuda')
        # self.tokenizer = BartTokenizer.from_pretrained(model_name)

    def summarize(self, text: str, min_length = 30, max_length = 100):
        """Fixed-size chunking"""
        inputs_no_trunc = self.tokenizer(text, max_length=None, return_tensors='pt', truncation=False)
        if len(inputs_no_trunc['input_ids'][0]) < 30:
            return text

        # min_length = min_length_ratio * len(inputs)
        # max_length = max_length_ratio * len(inputs)
        
        inputs_batch_lst = []
        chunk_start = 0
        chunk_end = self.tokenizer.model_max_length  # == 1024 for Bart
        while chunk_start <= len(inputs_no_trunc['input_ids'][0]):
            inputs_batch = inputs_no_trunc['input_ids'][0][chunk_start:chunk_end]  # get batch of n tokens
            inputs_batch = torch.unsqueeze(inputs_batch, 0)
            inputs_batch_lst.append(inputs_batch)
            chunk_start += self.tokenizer.model_max_length  # == 1024 for Bart
            chunk_end += self.tokenizer.model_max_length  # == 1024 for Bart
        summary_ids_lst = [self.model.generate(inputs.to(self.device), num_beams=4, min_length=min_length, max_length=max_length, early_stopping=True) for inputs in inputs_batch_lst]

        summary_batch_lst = []
        for summary_id in summary_ids_lst:
            summary_batch = [self.tokenizer.decode(g, skip_special_tokens=True, clean_up_tokenization_spaces=False) for g in summary_id]
            summary_batch_lst.append(summary_batch[0])
        summary_all = '\n'.join(summary_batch_lst)

        return summary_all

    def __call__(self, text, min_length = 30, max_length = 100):
        return self.summarize(text, min_length, max_length)

######################################################################### llm Agent


# from llm import BaseLLM
# # from search_engine import SearchEngine
# from summarizer import Summarizer
# from query_db import CollectionOperator


# from utils import logging




enable_logging = True

class LLMAgent():
    def __init__(
        self, 
        llm: BaseLLM = None, 
        tm_qdb: CollectionOperator = None, 
        summarizer: Summarizer = None, 
        # search_engine: SearchEngine = None,
        use_summarizer = True,
       
    ) -> None:

        self.llm = llm
        self.tm_qdb = tm_qdb
        self.memory_access_threshold = 1.5
        # self.similarity_threshold = 0.5 # [0; 1]
        self.db_n_results = 3
        self.se_n_results = 3
        self.use_summarizer = use_summarizer
       
        self.summarizer = summarizer
        # self.search_engine = search_engine
       

    @logging(enable_logging, message = "[Adding to memory]")
    def add(self, request):
        # summary = self.summarizer(f"{self.llm.user}:\n{request}\n{self.llm.assistant}:\n{''.join(response)}")
        
        summary = self.summarize(request) if self.use_summarizer else request

        self.tm_qdb.add(summary) if summary != "" else None

        response = self.llm.response(request)

        return response
        

    ############################################################################################################################
    @logging(enable_logging, message = "[Querying memory]")
    def memory_response(self, request):
        memory_queries_data = self.tm_qdb.query(request, n_results = self.db_n_results, return_text = False)
        memory_queries = memory_queries_data['documents'][0]
        memory_queries_distances = memory_queries_data['distances'][0]

        acceptable_memory_queries = []

        for query, distance in list(zip(memory_queries, memory_queries_distances)):
            # print(f"Query: {query}, Distance: {distance}")
            if distance < self.memory_access_threshold:
            # if (1 - distance) >= self.similarity_threshold:
                acceptable_memory_queries.append(query)

        if len(acceptable_memory_queries) > 0:
            response = self.llm.memory_response(request, acceptable_memory_queries)
        else:
            response = self.llm.response(request) #TODO: add another solution

        return response
    ############################################################################################################################
    # @logging(enable_logging, message = "[Searching]")
    # def search(self, request):
    #     search_response = self.search_engine.search(request, n_results = self.se_n_results)

    #     for response in search_response:
    #         response['content'] = self.summarize(response['content'])

    #     return self.llm.search_response(request, search_response)

    # @logging(enable_logging, message = "[Summarizing]", color = "green")
    def summarize(self, text, min_length = 30, max_length = 100):
        return self.summarizer(text, min_length, max_length)


    @logging(enable_logging, message = "[Response]")
    def response(self, request):
        return self.llm.response(request)

    
    def generate(self, request: str):
        if request.upper().startswith("MEM"):
            response = self.memory_response(request[len("MEM"):])
        elif request.upper().startswith("REMEM"): #and len(acceptable_memory_queries) == 0
            response = self.add(request[len("REMEM"):])
        # elif request.upper().startswith("WEB"):
        #     response = self.search(request[len("WEB"):])
        else:
            response = self.response(request)
            
        return response





# Chat

In [5]:
# import os

# # Set the model path
# os.environ['MODEL_PATH'] = 'Memory_agent/mistral-7b-openorca.Q4_0.gguf'

# # Now you can access this path anywhere in your code using:
# model_path = os.environ.get('MODEL_PATH')

In [2]:
# from llm_agent import LLMAgent
# from llm import LlamaCPPLLM, GPT4AllLLM
# from embedder import HFEmbedder, GPT4AllEmbedder
# from search_engine import SearchEngine
# from summarizer import Summarizer
# from query_db import CollectionOperator
# from dotenv import dotenv_values

# env = dotenv_values(".env")

def chat_gpt4all():
    # llm_agent.llm.streaming = True
    llm_agent.llm.streaming = False
    system_template = 'A chat between a curious user and an artificial intelligence assistant.'

    with llm_agent.llm.gpt.chat_session(system_template):
        while True:
            user_text_request = input("You > ")

            bot_text_response = llm_agent.generate(user_text_request)
            
            if llm_agent.llm.streaming:
                print(f"Bot <", end = ' ')
                for token in bot_text_response:
                    print(token, end = '')
                print()
            else:
                print(f"Bot < {bot_text_response}")

def chat_llama_cpp():
    llm_agent.llm.streaming = True
    system_template = '<<SYS>>A chat between a curious user and an artificial intelligence assistant.<</SYS>>'

    llm_agent.llm.gpt.eval(llm_agent.llm.gpt.tokenize(system_template.encode("utf-8")))

    while True:
        user_text_request = input("You > ")

        bot_text_response = llm_agent.generate(user_text_request)
        
        if llm_agent.llm.streaming:
            print(f"Bot <", end = ' ')
            for token in bot_text_response:
                print(token['choices'][0]['text'], end = '')
            print()
        else:
            print(f"Bot < {bot_text_response['choices'][0]['text']}")


# if __name__ == "__main__":
#     port_lib_name = "LLAMA_CPP"

#     if port_lib_name == "LLAMA_CPP":
#         LLM = LlamaCPPLLM
#         chat = chat_llama_cpp
#     else:
#         LLM = GPT4AllLLM
#         chat = chat_gpt4all

#     llm = LLM(env['LLM_PATH'])
    

#     """Keep in mind that user and helper tokens may vary between LLMs."""
#     llm.user = "### Instruction" #"USER"
#     llm.assistant = "### Response" #"ASSISTANT"
    
#     embedder = HFEmbedder()
#     search_engine = SearchEngine()
#     summarizer = Summarizer()

#     total_memory_co = CollectionOperator("total-memory", embedder = embedder)

#     llm_agent = LLMAgent(llm, total_memory_co, summarizer, search_engine, use_summarizer = False)

#     chat()

In [None]:


import os
# Set the model path
os.environ['LLM_PATH'] = 'nous-hermes-llama2-13b.Q4_0.gguf'

if __name__ == "__main__":
    port_lib_name = "LLAMA_CPP"
    # port_lib_name = "none"

    if port_lib_name == "LLAMA_CPP":
        LLM = LlamaCPPLLM
        chat = chat_llama_cpp
    else:
        LLM = GPT4AllLLM
        chat = chat_gpt4all

    # Now you can access this path anywhere in your code using:
    llm = LLM(os.environ.get('LLM_PATH'))
    

    """Keep in mind that user and helper tokens may vary between LLMs."""
    # llm.user = "### Instruction" #"USER"
    # llm.assistant = "### Response" #"ASSISTANT"
    
    embedder = GPT4AllEmbedder()
    # search_engine = SearchEngine()
    summarizer = Summarizer()

    total_memory_co = CollectionOperator("total-memory", embedder = embedder)

    llm_agent = LLMAgent(llm, total_memory_co, summarizer, use_summarizer = False)

    chat()

llama_model_loader: loaded meta data with 19 key-value pairs and 363 tensors from nous-hermes-llama2-13b.Q4_0.gguf (version GGUF V2 (latest))
llama_model_loader: - tensor    0:                token_embd.weight q4_0     [  5120, 32032,     1,     1 ]
llama_model_loader: - tensor    1:              blk.0.attn_q.weight q4_0     [  5120,  5120,     1,     1 ]
llama_model_loader: - tensor    2:              blk.0.attn_k.weight q4_0     [  5120,  5120,     1,     1 ]
llama_model_loader: - tensor    3:              blk.0.attn_v.weight q4_0     [  5120,  5120,     1,     1 ]
llama_model_loader: - tensor    4:         blk.0.attn_output.weight q4_0     [  5120,  5120,     1,     1 ]
llama_model_loader: - tensor    5:            blk.0.ffn_gate.weight q4_0     [  5120, 13824,     1,     1 ]
llama_model_loader: - tensor    6:              blk.0.ffn_up.weight q4_0     [  5120, 13824,     1,     1 ]
llama_model_loader: - tensor    7:            blk.0.ffn_down.weight q4_0     [ 13824,  5120,     1,   

bert_load_from_file: gguf version     = 2
bert_load_from_file: gguf alignment   = 32
bert_load_from_file: gguf data offset = 695552
bert_load_from_file: model name           = BERT
bert_load_from_file: model architecture   = bert
bert_load_from_file: model file type      = 1
bert_load_from_file: bert tokenizer vocab = 30522


You >  mem Who is Koffivi Gbagbe


LOG: [33m[Querying memory][0m
Bot <         Based on the memory chunk provided, I can determine that Koffivi Gbagbe is a person and their name is Koffivi Gbagbe. Therefore, the response to the task would be: Koffivi Gbagbe is a person and their name is Koffivi Gbagbe.


You >  mem Where is the apple


LOG: [33m[Querying memory][0m
Bot <  The apple is on the manipulator table


You >  mem where is the banana


LOG: [33m[Querying memory][0m
Bot <  Task:  where is the banana 
Response: No relevant information.


In [10]:
# # import chromadb

# # client = chromadb.get_collection("chroma.sqlite3")

# # # client.heartbeat() # returns a nanosecond heartbeat. Useful for making sure the client remains connected.
# # # client.reset() # Empties and completely resets the database. ⚠️ This is destructive and not reversible.

# import sqlite3

# # Connect to the SQLite database
# conn = sqlite3.connect('chroma.sqlite3')


# # Create a cursor object
# cur = conn.cursor()

# # # Let's assume you have a table named 'my_table' with columns 'id' and 'value'

# # # Insert a row of data
# # cur.execute("INSERT INTO my_table VALUES (1, 'Hello World')")

# # # Save (commit) the changes
# # conn.commit()

# # Retrieve data
# # cur.execute('SELECT * FROM my_table')
# print(cur.fetchall())  # It will print: [(1, 'Hello World')]

# # Close the connection when you are done
# # conn.close()

[]


In [9]:
conn

<sqlite3.Connection at 0x7fa280d316c0>

In [None]:

    # add new memory: type remem before your query (add your query to vector db)
    # query memory: type mem before your query (query most relevant memory from db)
    # web search: type web before your query (search in google)


In [2]:
# self.memory_context = lambda question: f"""
# Here is a context that represent all informations in the memory of a robot that he has to use to answer a question. 
# Based on this memory context answer the following question. 
# Here is the question: {question}

# Remember: If there is no relevent information in the memory to answer the question you can just answer by "no relevent information ".
# """

NameError: name 'self' is not defined

In [None]:
# ######################################################################### llm

# from typing import List, Optional, Any
# from gpt4all import GPT4All
# from llama_cpp import Llama

# class BaseLLM():
#     def __init__(self, model_name: Optional[str] = None, model_path: Optional[str] = None) -> None:
#         self.user = '''We have a database that represents the memory of a robot. Using this memory the robot is 
#         supposed to provide relevent and extremely accurate informations in order to solve a task or answer a question. 
#         Your goal is to analyse the informations in the memory and and help the robot to provide the most relevent informations to solve the task ''' #"USER"
        
#         self.assistant = "Your Response:" #"ASSISTANT"
#         self.input = "Here is the informations from the memory:"
#         self.streaming = False

#         ##############
#         self.memory_context = lambda question: f""" 
#         Here is the task  \n 
#         Task: {question} \n
#         Here is the set of rules you should explicetly follow  : \n 
#         Rules: 
#         1 - If there is no relevent information in the memory to answer the question you must answer by "no relevent information "
#         2 - You are not allowed any type of information outside of the provided memory to answer the question
#         """
#         ##############

#         # self.search_context = lambda question: f"""
#         # You have been given access to the Internet. By considering below search results, summarize the information if its provided in search result, else just answer without search results: {question}
#         # """

#     def generate(self, request: str, streaming: bool) -> Any:
#         raise NotImplementedError

#     def response(self, request: str) -> Any:
#         return self.generate(f"{self.user}:\n{request}\n{self.assistant}:\n", streaming = self.streaming)

#     ############################################################################################################################
#     def memory_response(self, request: str, memory_queries: List[str]) -> Any:
#         queries = f"{self.user}:\n{self.memory_context(request)}\n{self.input}:\n"


#         for i, query in enumerate(memory_queries):
#             queries += f"MEMORY CHUNK {i}: {query}\n"

#         queries += f"{self.assistant}:\n"

#         return self.generate(queries, streaming = self.streaming)
#     ############################################################################################################################

#     # def search_response(self, request: str, search_results: List[dict[str, str, str]]) -> Any:
#     #     queries = f"{self.user}:\n{self.search_context(request)}\n{self.input}:\n"

#     #     for i, query in enumerate(search_results):
#     #         queries += f"SEARCH TITLE: {query['title']}\nSEARCH LINK: {query['link']}\nSEARCH CONTENT: {query['content']}\n"

#     #     queries += f"{self.assistant}:\n"

#     #     return self.generate(queries, streaming = self.streaming)

# class GPT4AllLLM(BaseLLM):
#     def __init__(self, model_name: Optional[str] = None, model_path: Optional[str] = None) -> None:
#         super().__init__(model_name, model_path)
        
#         self.gpt = GPT4All(model_name = model_name, model_path = model_path, verbose=False)

#     def generate(self, request: str, streaming: bool) -> Any:
#         return self.gpt.generate(prompt = request, streaming = streaming)

# class LlamaCPPLLM(BaseLLM):
#     def __init__(self, model_name: Optional[str] = None) -> None:
#         super().__init__(model_name)
        
#         self.gpt = Llama(model_path = model_name, n_ctx=2048, verbose=False)

#     def generate(self, request: str, streaming: bool) -> Any:
#         return self.gpt.create_completion(prompt = request, stream = streaming, stop=[f"{self.user}:"])



In [None]:

######################################################################### new llm

from typing import List, Optional, Any
from gpt4all import GPT4All
from llama_cpp import Llama

class BaseLLM():
    def __init__(self, model_name: Optional[str] = None, model_path: Optional[str] = None) -> None:
        # self.user = '''We have a database that represents the memory of a robot. Using this memory the robot is 
        # supposed to provide relevent and extremely accurate informations in order to solve a task or answer a question. 
        # Your goal is to analyse the informations in the memory and and help the robot to provide the most relevent informations to solve the task ''' #"USER"
        self.user = '''We have a database that serves as the memory of a robot. This memory is used by the robot to provide relevant and highly accurate 
        information to solve tasks or answer questions. Your goal is to analyze the information in the memory and assist the robot in providing the most 
        relevant information to solve the task.'''
                
        # self.assistant = '''
        # Here is the set of rules you should explicetly follow  : 
        # Rules: 
        # 1 - If there is no relevent information in the memory to answer the question you must answer by "no relevent information "
        # 2 - You are not allowed any type of information outside of the provided memory to answer the question
        # Your Response:''' #"ASSISTANT"
        self.assistant = '''
        Here are the rules you should follow explicitly: 
        Rules: 
        1 - If there is no relevant information in the memory to answer the question, you must answer with "No relevant information."
        2 - You are not allowed to use any information outside of the provided memory to answer the question.
        
        Remember: There might be cases where the information from the memory is not relevant to solve the task. In such cases, you should explicitly answer with "No relevant information."
        
        Your Response:'''
        # self.input = "Here is the informations from the memory:"
        self.input = "Here are the pieces of information from the memory:"
        self.streaming = False

        ##############
        self.memory_context = lambda question: f""" 
        Here is the task  \n 
        Task: {question} \n
        """
        ##############

        # self.search_context = lambda question: f"""
        # You have been given access to the Internet. By considering below search results, summarize the information if its provided in search result, else just answer without search results: {question}
        # """

    def generate(self, request: str, streaming: bool) -> Any:
        raise NotImplementedError

    def response(self, request: str) -> Any:
        return self.generate(f"{self.user}:\n{request}\n{self.assistant}:\n", streaming = self.streaming)

    ############################################################################################################################
    def memory_response(self, request: str, memory_queries: List[str]) -> Any:
        queries = f"{self.user}:\n{self.memory_context(request)}\n{self.input}:\n"


        for i, query in enumerate(memory_queries):
            queries += f"MEMORY CHUNK {i}: {query}\n"

        queries += f"{self.assistant}:\n"

        return self.generate(queries, streaming = self.streaming)
    ############################################################################################################################

    # def search_response(self, request: str, search_results: List[dict[str, str, str]]) -> Any:
    #     queries = f"{self.user}:\n{self.search_context(request)}\n{self.input}:\n"

    #     for i, query in enumerate(search_results):
    #         queries += f"SEARCH TITLE: {query['title']}\nSEARCH LINK: {query['link']}\nSEARCH CONTENT: {query['content']}\n"

    #     queries += f"{self.assistant}:\n"

    #     return self.generate(queries, streaming = self.streaming)

class GPT4AllLLM(BaseLLM):
    def __init__(self, model_name: Optional[str] = None, model_path: Optional[str] = None) -> None:
        super().__init__(model_name, model_path)
        
        self.gpt = GPT4All(model_name = model_name, model_path = model_path, verbose=False)

    def generate(self, request: str, streaming: bool) -> Any:
        return self.gpt.generate(prompt = request, streaming = streaming)

class LlamaCPPLLM(BaseLLM):
    def __init__(self, model_name: Optional[str] = None) -> None:
        super().__init__(model_name)
        
        self.gpt = Llama(model_path = model_name, n_ctx=2048, verbose=False)

    def generate(self, request: str, streaming: bool) -> Any:
        return self.gpt.create_completion(prompt = request, stream = streaming, stop=[f"{self.user}:"])



In [None]:
    ############################################################################################################################
    def memory_response(self, request: str, memory_queries: List[str]) -> Any:
        queries = f"{self.user}:\n{self.memory_context(request)}\n{self.input}:\n"


        for i, query in enumerate(memory_queries):
            queries += f"MEMORY CHUNK {i}: {query}\n"

        queries += f"{self.assistant}:\n"

        return self.generate(queries, streaming = self.streaming)
    ############################################################################################################################


        # self.user = '''We have a database that represents the memory of a robot. Using this memory the robot is 
        # supposed to provide relevent and extremely accurate informations in order to solve a task or answer a question. 
        # Your goal is to analyse the informations in the memory and and help the robot to provide the most relevent informations to solve the task ''' #"USER"
        

        # self.memory_context = lambda question: f""" 
        # Here is the task  \n 
        # Task: {question} \n
        # """

        # self.input = "Here are the informations from the memory:"


        # for i, query in enumerate(memory_queries):
        #             queries += f"MEMORY CHUNK {i}: {query}\n"
        
        #         queries += f"{self.assistant}:\n"

        # self.assistant = '''
        # Here is the set of rules you should explicetly follow  : 
        # Rules: 
        # 1 - If there is no relevent information in the memory to answer the question you must answer by "no relevent information "
        # 2 - You are not allowed any type of information outside of the provided memory to answer the question

        # Remember: Their might be a case where the informations from the memory is not relevent to solve the task 
        # and in such case you should explicitly answer by "no relevent information "
        
        # Your Response:''' 


# self.user = '''We have a database that serves as memory to a robot. This memory is used by the robot to provide relevant and highly accurate 
# informations that are useful for solving a task an answering a question. Your goal is to analyze the information in the memory and assist the robot in providing the most 
# relevant information to solve the task.'''

# self.memory_context = lambda question: f""" 
# Here is the task: 
# Task: {question}
# """

# self.input = "Here are the pieces of information from the memory:"

# for i, query in enumerate(memory_queries):
#     queries += f"MEMORY CHUNK {i}: {query}\n"

# queries += f"{self.assistant}:\n"

# self.assistant = '''
# Here are the rules you should follow explicitly: 
# Rules: 
# 1 - If there is no relevant information in the memory to answer the question, you must answer with "No relevant information."
# 2 - You are not allowed to use any information outside of the provided memory to answer the question.

# Remember: There might be cases where the information from the memory is not relevant to solve the task. In such cases, you should explicitly answer with the term 

# Your Response:'''


self.user = '''I will provide you some MEMORY CHUNKs retrieved from a database. Your goal is given a task or a question  and  the provided memory chunks
        to  help to solve the task or answer the question by only using the informations from the memory chunks'''


self.assistant = '''
Here are some very important rules you should follow explicitly when answering: 

 
Rule 1: If there is no relevant informations in the MEMORY CHUNKs to solvethe task or aswer the question, you must explicitly answer with the term  "None"
Rule 2: You are not allowed to use any information outside of the provided MEMORY CHUNKs to answer the question.
Rule 3: There might be cases where the information from the memory is not relevant to solve the task. In such cases, you should explicitly answer with the term "None"
RUle 4: Answer the question straight without any additional comments.
'''

self.memory_context = lambda question: f""" 
Here is the task: 
Task: {question}
"""

self.input = "Here are the MEMORY CHUNKs to explicitly use:"

for i, query in enumerate(memory_queries):
    queries += f"MEMORY CHUNK {i}: {query}\n"

queries += f"{self.assistant}:\n"

##################################################
self.user = """
I will provide you with MEMORY CHUNKs retrieved from a database. Your goal is to use these MEMORY CHUNKs to solve a task or answer a question. 
Rely solely on the information from the MEMORY CHUNKs for solving the task or responding to the query.
"""

self.assistant = """
Please adhere to these essential rules when formulating your responses:

Rule 1: If the MEMORY CHUNKs do not contain relevant information to solve the task or answer the question, respond explicitly with "None".
Rule 2: Do not use any information beyond what is provided in the MEMORY CHUNKs.
Rule 3: In cases where the MEMORY CHUNKs' information is irrelevant to the task, respond explicitly with "None".
Rule 4: Provide a direct answer to the question without any additional commentary.
"""

self.memory_context = lambda question: f""" 
Here is the task: 
Task: {question}
"""

self.input = "Utilize the following MEMORY CHUNKs explicitly:"

queries = ""
for i, query in enumerate(memory_queries):
    queries += f"MEMORY CHUNK {i}: {query}\n"

queries += f"{self.assistant}\n"









In [8]:

user = """
I will provide you with MEMORY CHUNKs retrieved from a database. Your goal is to use these MEMORY CHUNKs to solve a task or answer a question. 
Rely solely on the information from the MEMORY CHUNKs for solving the task or responding to the query.
"""

assistant = """
Please adhere to these essential rules when formulating your responses:

Rule 1: If the MEMORY CHUNKs do not contain relevant information to solve the task or answer the question, respond explicitly with "None".
Rule 2: Do not use any information beyond what is provided in the MEMORY CHUNKs.
Rule 3: In cases where the MEMORY CHUNKs information is irrelevant to the task, respond explicitly with "None".
Rule 4: Provide a direct answer to the question without any additional commentary.
"""

memory_context = lambda question: f""" 
Here is the task: 
Task: {question} 
"""

input = "Utilize the following MEMORY CHUNKs explicitly:"

request = "allez les bleus"

queries = f"{user}{assistant}{memory_context(request)}{input}"

output = queries.replace('\n', '')
# print(output)
# print(queries)

queries

'\nI will provide you with MEMORY CHUNKs retrieved from a database. Your goal is to use these MEMORY CHUNKs to solve a task or answer a question. \nRely solely on the information from the MEMORY CHUNKs for solving the task or responding to the query.\n\nPlease adhere to these essential rules when formulating your responses:\n\nRule 1: If the MEMORY CHUNKs do not contain relevant information to solve the task or answer the question, respond explicitly with "None".\nRule 2: Do not use any information beyond what is provided in the MEMORY CHUNKs.\nRule 3: In cases where the MEMORY CHUNKs information is irrelevant to the task, respond explicitly with "None".\nRule 4: Provide a direct answer to the question without any additional commentary.\n \nHere is the task: \nTask: allez les bleus \nUtilize the following MEMORY CHUNKs explicitly:'

In [9]:
output

'I will provide you with MEMORY CHUNKs retrieved from a database. Your goal is to use these MEMORY CHUNKs to solve a task or answer a question. Rely solely on the information from the MEMORY CHUNKs for solving the task or responding to the query.Please adhere to these essential rules when formulating your responses:Rule 1: If the MEMORY CHUNKs do not contain relevant information to solve the task or answer the question, respond explicitly with "None".Rule 2: Do not use any information beyond what is provided in the MEMORY CHUNKs.Rule 3: In cases where the MEMORY CHUNKs information is irrelevant to the task, respond explicitly with "None".Rule 4: Provide a direct answer to the question without any additional commentary. Here is the task: Task: allez les bleus Utilize the following MEMORY CHUNKs explicitly:'

In [None]:

user = """
I will provide you with a Document retrieved from a database. Your goal is to answer a question or solve a task by only using the informations from this  Document.
"""

assistant = """
Here are some important rules responses:

Rule 1: If the Document  do not contain any elevant information to solve the task or answer the question, respond explicitly with "None".
Rule 2: Do not use any information except what is provided in the Document.
Rule 4: Provide a direct answer to the question without any additional comment.
"""

memory_context = lambda question: f""" 
Here is the task: 
Task: {question} 
"""

input = "Here is the Document:"

request = "allez les bleus"

queries = f"{user}{assistant}{memory_context(request)}{input}"

output = queries.replace('\n', '')
# print(output)
# print(queries)

queries

##############################




## removing chat

In [None]:


# from llm import BaseLLM
# # from search_engine import SearchEngine
# from summarizer import Summarizer
# from query_db import CollectionOperator


# from utils import logging




enable_logging = True

class LLMAgent():
    def __init__(
        self, 
        llm: BaseLLM = None, 
        tm_qdb: CollectionOperator = None, 
        summarizer: Summarizer = None, 
        # search_engine: SearchEngine = None,
        use_summarizer = True,
       
    ) -> None:

        self.llm = llm
        self.tm_qdb = tm_qdb
        self.memory_access_threshold = 1.5
        # self.similarity_threshold = 0.5 # [0; 1]
        self.db_n_results = 3
        self.se_n_results = 3
        self.use_summarizer = use_summarizer
       
        self.summarizer = summarizer
        # self.search_engine = search_engine
       

    @logging(enable_logging, message = "[Adding to memory]")
    def add(self, request):
        # summary = self.summarizer(f"{self.llm.user}:\n{request}\n{self.llm.assistant}:\n{''.join(response)}")
        
        summary = self.summarize(request) if self.use_summarizer else request

        self.tm_qdb.add(summary) if summary != "" else None

        response = self.llm.response(request)

        return response
        
    ###################################### New Info to Memory without chat
    
    def add_info_to_mem(self, request):
        # summary = self.summarizer(f"{self.llm.user}:\n{request}\n{self.llm.assistant}:\n{''.join(response)}")
        
        summary = self.summarize(request) if self.use_summarizer else request

        self.tm_qdb.add(summary) if summary != "" else None
        print(f" '{summary}' was added to the memory")

        # response = self.llm.response(request)

    
     ###################################### Get response without chat #####

    
    def memory_response_retriver(self, request):
        memory_queries_data = self.tm_qdb.query(request, n_results = self.db_n_results, return_text = False)
        memory_queries = memory_queries_data['documents'][0]
        memory_queries_distances = memory_queries_data['distances'][0]
    
        acceptable_memory_queries = []
    
        for query, distance in list(zip(memory_queries, memory_queries_distances)):
            # print(f"Query: {query}, Distance: {distance}")
            if distance < self.memory_access_threshold:
            # if (1 - distance) >= self.similarity_threshold:
                acceptable_memory_queries.append(query)
    
        if len(acceptable_memory_queries) > 0:
            response = self.llm.memory_response(request, acceptable_memory_queries)
        else:
            # response = self.llm.response(request) #TODO: add another solution
            response  = "No relevant information."    
        return response
        

    ############################################################################################################################
    @logging(enable_logging, message = "[Querying memory]")
    def memory_response(self, request):
        memory_queries_data = self.tm_qdb.query(request, n_results = self.db_n_results, return_text = False)
        memory_queries = memory_queries_data['documents'][0]
        memory_queries_distances = memory_queries_data['distances'][0]

        acceptable_memory_queries = []

        for query, distance in list(zip(memory_queries, memory_queries_distances)):
            # print(f"Query: {query}, Distance: {distance}")
            if distance < self.memory_access_threshold:
            # if (1 - distance) >= self.similarity_threshold:
                acceptable_memory_queries.append(query)

        if len(acceptable_memory_queries) > 0:
            response = self.llm.memory_response(request, acceptable_memory_queries)
        else:
            response = self.llm.response(request) #TODO: add another solution

        return response
    ############################################################################################################################
    # @logging(enable_logging, message = "[Searching]")
    # def search(self, request):
    #     search_response = self.search_engine.search(request, n_results = self.se_n_results)

    #     for response in search_response:
    #         response['content'] = self.summarize(response['content'])

    #     return self.llm.search_response(request, search_response)

    # @logging(enable_logging, message = "[Summarizing]", color = "green")
    def summarize(self, text, min_length = 30, max_length = 100):
        return self.summarizer(text, min_length, max_length)


    @logging(enable_logging, message = "[Response]")
    def response(self, request):
        return self.llm.response(request)

    
    def generate(self, request: str):
        if request.upper().startswith("MEM"):
            response = self.memory_response(request[len("MEM"):])
        elif request.upper().startswith("REMEM"): #and len(acceptable_memory_queries) == 0
            response = self.add(request[len("REMEM"):])
        # elif request.upper().startswith("WEB"):
        #     response = self.search(request[len("WEB"):])
        else:
            response = self.response(request)
            
        return response




In [None]:
import os
# Set the model path
os.environ['LLM_PATH'] = 'nous-hermes-llama2-13b.Q4_0.gguf'

if __name__ == "__main__":
    port_lib_name = "LLAMA_CPP"
    # port_lib_name = "none"

    if port_lib_name == "LLAMA_CPP":
        LLM = LlamaCPPLLM
        chat = chat_llama_cpp
    else:
        LLM = GPT4AllLLM
        chat = chat_gpt4all

    # Now you can access this path anywhere in your code using:
    llm = LLM(os.environ.get('LLM_PATH'))
    

    """Keep in mind that user and helper tokens may vary between LLMs."""
    # llm.user = "### Instruction" #"USER"
    # llm.assistant = "### Response" #"ASSISTANT"
    
    embedder = GPT4AllEmbedder()
    # search_engine = SearchEngine()
    summarizer = Summarizer()

    total_memory_co = CollectionOperator("total-memory", embedder = embedder)

    llm_agent = LLMAgent(llm, total_memory_co, summarizer, use_summarizer = False)

    chat()

In [None]:
import os
# Set the model path
os.environ['LLM_PATH'] = 'nous-hermes-llama2-13b.Q4_0.gguf'

llm = LLM(os.environ.get('LLM_PATH'))

embedder = GPT4AllEmbedder()
# search_engine = SearchEngine()
summarizer = Summarizer()

total_memory_co = CollectionOperator("total-memory", embedder = embedder)

llm_agent = LLMAgent(llm, total_memory_co, summarizer, use_summarizer = True)



In [None]:
info_to_add = " "



llm_agent.add_info_to_mem(info_to_add)

In [None]:
user_text_request = " "

bot_text_response = llm_agent.memory_response_retriver(user_text_request)

In [None]:
    def generate(self, request: str):
        if request.upper().startswith("MEM"):
            response = self.memory_response(request[len("MEM"):])
        elif request.upper().startswith("REMEM"): #and len(acceptable_memory_queries) == 0
            response = self.add(request[len("REMEM"):])
        elif request.upper().startswith("WEB"):
            response = self.search(request[len("WEB"):])
        else:
            response = self.response(request)
            
        return response

# Bulshitting Code

In [None]:
from flask import Flask, request


################################################################### Utils
import numpy as np
from functools import wraps
from termcolor import colored


def cosine_similarity(a: np.ndarray, b: np.ndarray):
    return np.dot(a, b.T) / (np.linalg.norm(a) * np.linalg.norm(b))




def logging(enabled = True, message = "", color = "yellow"):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            if enabled:
                print(f"LOG: {colored(message, color = color)}")
            return func(*args, **kwargs)
        return wrapper
    return decorator



################################################################ Embeddings

import torch
import os
import numpy as np
from transformers import AutoModel, AutoTokenizer
from chromadb import EmbeddingFunction
from gpt4all import Embed4All
# from dotenv import dotenv_values

# env = dotenv_values(".env")
# os.environ['HUGGINGFACE_HUB_CACHE'] = env['HUGGINGFACE_HUB_CACHE']



class BaseEmbedder(EmbeddingFunction):
    def __init__(self):
        pass

    def get_embeddings(self, texts):
        raise NotImplementedError("Subclasses should implement this!")

    def __call__(self, text):
        return self.get_embeddings(text)


class GPT4AllEmbedder(BaseEmbedder):
    def __init__(self):
        self.embedder = Embed4All() # default: all-MiniLM-L6-v2

    def get_embeddings(self, texts):
        if type(texts) == str:
            texts = [texts]
        
        embeddings = []
        for text in texts:
            embeddings.append(self.embedder.embed(text))

        return embeddings

    def __call__(self, text):
        return self.get_embeddings(text)
    

class HFEmbedder(BaseEmbedder):
    # def __init__(self, model = 'princeton-nlp/sup-simcse-roberta-large'): #sentence-transformers/all-MiniLM-L6-v2 
    def __init__(self, model = '/app/weights/sup-simcse-roberta-large'):
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.model = AutoModel.from_pretrained(model).to(self.device)
        self.tokenizer = AutoTokenizer.from_pretrained(model)
        

    def get_embeddings(self, texts):
        if type(texts) == str:
            texts = [texts]

        inputs = self.tokenizer(texts, padding=True, truncation=True, return_tensors="pt").to(self.device)

        with torch.no_grad():
            embeddings = self.model(**inputs, output_hidden_states=True, return_dict=True).pooler_output.detach().cpu().numpy()

        norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
        normalized_embeddings = embeddings / norms

        return normalized_embeddings.tolist()

    def __call__(self, text):
        return self.get_embeddings(text)




################################################################### VectDB
import chromadb
import uuid
import datetime
# from embedder import BaseEmbedder, HFEmbedder
# from dotenv import dotenv_values

# env = dotenv_values(".env")
# DB_PATH = env["DB_PATH"]
DB_PATH = './'

class CollectionOperator():
    def __init__(self, collection_name, db_path = DB_PATH, embedder: BaseEmbedder = None):
        self.embedder = embedder
        self.client = chromadb.PersistentClient(path = db_path)
        self.collection = self.client.get_or_create_collection(name = collection_name, embedding_function = self.embedder.get_embeddings)

    def add(self, text, metadata = {}):
        metadata['timestamp'] = str(datetime.datetime.now())

        self.collection.add(
            documents = [text],
            metadatas = [metadata],
            ids = [str(uuid.uuid4())]
        )

    def delete(self, id):
        self.collection.delete(id)

    def query(self, query, n_results, return_text = True):
        query = self.collection.query(
            query_texts = query,
            n_results = n_results,
        )

        if return_text:
            return query['documents'][0]
        else:
            return query




######################################################################### new llm

from typing import List, Optional, Any
from gpt4all import GPT4All
from llama_cpp import Llama

class BaseLLM():
    def __init__(self, model_name: Optional[str] = None, model_path: Optional[str] = None) -> None:

        self.user = """
            We have a database that serves as the memory of a robot. This memory is used by the robot to provide relevant and highly accurate 
        information to solve tasks or answer questions. Your goal is to analyze the information in the memory and assist the robot in providing the most 
        relevant information to solve the task.
            """

        self.assistant = """
        Please adhere to these essential rules when formulating your responses:
        
        Rule 1: If the text do not contain relevant information to solve the task or answer the question, respond explicitly with "None".
        Rule 2: Do not use any information beyond what is provided in the text.
        Rule 3: In cases where the text information is irrelevant to the task, respond explicitly with "None".
        Rule 4: Provide a direct answer to the question without any additional commentary.
        """
        self.input = "Here are the pieces of information from the memory:"
        self.streaming = False

        ##############
        self.memory_context = lambda question: f""" 
        Here is the task  
        Task: {question} 
        """
        ##############

    def generate(self, request: str, streaming: bool) -> Any:
        raise NotImplementedError

    def response(self, request: str) -> Any:
        user = "###Instructions" 
        assistant = " ###Response" 
        return self.generate(f"{user}:\n{request}\n{assistant}:\n", streaming = self.streaming)

    ############################################################################################################################
    # def memory_response(self, request: str, memory_queries: List[str]) -> Any:
    #     queries = f"{self.user}:\n{self.memory_context(request)}\n{self.input}:\n"


    #     for i, query in enumerate(memory_queries):
    #         queries += f"MEMORY CHUNK {i}: {query}\n"

    #     queries += f"{self.assistant}:\n"

    #     return self.generate(queries, streaming = self.streaming)

    def memory_response(self, request: str, memory_queries: List[str]) -> Any:
        queries = f"{self.user}\n{self.assistant}\n{self.memory_context(request)}\n{self.input}:\n"


        for i, query in enumerate(memory_queries):
            # queries += f"MEMORY CHUNK {i}: {query}\n"
            queries += f"{query}\n"

        queries = queries.replace('\n', '')

        return self.generate(queries, streaming = self.streaming)
    ############################################################################################################################

class GPT4AllLLM(BaseLLM):
    def __init__(self, model_name: Optional[str] = None, model_path: Optional[str] = None) -> None:
        super().__init__(model_name, model_path)
        
        self.gpt = GPT4All(model_name = model_name, model_path = model_path, verbose=False)

    def generate(self, request: str, streaming: bool) -> Any:
        return self.gpt.generate(prompt = request, streaming = streaming)

class LlamaCPPLLM(BaseLLM):
    def __init__(self, model_name: Optional[str] = None) -> None:
        super().__init__(model_name)
        
        self.gpt = Llama(model_path = model_name, n_ctx=2048, verbose=False)

    def generate(self, request: str, streaming: bool) -> Any:
        return self.gpt.create_completion(prompt = request, stream = streaming, stop=[f"{self.user}:"])



######################################################################### summarizer


import os
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
from transformers import pipeline
from dotenv import dotenv_values

checkpoint = "sshleifer/distilbart-cnn-12-6"

class Summarizer():
    def __init__(self, model = checkpoint) -> None:
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.model = AutoModelForSeq2SeqLM.from_pretrained(model).to(self.device)
        self.tokenizer = AutoTokenizer.from_pretrained(model)


    def summarize(self, text: str, min_length = 30, max_length = 100):
        """Fixed-size chunking"""
        inputs_no_trunc = self.tokenizer(text, max_length=None, return_tensors='pt', truncation=False)
        if len(inputs_no_trunc['input_ids'][0]) < 30:
            return text

        # min_length = min_length_ratio * len(inputs)
        # max_length = max_length_ratio * len(inputs)
        
        inputs_batch_lst = []
        chunk_start = 0
        chunk_end = self.tokenizer.model_max_length  # == 1024 for Bart
        while chunk_start <= len(inputs_no_trunc['input_ids'][0]):
            inputs_batch = inputs_no_trunc['input_ids'][0][chunk_start:chunk_end]  # get batch of n tokens
            inputs_batch = torch.unsqueeze(inputs_batch, 0)
            inputs_batch_lst.append(inputs_batch)
            chunk_start += self.tokenizer.model_max_length  # == 1024 for Bart
            chunk_end += self.tokenizer.model_max_length  # == 1024 for Bart
        summary_ids_lst = [self.model.generate(inputs.to(self.device), num_beams=4, min_length=min_length, max_length=max_length, early_stopping=True) for inputs in inputs_batch_lst]

        summary_batch_lst = []
        for summary_id in summary_ids_lst:
            summary_batch = [self.tokenizer.decode(g, skip_special_tokens=True, clean_up_tokenization_spaces=False) for g in summary_id]
            summary_batch_lst.append(summary_batch[0])
        summary_all = '\n'.join(summary_batch_lst)

        return summary_all

    def __call__(self, text, min_length = 30, max_length = 100):
        return self.summarize(text, min_length, max_length)

######################################################################### llm Agent

enable_logging = True

class LLMAgent():
    def __init__(
        self, 
        llm: BaseLLM = None, 
        tm_qdb: CollectionOperator = None, 
        summarizer: Summarizer = None, 
        # search_engine: SearchEngine = None,
        use_summarizer = True,
       
    ) -> None:

        self.llm = llm
        self.tm_qdb = tm_qdb
        self.memory_access_threshold = 2 #1.5
        # self.similarity_threshold = 0.5 # [0; 1]
        self.db_n_results = 3
        self.se_n_results = 3
        self.use_summarizer = use_summarizer
       
        self.summarizer = summarizer
        # self.search_engine = search_engine
       

    @logging(enable_logging, message = "[Adding to memory from chat]")
    def add(self, request):
        
        summary = self.summarize(request) if self.use_summarizer else request

        self.tm_qdb.add(summary) if summary != "" else None

        response = self.llm.response(request)

        return response
        
    ###################################### New Info to Memory without chat
    @logging(enable_logging, message = "[Adding to memory]")
    def add_info_to_mem(self, request):
        # summary = self.summarizer(f"{self.llm.user}:\n{request}\n{self.llm.assistant}:\n{''.join(response)}")
        
        summary = self.summarize(request) if self.use_summarizer else request

        self.tm_qdb.add(summary) if summary != "" else None
        print(f" '{summary}' was added to the memory")

        # response = self.llm.response(request)

    
     ###################################### Get response without chat #####

    @logging(enable_logging, message = "[Querying memory]", color = "blue")
    def memory_response_retriver(self, request):
        memory_queries_data = self.tm_qdb.query(request, n_results = self.db_n_results, return_text = False)
        memory_queries = memory_queries_data['documents'][0]
        memory_queries_distances = memory_queries_data['distances'][0]
    
        acceptable_memory_queries = []
    
        for query, distance in list(zip(memory_queries, memory_queries_distances)):
            # print(f"Query: {query}, Distance: {distance}")
            if distance < self.memory_access_threshold:
            # if (1 - distance) >= self.similarity_threshold:
                acceptable_memory_queries.append(query)
    
        if len(acceptable_memory_queries) > 0:
            response = self.llm.memory_response(request, acceptable_memory_queries)
        else:
            # response = self.llm.response(request) #TODO: add another solution
            response  = "None"    
        return response
        

    ############################################################################################################################
    @logging(enable_logging, message = "[Querying memory from chat]")
    def memory_response(self, request):
        memory_queries_data = self.tm_qdb.query(request, n_results = self.db_n_results, return_text = False)
        memory_queries = memory_queries_data['documents'][0]
        memory_queries_distances = memory_queries_data['distances'][0]

        acceptable_memory_queries = []

        for query, distance in list(zip(memory_queries, memory_queries_distances)):
            # print(f"Query: {query}, Distance: {distance}")
            if distance < self.memory_access_threshold:
            # if (1 - distance) >= self.similarity_threshold:
                acceptable_memory_queries.append(query)

        if len(acceptable_memory_queries) > 0:
            response = self.llm.memory_response(request, acceptable_memory_queries)
        else:
            response = self.llm.response(request) #TODO: add another solution

        return response
    ############################################################################################################################

    # @logging(enable_logging, message = "[Summarizing]", color = "green")
    def summarize(self, text, min_length = 30, max_length = 100):
        return self.summarizer(text, min_length, max_length)


    @logging(enable_logging, message = "[Response]")
    def response(self, request):
        return self.llm.response(request)

    
    def generate(self, request: str):
        if request.upper().startswith("MEM"):
            response = self.memory_response(request[len("MEM"):])
        elif request.upper().startswith("REMEM"): #and len(acceptable_memory_queries) == 0
            response = self.add(request[len("REMEM"):])
        # elif request.upper().startswith("WEB"):
        #     response = self.search(request[len("WEB"):])
        else:
            response = self.response(request)
            
        return response







#########################################################################################

import os

###############################################################################
# Set the model path
os.environ['LLM_PATH'] = 'nous-hermes-llama2-13b.Q4_0.gguf'

port_lib_name = "LLAMA_CPP"
# port_lib_name = "none"

if port_lib_name == "LLAMA_CPP":
    LLM = LlamaCPPLLM

else:
    LLM = GPT4AllLLM


llm = LLM(os.environ.get('LLM_PATH'))

embedder = HFEmbedder()

summarizer = Summarizer()

total_memory_co = CollectionOperator("total-memory", embedder = embedder)

# llm_agent = LLMAgent(llm, total_memory_co, summarizer, use_summarizer = True)
llm_agent = LLMAgent(llm, total_memory_co, summarizer, use_summarizer = False)




def add_info(info_to_add):
    info_to_add = str(info_to_add)
    llm_agent.add_info_to_mem(info_to_add)

def retrieve_info(user_text_request):
    bot_text_response = llm_agent.memory_response_retriver(user_text_request)  
    if type(bot_text_response)== str:
        return bot_text_response
    else:
        return bot_text_response['choices'][0]['text']

 
#########################################################################################

app = Flask(__name__)

# # Define your functions here
# def add_info(info_to_add):
#     info_to_add = str(info_to_add)
#     llm_agent.add_info_to_mem(info_to_add)

# def retrieve_info(user_text_request):
#     bot_text_response = llm_agent.memory_response_retriever(user_text_request)
#     print(bot_text_response)
#     bot_text_response = dict(bot_text_response)
#     return bot_text_response['choices'][0]['text']

# Define your API endpoints
@app.route('/addinfo', methods=['POST'])
def addinfo():

    data = request.json

    info_to_add = data['info']

    print("info_to_add: " + info_to_add)

    # info_to_add = request.form['info']
    add_info(info_to_add)
    return 'Info added successfully'

@app.route('/retrieveinfo', methods=['POST'])
def retrieveinfo():
    # user_text_request = request.form['text']

    data = request.json

    user_text_request = data['text']

    print("user_text_request: " + user_text_request)

    response = retrieve_info(user_text_request)
    return response

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=7778)