In [None]:
!pip install pinecone sentence-transformers boto3 tqdm

In [None]:
import os
import boto3
from pinecone import Pinecone, ServerlessSpec
from sentence_transformers import SentenceTransformer
from tqdm import tqdm
import time
import logging
import re

# Set up logging to both console and S3
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Function to parse S3 URI
def parse_s3_uri(s3_uri):
    """
    Parse an S3 URI to extract bucket name and prefix
    
    Args:
        s3_uri (str): S3 URI in the format s3://bucket-name/prefix/path/
        
    Returns:
        tuple: (bucket_name, prefix)
    """
    pattern = r's3://([^/]+)/?(.*)'
    match = re.match(pattern, s3_uri)
    if match:
        bucket_name = match.group(1)
        prefix = match.group(2)
        return bucket_name, prefix
    else:
        raise ValueError(f"Invalid S3 URI format: {s3_uri}")

# Custom S3 log handler class
class S3LogHandler(logging.Handler):
    def __init__(self, s3_uri, interval=60):
        """
        Initialize S3 log handler that uploads logs to S3 at regular intervals
        
        Args:
            s3_uri (str): S3 URI for log storage
            interval (int): Upload interval in seconds
        """
        super().__init__()
        
        # Parse S3 URI to get bucket and prefix
        self.bucket_name, self.log_prefix = parse_s3_uri(s3_uri)
        
        # Make sure prefix ends with a slash
        if self.log_prefix and not self.log_prefix.endswith('/'):
            self.log_prefix += '/'
            
        self.interval = interval
        self.buffer = []
        self.last_upload_time = time.time()
        
        # Create S3 client
        self.s3_client = boto3.client('s3')
        
        # Set formatter
        self.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
        
        # Create a unique log file name with timestamp
        timestamp = time.strftime("%Y%m%d-%H%M%S")
        self.log_filename = f"vector_embedding_process_{timestamp}.log"
        
        logger.info(f"S3 logging enabled. Logs will be uploaded to s3://{self.bucket_name}/{self.log_prefix}{self.log_filename}")
    
    def emit(self, record):
        """Process a log record by adding it to buffer and uploading if needed"""
        log_entry = self.format(record)
        self.buffer.append(log_entry)
        
        # Upload logs if interval has passed
        current_time = time.time()
        if current_time - self.last_upload_time >= self.interval:
            self.upload_logs()
    
    def upload_logs(self):
        """Upload accumulated logs to S3"""
        if not self.buffer:
            return
            
        log_content = "\n".join(self.buffer)
        s3_key = f"{self.log_prefix}{self.log_filename}"
        
        try:
            self.s3_client.put_object(
                Bucket=self.bucket_name,
                Key=s3_key,
                Body=log_content
            )
            self.last_upload_time = time.time()
            print(f"Logs uploaded to s3://{self.bucket_name}/{s3_key}")
            # Don't clear buffer as we want to keep the full log history in each upload
            # (appending approach)
        except Exception as e:
            # Log to console since we can't use logger here (would cause recursion)
            print(f"Error uploading logs to S3: {e}")
    
    def flush(self):
        """Upload any remaining logs when handler is closed"""
        self.upload_logs()
        self.buffer = []

# Function to set up S3 logging
def setup_s3_logging(s3_uri, interval=60):
    """
    Set up logging to S3
    
    Args:
        s3_uri (str): S3 URI for log files in format s3://bucket/prefix
        interval (int): Interval in seconds to upload logs to S3
    """
    try:
        # Create and add the S3 handler
        s3_handler = S3LogHandler(s3_uri, interval)
        logger.addHandler(s3_handler)
        
        return s3_handler
    except Exception as e:
        logger.error(f"Failed to set up S3 logging: {e}")
        return None

# Environment variables 
os.environ["AWS_ACCESS_KEY_ID"] = "AKIA47CRW2KZIK6G4JDN"
os.environ["AWS_SECRET_ACCESS_KEY"] = "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"
os.environ["AWS_REGION"] = "us-east-2"  
os.environ["PINECONE_API_KEY"] = "pcsk_3rWgJE_FzWZNr3H2bxL5nyTHJgkMS16qhJWEBXWupJuFm8M5gUKTGittSVeG2VLSFfH2RR"
os.environ["S3_LOG_URI"] = "s3://tyson-chatbot-pipeline-storage/logs/Embedding_logs/"  

# Initialize DynamoDB client
def init_dynamodb():
    try:
        dynamodb = boto3.resource('dynamodb')
        return dynamodb
    except Exception as e:
        logger.error(f"Failed to initialize DynamoDB: {e}")
        raise

# Load embedding model
def load_embedding_model():
    try:
        # Load the all-MiniLM-L6-v2 model (outputs 384-dimensional embeddings)
        model = SentenceTransformer('all-MiniLM-L6-v2')
        return model
    except Exception as e:
        logger.error(f"Failed to load embedding model: {e}")
        raise

# Initialize Pinecone
def init_pinecone(index_name="neil-degrasse-tyson-embeddings"):
    try:
        # Initialize Pinecone client (v3 format for serverless)
        from pinecone import Pinecone, ServerlessSpec
        
        pc = Pinecone(api_key=os.environ["PINECONE_API_KEY"])
        
        # Check if the index exists
        indexes = pc.list_indexes()
        index_exists = any(index.name == index_name for index in indexes)
        
        # If index doesn't exist, create it
        if not index_exists:
            # Create a serverless index
            index_config = pc.create_index(
                name=index_name,
                dimension=384,  # all-MiniLM-L6-v2 produces 384-dimensional vectors
                metric="cosine",
                spec=ServerlessSpec(
                    cloud="aws",
                    region="us-east-1"  # You can change this to your preferred region
                )
            )
            logger.info(f"Created new Pinecone serverless index: {index_name}")
            # Wait for index to be ready
            while not pc.describe_index(index_name).status.ready:
                logger.info("Waiting for index to be ready...")
                time.sleep(5)
        else:
            index_config = pc.describe_index(index_name)
            logger.info(f"Using existing Pinecone index: {index_name}")
        
        # Connect to the index
        index = pc.Index(host=index_config.host)
        return index
    except Exception as e:
        logger.error(f"Failed to initialize Pinecone: {e}")
        raise

# Scan DynamoDB table for all items
def get_all_chunks(dynamodb, table_name="ChunkDB"):
    try:
        table = dynamodb.Table(table_name)
        response = table.scan()
        items = response['Items']
        
        # Handle pagination if there are more items
        while 'LastEvaluatedKey' in response:
            response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
            items.extend(response['Items'])
        
        logger.info(f"Retrieved {len(items)} chunks from DynamoDB")
        return items
    except Exception as e:
        logger.error(f"Failed to retrieve chunks from DynamoDB: {e}")
        raise

# Create embeddings for chunks and upload to Pinecone
def process_chunks_to_pinecone(chunks, model, index, batch_size=100):
    try:
        total_chunks = len(chunks)
        total_batches = (total_chunks + batch_size - 1) // batch_size
        
        for batch_idx in tqdm(range(total_batches), desc="Processing batches"):
            start_idx = batch_idx * batch_size
            end_idx = min((batch_idx + 1) * batch_size, total_chunks)
            
            batch_chunks = chunks[start_idx:end_idx]
            
            # Extract text from chunks
            texts = [chunk.get('text', '') for chunk in batch_chunks]
            
            # Create embeddings for the batch
            embeddings = model.encode(texts)
            
            # Prepare data for Pinecone
            vector_data = []
            for i, (chunk, embedding) in enumerate(zip(batch_chunks, embeddings)):
                chunk_id = chunk.get('chunk_id', f'unknown-{start_idx + i}')
                
                # Prepare metadata (all columns except the vector itself)
                metadata = {
                    'source': chunk.get('source', ''),
                    'text': chunk.get('text', ''),
                    'timestamp': chunk.get('timestamp', ''),
                    'title': chunk.get('title', '')
                }
                
                # Add to vector data
                vector_data.append({
                    'id': chunk_id,
                    'values': embedding.tolist(),
                    'metadata': metadata
                })
            
            # Upsert vectors to Pinecone
            if vector_data:
                index.upsert(vectors=vector_data)
                logger.info(f"Uploaded batch {batch_idx + 1}/{total_batches} to Pinecone")
            
            # Small delay to avoid rate limiting
            time.sleep(0.5)
        
        logger.info(f"Successfully processed all {total_chunks} chunks to Pinecone")
    except Exception as e:
        logger.error(f"Error processing chunks to Pinecone: {e}")
        raise

# Main function
def main():
    try:
        logger.info("Starting DynamoDB to Pinecone embedding process")
        
        # Set up S3 logging if S3 URI is provided
        s3_log_uri = os.environ.get("S3_LOG_URI")
        if s3_log_uri:
            s3_handler = setup_s3_logging(s3_log_uri, interval=30)  # Upload logs every 30 seconds
            # Make sure to flush logs to S3 at the end
            if s3_handler:
                logger.info(f"Logs will be uploaded to {s3_log_uri}")
        
        # Initialize services
        dynamodb = init_dynamodb()
        model = load_embedding_model()
        index = init_pinecone()
        
        # Get all chunks from DynamoDB
        chunks = get_all_chunks(dynamodb)
        
        # Process chunks, create embeddings, and upload to Pinecone
        process_chunks_to_pinecone(chunks, model, index)
        
        # Get and display stats from Pinecone
        stats = index.describe_stats()
        logger.info(f"Pinecone index stats: {stats}")
        
        logger.info("Process completed successfully")
        
        # Ensure logs are flushed to S3 before exiting
        if s3_log_uri and 's3_handler' in locals() and s3_handler:
            logger.info("Flushing final logs to S3...")
            s3_handler.flush()
            logger.info("Logs successfully uploaded to S3")
            
    except Exception as e:
        logger.error(f"Process failed with error: {e}")
        # Ensure logs are flushed to S3 in case of error
        if 's3_handler' in locals() and s3_handler:
            logger.info("Flushing error logs to S3...")
            s3_handler.flush()

if __name__ == "__main__":
    main()