In [1]:
! nvidia-smi -L

GPU 0: NVIDIA GeForce MX330 (UUID: GPU-997e1912-4b02-eb94-fde9-258b23b45a72)


In [None]:
%%time

# ! pip install -qq -U langchain tiktoken pypdf faiss-gpu
! pip install -qq -U langchain tiktoken faiss-gpu
! pip install -qq -U transformers InstructorEmbedding sentence_transformers
! pip install -qq -U accelerate bitsandbytes xformers einops

In [None]:
pip install gradio

In [None]:
import warnings
warnings.filterwarnings("ignore")

import os
import glob
import textwrap
import time

import langchain

# loaders
# from langchain.document_loaders import PyPDFLoader
# from langchain.document_loaders import DirectoryLoader
from langchain.document_loaders import TextLoader


# splits
from langchain.text_splitter import RecursiveCharacterTextSplitter

# prompts
from langchain import PromptTemplate, LLMChain

# vector stores
from langchain.vectorstores import FAISS

# models
from langchain.llms import HuggingFacePipeline
from InstructorEmbedding import INSTRUCTOR
from langchain.embeddings import HuggingFaceInstructEmbeddings

# retrievers
from langchain.chains import RetrievalQA

import torch
import transformers
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline

print(langchain.__version__)

In [None]:
import requests
from bs4 import BeautifulSoup
import difflib

# Function to search for a book by name and return the best match URL
def search_book_by_name(book_name):
    base_url = "https://www.gutenberg.org/"
    search_url = base_url + "ebooks/search/?query=" + book_name.replace(" ", "+") + "&submit_search=Go%21"
    
    response = requests.get(search_url)
    soup = BeautifulSoup(response.content, "html.parser")

    # Find the best match link based on similarity ratio
    best_match_ratio = 0
    best_match_url = ""

    for link in soup.find_all("li", class_="booklink"):
        link_title = link.find("span", class_="title").get_text()
        similarity_ratio = difflib.SequenceMatcher(None, book_name.lower(), link_title.lower()).ratio()
        if similarity_ratio > best_match_ratio:
            best_match_ratio = similarity_ratio
            best_match_url = base_url + link.find("a").get("href")

    return best_match_url

# Function to get the "Plain Text UTF-8" download link from the book page
def get_plain_text_link(book_url):
    response = requests.get(book_url)
    soup = BeautifulSoup(response.content, "html.parser")
    
    plain_text_link = ""
    
    for row in soup.find_all("tr"):
        format_cell = row.find("td", class_="unpadded icon_save")
        if format_cell and "Plain Text UTF-8" in format_cell.get_text():
            plain_text_link = format_cell.find("a").get("href")
            break
    
    return plain_text_link


# Function to get the content of the "Plain Text UTF-8" link
def get_plain_text_content(plain_text_link):
    response = requests.get(plain_text_link)
    content = response.text
    return content


# Main function
def load_book(book_name):
    best_match_url = search_book_by_name(book_name)

    if best_match_url:
        plain_text_link = get_plain_text_link(best_match_url)
        if plain_text_link:
            full_plain_text_link = "https://www.gutenberg.org" + plain_text_link
            plain_text_content = get_plain_text_content(full_plain_text_link)
#             print("Plain Text UTF-8 content:", plain_text_content)
            
            book_text = plain_text_content
            
            file = book_name + ".txt"

            # Remove the BOM character if it exists
            book_text = book_text.lstrip('\ufeff')

            # Choose an appropriate encoding, such as 'utf-8'
            with open(file, "w", encoding="utf-8") as file:
                file.write(book_text)
                
            return book_text
        else:
            print("No Plain Text UTF-8 link found.")
            return "web site error"
    else:
        print("No matching book found.")
        return "web site error"

# def clean_book_content(book_text):
#     cleaned_book_text = book_text.replace("\n", " ")
#     cleaned_book_text = cleaned_book_text.replace("\r", " ")
#     cleaned_book_text = cleaned_book_text.replace("\ufeff", "")
#     return cleaned_book_text

In [None]:
class CFG:
    # LLMs
    model_name = 'llama2-13b' # wizardlm, bloom, falcon, llama2-7b, llama2-13b
    temperature = 0,
    top_p = 0.95,
    repetition_penalty = 1.15    

    # splitting
    split_chunk_size = 800
    split_overlap = 0
    
    # embeddings
    embeddings_model_repo = 'hkunlp/instructor-base'    

    # similar passages
    k = 3

In [None]:
def get_model(model = CFG.model_name):

    print('\nDownloading model: ', model, '\n\n')

    if model == 'wizardlm':
        model_repo = 'TheBloke/wizardLM-7B-HF'
        
        tokenizer = AutoTokenizer.from_pretrained(model_repo)

        model = AutoModelForCausalLM.from_pretrained(
            model_repo,
            load_in_4bit=True,
            device_map='auto',
            torch_dtype=torch.float16,
            low_cpu_mem_usage=True
        )
        
        max_len = 1024

    elif model == 'llama2-7b':
        model_repo = 'daryl149/llama-2-7b-chat-hf'
        
        tokenizer = AutoTokenizer.from_pretrained(model_repo, use_fast=True)

        model = AutoModelForCausalLM.from_pretrained(
            model_repo,
            load_in_4bit=True,
            device_map='auto',
            torch_dtype=torch.float16,
            low_cpu_mem_usage=True,
            trust_remote_code=True
        )
        
        max_len = 2048

    elif model == 'llama2-13b':
        model_repo = 'daryl149/llama-2-13b-chat-hf'
        
        tokenizer = AutoTokenizer.from_pretrained(model_repo, use_fast=True)

        model = AutoModelForCausalLM.from_pretrained(
            model_repo,
            load_in_4bit=True,
            device_map='auto',
            torch_dtype=torch.float16,
            low_cpu_mem_usage=True,
            trust_remote_code=True
        )
        
        max_len = 8192

    elif model == 'bloom':
        model_repo = 'bigscience/bloom-7b1'
        
        tokenizer = AutoTokenizer.from_pretrained(model_repo)

        model = AutoModelForCausalLM.from_pretrained(
            model_repo,
            load_in_4bit=True,
            device_map='auto',
            torch_dtype=torch.float16,
            low_cpu_mem_usage=True,
        )
        
        max_len = 1024

    elif model == 'falcon':
        model_repo = 'h2oai/h2ogpt-gm-oasst1-en-2048-falcon-7b-v2'
        
        tokenizer = AutoTokenizer.from_pretrained(model_repo)

        model = AutoModelForCausalLM.from_pretrained(
            model_repo,
            load_in_4bit=True,
            device_map='auto',
            torch_dtype=torch.float16,
            low_cpu_mem_usage=True,
            trust_remote_code=True
        )
        
        max_len = 1024

    else:
        print("Not implemented model (tokenizer and backbone)")

    return tokenizer, model, max_len

In [None]:
%%time

tokenizer, model, max_len = get_model(model = CFG.model_name)

In [None]:
pipe = pipeline(
    task = "text-generation",
    model = model,
    tokenizer = tokenizer,
    pad_token_id = tokenizer.eos_token_id,
    max_length = max_len,
    temperature = CFG.temperature,
    top_p = CFG.top_p,
    repetition_penalty = CFG.repetition_penalty
)

llm = HuggingFacePipeline(pipeline = pipe)

In [None]:
llm

In [None]:
### testing model, not using the book yet
### answer is not necessarily related to the book
query = "Give me 5 examples of cool potions and explain what they do"
llm(query)

In [None]:
prompt_template = """
Don't try to make up an answer, if you don't know just say that you don't know.
Answer in the same language the question was asked.
Use only the following pieces of context to answer the question at the end.

{context}

Question: {question}
Answer:"""


PROMPT = PromptTemplate(
    template = prompt_template, 
    input_variables = ["context", "question"]
)

In [None]:
def loadForEmbeddings(book_txt):
    # load document
    loader = TextLoader(book_txt, encoding="utf-8")
    documents = loader.load()
    
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size = CFG.split_chunk_size,
        chunk_overlap = CFG.split_overlap
    )

    texts = text_splitter.split_documents(documents)
    return texts


In [None]:
def wrap_text_preserve_newlines(text, width=200): # 110
    # Split the input text into lines based on newline characters
    lines = text.split('\n')

    # Wrap each line individually
    wrapped_lines = [textwrap.fill(line, width=width) for line in lines]

    # Join the wrapped lines back together using newline characters
    wrapped_text = '\n'.join(wrapped_lines)

    return wrapped_text


def process_llm_response(llm_response):
    ans = wrap_text_preserve_newlines(llm_response['result'])
    
    sources_used = llm_response['source_documents'][0].metadata['source']
    
    ans = ans + '\n\nSources: \n' + sources_used
    return ans

In [None]:
def llm_ans(query):
    start = time.time()
    qa_chain = RetrievalQA.from_chain_type(
        llm = llm,
        chain_type = "stuff", # map_reduce, map_rerank, stuff, refine
        retriever = retriever, 
        chain_type_kwargs = {"prompt": PROMPT},
        return_source_documents = True,
        verbose = False
    )
    llm_response = qa_chain(query)
    ans = process_llm_response(llm_response)
    end = time.time()

    time_elapsed = int(round(end - start, 0))
    time_elapsed_str = f'\n\nTime elapsed: {time_elapsed} s'
    return ans + time_elapsed_str

In [None]:
import gradio as gr

title = "GutenbergChat Hub"
vectordb = None
retriever = None

# Submit book
def submit_book(book_name):
    global vectordb, retriever
    if not book_name:
        return "Please enter the name of the book."
    
    book_text = load_book(book_name)
    file = book_name + ".txt"
    texts = loadForEmbeddings(file)

    ### download embeddings model
    instructor_embeddings = HuggingFaceInstructEmbeddings(
        model_name = CFG.embeddings_model_repo,
        model_kwargs = {"device": "cuda"}
    )

    ### create embeddings and DB
    vectordb = FAISS.from_documents(
        documents = texts, 
        embedding = instructor_embeddings
    )

    ### persist vector database
    vectordb.save_local("faiss_index_hp")
    
    retriever = vectordb.as_retriever(search_kwargs = {"k": CFG.k, "search_type" : "similarity"})

    
    return "done"


# Q/A
def get_response(prompt):
#     if (not book_name and not prompt):
#         return "Please enter the name of the book and the prompt."
#     if not book_name:
#         return "Please enter the name of the book."
#     if not prompt:
#         return "Please enter the prompt."
    query = prompt
#     llm_response = qa_chain(query)
    
    return llm_ans(query)

# Interface 1
submitBook = gr.Interface(fn=submit_book, inputs="text", outputs="text", title="Submit your book here first")
# Interface 2
chatBot = gr.Interface(
        fn=get_response,
        inputs="text",
        outputs="text",
        title=title + " - Q/A-Bot",
        description="Enter the name of the book in previous tab, then ask questions here",
        examples=["What are the characters in the book?"]
    )

demo = gr.TabbedInterface([submitBook, chatBot], ["SubmitBook", "Q/A-Bot"])
demo.launch()

In [None]:
# ### test if vector DB was loaded correctly
# vectordb.similarity_search('magic creatures')

In [None]:
# ### testing MMR search
# question = "What are the characters"
# vectordb.max_marginal_relevance_search(question, k = CFG.k)

In [None]:
# # Gradio app
# def chatbot_interface(book_name, query):
#     if not book_name:
#         return "Please enter the name of the book."
    
#     book_text = load_book(book_name)  # Implement the function to load the book
    
#     # Generate book summary
#     book_summary = generate_summary(book_text)
    
#     llm_response = qa_chain(query)  # Implement the function to get the LLM response
    
#     return f"Book Summary: {book_summary}\n\nYou: {query}\nBot: {llm_response}"

# if __name__ == "__main__":
#     iface = gr.Interface(
#         fn=chatbot_interface,
#         inputs=["text", "text"],
#         outputs="text",
#         title="Chatbot Gradio Web App",
#         description="Enter the name of the book, then ask questions to the chatbot.",
#         examples=[["Book Name", "What are the characters in the book?"]]
#     )
#     iface.launch()

In [None]:
# pip install streamlit

In [None]:
# import streamlit as st
# from transformers import BartTokenizer, BartForConditionalGeneration

# # Load pre-trained BART model and tokenizer
# tokenizer = BartTokenizer.from_pretrained('facebook/bart-large-cnn')
# model = BartForConditionalGeneration.from_pretrained('facebook/bart-large-cnn')

# # Function to generate book summary
# def generate_summary(book_text):
#     # Split the source document into chunks
#     chunk_size = 10000 # Adjust as needed
#     chunks = [book_text[i:i + chunk_size] for i in range(0, len(book_text), chunk_size)]

#     # Generate summaries for each chunk
#     summaries = []
#     for chunk in chunks:
#         inputs = tokenizer.encode("summarize: " + chunk, return_tensors="pt", max_length=1024, truncation=True)
#         summary_ids = model.generate(inputs, max_length=300, min_length=50, length_penalty=2.0, num_beams=4, early_stopping=True)
#         summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True)
#         summaries.append(summary)

#     # Combine the summaries of all chunks
#     combined_summary = " ".join(summaries)
    
#     # Tokenize and summarize the combined summary
#     combined_inputs = tokenizer.encode("summarize: " + combined_summary, return_tensors="pt", max_length=1024, truncation=True)
#     combined_summary_ids = model.generate(combined_inputs, max_length=300, min_length=50, length_penalty=2.0, num_beams=4, early_stopping=True)
#     final_summary = tokenizer.decode(combined_summary_ids[0], skip_special_tokens=True)
    
#     return final_summary

# # Streamlit app
# def main():
#     st.title("Chatbot Web App")
    
#     # Input for the book name
#     book_name = st.text_input("Enter the name of the book:")
    
#     if book_name:
#         book_text = load_book(book_name)  # Implement the function to load the book
        
#         # Generate and display the book summary
#         book_summary = generate_summary(book_text)
#         st.subheader("Book Summary:")
#         st.write(book_summary)
#     else:
#         st.warning("Please enter the name of the book.")
    
#     st.sidebar.title("Chatbot")
    
#     # Interactive chatbot section
#     query = st.sidebar.text_input("Enter your question:")
    
#     if query:
#         llm_response = qa_chain(query)  # Implement the function to get the LLM response
#         st.sidebar.subheader("You:")
#         st.sidebar.write(query)
        
#         st.sidebar.subheader("Bot:")
#         st.sidebar.write(llm_response)

# if __name__ == "__main__":
#     main()


In [None]:
# !streamlit run /opt/conda/lib/python3.10/site-packages/ipykernel_launcher.py

In [None]:
# pip install gradio

In [None]:
# import gradio as gr
# from transformers import BartTokenizer, BartForConditionalGeneration

# # Load pre-trained BART model and tokenizer
# tokenizer = BartTokenizer.from_pretrained('facebook/bart-large-cnn')
# model = BartForConditionalGeneration.from_pretrained('facebook/bart-large-cnn')

# # Function to generate book summary
# def generate_summary(book_text):
#     # Split the source document into chunks
#     chunk_size = 10000 # Adjust as needed
#     chunks = [book_text[i:i + chunk_size] for i in range(0, len(book_text), chunk_size)]

#     # Generate summaries for each chunk
#     summaries = []
#     for chunk in chunks:
#         inputs = tokenizer.encode("summarize: " + chunk, return_tensors="pt", max_length=1024, truncation=True)
#         summary_ids = model.generate(inputs, max_length=300, min_length=50, length_penalty=2.0, num_beams=4, early_stopping=True)
#         summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True)
#         summaries.append(summary)

#     # Combine the summaries of all chunks
#     combined_summary = " ".join(summaries)
    
#     # Tokenize and summarize the combined summary
#     combined_inputs = tokenizer.encode("summarize: " + combined_summary, return_tensors="pt", max_length=1024, truncation=True)
#     combined_summary_ids = model.generate(combined_inputs, max_length=300, min_length=50, length_penalty=2.0, num_beams=4, early_stopping=True)
#     final_summary = tokenizer.decode(combined_summary_ids[0], skip_special_tokens=True)
    
#     return final_summary

# # Gradio app
# def chatbot_interface(book_name, query):
#     if not book_name:
#         return "Please enter the name of the book."
    
#     book_text = load_book(book_name)  # Implement the function to load the book
    
#     # Generate book summary
#     book_summary = generate_summary(book_text)
    
#     llm_response = qa_chain(query)  # Implement the function to get the LLM response
    
#     return f"Book Summary: {book_summary}\n\nYou: {query}\nBot: {llm_response}"

# if __name__ == "__main__":
#     iface = gr.Interface(
#         fn=chatbot_interface,
#         inputs=["text", "text"],
#         outputs="text",
#         title="Chatbot Gradio Web App",
#         description="Enter the name of the book, then ask questions to the chatbot.",
#         examples=[["Book Name", "What are the characters in the book?"]]
#     )
#     iface.launch()

In [None]:
# import requests
# from bs4 import BeautifulSoup
# import difflib

# # Function to search for a book by name and return the best match URL
# def search_book_by_name(book_name):
#     base_url = "https://www.gutenberg.org/"
#     search_url = base_url + "ebooks/search/?query=" + book_name.replace(" ", "+") + "&submit_search=Go%21"
    
#     response = requests.get(search_url)
#     soup = BeautifulSoup(response.content, "html.parser")

#     # Find the best match link based on similarity ratio
#     best_match_ratio = 0
#     best_match_url = ""

#     for link in soup.find_all("li", class_="booklink"):
#         link_title = link.find("span", class_="title").get_text()
#         similarity_ratio = difflib.SequenceMatcher(None, book_name.lower(), link_title.lower()).ratio()
#         if similarity_ratio > best_match_ratio:
#             best_match_ratio = similarity_ratio
#             best_match_url = base_url + link.find("a").get("href")

#     return best_match_url

# # Function to get the "Plain Text UTF-8" download link from the book page
# def get_plain_text_link(book_url):
#     response = requests.get(book_url)
#     soup = BeautifulSoup(response.content, "html.parser")
    
#     plain_text_link = ""
    
#     for row in soup.find_all("tr"):
#         format_cell = row.find("td", class_="unpadded icon_save")
#         if format_cell and "Plain Text UTF-8" in format_cell.get_text():
#             plain_text_link = format_cell.find("a").get("href")
#             break
    
#     return plain_text_link


# # Function to get the content of the "Plain Text UTF-8" link
# def get_plain_text_content(plain_text_link):
#     response = requests.get(plain_text_link)
#     content = response.text
#     return content


# # Main function
# def load_book(book_name):
#     best_match_url = search_book_by_name(book_name)

#     if best_match_url:
#         plain_text_link = get_plain_text_link(best_match_url)
#         if plain_text_link:
#             full_plain_text_link = "https://www.gutenberg.org" + plain_text_link
#             plain_text_content = get_plain_text_content(full_plain_text_link)
# #             print("Plain Text UTF-8 content:", plain_text_content)
            
#             book_text = plain_text_content
            
#             file = book_name + ".txt"

#             # Remove the BOM character if it exists
#             book_text = book_text.lstrip('\ufeff')

#             # Choose an appropriate encoding, such as 'utf-8'
#             with open(file, "w", encoding="utf-8") as file:
#                 file.write(book_text)
                
#             return book_text
#         else:
#             print("No Plain Text UTF-8 link found.")
#             return "web site error"
#     else:
#         print("No matching book found.")
#         return "web site error"

# # def clean_book_content(book_text):
# #     cleaned_book_text = book_text.replace("\n", " ")
# #     cleaned_book_text = cleaned_book_text.replace("\r", " ")
# #     cleaned_book_text = cleaned_book_text.replace("\ufeff", "")
# #     return cleaned_book_text

In [None]:
# from transformers import BartTokenizer, BartForConditionalGeneration

# # Load pre-trained BART model and tokenizer
# tokenizer = BartTokenizer.from_pretrained('facebook/bart-large-cnn')
# model = BartForConditionalGeneration.from_pretrained('facebook/bart-large-cnn')

# # Function to generate book summary
# def generate_summary(book_text):
#     # Split the source document into chunks
#     chunk_size = 100000 # Adjust as needed
#     chunks = [book_text[i:i + chunk_size] for i in range(0, len(book_text), chunk_size)]

#     # Generate summaries for each chunk
#     summaries = []
#     for chunk in chunks:
#         inputs = tokenizer.encode("summarize: " + chunk, return_tensors="pt", max_length=1024, truncation=True)
#         summary_ids = model.generate(inputs, max_length=300, min_length=50, length_penalty=2.0, num_beams=4, early_stopping=True)
#         summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True)
#         summaries.append(summary)

#     # Combine the summaries of all chunks
#     combined_summary = " ".join(summaries)
    
#     # Tokenize and summarize the combined summary
#     combined_inputs = tokenizer.encode("summarize: " + combined_summary, return_tensors="pt", max_length=1024, truncation=True)
#     combined_summary_ids = model.generate(combined_inputs, max_length=300, min_length=50, length_penalty=2.0, num_beams=4, early_stopping=True)
#     final_summary = tokenizer.decode(combined_summary_ids[0], skip_special_tokens=True)
    
#     return final_summary

In [None]:
import re
from transformers import BartTokenizer, BartForConditionalGeneration

# Load pre-trained BART model and tokenizer
tokenizer = BartTokenizer.from_pretrained('facebook/bart-large-cnn')
model = BartForConditionalGeneration.from_pretrained('facebook/bart-large-cnn')

# Function to generate book summary
def generate_summary(book_text):

    # Define the possible variations of the start marker
    possible_start_markers = [
        r"\*\*\* START OF THIS PROJECT GUTENBERG EBOOK (.+?) \*\*\*",
        r"\*\*\* START OF THE PROJECT GUTENBERG EBOOK (.+?) \*\*\*"
    ]

    # Fetch the plain_text_content of the book (assuming you have it)
    plain_text_content = book_text  # Fetch the content here

    start_index = None
    for start_marker_pattern in possible_start_markers:
        match = re.search(start_marker_pattern, plain_text_content)
        if match:
            start_index = match.start()
            book_name = match.group(1)
            break

    if start_index is not None:
        end_marker = f"*** END OF THE PROJECT GUTENBERG EBOOK {book_name} ***"

        end_index = plain_text_content.find(end_marker, start_index)

        if end_index != -1:
            text_to_summarize = plain_text_content[start_index + len(match.group(0)):end_index]

            # Encode the text using the tokenizer
            input_ids = tokenizer.encode(text_to_summarize, return_tensors='pt', max_length=1024, truncation=True)

            # Generate the summary using the BART model, aiming for 200-300 words
            summary_ids = model.generate(input_ids, max_length=300, min_length=150, length_penalty=2.0, num_beams=4, early_stopping=True)

            # Decode the summary_ids to get the human-readable summary
            summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True)

            # Print the generated summary
            print("Generated Summary:", summary.strip())  # Use strip() to remove leading/trailing spaces
            return summary.strip()
        else:
            print(f"End marker not found for {book_name}.")
            return "book content error"
    else:
        print("Start marker not found.")
        return "book content error"

In [None]:
%%time

# load book
book_text = load_book("The changed brides")
    
# Generate book summary
book_summary = generate_summary(book_text)

print(book_summary)

In [None]:
import gradio as gr

title = "GutenbergChat Hub"

# Summary
def get_summary(book_name):
    if not book_name:
        return "Please enter the name of the book."
    
    # load book
    book_text = load_book(book_name)
    
    # Generate book summary
    book_summary = generate_summary(book_text)
    
    return f"Book Summary: {book_summary}\n"

# TODO: Q/A
# Q/A
def get_response(book_name, prompt):
    if (not book_name and not prompt):
        return "Please enter the name of the book and the prompt."
    if not book_name:
        return "Please enter the name of the book."
    if not prompt:
        return "Please enter the prompt."
    return "#TODO " + prompt

# Interface 1
summaryBot = gr.Interface(fn=get_summary, inputs="text", outputs="text", title=title + " - Summary")
# Interface 2
chatbot = gr.Interface(fn=get_response, inputs=["text", "text"], outputs="text", title=title + " - Q/A-Bot")

demo2 = gr.TabbedInterface([summaryBot, chatbot], ["Summary", "Q/A-Bot"])
demo2.launch()
