<a href="https://colab.research.google.com/github/exdsgift/SocialResearch/blob/main/chatbot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
%cd /content/drive/MyDrive/Github Repository
!git clone https://github.com/exdsgift/SocialResearch
!ls

In [None]:
import torch

# Verifica se la GPU è disponibile
if torch.cuda.is_available():
    device = torch.device("cuda")  # Usa la GPU
    print("GPU disponibile:", torch.cuda.get_device_name(0))
else:
    device = torch.device("cpu")  # Usa la CPU
    print("GPU non disponibile, verrà utilizzata la CPU.")

In [None]:
pip install -q jupyter_bokeh

In [None]:
# Import necessary libraries
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import DocArrayInMemorySearch
from langchain.chains import ConversationalRetrievalChain
from langchain.document_loaders import PyPDFLoader
from langchain.llms import HuggingFacePipeline
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
import panel as pn
import param

# Initialize Panel extension
pn.extension()

# Load Hugging Face model and tokenizer
def load_llm(model_name="deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B"):
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForCausalLM.from_pretrained(model_name)
    model.to(device)
    tokenizer.pad_token = tokenizer.eos_token  # Ensure pad_token is set

    hf_pipeline = pipeline(
        "text-generation",
        model=model,
        tokenizer=tokenizer,
        max_new_tokens=200,
        temperature=0.1,
        truncation=True,  # Ensure truncation is enabled
        pad_token_id=tokenizer.eos_token_id,
    )
    return HuggingFacePipeline(pipeline=hf_pipeline)

# Load and process documents
def load_db(file, chain_type, k, llm):
    # Load and split documents
    loader = PyPDFLoader(file)
    documents = loader.load()
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=300, chunk_overlap=30)
    docs = text_splitter.split_documents(documents)

    # Create embeddings and vector store
    embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
    db = DocArrayInMemorySearch.from_documents(docs, embeddings)
    retriever = db.as_retriever(search_type="similarity", search_kwargs={"k": k})

    # Create conversational retrieval chain
    qa = ConversationalRetrievalChain.from_llm(
        llm=llm,
        chain_type=chain_type,
        retriever=retriever,
        return_source_documents=True,
        return_generated_question=True,
    )
    return qa

# Chatbot class
class ChatBot(param.Parameterized):
    chat_history = param.List([])
    answer = param.String("")
    db_query = param.String("")
    db_response = param.List([])

    def __init__(self, **params):
        super().__init__(**params)
        self.panels = []
        self.loaded_file = "references/daminov-2024-relationship-between-the-type-of-media-consumption-and-political-trust-in-the-european-union-evidence-from.pdf"
        self.llm = load_llm()
        self.qa = load_db(self.loaded_file, "stuff", 4, self.llm)

    def call_load_db(self, count):
        if count == 0 or not file_input.value:
            return pn.pane.Markdown(f"Loaded File: {self.loaded_file}")
        else:
            file_input.save("temp.pdf")
            self.loaded_file = file_input.filename
            self.qa = load_db("temp.pdf", "stuff", 4, self.llm)
            button_load.button_style = "solid"
        self.clr_history()
        return pn.pane.Markdown(f"Loaded File: {self.loaded_file}")

    def is_repetitive(self, text, threshold=3):
        words = text.split()
        for i in range(len(words) - threshold):
            if words[i:i+threshold] == words[i+threshold:i+2*threshold]:
                return True
        return False

    def convchain(self, query):
        if not query:
            return pn.WidgetBox(pn.Row('User:', pn.pane.Markdown("", width=600)), scroll=True)

        try:
            result = self.qa.invoke({"question": query, "chat_history": self.chat_history})
            answer = result["answer"]

            # Check for repetitive output
            if self.is_repetitive(answer):  # Correctly call the method
                answer = "I'm sorry, I seem to be repeating myself. Could you please rephrase your question?"

            self.chat_history.extend([(query, answer)])
            self.db_query = result["generated_question"]
            self.db_response = result["source_documents"]
            self.answer = answer
            self.panels.extend([
                pn.Row('User:', pn.pane.Markdown(query, width=600)),
                pn.Row('ChatBot:', pn.pane.Markdown(self.answer, width=600))
            ])
            inp.value = ''  # Clear input field
            return pn.WidgetBox(*self.panels, scroll=True)
        except Exception as e:
            self.answer = f"Error: {str(e)}"
            return pn.WidgetBox(pn.Row('ChatBot:', pn.pane.Markdown(self.answer, width=600, styles={'background-color': '#FFCCCB'})))

    @param.depends('db_query')
    def get_lquest(self):
        if not self.db_query:
            return pn.Column(
                pn.Row(pn.pane.Markdown(f"Last question to DB:", styles={'background-color': '#F6F6F6'})),
                pn.Row(pn.pane.Str("no DB accesses so far"))
            )
        return pn.Column(
            pn.Row(pn.pane.Markdown(f"DB query:", styles={'background-color': '#F6F6F6'})),
            pn.pane.Str(self.db_query)
        )

    @param.depends('db_response')
    def get_sources(self):
        if not self.db_response:
            return
        rlist = [pn.Row(pn.pane.Markdown(f"Result of DB lookup:", styles={'background-color': '#F6F6F6'}))]
        for doc in self.db_response:
            rlist.append(pn.Row(pn.pane.Str(doc)))
        return pn.WidgetBox(*rlist, width=600, scroll=True)

    @param.depends('chat_history')
    def get_chats(self):
        if not self.chat_history:
            return pn.WidgetBox(pn.Row(pn.pane.Str("No History Yet")), width=600, scroll=True)
        rlist = [pn.Row(pn.pane.Markdown(f"Current Chat History variable"))]
        for exchange in self.chat_history:
            rlist.append(pn.Row(pn.pane.Str(exchange)))
        return pn.WidgetBox(*rlist, width=600, scroll=True)

    def clr_history(self, count=0):
        self.chat_history = []
        return

# Instantiate the chatbot
cb = ChatBot()

# Create Panel widgets
file_input = pn.widgets.FileInput(accept='.pdf')
button_load = pn.widgets.Button(name="Load DB", button_type='primary')
button_clearhistory = pn.widgets.Button(name="Clear History", button_type='warning')
inp = pn.widgets.TextInput(placeholder='Enter text here…')

# Bind widgets to functions
button_clearhistory.on_click(cb.clr_history)
bound_button_load = pn.bind(cb.call_load_db, button_load.param.clicks)
conversation = pn.bind(cb.convchain, inp)

# Create dashboard tabs
tab1 = pn.Column(
    pn.Row(inp),
    pn.layout.Divider(),
    pn.panel(conversation, loading_indicator=True, height=300),
    pn.layout.Divider(),
)

tab2 = pn.Column(
    pn.panel(cb.get_lquest),
    pn.layout.Divider(),
    pn.panel(cb.get_sources),
)

tab3 = pn.Column(
    pn.panel(cb.get_chats),
    pn.layout.Divider(),
)

tab4 = pn.Column(
    pn.Row(file_input, button_load, bound_button_load),
    pn.Row(button_clearhistory, pn.pane.Markdown("Clears chat history. Can use to start a new topic")),
    pn.layout.Divider(),
)

# Assemble dashboard
dashboard = pn.Column(
    pn.Row(pn.pane.Markdown('# SocialResearch Bot! Ask me something!')),
    pn.Tabs(
        ('Conversation', tab1),
        ('Database', tab2),
        ('Chat History', tab3),
        ('Configure', tab4)
    )
)

# Serve the dashboard
dashboard.servable()

### Simple version (not really working)

In [None]:
!pip install -q langchain chromadb transformers sentence-transformers PyPDF2 langchain-community

In [None]:
import torch

# Verifica se la GPU è disponibile
if torch.cuda.is_available():
    device = torch.device("cuda")  # Usa la GPU
    print("GPU disponibile:", torch.cuda.get_device_name(0))
else:
    device = torch.device("cpu")  # Usa la CPU
    print("GPU non disponibile, verrà utilizzata la CPU.")

In [None]:
import os
from PyPDF2 import PdfReader
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter

# 1. Configurazione del database vettoriale
persist_directory = 'docs/chroma/'
embedding = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
vectordb = Chroma(persist_directory=persist_directory, embedding_function=embedding)

# 2. Funzione per estrarre testo da un file PDF
def extract_text_from_pdf(pdf_path):
    reader = PdfReader(pdf_path)
    text = ""
    for page in reader.pages:
        text += page.extract_text()
    return text

# 3. Cartella contenente i file PDF
pdf_folder = '/content/drive/MyDrive/Github Repository/SocialResearch/references'  # Sostituisci con il percorso della tua cartella

# 4. Leggi tutti i file PDF nella cartella e estrai il testo
documents = []
for filename in os.listdir(pdf_folder):
    if filename.endswith(".pdf"):
        pdf_path = os.path.join(pdf_folder, filename)
        print(f"Processing {filename}...")
        text = extract_text_from_pdf(pdf_path)
        documents.append(text)

text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
documents = text_splitter.split_documents(documents)

# 5. Aggiungi i documenti al database vettoriale
vectordb.add_texts(documents)  # Usa add_texts per liste di stringhe

# 6. Salva il database (opzionale)
vectordb.persist()

In [None]:
question = "Trust is important in politics?"
docs = vectordb.similarity_search(question,k=4)
len(docs)

In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM

# Load the tokenizer and model
model_name = "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)

model.to(device)

# Define a prompt
prompt = "Who is Elon Musk"

# Tokenize the input
inputs = tokenizer(prompt, return_tensors="pt", padding=True).to(device)

attention_mask = inputs.attention_mask

# Generate text
outputs = model.generate(
    inputs.input_ids,
    attention_mask=attention_mask,
    max_new_tokens=200,  # Limit the number of new tokens generated
    #top_p=0.95,          # Use nucleus sampling (top-p sampling)
    temperature=0.1      # Control the randomness of predictions
)

# Decode the generated text
generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)

# Print the result
print(generated_text)

In [None]:
from langchain.prompts import PromptTemplate
template = """Use the following pieces of context to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer. Use three sentences maximum. Keep the answer as concise as possible. Always say "thanks for asking!" at the end of the answer.
{context}
Question: {question}
Helpful Answer:"""
QA_CHAIN_PROMPT = PromptTemplate(input_variables=["context", "question"],template=template,)

# Run chain
from langchain.chains import RetrievalQA
question = "Trust is important in politics?"
qa_chain = RetrievalQA.from_chain_type(model,
                                       retriever=vectordb.as_retriever(),
                                       return_source_documents=True,
                                       chain_type_kwargs={"prompt": QA_CHAIN_PROMPT})


result = qa_chain({"query": question})
result["result"]

### Compact Version

In [None]:
import os
from PyPDF2 import PdfReader
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.docstore.document import Document

# 1. Configurazione del database vettoriale
persist_directory = 'docs/chroma/'
embedding = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
vectordb = Chroma(persist_directory=persist_directory, embedding_function=embedding)

# 2. Funzione per estrarre testo da un file PDF
def extract_text_from_pdf(pdf_path):
    reader = PdfReader(pdf_path)
    text = ""
    for page in reader.pages:
        text += page.extract_text()
    return text

# 3. Cartella contenente i file PDF
pdf_folder = '/content/drive/MyDrive/Github Repository/SocialResearch/references'  # Sostituisci con il percorso della tua cartella

# 4. Leggi tutti i file PDF nella cartella e estrai il testo
documents = []
for filename in os.listdir(pdf_folder):
    if filename.endswith(".pdf"):
        pdf_path = os.path.join(pdf_folder, filename)
        print(f"Processing {filename}...")
        text = extract_text_from_pdf(pdf_path)
        documents.append(Document(page_content=text))  # Convert text to Document object

# 5. Split documents into chunks
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
split_documents = text_splitter.split_documents(documents)

# 6. Aggiungi i documenti al database vettoriale
vectordb.add_texts([doc.page_content for doc in split_documents])  # Extract text from Document objects

# 7. Salva il database (opzionale)
vectordb.persist()

In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM
from langchain.prompts import PromptTemplate
from langchain.chains import RetrievalQA
from langchain_huggingface import HuggingFacePipeline
from langchain.text_splitter import RecursiveCharacterTextSplitter
import torch

# Load the tokenizer and model
model_name = "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)

# Check GPU availability and move model to device
if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

#Move the model to the specified device
model.to(device)

# Define a prompt template
template = """Use the following pieces of context to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer. Use three sentences maximum. Keep the answer as concise as possible. Always say "thanks for asking!" at the end of the answer. Write only the answer, without any oders informations! Dont report the prompt for any reasons!
{context}.
Question: {question}
Helpful Answer:"""
QA_CHAIN_PROMPT = PromptTemplate(input_variables=["context", "question"], template=template)

# Wrap the model in a HuggingFacePipeline
from transformers import pipeline

pipe = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    device=0 if device == "cuda" else -1,
    max_new_tokens=100,
    top_p=0.95,
    temperature=0.1
)

llm = HuggingFacePipeline(pipeline=pipe)

# Run chain
question = "Trust is important in politics?"
qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectordb.as_retriever(),
    return_source_documents=False,
    chain_type_kwargs={"prompt": QA_CHAIN_PROMPT}
)

# Get the result
result = qa_chain.invoke({"query": question})

# Print only the question and the response
print(f"Question: {question}")
print(f"Response: {result['result']}")