# Data Ingestion

In this tutorial we show how create a data ingenstion pipeline to add data to a vector database.

We are going to use `Pinecone` as the vector database, but there are other vector databases available too for example `Chroma, Weaviate, Faiss, etc.`

We will be doing the following in this session:
- How to load in documents.
- Add metadata to each document.
- How to use a text splitter to split documents.
- How to generate embeddings for each text chunk.
- How to insert into a vector database.


## Pinecone

You will need a [Pinecone](https://www.pinecone.io/) API key, you can [sign-up](https://app.pinecone.io/?sessionType=signup) for free to get a started account and then get the API key after sign-up.

## OpenAI

You will need an [OpenAI](https://openai.com/) api key for this session. Login to your [platform.openai.com](https://platform.openai.com/) account, click on your profile picture in the upper right corner, and choose 'API Keys' from the menu. Create an API key and save it.

## Environemnt Variables

Create a `.env` file in your project directory and save the following.

```
PINECONE_API_KEY = "<your api key>"
OPENAI_API_KEY = "<your api key>"
LANGCHAIN_API_KEY = "<your api key>"
```

### Load Environment variable

`python-dotenv` package can be used as shown below to load the `.env` file we just created and then using `os` module we can set the environemnt variables.

To install: `pip install python-dotenv`

In [1]:
import os

from dotenv import load_dotenv
load_dotenv()

os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = os.getenv("LANGCHAIN_API_KEY")
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")
os.environ["PINECONE_API_KEY"] = os.getenv("PINECONE_API_KEY")
os.environ['LANGCHAIN_PROJECT'] = os.getenv("LANGCHAIN_PROJECT")

## Setup a Pinecone Index

In [2]:
import time  
from pinecone import Pinecone, ServerlessSpec

INDEX_NAME = 'earning-calls-euclidean'
USE_SERVERLESS = True

# configure client  
pc = Pinecone(api_key=os.environ["PINECONE_API_KEY"])  

if USE_SERVERLESS:  
    spec = ServerlessSpec(cloud='aws', region='us-east-1')   
    # check if already exists
    if INDEX_NAME in pc.list_indexes().names():
        print(f"Index `{INDEX_NAME}` already exists")
        index = pc.Index(INDEX_NAME)
        print(index.describe_index_stats())
    # create a new index 
    else: 
        pass       
        # pc.create_index(
        #     INDEX_NAME,
        #     dimension=1536,  # dimensionality of text-embedding-ada-002
        #     metric='cosine',
        #     spec=spec
        # )
        # # wait for index to be initialized
        # while not pc.describe_index(INDEX_NAME).status['ready']:
        #     time.sleep(1)
        # print(f"Index with name `{INDEX_NAME}` is created")
        # index = pc.Index(INDEX_NAME)
        # print(index.describe_index_stats())

Index `earning-calls-euclidean` already exists
{'dimension': 1536,
 'index_fullness': 0.0,
 'namespaces': {'': {'vector_count': 3039}},
 'total_vector_count': 3039}


`Note:` In case you want to delete an already existing index then use the following `pc.delete_index(index_name)`

## Building an Ingestion Pipeline

### Importing the required packages

In [3]:
from langchain.text_splitter import RecursiveCharacterTextSplitter # To split the text into smaller chunks
from langchain_openai import OpenAIEmbeddings # To create embeddings
from langchain_pinecone import PineconeVectorStore # To connect with the Vectorstore
from langchain_community.document_loaders import DirectoryLoader # To load files in a directory
from langchain_community.document_loaders import PyPDFLoader # To parse the PDFs

In [4]:
DATA_DIR_PATH = "DATA_2/" # Path to the Data directory
CHUNK_SIZE = 1024
CHUNK_OVERLAP = 204
INDEX_NAME = 'earning-calls-euclidean'

`Note:` Make sure to maintain the below show directory structure since we will be using the Year and Quarter directory names in the metadata later.

<!-- ![Data Dir Tree](images/data_dir_tree.png) -->

<img src="images/data_dir_tree.png"/>

### Loading Files

Initialize a DirectoryLoader object and pass the `Path to data`, `the type of files to load from directory`, and `the loader_class` which in our case is PyPDFLoader since we are working with PDF files.

In [5]:
loader = DirectoryLoader(path=DATA_DIR_PATH, glob="**/*.pdf", loader_cls=PyPDFLoader)
docs = loader.load()

print(f"Total Documents loaded: {len(docs)}")

Total Documents loaded: 881


In [None]:
docs[0]

In [None]:
# we can convert the Document object to a python dict using the .dict() method.
print(f"keys associated with a Document: {docs[0].dict().keys()}")

In [None]:
print(f"{'-'*15}\nFirst 100 charachters of the page content: {docs[0].page_content[:100]}\n{'-'*15}")
print(f"Metadata associated with the document: {docs[0].metadata}\n{'-'*15}")
print(f"Datatype of the document: {docs[0].type}\n{'-'*15}")

In [None]:
#  We loop through each document and add additional metadata - filename, quarter, and year
for doc in docs:
    filename = doc.dict()['metadata']['source'].split("/")[-1]
    quarter = doc.dict()['metadata']['source'].split("/")[-2]
    year = doc.dict()['metadata']['source'].split("/")[-3]
    doc.metadata = {"filename": filename, "quarter": quarter, "year": year, "source": doc.dict()['metadata']['source'], "page": doc.dict()['metadata']['page']}

In [None]:
# To veryfy that the metadata is indeed added to the document
print(f"Metadata associated with the document: {docs[0].metadata}\n{'-'*15}")
print(f"Metadata associated with the document: {docs[1].metadata}\n{'-'*15}")

### Chunking Text

As the name suggests, chunking is the process of dividing a large amount of data into several smaller parts for more effective and meaningful storage.

There are various ways to perform chunking naming some as:
 - Character Chunking
 - Recursive Character Chunking
 - Document Specific Chunking

For the sake of this session we will be using the `Recursive Character Chunking` and langchain has an implemention that we can directly use. To read more about it you can refer to the [docs](https://python.langchain.com/v0.1/docs/modules/data_connection/document_transformers/recursive_text_splitter/)

`Additional Resource:` If you want to explore the different chunking stratigies than you can refer to the following docs from langchain - [Link to Docs](https://python.langchain.com/v0.1/docs/modules/data_connection/document_transformers/)

In [None]:
# Split text into chunks 
text_splitter = RecursiveCharacterTextSplitter(
     chunk_size=CHUNK_SIZE,
     chunk_overlap=CHUNK_OVERLAP
)

documents = text_splitter.split_documents(docs)

In [None]:
len(docs), len(documents)

In [None]:
documents[0:4]

In [None]:
embeddings = OpenAIEmbeddings(model = "text-embedding-ada-002") # Initialize the embedding model

In [None]:
docs_already_in_pinecone = input("Are the vectors already added in DB: (Type Y/N)")

# check if the documents were already added to the vector database
if docs_already_in_pinecone == "Y" or docs_already_in_pinecone == "y":
    docsearch = PineconeVectorStore(index_name=INDEX_NAME, embedding=embeddings)
    print("Existing Vectorstore is loaded")
# if not then add the documents to the vectore db
elif docs_already_in_pinecone == "N" or docs_already_in_pinecone == "n":
    docsearch = PineconeVectorStore.from_documents(documents, embeddings, index_name=INDEX_NAME)
    print("New vectorstore is created and loaded")
else:
    print("Please type Y - for yes and N - for no")

In [None]:
# Here we are defing how to use the loaded vectorstore as retriver
retriver = docsearch.as_retriever()

In [None]:
retriver.invoke("what is the income?")

#### Using metadata with retriver

In [None]:
retriver = docsearch.as_retriever(search_kwargs={"filter": {"quarter": "Q1"}, "k": 2})

In [None]:
retriver.invoke("what is the income?")