In [1]:
import os

In [2]:
os.chdir("../")

In [3]:
%pwd

'c:\\Users\\Maza\\Desktop\\Pinecone_pipeline'

In [15]:
from dataclasses import dataclass
from pathlib import Path
from vector_db_pipeline.utils.common import read_yaml, create_directories
from vector_db_pipeline.constants import *


@dataclass(frozen=True)
class DataUploadConfig:
    root_dir: Path
    read_data_dir: Path
    STATUS_FILE: str
    index_info: dict
    batch_size: int

In [31]:
class ConfigurationManager:
    def __init__(
        self,
        config_filepath = CONFIG_FILE_PATH,
        schema_filepath = SCHEMA_FILE_PATH,
        params_filepath = PARAMS_FILE_PATH):
        self.config = read_yaml(config_filepath)
        self.schema = read_yaml(schema_filepath)
        self.params = read_yaml(params_filepath)
        

    
    def get_data_upload_config(self) -> DataUploadConfig:
        config = self.config.data_load
        index_info = self.params.INDEX_INFO
        batch_size = self.params.BATCH_SIZE
    

        create_directories([config.root_dir])

        data_upload_config = DataUploadConfig(
            root_dir=config.root_dir,
            read_data_dir=config.read_data_dir,
            STATUS_FILE=config.STATUS_FILE,
            index_info=index_info,
            batch_size=batch_size
        )

        return data_upload_config

In [32]:
import pandas as pd
from langchain.embeddings.openai import OpenAIEmbeddings
from pinecone import Pinecone, PodSpec
import math
from dotenv import load_dotenv
from vector_db_pipeline.utils.common import read_yaml, create_directories
from vector_db_pipeline.constants import *
from vector_db_pipeline import logger
import time
from pathlib import Path


In [33]:
"""
Handles data upload to Pinecone indexes.

Attributes:
    config (DataUploadConfig): Configuration object containing settings for data upload.
    params (dict): Dictionary containing parameters required for data upload.

Methods:
    del_index(): Deletes the specified index if it exists.
    recreate_index(): Recreates the index with specified dimensions, metric, and environment.
    pinecon_vector(): Converts data from CSV to a list of JSON objects.
    batch_upload(pinecone_vector): Uploads vectors to a Pinecone index in batches.
"""
class DataUpload:
    def __init__(self, config: DataUploadConfig):
        """
        Initializes DataUpload class with provided configuration and parameters.

        Args:
            config (DataUploadConfig): Configuration object containing settings for data upload.
            params_filepath (str): Filepath to parameters file. Defaults to PARAMS_FILE_PATH.
        """


        self.config = config

        #initialize db
        load_dotenv()
        pinecone_api_key = os.getenv("PINECONE_API_KEY")
        self.pc = Pinecone(api_key=pinecone_api_key)
        self.index_info = self.config.index_info
        self.index_name = self.index_info.INDEX_NAME
        
        
     
    def del_index(self):
        """
        Deletes the specified index if it exists.
        """
        if self.index_name in [index_info["name"] for index_info in self.pc.list_indexes()]:
            self.pc.delete_index(self.index_name)
            logger.info(f"Index '{self.index_name}' deleted ")

    def recreate_index(self):
        """
        Recreates the index with specified dimensions, metric, and environment.
        """

        dim = self.index_info.DIMENSIONS
        met = self.index_info.METRIC
        env = self.index_info.ENVIROMENT
        existing_indexes = [index_info["name"] for index_info in self.pc.list_indexes()]
        
        # Check if index already exists
        if self.index_name not in existing_indexes:
            # Create index if it doesn't exist
            self.pc.create_index(
                name=self.index_name,
                dimension=dim,
                metric=met,
                spec=PodSpec(
                    environment=env
                )
            )
            # Wait for index to be initialized
            while not self.pc.describe_index(self.index_name).status['ready']:
                time.sleep(1)
        index = self.pc.Index(self.index_name)
        logger.info("Index created")
        logger.info(index.describe_index_stats())

    def pinecon_vector(self): 
        """
        Converts data from CSV to a list of JSON objects.

        Returns:
            pinecone_vect (list): List of JSON objects representing each row of the dataframe.
        """
        data_read_path = self.config.read_data_dir
        df = pd.read_json(data_read_path, orient='records')
        pinecone_vect = []
        
        for i, row in df.iterrows():
            id = row['id']
            vectors = row['values']
            text = row['text']
            host = row['host']
            page_title = row['page_title']
            url = row['url']
            # Create a dictionary for the metadata containing 'text', 'host', 'page_title', and 'url'
            metadata = {'text': text, 'host': host, 'page_title': page_title, 'url': url}
            # Create a dictionary for the JSON object containing 'id', 'values', and 'metadata'
            emb_vect = {'id': id, 'values': vectors, 'metadata': metadata}
            
            pinecone_vect.append(emb_vect)
        logger.info(f"Data ready for upload")
        with open(self.config.STATUS_FILE, 'a') as f:
            f.write(f"Data size: {len(pinecone_vect)}\n")
        # Return the list of JSON objects
        return pinecone_vect

    def batch_upload(self, pinecone_vector):
        """
        Uploads vectors to a Pinecone index in batches.

        Args:
            pinecone_vector (list): List of JSON objects representing vectors to be uploaded.
        """
        # Determine the batch size and total number of data points
        batch_size = self.config.batch_size.BATCH_SIZE 
        namespace = self.index_info.NAMESPACE
        
        index = self.pc.Index(self.index_name)
        data_size = len(pinecone_vector)
        
        
        # Calculate the number of batches required
        batch_num = math.ceil(data_size / batch_size)
        logger.info(f"Uploading: {data_size} vectors, in {batch_num} batches")
        
        # Iterate over each batch
        for i in range(batch_num):
 
            try:
                # Calculate the start and end indices for the current batch
                start_idx = i * batch_size
                end_idx = min((i + 1) * batch_size, len(pinecone_vector))
            
                batch_vectors = pinecone_vector[start_idx:end_idx]
                
                # Upload the vectors to the Pinecone index
                index.upsert(vectors=batch_vectors, namespace=namespace)
                logger.info(f"Batch {i+1} uploaded")
            except Exception as e:
                logger.info(f"Error encountered: {e}")
        
        time.sleep(30)
        logger.info(index.describe_index_stats())
        with open(self.config.STATUS_FILE, 'a') as f:
            f.write(f"Data upload completed\n")


In [35]:
# __main__ section
if __name__ == "__main__":
    try:
        config = ConfigurationManager()
        data_upload_config = config.get_data_upload_config()
        data_upload = DataUpload(config=data_upload_config)
        # data_upload.del_index()
        # data_upload.recreate_index()
        pinecone_vector = data_upload.pinecon_vector()
        data_upload.batch_upload(pinecone_vector)
    except Exception as e:
        raise e
    

[2024-04-15 10:49:38,839: INFO: common: yaml file: config\config.yaml loaded successfully:]
[2024-04-15 10:49:38,842: INFO: common: yaml file: schema.yaml loaded successfully:]
[2024-04-15 10:49:38,846: INFO: common: yaml file: params.yaml loaded successfully:]
[2024-04-15 10:49:38,849: INFO: common: Directory already exists: artifacts/data_upload:]
[2024-04-15 10:49:38,905: INFO: 2620151098: Data ready for upload:]
[2024-04-15 10:49:38,911: INFO: 2620151098: Uploading: 236 vectors, in 2 batches:]
[2024-04-15 10:49:41,778: INFO: 2620151098: Batch 1 uploaded:]
[2024-04-15 10:49:43,294: INFO: 2620151098: Batch 2 uploaded:]
[2024-04-15 10:50:13,469: INFO: 2620151098: {'dimension': 1536,
 'index_fullness': 0.00236,
 'namespaces': {'blog': {'vector_count': 236}},
 'total_vector_count': 236}:]


In [28]:
pinecone_vector

[{'id': 'blog#0',
  'values': [0.0073613964,
   -0.0216906355,
   0.0233809822,
   -0.0282842783,
   -0.0153937706,
   0.026761675800000002,
   0.0323617577,
   0.0044129669,
   -0.018297037500000002,
   -0.0200648051,
   0.0292907456,
   0.0337811343,
   -0.0081678603,
   -0.0307875412,
   -0.0090581954,
   0.014245367,
   0.027432654600000002,
   -0.000910497,
   -0.013167932,
   -0.0306068926,
   -0.017277669500000002,
   -0.0198970609,
   0.0085743174,
   -0.0017193797,
   -0.0173679929,
   0.0001874019,
   0.0193938282,
   -0.0084646383,
   -0.015355060200000001,
   -0.0160002302,
   0.0049194257000000005,
   0.010425957100000001,
   -0.0059323438,
   -0.0030323018,
   -0.0008229957,
   0.012896959900000001,
   0.0134066448,
   0.0063775113,
   0.0289036433,
   -0.0153163498,
   0.0168776627,
   0.011825977100000001,
   -0.015664741700000002,
   -0.0201938399,
   0.019368021200000002,
   0.0207744934,
   -0.0041290918,
   0.013871168100000001,
   0.0193293117,
   0.0340392,
   0.0