In [1]:
import sys
import os
import json
import time
import uuid
import logging
from typing import List, Dict, Union, Optional, Tuple, Any
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import re
import nltk
from nltk.tokenize import sent_tokenize
import seaborn as sns
from tqdm.auto import tqdm
from google.cloud import aiplatform
from google.cloud.aiplatform.gapic.schema import predict
import vertexai
from vertexai.language_models import TextGenerationModel
from vertexai.preview.generative_models import GenerativeModel
from langchain_google_vertexai import VertexAIEmbeddings, ChatVertexAI
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document

#!/usr/bin/env python
# coding: utf-8

# # Google Vertex AI LLM Data Processing Pipeline
# 
# A comprehensive demo showcasing how to build a data processing pipeline
# using Google Vertex AI and Llama 3.3 LLM.

# ## 1. Setup and Installation

# Install required packages
!pip install --quiet google-cloud-aiplatform pandas matplotlib seaborn nltk tqdm python-dotenv scikit-learn
!pip install --quiet langchain langchain-google-vertexai


# Data processing

# Text processing
nltk.download('punkt', quiet=True)

# Visualization
import matplotlib.pyplot as plt

# Progress tracking

# Google Cloud and Vertex AI

# LangChain integration

# Configure logging
logging.basicConfig(level=logging.INFO, 
                    format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# ## 2. Google Cloud Setup and Authentication

def setup_google_cloud(project_id: str, location: str = "us-central1") -> None:
    """
    Initialize Google Cloud and Vertex AI with proper authentication.
    
    Args:
        project_id: Google Cloud Project ID
        location: Google Cloud region (default: us-central1)
    """
    try:
        # Initialize Vertex AI
        vertexai.init(project=project_id, location=location)
        aiplatform.init(project=project_id, location=location)
        
        logger.info(f"Successfully initialized Vertex AI with project: {project_id} in {location}")
    except Exception as e:
        logger.error(f"Failed to initialize Google Cloud: {str(e)}")
        raise

# ## 3. Data Ingestion

def load_data(data_path: str, source_type: str = "local") -> pd.DataFrame:
    """
    Load data from a specified location (local or GCS).
    
    Args:
        data_path: Path to the CSV file
        source_type: 'local' or 'gcs'
    
    Returns:
        Pandas DataFrame containing the loaded data
    """
    try:
        if source_type.lower() == "local":
            df = pd.read_csv(data_path)
        elif source_type.lower() == "gcs":
            # For GCS paths, use pandas' ability to read directly from GCS with gs:// prefix
            df = pd.read_csv(f"gs://{data_path.lstrip('gs://')}")
        else:
            raise ValueError("Source type must be 'local' or 'gcs'")
        
        logger.info(f"Successfully loaded data from {data_path}: {df.shape[0]} rows, {df.shape[1]} columns")
        return df
    except Exception as e:
        logger.error(f"Error loading data: {str(e)}")
        raise

def validate_data(df: pd.DataFrame, text_column: str) -> pd.DataFrame:
    """
    Validate and perform basic cleaning on the dataset.
    
    Args:
        df: Input DataFrame
        text_column: Name of the column containing text data
    
    Returns:
        Validated and cleaned DataFrame
    """
    logger.info("Starting data validation...")
    
    # Check if text column exists
    if text_column not in df.columns:
        raise ValueError(f"Text column '{text_column}' not found in data")
    
    # Check for missing values in text column
    missing_count = df[text_column].isna().sum()
    if missing_count > 0:
        logger.warning(f"Found {missing_count} missing values in text column")
        df = df.dropna(subset=[text_column])
        logger.info(f"Dropped {missing_count} rows with missing text values")
    
    # Remove empty texts
    empty_count = (df[text_column].str.strip() == "").sum()
    if empty_count > 0:
        logger.warning(f"Found {empty_count} empty text fields")
        df = df[df[text_column].str.strip() != ""]
        logger.info(f"Dropped {empty_count} rows with empty text")
    
    # Basic statistics
    df['text_length'] = df[text_column].str.len()
    logger.info(f"Text length statistics: Min={df['text_length'].min()}, "
                f"Max={df['text_length'].max()}, "
                f"Avg={df['text_length'].mean():.1f}")
    
    return df

# ## 4. Text Preprocessing

def preprocess_text(text: str) -> str:
    """
    Clean and normalize text.
    
    Args:
        text: Raw input text
    
    Returns:
        Preprocessed text
    """
    # Convert to string just in case
    text = str(text)
    
    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text).strip()
    
    # Remove URLs
    text = re.sub(r'http\S+|www\S+|https\S+', '', text)
    
    # Remove HTML tags
    text = re.sub(r'<.*?>', '', text)
    
    return text

def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 100) -> List[str]:
    """
    Split text into chunks of appropriate size for LLM processing.
    
    Args:
        text: Text to split
        chunk_size: Maximum number of characters per chunk
        overlap: Number of characters to overlap between chunks
    
    Returns:
        List of text chunks
    """
    if not text or len(text) <= chunk_size:
        return [text] if text else []
    
    # Use sentence tokenization for more natural splits
    sentences = sent_tokenize(text)
    chunks = []
    current_chunk = []
    current_length = 0
    
    for sentence in sentences:
        sentence_len = len(sentence)
        
        # If adding this sentence exceeds chunk size, save current chunk and start new one
        if current_length + sentence_len > chunk_size and current_chunk:
            chunks.append(' '.join(current_chunk))
            
            # Keep some sentences for overlap
            overlap_size = 0
            overlap_sentences = []
            
            # Add sentences from the end until we reach desired overlap
            for s in reversed(current_chunk):
                if overlap_size + len(s) <= overlap:
                    overlap_sentences.insert(0, s)
                    overlap_size += len(s) + 1  # +1 for space
                else:
                    break
            
            current_chunk = overlap_sentences
            current_length = overlap_size
        
        current_chunk.append(sentence)
        current_length += sentence_len + 1  # +1 for space
    
    # Add the last chunk if it's not empty
    if current_chunk:
        chunks.append(' '.join(current_chunk))
    
    return chunks

def generate_embeddings(texts: List[str], project_id: str, location: str = "us-central1") -> List[List[float]]:
    """
    Generate embeddings for text chunks using Vertex AI embeddings.
    
    Args:
        texts: List of text chunks
        project_id: Google Cloud Project ID
        location: Google Cloud region
        
    Returns:
        List of embedding vectors
    """
    try:
        # Initialize embedding model
        embedding_model = VertexAIEmbeddings(
            model_name="textembedding-gecko@latest",
            project=project_id,
            location=location
        )
        
        # Generate embeddings (batch processing)
        logger.info(f"Generating embeddings for {len(texts)} text chunks")
        embeddings = embedding_model.embed_documents(texts)
        
        logger.info(f"Successfully generated embeddings of dimension {len(embeddings[0])}")
        return embeddings
    except Exception as e:
        logger.error(f"Error generating embeddings: {str(e)}")
        raise

# ## 5. Vertex AI and Llama 3.3 Integration

def initialize_llama_model(model_name: str = "llama-3-8b-text-001") -> Any:
    """
    Initialize the Llama 3.3 model on Vertex AI.
    
    Args:
        model_name: The specific Llama model to use
        
    Returns:
        Initialized model object
    """
    try:
        # Initialize the Llama model
        model = TextGenerationModel.from_pretrained(model_name)
        logger.info(f"Successfully initialized Llama model: {model_name}")
        return model
    except Exception as e:
        logger.error(f"Failed to initialize Llama model: {str(e)}")
        raise

def initialize_generative_model(model_name: str = "gemini-1.5-pro") -> Any:
    """
    Initialize a multimodal generative model (alternative to Llama).
    
    Args:
        model_name: The specific model to use
        
    Returns:
        Initialized model object
    """
    try:
        # Initialize model
        model = GenerativeModel(model_name)
        logger.info(f"Successfully initialized generative model: {model_name}")
        return model
    except Exception as e:
        logger.error(f"Failed to initialize generative model: {str(e)}")
        raise

# ## 6. LLM Operations

def process_chunk_with_llm(
    model: Any,
    text_chunk: str,
    prompt_template: str,
    temperature: float = 0.2,
    max_output_tokens: int = 1024,
    top_p: float = 0.95,
    top_k: int = 40
) -> Dict[str, Any]:
    """
    Send a text chunk to the LLM and get the response.
    
    Args:
        model: Initialized LLM model
        text_chunk: Text to process
        prompt_template: Template with {text} placeholder
        temperature: Controls randomness (lower is more deterministic)
        max_output_tokens: Maximum tokens to generate in response
        top_p: Nucleus sampling parameter
        top_k: Top-k sampling parameter
        
    Returns:
        Dictionary with LLM response and metadata
    """
    # Prepare the prompt by inserting the text into the template
    prompt = prompt_template.format(text=text_chunk)
    
    try:
        # Get response from the model
        start_time = time.time()
        
        response = model.predict(
            prompt,
            temperature=temperature,
            max_output_tokens=max_output_tokens,
            top_p=top_p,
            top_k=top_k
        )
        
        processing_time = time.time() - start_time
        
        return {
            "input_text": text_chunk,
            "response_text": response.text,
            "processing_time": processing_time,
            "prompt": prompt,
            "success": True,
            "timestamp": time.time()
        }
    except Exception as e:
        logger.error(f"Error processing chunk with LLM: {str(e)}")
        return {
            "input_text": text_chunk,
            "response_text": "",
            "error": str(e),
            "success": False,
            "timestamp": time.time()
        }

def batch_process_with_llm(
    model: Any,
    text_chunks: List[str],
    prompt_template: str,
    batch_size: int = 5,
    max_retries: int = 3,
    retry_delay: int = 2,
    **model_params
) -> List[Dict[str, Any]]:
    """
    Process multiple text chunks with LLM in batches with retry logic.
    
    Args:
        model: Initialized LLM model
        text_chunks: List of text chunks to process
        prompt_template: Template for prompt with {text} placeholder
        batch_size: Number of parallel requests
        max_retries: Maximum number of retries for failed requests
        retry_delay: Seconds to wait between retries
        model_params: Parameters for the LLM model
        
    Returns:
        List of response dictionaries
    """
    all_results = []
    
    for i in tqdm(range(0, len(text_chunks), batch_size), desc="Processing batches"):
        batch = text_chunks[i:i + batch_size]
        batch_results = []
        
        for chunk in batch:
            # Process with retry logic
            for attempt in range(max_retries + 1):
                result = process_chunk_with_llm(model, chunk, prompt_template, **model_params)
                
                if result["success"]:
                    batch_results.append(result)
                    break
                elif attempt < max_retries:
                    logger.warning(f"Retry {attempt + 1}/{max_retries} after error: {result.get('error', 'Unknown error')}")
                    time.sleep(retry_delay)
                else:
                    logger.error(f"Failed to process chunk after {max_retries} retries: {chunk[:100]}...")
                    batch_results.append(result)
        
        all_results.extend(batch_results)
        
        # Add a small delay between batches to avoid rate limiting
        if i + batch_size < len(text_chunks):
            time.sleep(1)
    
    # Log processing statistics
    success_count = sum(1 for r in all_results if r["success"])
    logger.info(f"Processing complete: {success_count}/{len(all_results)} chunks successful")
    
    return all_results

# ## 7. Results and Visualization

def analyze_results(results: List[Dict[str, Any]]) -> Dict[str, Any]:
    """
    Analyze the results from LLM processing.
    
    Args:
        results: List of LLM response dictionaries
        
    Returns:
        Dictionary with analysis metrics
    """
    # Filter successful results
    successful = [r for r in results if r["success"]]
    
    if not successful:
        logger.warning("No successful results to analyze")
        return {"success_rate": 0}
    
    # Extract basic metrics
    response_lengths = [len(r["response_text"]) for r in successful]
    processing_times = [r["processing_time"] for r in successful]
    
    analysis = {
        "success_rate": len(successful) / len(results) if results else 0,
        "total_chunks": len(results),
        "successful_chunks": len(successful),
        "avg_response_length": sum(response_lengths) / len(successful) if successful else 0,
        "min_response_length": min(response_lengths) if response_lengths else 0,
        "max_response_length": max(response_lengths) if response_lengths else 0,
        "avg_processing_time": sum(processing_times) / len(successful) if successful else 0,
        "total_processing_time": sum(processing_times) if successful else 0,
    }
    
    return analysis

def visualize_results(results: List[Dict[str, Any]], analysis: Dict[str, Any]):
    """
    Visualize the results from LLM processing.
    
    Args:
        results: List of LLM response dictionaries
        analysis: Analysis metrics from analyze_results function
    """
    # Filter successful results
    successful = [r for r in results if r["success"]]
    
    if not successful:
        logger.warning("No successful results to visualize")
        return
    
    # Set up the plotting area
    plt.figure(figsize=(15, 10))
    
    # 1. Success rate pie chart
    plt.subplot(2, 2, 1)
    labels = ['Success', 'Failed']
    sizes = [analysis['successful_chunks'], analysis['total_chunks'] - analysis['successful_chunks']]
    colors = ['#66b3ff', '#ff9999']
    plt.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%', startangle=90)
    plt.axis('equal')
    plt.title('Processing Success Rate')
    
    # 2. Response length distribution
    plt.subplot(2, 2, 2)
    response_lengths = [len(r["response_text"]) for r in successful]
    sns.histplot(response_lengths, kde=True, bins=20)
    plt.title('Distribution of Response Lengths')
    plt.xlabel('Response Length (characters)')
    plt.ylabel('Frequency')
    
    # 3. Processing time distribution
    plt.subplot(2, 2, 3)
    processing_times = [r["processing_time"] for r in successful]
    sns.histplot(processing_times, kde=True, bins=20)
    plt.title('Distribution of Processing Times')
    plt.xlabel('Processing Time (seconds)')
    plt.ylabel('Frequency')
    
    # 4. Input vs Output length scatter plot
    plt.subplot(2, 2, 4)
    input_lengths = [len(r["input_text"]) for r in successful]
    output_lengths = [len(r["response_text"]) for r in successful]
    plt.scatter(input_lengths, output_lengths, alpha=0.5)
    plt.title('Input Length vs. Output Length')
    plt.xlabel('Input Length (characters)')
    plt.ylabel('Output Length (characters)')
    
    plt.tight_layout()
    plt.savefig('llm_results_visualization.png')
    plt.show()
    
    logger.info("Visualizations generated and saved to 'llm_results_visualization.png'")

def save_results(results: List[Dict[str, Any]], output_path: str = "llm_results.json"):
    """
    Save processing results to file.
    
    Args:
        results: List of LLM response dictionaries
        output_path: File path for saving results
    """
    # Create a serializable version of the results
    serializable_results = []
    
    for result in results:
        # Create a clean copy without any non-serializable elements
        clean_result = {
            "input_text": result["input_text"][:100] + "..." if len(result["input_text"]) > 100 else result["input_text"],
            "response_text": result["response_text"],
            "success": result["success"],
            "processing_time": result.get("processing_time", None),
            "timestamp": result.get("timestamp", None)
        }
        
        if "error" in result:
            clean_result["error"] = str(result["error"])
            
        serializable_results.append(clean_result)
    
    # Save to file
    try:
        with open(output_path, 'w') as f:
            json.dump(serializable_results, f, indent=2)
        logger.info(f"Results saved to {output_path}")
    except Exception as e:
        logger.error(f"Error saving results: {str(e)}")

# ## 8. Example Pipeline Execution Function

def run_llm_processing_pipeline(
    data_path: str,
    text_column: str,
    project_id: str,
    prompt_template: str,
    location: str = "us-central1",
    source_type: str = "local",
    chunk_size: int = 1000,
    chunk_overlap: int = 100,
    model_name: str = "llama-3-8b-text-001",
    batch_size: int = 3,
    sample_size: Optional[int] = None
) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
    """
    Execute the full LLM processing pipeline.
    
    Args:
        data_path: Path to the data file
        text_column: Name of the column containing text
        project_id: Google Cloud Project ID
        prompt_template: Template for LLM prompts with {text} placeholder
        location: Google Cloud region
        source_type: Data source type ('local' or 'gcs')
        chunk_size: Maximum size of text chunks
        chunk_overlap: Overlap between chunks
        model_name: Vertex AI model name
        batch_size: Batch size for LLM processing
        sample_size: Optional limit on number of rows to process
        
    Returns:
        Tuple of (results list, analysis dictionary)
    """
    # 1. Setup
    setup_google_cloud(project_id, location)
    
    # 2. Data ingestion
    df = load_data(data_path, source_type)
    df = validate_data(df, text_column)
    
    # Optional sampling for testing
    if sample_size and sample_size < len(df):
        df = df.sample(sample_size, random_state=42)
        logger.info(f"Using sample of {sample_size} records for processing")
    
    # 3. Text preprocessing
    logger.info("Preprocessing text data...")
    df['processed_text'] = df[text_column].apply(preprocess_text)
    
    # 4. Text chunking
    logger.info(f"Chunking text with chunk_size={chunk_size}, overlap={chunk_overlap}")
    all_chunks = []
    for text in tqdm(df['processed_text'], desc="Chunking texts"):
        chunks = chunk_text(text, chunk_size=chunk_size, overlap=chunk_overlap)
        all_chunks.extend(chunks)
    
    logger.info(f"Created {len(all_chunks)} chunks from {len(df)} documents")
    
    # 5. Initialize Llama model
    model = initialize_llama_model(model_name)
    
    # 6. Process chunks with LLM
    logger.info(f"Processing {len(all_chunks)} chunks with LLM in batches of {batch_size}")
    results = batch_process_with_llm(
        model=model,
        text_chunks=all_chunks,
        prompt_template=prompt_template,
        batch_size=batch_size,
        temperature=0.2,
        max_output_tokens=1024
    )
    
    # 7. Analyze and visualize results
    analysis = analyze_results(results)
    logger.info(f"Analysis complete: {analysis['success_rate']:.1%} success rate")
    
    visualize_results(results, analysis)
    save_results(results, "llm_results.json")
    
    return results, analysis

# Example usage of the pipeline
if __name__ == "__main__":
    # Demo configuration
    PROJECT_ID = "your-gcp-project-id"  # Replace with actual project ID
    PROMPT_TEMPLATE = """
    Please analyze the following text and extract the main topics, key entities, and sentiment:
    
    {text}
    
    Response format:
    - Main topics: [List the key topics]
    - Key entities: [List important entities]
    - Sentiment: [Positive/Negative/Neutral with brief explanation]
    """
    
    # Run pipeline with small sample for demo
    results, analysis = run_llm_processing_pipeline(
        data_path="sample_data.csv",        # Replace with actual data path
        text_column="content",              # Replace with actual column name
        project_id=PROJECT_ID,
        prompt_template=PROMPT_TEMPLATE,
        sample_size=10                      # Small sample for demonstration
    )
    
    print(f"Pipeline execution complete. Processed {len(results)} text chunks.")
    print(f"Success rate: {analysis['success_rate']:.1%}")

ModuleNotFoundError: No module named 'sklearn'