# TTYD using RAG

#### RAG:
https://python.langchain.com/docs/use_cases/question_answering/ 

#### PDF Loader:
https://github.com/langchain-ai/langchain/blob/master/cookbook/Semi_structured_multi_modal_RAG_LLaMA2.ipynb 


## Docs class

In [1]:
from pypdf import PdfReader 

####################################
#                                  #
####################################
class pdf_doc:
#private---------------------------------
    _doc = ""
    _doc_title = ""
    _number_of_pages = ""
    _text_extract = []


#public----------------------------------
    def __init__(self, path, doc_title):
        self._doc = PdfReader(path)
        self._doc_title = doc_title
        self._number_of_pages = len(self._doc.pages)

        for i in range(self._number_of_pages):
            self._text_extract.append(self._doc.pages[i].extract_text())
    
    def g_number_of_pages(self):
        return self._number_of_pages

    def g_page_text(self, page_number):
        return self._text_extract[page_number]

    def g_title(self):
        return self._doc_title
####################################
#                                  #
####################################

## Docs manager class

In [2]:
import os
import os.path

####################################
#                                  #
####################################
class doc_manager:
#private---------------------------
    _docs = []
    _path = ""

#public----------------------------
    def __init__(self, path):
        self._path = path

        for file in os.listdir(self._path):
            if file.endswith(".pdf"):
                self._docs.append(pdf_doc(os.path.join(self._path, file), file))

    def g_docs_length(self):
        return len(self._docs)
    
    def g_doc_title(self, index):
        return self._docs[index].g_title()
    
    def process_data_to_text(self):
        text = ""

        for doc in self._docs:
            for i in range(doc.g_number_of_pages()):
                text += doc.g_page_text(i)
        
        return text

####################################
#                                  #
####################################

## Parser class

In [3]:
#############################################################
# a class to parse text to tje console and/or to a directory#
#############################################################
class parser:

#private-----------------------------------------------------
    #default path to the data directory
    __default_path = "data"
    #path to the data directory. it is _default_path by default
    __path = __default_path
    #name when writing or creating a text file
    __file_name = "test_file"
    #index that tracks the new files
    __index = 1

#public------------------------------------------------------
    
# s_ prefix stands for a setter function

    #sets the path
    @staticmethod
    def s_default_path(path):
        parser.__path = path

    #sets the file name
    @staticmethod
    def s_file_name(name):
         parser.__file_name = name
    
    #resets the index for p_new_files() method
    @staticmethod
    def s_reset_index():
        parser.__index = 0

# p_ prefix stands for a printing function
    #prints to console
    @staticmethod
    def p_console(data):
        print(data)

    #appends text to a text file. creates a new file is it doesnt exists 
    @staticmethod
    def p_append_file(data, dir = __path, file_name = __file_name):
        file_exists = os.path.exists(f"{dir}/{file_name}.txt")

        if file_exists :
            f = open(f"{dir}/{file_name}.txt", "a")
            f.write("\n\n\n")
            f.write(data)
            f.close()
        else:
            f = open(f"{dir}/{file_name}.txt", "x")
            f.write(data)
            f.close()
    
    #overites an existing file. creates a new file is it doesnt exists 
    @staticmethod
    def p_overwrite_file(data, dir = __path, file_name = __file_name ):
        file_exists = os.path.exists(f"{dir}/{file_name}.txt")

        if file_exists :
            f = open(f"{dir}/{file_name}.txt", "w")
            f.write(data)
            f.close()
        else:
            f = open(f"{dir}/{file_name}.txt", "x")
            f.write(data)
            f.close()

    #will create a new file and write to it every time this function is called
    #WARNING!!! - it will overwrite if the files allready exists
    @staticmethod
    def p_new_files(data, dir = __path, file_name = __file_name):
        file_exists = os.path.exists(f"{dir}/{file_name}_{parser.__index}.txt")
        
        if file_exists:
            f = open(f"{dir}/{file_name}_{parser.__index}.txt", "w")
            f.write(data)
            f.close()
        else:
            f = open(f"{dir}/{file_name}_{parser.__index}.txt", "x")
            f.write(data)
            f.close()

        parser.__index += 1

    #clears a file of its data
    @staticmethod
    def clear_file(dir = __path, file_name = __file_name ):
        file_exists = os.path.exists(f"{dir}/{file_name}.txt")

        if file_exists :
            f = open(f"{dir}/{file_name}.txt", "x")
            f.write("")
            f.close()
        else :
            raise Exception("file that you are trying to clear doesnt exists")
###############################################################
# end of the parser class                                     #
###############################################################

## TTYD class

In [4]:
from langchain.chains import LLMChain
from langchain_community.llms import Ollama
from langchain_core.prompts import PromptTemplate

####################################
#                                  #
####################################
class ttyd:
#private---------------------------------
    __llm = Ollama(temperature= 0.9, model="mistral")
    
    __promt_give_sumary = PromptTemplate(
        input_variables=["data"],
        template="give me a sumary of this data: {data}"
    )

    __promt_user_input = PromptTemplate(
        input_variables=["data", "question"],
        template="answer the question regarding to the data. data: {data}, question: {question}"
    )

#public----------------------------------
    def __init__(self, path):
        self.__data_path = path
        self.__file_manager = doc_manager(self.__data_path)

        self.__raw_data_text = self.converte_data_to_text()
    
    def g_document_count(self):
        return self.__file_manager.g_docs_length()
        
    def dict_to_str(dict):
        return str(dict.get('text'))
    
    def generate_summary(self):
        chain = LLMChain(llm=self.__llm, prompt=self.__promt_give_sumary)
        return self.dict_to_str(chain.invoke(self.__raw_data_text))
    
    def talk_to_data(self, question):
        chain = LLMChain(llm=self.__llm, prompt=self.__promt_user_input)
        return self.dict_to_str(chain.invoke(input={'data': self.__raw_data_text, 'question' : question}))
    
    def converte_data_to_text(self):
        return self.__file_manager.process_data_to_text()
    


    
####################################
#                                  #
####################################



# Executable 

In [5]:

test = ttyd("data")

print(test.talk_to_data("give a very brief summary"))



TypeError: ttyd.dict_to_str() takes 1 positional argument but 2 were given