In [21]:
import os
from dotenv import load_dotenv 
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from tavily import TavilyClient

In [22]:
from ebooklib import epub 

load_dotenv(dotenv_path="/app/.env")
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")
os.environ["TAVILY_API_KEY"] = os.getenv("TAVILY_API_KEY")
os.environ["PINECONE_API_KEY"] = os.getenv("PINECONE_API_KEY")

#intiialize LLM
llm = ChatOpenAI(
    model="gpt-5-nano"
)

In [23]:
tavily_client = TavilyClient()

def get_search(topic: str) -> str: 
    """ Performs a web search on a certain topic """
    response = tavily_client.search(topic)
    return response

def generate_response(prompt):
    msg = ChatPromptTemplate.from_messages([ 
        ("system", "You are a helpful assistant that will help summarize blocks of info from a given search topic"), 
        ("user", "{prompt}") ])    
    #dont forget this step 
    formatted_msg = msg.format_messages(prompt=prompt)
    result = llm.invoke(formatted_msg) 
    return result.content

In [24]:
#search_query = get_search("tell me about the history of south korea")
#generate llm response 
#answer = generate_response(search_query)
#print(answer)


In [25]:
book_directories = []

In [26]:
#test file searching
from bs4 import BeautifulSoup
import json 

#trawl the non fiction directory 
def crawl_directories(root):
    for folders in os.listdir(root):
        folder_path = os.path.join(root, folders)
        if not os.path.isdir(folder_path):
            continue
            
        for files in os.listdir(folder_path):
            file_path = os.path.join(folder_path, files)
            if not files.lower().endswith(".epub"):
                continue
            try:
                nihao = epub.read_epub(file_path)
                title = nihao.get_metadata("DC", "title")
                author = nihao.get_metadata("DC", "creator")
                description = nihao.get_metadata("DC", "description")
                if len(description) == 0:
                    desc = generate_description(title, author)
                    nihao.add_metadata("DC", "description", desc)
                    epub.write_epub(file_path, nihao)

                if file_path not in book_directories:
                    add_to_array(file_path)
            except Exception as e:
                #identify the corrupt epubs
                print("BAD EPUB:", file_path, "=>", e)
                continue

def add_to_array(file_path):
    book_directories.append(file_path)
    print(file_path, "appended")
    return

#if the description is missing, fill it out
def generate_description(title, author):
    prompt = ChatPromptTemplate([
        ("system", """You are a professional summarizer well versed in all the books in the world. You are able to recall
        any information about a book. Your task will be to add in a description of what the book is about in less than 120 words.
        You should not spoil too much about the book, and only fill in the parts to let any reader understand what they are 
        about to read.

        Only return the summary and nothing else. Do not introduce yourself, and do not ask any other questions. Your job is only to print out 
        the description, and the description only. Do not mention the author or the title in your answer.
        
        Description:
        """),
        ("user", "{title}, {author}")
    ])

    formatted_prompt = prompt.format_messages(title=title, author=author)
    result = llm.invoke(formatted_prompt)
    return result.content

    
    

In [27]:
#testing
#generate_description("i, robot", "isaac asimov")

In [28]:
BOOK_DIR = "/ebooks"
OUTPUT= "/output"
FICTION = BOOK_DIR+"/Fiction"
NON_FICTION = BOOK_DIR+"/Non-Fiction"




In [29]:
#TRAWL DAT 
#crawl_directories(NON_FICTION)
#crawl_directories(FICTION)

In [30]:
#trawl the directory 
def read_books(root):
    for folders in os.listdir(root):
        folder_path = os.path.join(root, folders)
        if not os.path.isdir(folder_path):
            continue
            
        for files in os.listdir(folder_path):
            file_path = os.path.join(folder_path, files)
            if not files.lower().endswith(".epub"):
                continue
            try:
                nihao = epub.read_epub(file_path)
                title = nihao.get_metadata("DC", "title")
                author = nihao.get_metadata("DC", "creator")
                description = nihao.get_metadata("DC", "description")

                if file_path not in book_directories:
                    add_to_array(file_path)
            except Exception as e:
                #identify the corrupt epubs
                print("BAD EPUB:", file_path, "=>", e)
                continue

    

In [48]:
read_books(NON_FICTION)
#read_books(FICTION)

BAD EPUB: /ebooks/Non-Fiction/Science/Neil deGrasse Tyson - Astrophysics for Young People in a Hurry.epub => "There is no item named 'EPUB/toc.ncx' in the archive"
BAD EPUB: /ebooks/Non-Fiction/Economics/Ray Dalio - Principles for Dealing with the Changing World Orderï€º Why Nations Succeed and Fail.epub => "There is no item named 'EPUB/toc.ncx' in the archive"
BAD EPUB: /ebooks/Non-Fiction/Economics/Ray Dalio - Big Debt Crises.epub => 'Bad Zip file'
BAD EPUB: /ebooks/Non-Fiction/Self-Help/Oliver Burkeman - Four Thousand Weeks.epub => "There is no item named 'EPUB/toc.ncx' in the archive"
BAD EPUB: /ebooks/Non-Fiction/Self-Help/Michael Easter - The Comfort Crisis.epub => "There is no item named 'EPUB/9780593138779_ncx.ncx' in the archive"


In [49]:
#prepare for encoding and chunking
import html2text

#convert all epubs to text, add to array 

def epub_to_text(path: str) -> str:
    book = epub.read_epub(path)
    texts = []

    for item in book.get_items():
        if isinstance(item, epub.EpubHtml):
            html = item.get_content().decode("utf-8", errors="ignore")

            # basic HTML tag removal
            cleaned = (
                html.replace("<br>", "\n")
                    .replace("<br/>", "\n")
                    .replace("</p>", "\n\n")
            )

            import re
            cleaned = re.sub("<.*?>", "", cleaned)  # strip remaining tags

            texts.append(cleaned)

    return "\n".join(texts)



In [50]:
all_texts = []
for path in book_directories:
    text = epub_to_text(path)
    all_texts.append(text)

In [51]:
from langchain_openai import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_pinecone import PineconeVectorStore

In [52]:
splitter = RecursiveCharacterTextSplitter(
    chunk_size=2000,
    chunk_overlap=200
)

In [53]:
#chunk dat data
all_chunks = []

for path in book_directories:
    text = epub_to_text(path)
    chunks = splitter.split_text(text)

    #associate metadata
    for chunk in chunks:
        all_chunks.append({
            "text": chunk,
            "source": path
        })

In [54]:
#chroma 
import chromadb
from langchain_community.vectorstores import Chroma

embeddings = OpenAIEmbeddings()

#create chroma
vectorstore = Chroma(
    collection_name="ebook_texts", 
    embedding_function=embeddings,
    persist_directory="/outputs/chroma_db"
)


In [38]:
vectorstore.add_texts(
    texts=[c["text"] for c in all_chunks],
    metadatas=[{"source": c["source"]} for c in all_chunks]
)

BadRequestError: Error code: 400 - {'error': {'message': 'Requested 329414 tokens, max 300000 tokens per request', 'type': 'max_tokens_per_request', 'param': None, 'code': 'max_tokens_per_request'}}

In [47]:
texts=[c["text"] for c in all_chunks]
for c in all_chunks:
    print(len(c["text"]))
print(len(texts))

1826
1247
1304
1139
1947
1985
1990
1999
1939
1952
1979
1970
1350
1401
1406
1857
600
1996
378
1891
392
1923
1916
1777
1683
1953
1725
1649
1951
1661
1312
1688
1991
1892
1509
1672
899
1947
1678
1607
1559
1292
1676
1109
1736
1721
1576
1488
1342
1844
1975
1711
1773
1661
1762
1470
1739
1219
1887
1789
1527
1482
1994
802
1516
1754
1335
1471
1628
1659
1898
1923
1480
1569
1505
1986
1976
1534
1954
717
1854
766
1426
1737
1780
1154
1619
1747
1862
1665
1705
1494
1817
1504
1224
1053
1186
1152
1906
1912
1115
1536
1545
869
1241
1744
1000
1844
1310
1215
1675
1767
819
1748
1813
787
1997
428
1627
1828
1756
1766
1345
670
1679
1884
1519
1398
1900
1720
1438
1770
1966
546
1513
1444
1846
1672
1610
1725
952
1858
1468
1684
1663
1481
1591
1775
1711
1898
1129
1795
936
1683
1872
1598
1313
1065
1498
1753
1719
1533
1944
1460
1355
1257
1551
1273
1026
1602
1990
1005
1814
1838
1419
1393
1900
1937
1830
1930
1685
1889
1784
1751
1849
925
1886
1159
1752
1732
1896
1652
1482
1661
1884
1898
1826
1325
1612
924
1936
1506
1795
18

In [56]:
#because the total size of the arrays are too big, you must batch

#batching! 

BATCH_SIZE = 100
def batch_then_store():
    for i in range(0, len(all_chunks), BATCH_SIZE):
        batch = all_chunks[i:i+BATCH_SIZE]
        vectorstore.add_texts(
            texts= [c["text"] for c in batch],
            metadatas= [{"source": c["source"]} for c in batch]
        )

In [57]:
print("batching and adding to vectorstore")
batch_then_store()

batching and adding to vectorstore


In [None]:
###create vector store (using pinecone)
"""
vectorstore = PineconeVectorStore.from_existing_index(
    embedding=embeddings,
    index_name=index_name
)



#dis too large, batch it
batch_size = 100
def batch_list(data, batch_size):
    for i in range(0, len(data), batch_size):
        yield data[i:i+batch_size]

for batch in batch_list(all_chunks, batch_size):
    texts = [c["text"] for c in batch]
    metadata = [{"source": os.path.basename(c["source"])} for c in batch]
    vectorstore.add_texts(
     texts=texts,
     metadatas=metadata
    )

"""


In [65]:
retriever = vectorstore.as_retriever()

In [78]:
from operator import itemgetter

from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from langchain_openai import ChatOpenAI

system_prompt = """
You are a strict RAG model. 
Only answer using the provided context.
If the answer is not contained in the context, respond with:
"I don't know â€” no supporting text found."
"""

def build_prompt(input):
    return f"""
        {system_prompt}
        
        Question:
        {input["query"]}
        
        Context:
        {input["context"]}
    """

chain = (
    {"query": RunnablePassthrough(), "context": retriever | RunnableLambda(lambda docs: "\n\n".join([d.page_content for d in docs]))}
    | RunnableLambda(build_prompt)
    | llm
    | StrOutputParser()
)


chain.invoke("what happens in the three bodies problem? how does it end?")


'- In this context, the three-body problem is the chaotic gravitational interaction of three suns. It is described as a problem with sensitive dependence on initial conditions and no simple general solution (PoincarÃ©â€™s remark and Sundmanâ€™s note are mentioned).\n- The three suns create a tri-solar day that can destroy civilizations; Civilization Number 183 is wiped out by such a day.\n- After a long time, life and civilization begin again, moving forward through the unpredictable world of Three Body.\n- The excerpt ends with the note that the game has entered the second level, i.e., no final ending is shown in this passage.'