In [0]:
# %pip install python-dotenv pinecone_haystack haystack-ai markdown-it-py mdit_plain chromadb pinecone boto3 chroma-haystack 

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
# %pip install --upgrade nltk==3.9.1

Collecting nltk==3.9.1
  Downloading nltk-3.9.1-py3-none-any.whl.metadata (2.9 kB)
Downloading nltk-3.9.1-py3-none-any.whl (1.5 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.5 MB[0m [31m?[0m eta [36m-:--:--[0m
[2K   [91m━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.2/1.5 MB[0m [31m5.0 MB/s[0m eta [36m0:00:01[0m
[2K   [91m━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.5/1.5 MB[0m [31m5.4 MB/s[0m eta [36m0:00:01[0m
[2K   [91m━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━[0m [32m0.8/1.5 MB[0m [31m6.6 MB/s[0m eta [36m0:00:01[0m
[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━[0m [32m1.2/1.5 MB[0m [31m7.6 MB/s[0m eta [36m0:00:01[0m
[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m1.5/1.5 MB[0m [31m8.2 MB/s[0m eta [36m0:00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m7.3 MB/s[0m e

In [0]:
import nltk
nltk.download('punkt', download_dir='/dbfs/mnt/nltk_data')
nltk.data.path.append('dbfs:/mnt/nltk_data')

[nltk_data] Downloading package punkt to /dbfs/mnt/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [0]:
import boto3
from botocore.exceptions import ClientError, EndpointConnectionError
from haystack import Pipeline
from haystack.document_stores.types import DuplicatePolicy
from haystack_integrations.document_stores.chroma import ChromaDocumentStore
from haystack_integrations.components.retrievers.pinecone import PineconeEmbeddingRetriever
from haystack_integrations.document_stores.pinecone import PineconeDocumentStore
from haystack.components.converters import TextFileToDocument
from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter, RecursiveDocumentSplitter
from haystack.components.embedders import OpenAIDocumentEmbedder, OpenAITextEmbedder
from haystack.components.writers import DocumentWriter
from haystack.components.builders import PromptBuilder, ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack.components.converters import MarkdownToDocument
from haystack.dataclasses.byte_stream import ByteStream
import requests
from dotenv import load_dotenv
import os
load_dotenv()


True

In [0]:
def get_s3_client():
    try:
        s3_client = boto3.client(
        's3', 
        aws_access_key_id=os.getenv('AWS_ACCESS_KEY'), 
        aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
        region_name=os.getenv('AWS_REGION')  
        )
        return s3_client
    except:
        return -1
    
    
def read_markdown_from_s3(s3_client, year, qtr, file_url):
    bucket_name, aws_region = os.getenv('BUCKET_NAME') , os.getenv('AWS_REGION') 
    if bucket_name is None or aws_region is None:
        return -1
    try:
        tool_path=dbutils.widgets.get("tool")
        file_name = file_url.split("/")[-1]
        response = s3_client.get_object(Bucket=bucket_name, Key=f'{year}/{qtr}/{tool_path}/{file_name}')
        markdown_content = response["Body"].read()  # Decode bytes to string .decode("utf-8")           
        return markdown_content, file_url
    except ClientError as e:
        if e.response['Error']['Code'] == "NoSuchKey":
            print("Error: The specified file does not exist.")
        else:
            print(f"ClientError: {e}")
        return -1
    except EndpointConnectionError as e:
        print("Error: Could not connect to the S3 endpoint. Check your configuration.")
        return -2
    except Exception as e:
        print(f"Unexpected error occurred: {e}")
        return -999

In [0]:
response = requests.get('https://rag-pipeline-data.s3.us-east-2.amazonaws.com/metadata/metadata_s3url.json')
metadata = response.json()

In [0]:
print(dbutils.widgets.get("tool"))

In [0]:
markdown_streams=[]
for year, metadata_value in metadata.items():
    for qtr, tool in metadata_value.items():
        file_name = tool[dbutils.widgets.get("tool")]
        markdown_bytes = read_markdown_from_s3(get_s3_client(), year, qtr, file_name)[0]
        markdown_stream = ByteStream(data=markdown_bytes, mime_type="text/markdown",meta={'year':year, 'qtr':qtr})
        markdown_streams.append(markdown_stream)

In [0]:
pinecone_document_store_cs1 = PineconeDocumentStore(index="nvidia-vectors", namespace="nvidia_cs_1", dimension=1536)
pinecone_document_store_cs2 = PineconeDocumentStore(index="nvidia-vectors", namespace="nvidia_cs_2", dimension=1536)
pinecone_document_store_cs3 = PineconeDocumentStore(index="nvidia-vectors", namespace="nvidia_cs_3", dimension=1536)

chroma_document_store_cs1 = ChromaDocumentStore(host="34.31.232.10", port="8000", collection_name="nvidia_cs_1")
chroma_document_store_cs2 = ChromaDocumentStore(host="34.31.232.10", port="8000", collection_name="nvidia_cs_2")
chroma_document_store_cs3 = ChromaDocumentStore(host="34.31.232.10", port="8000", collection_name="nvidia_cs_3")

converter = MarkdownToDocument()
cleaner = DocumentCleaner()
splitter_cs1 = DocumentSplitter(split_by="sentence", split_length=5)
splitter_cs2=RecursiveDocumentSplitter(
        split_length=400,
        split_overlap=40,
        split_unit="word",
        separators=["\n\n", "\n", "sentence", " "])
splitter_cs3=RecursiveDocumentSplitter(
        split_length=1200,
        split_overlap=120,
        split_unit="char",
        separators=["\n\n", "\n", "sentence", " "])

embedder_cs1 = OpenAIDocumentEmbedder(model="text-embedding-3-small", meta_fields_to_embed=["year", "qtr"], dimensions=1536)
embedder_cs2 = OpenAIDocumentEmbedder(model="text-embedding-3-small", meta_fields_to_embed=["year", "qtr"], dimensions=1536)
embedder_cs3 = OpenAIDocumentEmbedder(model="text-embedding-3-small", meta_fields_to_embed=["year", "qtr"], dimensions=1536)

pinecone_writer_cs1 = DocumentWriter(pinecone_document_store_cs1, DuplicatePolicy.OVERWRITE )
pinecone_writer_cs2 = DocumentWriter(pinecone_document_store_cs2, DuplicatePolicy.OVERWRITE )
pinecone_writer_cs3 = DocumentWriter(pinecone_document_store_cs3, DuplicatePolicy.OVERWRITE )

chroma_writer_cs1 = DocumentWriter(chroma_document_store_cs1, DuplicatePolicy.OVERWRITE)
chroma_writer_cs2 = DocumentWriter(chroma_document_store_cs2, DuplicatePolicy.OVERWRITE)
chroma_writer_cs3 = DocumentWriter(chroma_document_store_cs3, DuplicatePolicy.OVERWRITE)

indexing_pipeline = Pipeline()
indexing_pipeline.add_component("converter", converter)
indexing_pipeline.add_component("cleaner", cleaner)
indexing_pipeline.add_component("splitter_cs1", splitter_cs1)
indexing_pipeline.add_component("splitter_cs2", splitter_cs2)
indexing_pipeline.add_component("splitter_cs3", splitter_cs3)
indexing_pipeline.add_component("embedder_cs1", embedder_cs1)
indexing_pipeline.add_component("embedder_cs2", embedder_cs2)
indexing_pipeline.add_component("embedder_cs3", embedder_cs3)

indexing_pipeline.connect("converter.documents", "cleaner.documents")
indexing_pipeline.connect("cleaner.documents", "splitter_cs1.documents")
indexing_pipeline.connect("cleaner.documents", "splitter_cs2.documents")
indexing_pipeline.connect("cleaner.documents", "splitter_cs3.documents")
indexing_pipeline.connect("splitter_cs1.documents", "embedder_cs1.documents")
indexing_pipeline.connect("splitter_cs2.documents", "embedder_cs2.documents")
indexing_pipeline.connect("splitter_cs3.documents", "embedder_cs3.documents")

data=indexing_pipeline.run(data={"sources": markdown_streams})



Converting markdown files to Documents:   0%|          | 0/1 [00:00<?, ?it/s][A[A

Converting markdown files to Documents: 100%|██████████| 1/1 [00:00<00:00,  2.73it/s][A[AConverting markdown files to Documents: 100%|██████████| 1/1 [00:00<00:00,  2.72it/s]


Calculating embeddings: 0it [00:00, ?it/s][A[A

Calculating embeddings: 1it [00:02,  2.89s/it][A[A

Calculating embeddings: 2it [00:05,  2.58s/it][A[A

Calculating embeddings: 3it [00:07,  2.64s/it][A[A

Calculating embeddings: 4it [00:10,  2.60s/it][A[A

Calculating embeddings: 5it [00:12,  2.49s/it][A[A

Calculating embeddings: 6it [00:15,  2.45s/it][A[A

Calculating embeddings: 7it [00:17,  2.46s/it][A[A

Calculating embeddings: 8it [00:20,  2.62s/it][A[A

Calculating embeddings: 9it [00:22,  2.51s/it][A[A

Calculating embeddings: 10it [00:25,  2.51s/it][A[A

Calculating embeddings: 11it [00:28,  2.68s/it][A[A

Calculating embeddings: 12it [00:30,  2.59s/it][A[A

Calculating embe

[Trace(request_id=tr-b49492776b2a44cd92dda35f9740c6fc), Trace(request_id=tr-c0051f5ba1e145cd861f30db25fa94a1), Trace(request_id=tr-b6ff5a2309e24dc68bf48bf0232ee69a), Trace(request_id=tr-77dd1cc217534d0da1e8e4a8f1ff2be3), Trace(request_id=tr-0576f68231684e9f8be51720c01f8469), Trace(request_id=tr-7e9c1cab790949a0ade3d62012564f1f), Trace(request_id=tr-32a1d3a133e64b9393b666709d098462), Trace(request_id=tr-48605a0c40ff4e4d90f9427e5f762f1e), Trace(request_id=tr-512412d0f65e4d6e87d09291caa214d0), Trace(request_id=tr-ad25cbb07414454bbf1e1cafacaafe6c)]

In [0]:
docs=[]
for doc in data['embedder_cs1']['documents']:
    if '_split_overlap' in doc.meta:
        doc.meta.pop('_split_overlap')
    if doc.embedding:
        docs.append(doc)
pinecone_document_store_cs1.write_documents(docs)

Upserted vectors:   0%|          | 0/397 [00:00<?, ?it/s]

397

In [0]:
docs=[]
for doc in data['embedder_cs2']['documents']:
    if '_split_overlap' in doc.meta:
        doc.meta.pop('_split_overlap')
    if doc.embedding:
        docs.append(doc)
pinecone_document_store_cs2.write_documents(docs)

Upserted vectors:   0%|          | 0/290 [00:00<?, ?it/s]

290

In [0]:
docs=[]
for doc in data['embedder_cs3']['documents']:
    if '_split_overlap' in doc.meta:
        doc.meta.pop('_split_overlap')
    if doc.embedding:
        docs.append(doc)
pinecone_document_store_cs3.write_documents(docs)

Upserted vectors:   0%|          | 0/882 [00:00<?, ?it/s]

882

In [0]:
docs=[]
for doc in data['embedder_cs1']['documents']:
    if '_split_overlap' in doc.meta:
        doc.meta.pop('_split_overlap')
    if doc.embedding:
        docs.append(doc)
chroma_document_store_cs1.write_documents(docs)

397

In [0]:
docs=[]
for doc in data['embedder_cs2']['documents']:
    if '_split_overlap' in doc.meta:
        doc.meta.pop('_split_overlap')
    if doc.embedding:
        docs.append(doc)
chroma_document_store_cs2.write_documents(docs)

290

In [0]:
docs=[]
for doc in data['embedder_cs3']['documents']:
    if '_split_overlap' in doc.meta:
        doc.meta.pop('_split_overlap')
    if doc.embedding:
        docs.append(doc)
chroma_document_store_cs3.write_documents(docs)

882