In [1]:
!pwd

/home/dephinate/ASU/DL/MedicalChatBot/research


In [2]:
import os
from pathlib import Path

In [3]:
os.chdir("../")

In [4]:
!pwd

/home/dephinate/ASU/DL/MedicalChatBot


In [5]:
# !pip install python-box
# !pip install ensure
# !pip install -e .
# !pip install unstructured
# !pip install pdf2image
# !pip install pdfminer
# !pip uninstall pdfminer
# !pip install pillow_heif
# !pip install opencv-python
# !pip install unstructured-inference
# !pip install pytesseract
# !pip install pikepdf

In [6]:
os.getenv('PINECONE_API_KEY')

'ce80aa7c-c98e-467c-a100-b4b7e6a07c05'

Helper Functions

In [7]:
# Print distribution
import numpy as np
def print_stats(len_list,docs):
    len_arr = np.array(len_list)
    print("num of chunks: ",len(len_arr))
    print("min: ",np.argmin(len_arr) ,np.min(len_arr))
    print("max: ",np.argmax(len_arr) ,np.max(len_arr))
    print("avg :",np.mean(len_arr))
    print("std :",np.std(len_arr))

In [8]:
# Formatting
import re
from unstructured.cleaners.core import clean,group_broken_paragraphs
def format_docs(doc):
    print(doc.page_content)
    para_split_re = re.compile(r"(\s*\n\s*){3}")
    print("\nRestructured: ",group_broken_paragraphs(doc.page_content,paragraph_split=para_split_re))
    print("\nSource: ",doc.metadata)

In [9]:
import re
from unstructured.cleaners.core import clean,group_broken_paragraphs
para_split_re = re.compile(r"(\s*\n\s*){3}")

Check configurations

In [13]:
from medicalChatBot.config.configurations import ConfigurationManager

In [14]:
configurationManager = ConfigurationManager()

FileNotFoundError: [Errno 2] No such file or directory: 'config/config.yaml'

In [None]:
dataloader_config = configurationManager.get_dataloader_config()
datasplitter_config = configurationManager.get_datasplitter_config()
vectorization_config =  configurationManager.get_vectorization_config()
model_config = configurationManager.get_model_config()

In [None]:
print(dataloader_config)
print(datasplitter_config)
print(vectorization_config)
print(model_config)

Data Loader

In [None]:
# Loader
import pypdf
from langchain.document_loaders import PyPDFLoader, DirectoryLoader
from medicalChatBot.entity  import DataLoaderConfig


class DataLoader:
    def __init__(self,
                 config : DataLoaderConfig)->None :
        self.config = config

    # Extract data from the pdf
    def load_pdf(self):
        loader = DirectoryLoader(   # To load all pdfs from a directory
            path=self.config.data_path,
            glob=self.config.file_types,
            loader_cls=PyPDFLoader,
            show_progress=True
        )
        documents = loader.load()
        return documents

In [None]:
dataLoader = DataLoader(config=dataloader_config)

In [None]:
extracted_data = dataLoader.load_pdf()

Data Splitter

In [None]:
# Data Splitter
from langchain.text_splitter import RecursiveCharacterTextSplitter
from medicalChatBot.config.configurations import ConfigurationManager
from medicalChatBot.entity  import SplitterConfig

class Splitter:
    def __init__(self,
                 config:SplitterConfig) -> None:
        self.config = config

    # function to impement recursive text splitting 
    def split_recursive(self, extracted_data:None):
        splitter = RecursiveCharacterTextSplitter(chunk_size = self.config.chunk_size  , chunk_overlap = self.config.chunk_overlap, separators=['\n\n', '\n', '.', ','])
        chunks = splitter.split_documents(extracted_data)
        return chunks

In [None]:
splitter = Splitter(config=datasplitter_config)

In [None]:
chunks = splitter.split_recursive(extracted_data=extracted_data)

In [None]:
lens = [len(chunk.page_content)for chunk in chunks]
print_stats(lens,docs=None)

Vectorization

In [None]:

from pinecone import Pinecone
from langchain_pinecone import PineconeVectorStore
from langchain.embeddings import HuggingFaceEmbeddings
from medicalChatBot.entity import VectorizationConfig
from medicalChatBot.utils.common import load_env



class Vectorizer:
    def __init__(self,
                 config : VectorizationConfig = None) -> None:
        self.config = config

    def download_embeddings_from_huggingface(self,model_name:str = None):
        model_name = model_name or self.config.encoder_name
        embeddings = HuggingFaceEmbeddings(model_name=model_name)
        return embeddings
    
    def create_pinecone_instance(self,env_file_path:str = None):
        load_env(env_file_path=env_file_path)
        print("Key:",os.getenv('PINECONE_API_KEY'))
        pc = Pinecone(
            api_key = f"{os.getenv('PINECONE_API_KEY')}"
        )
        return pc
    
    def check_pinecone_index_status(self,db_instance, index_name = None):
        index_name = index_name or self.config.index_name
        index = db_instance.Index(index_name)
        return index.describe_index_stats()

    def create_pinecone_vectorstore_instance(self,db_instance=None,namespace=None,index_name=None,embeddings=None):
        index_name = index_name or self.config.index_name
        namespace = namespace or self.config.namespace
        
        vectorstore = PineconeVectorStore(
        index=db_instance.Index(index_name),
        embedding=embeddings,
        namespace=namespace,
        index_name=index_name
        )
        return vectorstore
    
    def clean_pinecone_db(self, db_instance,index_name:str=None,namespace:str=None):
        index_name = index_name or self.config.index_name
        namespace = namespace or self.config.namespace
        db_instance.Index(index_name).delete(delete_all=True,namespace=namespace)

    def add_records_pinecone_db(self,vectorstore_instance, chunks):
        vectorstore_instance.add_texts(texts=[t.page_content for t in chunks])


    

In [None]:
# initialize Vectorizer
print(vectorization_config)
vectorizer = Vectorizer(config=vectorization_config)


In [None]:
# embeddings
embeddings = vectorizer.download_embeddings_from_huggingface()
# Instantiate vectordb
pc = vectorizer.create_pinecone_instance(env_file_path=".env")
index_list = pc.list_indexes()
index_status = vectorizer.check_pinecone_index_status(db_instance=pc)

print("embeddings :",embeddings)
print("\nindex_list:", index_list)
print("\nindex_status :",index_status)



In [None]:
# Create vectorstore and test
vector_store = vectorizer.create_pinecone_vectorstore_instance(db_instance= pc,embeddings=embeddings)


In [None]:

query = "What is Acne?"

chunks_retrieved = vector_store.similarity_search_with_relevance_scores(
    query,  # our search query
    k=3,  # return 3 most relevant docs
    # fetch_k = 30

)
chunks_retrieved

In [None]:
i = 0
for chunk in chunks_retrieved:
    print(i,":",group_broken_paragraphs(chunk[0].page_content,paragraph_split=para_split_re),"\n")
    i = i+1

In [None]:

query = "What is acne?"

chunks_retrieved = vector_store.max_marginal_relevance_search(
    query,  # our search query
    k=3,  # return 3 most relevant docs
    fetch_k = 30

)
chunks_retrieved

In [None]:
i = 0
for chunk in chunks_retrieved:
    print(i,":",group_broken_paragraphs(chunk.page_content,paragraph_split=para_split_re),"\n")
    i = i+1

In [None]:
# # Testing cleanup and check status
# vectorizer.clean_pinecone_db(db_instance=pc)

# index_status = vectorizer.check_pinecone_index_status(db_instance=pc)
# index_status

In [None]:
# # Testing db loading
# vectorizer.add_records_pinecone_db(vectorstore_instance=vector_store,chunks=chunks)

In [None]:
# Test status
index_status = vectorizer.check_pinecone_index_status(db_instance=pc)
index_status

Load Model

In [None]:
from langchain.llms import CTransformers
from langchain_community.llms import LlamaCpp
from medicalChatBot.entity import ModelConfig

class LoadModel:
    def __init__(self,
                 config:ModelConfig = None) -> None:
        self.config = config

    def load_model_from_ctransformers(self, model_path = None, model_type=None,max_new_tokens=None,n_ctx:int=None,temperature=None):
        model_path = model_path or self.config.model_path
        model_type = model_type or self.config.model_type
        max_new_tokens = max_new_tokens or self.config.max_new_tokens
        context_length = n_ctx or self.config.n_ctx

        temperature = temperature or self.config.temperature

        print(model_path)
        print(model_type)
        print(max_new_tokens)
        print(temperature)
        print(context_length)

        llm=CTransformers(model=model_path,
                        model_type=model_type,
                        config={'max_new_tokens':max_new_tokens,
                                'temperature':temperature,
                                'context_length': context_length})
        
        return llm
    
    def load_model_from_llamacpp(self,model_path:str=None, n_gpu_layers:int=None, n_batch:int=None, n_ctx:int=None, f16_kv:bool=None, temperature:int=None):
        model_path = model_path or self.config.model_path 
        n_gpu_layers = n_gpu_layers or self.config.n_gpu_layers 
        n_batch = n_batch or self.config.n_batch
        n_ctx = n_ctx or self.config.n_ctx
        f16_kv = f16_kv or self.config.f16_kv
        temperature = temperature or self.config.temperature
        
        print(model_path)
        print(n_gpu_layers)
        print(n_batch)
        print(n_ctx)
        print(f16_kv)
        print(temperature)
        
        lcpp_llm = None
        lcpp_llm = LlamaCpp(
            model_path=model_path,
            n_gpu_layers=n_gpu_layers,
            n_batch=n_batch,
            n_ctx=n_ctx,
            f16_kv=f16_kv, 
            temperature = temperature
            )
        return lcpp_llm
    
    def load_model(self):
        implementation_lower = self.config.implementation.lower()
        print(implementation_lower)
        if 'ctransformers' in implementation_lower:
            llm = self.load_model_from_ctransformers()
        elif 'llama' in implementation_lower or 'llamacpp' in implementation_lower:
            llm = self.load_model_from_llamacpp()
        return llm        

Compressing

In [None]:
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor

In [None]:
model_config = configurationManager.get_model_config()
loadModel = LoadModel(config=model_config)
# Default loader
model_default = loadModel.load_model()


In [None]:
compressor = LLMChainExtractor.from_llm(model_default)

In [None]:
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor,
    base_retriever=vector_store.as_retriever()
)

In [None]:
question = "what is acne?"
compressed_docs = compression_retriever.get_relevant_documents(question)
compressed_docs

Prompting

In [None]:
from importlib import import_module
from langchain.prompts import PromptTemplate
from langchain.chains import RetrievalQA
from medicalChatBot.prompts import *


class PromptQueryHandler:

    def __init__(self) -> None:
        pass

    def load_prompt(self, template_name: str = None):
        # Dynamically import the module containing the prompt templates
        prompt_module = import_module("medicalChatBot.prompts")
        default_template_name = "default_template"

        # Check if template_name is None or if the template is not found
        if not template_name:
            template = getattr(prompt_module, default_template_name)
            print(f"No template name provided. Using default template: '{default_template_name}'.")
        else:
            try:
                # Get the template from the module based on the template name
                template = getattr(prompt_module, template_name)
            except AttributeError:
                # If template not found, use a default template``
                template = getattr(prompt_module, default_template_name)
                print(f"Template '{template_name}' not found. Using default template: '{default_template_name}'.")

        # Create a PromptTemplate object from the retrieved template
        prompt = PromptTemplate.from_template(template)
        return prompt
    
    def initialize_chain_with_retrievalqa(self,llm,vector_store,return_source_documents:bool,prompt=None,k:int=3):
        prompt = self.load_prompt(template_name=prompt)
        print("prompt: ",prompt)
        qa=RetrievalQA.from_chain_type(
            llm=llm, 
            chain_type="stuff", 
            retriever=vector_store.as_retriever(search_kwargs={'k': k}),
            return_source_documents=return_source_documents, 
            chain_type_kwargs={"prompt": prompt},
            verbose = True)
            
        return qa


Test Load model

In [None]:
model_config = configurationManager.get_model_config()
model_config

In [None]:
loadModel = LoadModel(config=model_config)

In [None]:
# Default loader
model_default = loadModel.load_model_from_llamacpp(temperature=0)

In [None]:
# CTransformers loader
model_ctran = loadModel.load_model_from_ctransformers()

In [None]:
model_ctran.invoke("How are you?")

In [None]:
# LammaCpp loader
model_llamacpp = loadModel.load_model_from_llamacpp()

In [None]:
model_llamacpp.invoke("How are you?")

Test Prompting

In [None]:
query = "How to treat hairfall?"

In [None]:
promptQueryHandler = PromptQueryHandler()

In [None]:
# Using CTransfprmers
qa_ct = promptQueryHandler.initialize_chain_with_retrievalqa(llm=model_ctran,vector_store=vector_store,return_source_documents=True)

In [None]:
response_ct = qa_ct.invoke({"query":query})

In [None]:
#Using LLamaCPP
qa_llama = promptQueryHandler.initialize_chain_with_retrievalqa(llm=model_default,vector_store=vector_store,return_source_documents=True,prompt="template4")

In [None]:
response_llama = qa_llama.invoke({"query":"What is Caffiene?"})

In [None]:
response_llama['query']

In [None]:
print(response_llama['result'])

In [None]:
model_default.invoke("What is Caffiene?")

In [None]:
response_llama['source_documents']

Speech to Text

In [None]:
# !pip install openai-whisper
# !pip install pytube

In [None]:
import tempfile
import whisper
from pytube import YouTube



In [None]:
!pwd

In [None]:
# Download model to a directory
whisper_model = whisper.load_model(name="medium",download_root="artifacts/model")

In [None]:
# Load model from a directory
whisper_model1 = whisper.load_model(name="artifacts/model/whisper_base.pt")

In [None]:
# whisper_model1

In [None]:
# Transcribe audio data
# Let's do this only if we haven't created the transcription file yet.
if not os.path.exists("artifacts/data/transcription.txt"):
    youtube = YouTube(YOUTUBE_VIDEO)
    audio = youtube.streams.filter(only_audio=True).first()

    # Let's load the base model. This is not the most accurate
    # model but it's fast.
    with tempfile.TemporaryDirectory() as tmpdir:
        file = audio.download(output_path=tmpdir)
        transcription = whisper_model.transcribe(file, fp16=False)["text"].strip()

        with open("transcription.txt", "w") as file:
            file.write(transcription)