Data Ingestion

In [1]:
# import requests, zipfile, io
# from pathlib import Path

# zip_url = "https://public.ukp.informatik.tu-darmstadt.de/thakur/BEIR/datasets/fiqa.zip"
# out_dir = Path("fiqa")
# out_dir.mkdir(parents=True, exist_ok=True)

# # Download the ZIP into memory (fine for small/medium files)
# resp = requests.get(zip_url, timeout=120)
# resp.raise_for_status()

# # Extract all files
# with zipfile.ZipFile(io.BytesIO(resp.content)) as z:
#     z.extractall(out_dir)

# print(f"Extracted to: {out_dir.resolve()}")

In [2]:
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
import json
import os
from typing import List, Dict, Iterable

Get Data Ready

In [None]:
def read_jsonl(path: str) -> Iterable[Dict]:
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            if line.strip():
                yield json.loads(line)

def record_to_document(rec: Dict, source: str) -> Document:
    _id = rec.get("_id")
    title = rec.get("title", "")
    text = rec.get("text", "")

    if title and title.strip():
        content = f"{title.strip()}\n\n{text.strip()}"
    else:
        content = text.strip()

    meta = {**rec.get("metadata", {})}
    meta.update({
        "source": source,   # 'fiqa_corpus' or 'fiqa_queries'
        "id": _id,
    })
    if title:
        meta["title"] = title

    return Document(page_content=content, metadata=meta)

In [None]:
splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,      
    chunk_overlap=200,     
    separators=["\n\n", "\n", " ", ""],  
)

def preprocess_fiqa_corpus(corpus_path: str) -> List[Document]:
    docs = []
    for rec in read_jsonl(corpus_path):
        doc = record_to_document(rec, source="fiqa_corpus")
        docs.append(doc)
    # chunk
    return splitter.split_documents(docs)

def preprocess_fiqa_queries(queries_path: str) -> List[Document]:
    docs = []
    for rec in read_jsonl(queries_path):
        doc = record_to_document(rec, source="fiqa_queries")
        docs.append(doc)
    return docs

In [7]:
corpus_path = r"C:\Users\ainao\OneDrive\Project\Financial-Retrieval-Augmented-Generation\fiqa\corpus.jsonl"
queries_path = r"C:\Users\ainao\OneDrive\Project\Financial-Retrieval-Augmented-Generation\fiqa\queries.jsonl"

fiqa_chunked_corpus: List[Document] = preprocess_fiqa_corpus(corpus_path)
fiqa_queries: List[Document] = preprocess_fiqa_queries(queries_path)

pinecone_docs = fiqa_chunked_corpus

In [8]:
pinecone_docs[:10]

[Document(metadata={'source': 'fiqa_corpus', 'id': '3'}, page_content="I'm not saying I don't like the idea of on-the-job training too, but you can't expect the company to do that. Training workers is not their job - they're building software. Perhaps educational systems in the U.S. (or their students) should worry a little about getting marketable skills in exchange for their massive investment in education, rather than getting out with thousands in student debt and then complaining that they aren't qualified to do anything."),
 Document(metadata={'source': 'fiqa_corpus', 'id': '31'}, page_content="So nothing preventing false ratings besides additional scrutiny from the market/investors, but there are some newer controls in place to prevent institutions from using them. Under the DFA banks can no longer solely rely on credit ratings as due diligence to buy a financial instrument, so that's a plus. The intent being that if financial institutions do their own leg work then *maybe* they'

In [13]:
import os
from dotenv import load_dotenv, find_dotenv
from pathlib import Path
openai_api_key = os.environ.get("OPENAI_API_KEY")
pinecone_api_key = os.environ.get("PINECONE_API_KEY")

ENV_PATH = Path.cwd() / "env" / ".env" 
load_dotenv(find_dotenv(), override=False)
#load_dotenv(ENV_PATH)   # searches upward from the current working dir
print("Has OPENAI_API_KEY:", bool(os.getenv("OPENAI_API_KEY")))
print("Has PINECONE_API_KEY:", bool(os.getenv("PINECONE_API_KEY")))

Has OPENAI_API_KEY: True
Has PINECONE_API_KEY: True


In [16]:
from pinecone import Pinecone

pc = Pinecone(
        api_key=pinecone_api_key)

In [None]:
index_name = "fiqa-hybrid"
from pinecone import ServerlessSpec

if not pc.has_index(index_name):
    pc.create_index(
        name=index_name,
        dimension=384,
        metric="dotproduct",
        spec=ServerlessSpec(
            cloud="aws",
            region="us-east-1"
        )
    )

# Initialize index client
index = pc.Index(name=index_name)

# View index stats
index.describe_index_stats()

  from .autonotebook import tqdm as notebook_tqdm


{'dimension': 384,
 'index_fullness': 0.0,
 'metric': 'dotproduct',
 'namespaces': {},
 'total_vector_count': 0,
 'vector_type': 'dense'}

In [18]:
index

<pinecone.db_data.index.Index at 0x26106418080>

Embedding model

In [None]:
from langchain_community.embeddings import HuggingFaceEmbeddings

embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
embeddings

  embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development
Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


HuggingFaceEmbeddings(client=SentenceTransformer(
  (0): Transformer({'max_seq_length': 256, 'do_lower_case': False, 'architecture': 'BertModel'})
  (1): Pooling({'word_embedding_dimension': 384, 'pooling_mode_cls_token': False, 'pooling_mode_mean_tokens': True, 'pooling_mode_max_tokens': False, 'pooling_mode_mean_sqrt_len_tokens': False, 'pooling_mode_weightedmean_tokens': False, 'pooling_mode_lasttoken': False, 'include_prompt': True})
  (2): Normalize()
), model_name='sentence-transformers/all-MiniLM-L6-v2', cache_folder=None, model_kwargs={}, encode_kwargs={}, multi_process=False, show_progress=False)

BM encdoder

In [20]:
from pinecone_text.sparse import BM25Encoder

bm25_encoder = BM25Encoder().default()
bm25_encoder

[nltk_data] Downloading package punkt_tab to
[nltk_data]     C:\Users\ainao\AppData\Roaming\nltk_data...
[nltk_data]   Unzipping tokenizers\punkt_tab.zip.


<pinecone_text.sparse.bm25_encoder.BM25Encoder at 0x26156020260>

In [28]:
pinecone_docs[:5]

[Document(metadata={'source': 'fiqa_corpus', 'id': '3'}, page_content="I'm not saying I don't like the idea of on-the-job training too, but you can't expect the company to do that. Training workers is not their job - they're building software. Perhaps educational systems in the U.S. (or their students) should worry a little about getting marketable skills in exchange for their massive investment in education, rather than getting out with thousands in student debt and then complaining that they aren't qualified to do anything."),
 Document(metadata={'source': 'fiqa_corpus', 'id': '31'}, page_content="So nothing preventing false ratings besides additional scrutiny from the market/investors, but there are some newer controls in place to prevent institutions from using them. Under the DFA banks can no longer solely rely on credit ratings as due diligence to buy a financial instrument, so that's a plus. The intent being that if financial institutions do their own leg work then *maybe* they'

In [None]:
texts = []
metadatas = []
ids = []

for d in pinecone_docs:
    if getattr(d, "page_content", None):
        t = d.page_content.strip()
        if t:
            texts.append(t)
            metadatas.append(dict(d.metadata or {}))
            ids.append(str(d.metadata.get("id", len(ids))))

print(f"Prepared {len(texts)} texts"), print(f"{len(metadatas)}"), print(f"{len(ids)}")

Prepared 78139 texts
78139
78139


(None, None, None)

Encode text documents

In [42]:
from pinecone_text.sparse import BM25Encoder

bm25_encoder = BM25Encoder().default()
bm25_encoder.fit(texts)                 
#bm25_encoder.dump("bm25_values.json")

100%|██████████| 78139/78139 [01:37<00:00, 803.79it/s] 


<pinecone_text.sparse.bm25_encoder.BM25Encoder at 0x26160aeb2f0>

In [45]:
doc_sparse_list = bm25_encoder.encode_documents(texts) 

Upsert chunked documents

In [None]:
from langchain_community.retrievers import PineconeHybridSearchRetriever
keep_texts, keep_metas, keep_ids = [], [], []
for t, sp, m, i_ in zip(texts, doc_sparse_list, metadatas, ids):
    if sp and sp.get("indices") and sp.get("values"):
        keep_texts.append(t)
        keep_metas.append(m)
        keep_ids.append(i_)

print(f"Upserting {len(keep_texts)} / {len(texts)} chunks (non-empty sparse)")

retriever = PineconeHybridSearchRetriever(
    embeddings=embeddings,
    sparse_encoder=bm25_encoder,  # already fitted
    index=index
)

# This will upsert with both dense & sparse values
retriever.add_texts(keep_texts, metadatas=keep_metas, ids=keep_ids)

Upserting 78130 / 78139 chunks (non-empty sparse)


100%|██████████| 2442/2442 [1:42:14<00:00,  2.51s/it]  


Make a RAG Pipeline

In [None]:

from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain.chains import create_retrieval_chain
from langchain.chains import RetrievalQA
from langchain.chains import create_retrieval_chain
from langchain import hub

In [None]:
## ChatPrompt Template
from langchain_core.prompts import ChatPromptTemplate
prompt = ChatPromptTemplate.from_template("""
Answer the following question based only on the provided context. 
Think step by step before providing a detailed answer. 
I will tip you $1000 if the user finds the answer helpful. 
<context>
{context}
</context>
Question: {input}""")

llm = ChatOpenAI(model="gpt-4o-mini")
document_chain = create_stuff_documents_chain(llm, prompt)
rag_chain = create_retrieval_chain(retriever, document_chain)

query = "What are the SEC requirements for an accredited investor?"
result = rag_chain.invoke({"input": query})
print(result["answer"])

Failed to multipart ingest runs: langsmith.utils.LangSmithError: Failed to POST https://api.smith.langchain.com/runs/multipart in LangSmith API. HTTPError('403 Client Error: Forbidden for url: https://api.smith.langchain.com/runs/multipart', '{"error":"Forbidden"}\n')
Failed to multipart ingest runs: langsmith.utils.LangSmithError: Failed to POST https://api.smith.langchain.com/runs/multipart in LangSmith API. HTTPError('403 Client Error: Forbidden for url: https://api.smith.langchain.com/runs/multipart', '{"error":"Forbidden"}\n')


The SEC requirements for an accredited investor, as defined in Rule 501 of Regulation D, are as follows:

1. **Net Worth Criteria**: A natural person must have an individual net worth, or joint net worth with their spouse, that exceeds $1 million at the time of purchase. This calculation excludes the value of the person's primary residence.

2. **Income Criteria**: A natural person must have an income exceeding $200,000 per year (or $300,000 together with a spouse) for the last two years, with the expectation of earning the same or a higher income in the current year.

3. **Entities**: Certain entities qualify as accredited investors, including:
   - Banks, insurance companies, registered investment companies, business development companies, or small business investment companies.
   - Employee benefit plans with total assets exceeding $5 million, if a bank, insurance company, or registered investment adviser makes the investment decisions.
   - Charitable organizations, corporations, 

Failed to multipart ingest runs: langsmith.utils.LangSmithError: Failed to POST https://api.smith.langchain.com/runs/multipart in LangSmith API. HTTPError('403 Client Error: Forbidden for url: https://api.smith.langchain.com/runs/multipart', '{"error":"Forbidden"}\n')
