# Enterprise RAG Chatbot - Data Pipeline
## Complete End-to-End Data Engineering Pipeline

This notebook demonstrates the complete data engineering pipeline for the RAG chatbot:
1. Data Ingestion from multiple sources
2. Text Processing and Cleaning
3. Intelligent Chunking
4. Embedding Generation
5. Vector Store Creation
6. Quality Validation

### Prerequisites
- Databricks environment with Spark
- Access to Foundation Models
- Vector Search endpoint configured

In [None]:
# Install required packages
%pip install transformers==4.30.2 "unstructured[pdf,docx]==0.10.30" langchain==0.0.319 llama-index==0.9.3 databricks-vectorsearch==0.20 pydantic==1.10.9 mlflow==2.9.0
dbutils.library.restartPython()

In [None]:
# Import required libraries
import sys
import os
from pathlib import Path

# Add src to path
sys.path.append('../src')

# Core imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, desc
import mlflow
import pandas as pd
import numpy as np
from datetime import datetime
import json

# Local imports
from data_engineering.data_ingestion import DataIngestionPipeline
from data_engineering.text_processing import TextProcessor, TextQualityMonitor
from feature_engineering.embeddings import EmbeddingGenerator, FeatureEngineer, EmbeddingQualityValidator
from modeling.vector_store import VectorStoreManager
from utils.config_manager import ConfigManager
from utils.logging_utils import setup_logging, get_logger

print("Libraries imported successfully")

## 1. Configuration and Setup

In [None]:
# Setup logging
setup_logging(log_level="INFO", log_format="structured")
logger = get_logger("data_pipeline", {"notebook": "01_data_pipeline"})

# Load configuration
config_manager = ConfigManager()
config = config_manager.load_config()

# Display configuration summary
config_summary = config_manager.get_config_summary()
print("Configuration Summary:")
for key, value in config_summary.items():
    print(f"  {key}: {value}")

logger.info("Configuration loaded successfully")

In [None]:
# Initialize Spark session with optimized settings
spark = SparkSession.builder \
    .appName(config['infrastructure']['spark']['app_name']) \
    .config("spark.executor.memory", config['infrastructure']['spark']['executor_memory']) \
    .config("spark.driver.memory", config['infrastructure']['spark']['driver_memory']) \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.execution.arrow.maxRecordsPerBatch", "10") \
    .getOrCreate()

# Set MLflow experiment
mlflow.set_tracking_uri(config['infrastructure']['mlflow']['tracking_uri'])
mlflow.set_experiment(config['infrastructure']['mlflow']['experiment_name'])

print(f"Spark session initialized: {spark.version}")
print(f"MLflow tracking URI: {mlflow.get_tracking_uri()}")

logger.info("Spark and MLflow initialized")

## 2. Data Ingestion Pipeline

In [None]:
# Initialize data ingestion pipeline
ingestion_pipeline = DataIngestionPipeline(config, spark)

# Define data sources
arxiv_papers = [
    '2312.14565',  # Recent LLM research
    '2303.10130',  # GPT-4 technical report
    '2302.06476',  # LLaMA paper
    '2311.07071',  # Advanced RAG techniques
    '2304.07683',  # Vector databases
    '2310.06825',  # Retrieval augmented generation
    '2309.15217',  # Large language models
    '2308.07107'   # Embedding models
]

# Additional URLs for diverse content
additional_urls = [
    'https://arxiv.org/pdf/2312.00506.pdf',  # AI safety
    'https://arxiv.org/pdf/2311.16867.pdf'   # Multimodal AI
]

logger.info(f"Configured {len(arxiv_papers)} arXiv papers and {len(additional_urls)} additional URLs")

In [None]:
# Start MLflow run for data pipeline
with mlflow.start_run(run_name="data_pipeline_" + datetime.now().strftime("%Y%m%d_%H%M%S")):
    
    # Log pipeline parameters
    mlflow.log_params({
        "num_arxiv_papers": len(arxiv_papers),
        "num_additional_urls": len(additional_urls),
        "chunk_size": config['data']['chunk_size'],
        "chunk_overlap": config['data']['chunk_overlap'],
        "embedding_model": config['models']['embedding']['name'],
        "embedding_dimension": config['models']['embedding']['dimension']
    })
    
    # Step 1: Ingest arXiv papers
    logger.info("Starting arXiv paper ingestion")
    df_arxiv = ingestion_pipeline.ingest_arxiv_papers(arxiv_papers)
    
    # Step 2: Ingest additional URLs
    logger.info("Starting additional URL ingestion")
    df_urls = ingestion_pipeline.ingest_from_urls(additional_urls)
    
    # Step 3: Combine datasets
    df_raw = df_arxiv.union(df_urls)
    
    # Cache for performance
    df_raw.cache()
    
    # Display ingestion results
    total_docs = df_raw.count()
    print(f"\nIngestion Results:")
    print(f"Total documents ingested: {total_docs}")
    
    # Show sample data
    display(df_raw.select("file_name", "file_type", "file_size", "source_type").limit(10))
    
    # Log ingestion metrics
    mlflow.log_metrics({
        "total_documents_ingested": total_docs,
        "arxiv_documents": df_arxiv.count(),
        "url_documents": df_urls.count()
    })
    
    logger.info(f"Ingestion completed: {total_docs} documents")

In [None]:
# Validate data quality
quality_report = ingestion_pipeline.validate_data_quality(df_raw)

print("\nData Quality Report:")
print(json.dumps(quality_report, indent=2))

# Log quality metrics
mlflow.log_metrics({
    "duplicate_percentage": quality_report['duplicate_percentage'],
    "unique_documents": quality_report['unique_documents']
})

# Save raw data to Delta table
table_name = "rag_documents_raw"
ingestion_pipeline.save_to_delta_table(df_raw, table_name, mode="overwrite")

logger.info(f"Raw data saved to table: {table_name}")

## 3. Text Processing Pipeline

In [None]:
# Initialize text processor
text_processor = TextProcessor(config)
quality_monitor = TextQualityMonitor()

logger.info("Text processor initialized")

In [None]:
# Step 1: Extract text from documents
logger.info("Starting text extraction")
df_with_text = text_processor.extract_text_from_documents(df_raw)

# Show text extraction results
print("\nText Extraction Results:")
df_text_stats = df_with_text.select(
    "file_name", 
    "text_length", 
    "word_count"
).orderBy(desc("text_length"))

display(df_text_stats.limit(10))

# Log text extraction metrics
avg_text_length = df_with_text.agg({"text_length": "avg"}).collect()[0][0]
avg_word_count = df_with_text.agg({"word_count": "avg"}).collect()[0][0]

mlflow.log_metrics({
    "avg_text_length": avg_text_length or 0,
    "avg_word_count": avg_word_count or 0
})

logger.info(f"Text extraction completed. Avg length: {avg_text_length:.0f} chars")

In [None]:
# Step 2: Clean and preprocess text
logger.info("Starting text cleaning")
df_cleaned = text_processor.clean_and_preprocess_text(df_with_text)

# Show cleaning results
docs_before = df_with_text.count()
docs_after = df_cleaned.count()

print(f"\nText Cleaning Results:")
print(f"Documents before cleaning: {docs_before}")
print(f"Documents after cleaning: {docs_after}")
print(f"Documents filtered out: {docs_before - docs_after}")

# Sample cleaned text
sample_text = df_cleaned.select("file_name", "cleaned_text").limit(1).collect()[0]
print(f"\nSample cleaned text from {sample_text.file_name}:")
print(sample_text.cleaned_text[:500] + "...")

mlflow.log_metrics({
    "documents_after_cleaning": docs_after,
    "cleaning_filter_rate": (docs_before - docs_after) / docs_before if docs_before > 0 else 0
})

logger.info(f"Text cleaning completed. {docs_after} documents retained")

In [None]:
# Step 3: Create intelligent chunks
logger.info("Starting intelligent chunking")
df_chunks = text_processor.create_intelligent_chunks(df_cleaned)

# Cache chunks for performance
df_chunks.cache()

# Show chunking results
total_chunks = df_chunks.count()
print(f"\nChunking Results:")
print(f"Total chunks created: {total_chunks}")

# Chunk statistics
chunk_stats = df_chunks.groupBy("chunk_type").count().orderBy(desc("count"))
print("\nChunk type distribution:")
display(chunk_stats)

# Token count statistics
token_stats = df_chunks.select("token_count").describe()
print("\nToken count statistics:")
display(token_stats)

# Sample chunks
print("\nSample chunks:")
sample_chunks = df_chunks.select(
    "file_name", "chunk_id", "chunk_type", "token_count", "chunk_content"
).limit(3).collect()

for chunk in sample_chunks:
    print(f"\nFile: {chunk.file_name}, Chunk: {chunk.chunk_id}, Type: {chunk.chunk_type}, Tokens: {chunk.token_count}")
    print(f"Content: {chunk.chunk_content[:200]}...")

mlflow.log_metrics({
    "total_chunks_created": total_chunks,
    "avg_chunks_per_document": total_chunks / docs_after if docs_after > 0 else 0
})

logger.info(f"Chunking completed: {total_chunks} chunks created")

In [None]:
# Validate chunk quality
chunk_quality = text_processor.validate_chunk_quality(df_chunks)

print("\nChunk Quality Report:")
print(json.dumps(chunk_quality, indent=2))

# Log chunk quality metrics
mlflow.log_metrics({
    "chunk_quality_score": 1.0 if chunk_quality['validation_passed'] else 0.0,
    "short_chunks_percentage": chunk_quality['short_chunks_percentage'],
    "avg_tokens_per_chunk": chunk_quality['average_tokens_per_chunk']
})

# Track processing metrics
processing_metrics = quality_monitor.track_processing_metrics(df_with_text, df_chunks)
print("\nProcessing Metrics:")
print(json.dumps(processing_metrics, indent=2))

logger.info("Text processing quality validation completed")

## 4. Feature Engineering Pipeline

In [None]:
# Initialize feature engineering components
embedding_generator = EmbeddingGenerator(config)
feature_engineer = FeatureEngineer(config)
embedding_validator = EmbeddingQualityValidator()

logger.info("Feature engineering components initialized")

In [None]:
# Step 1: Generate embeddings
logger.info("Starting embedding generation")
df_embeddings = embedding_generator.generate_embeddings(df_chunks, "chunk_content")

# Cache embeddings for performance
df_embeddings.cache()

# Show embedding results
embeddings_count = df_embeddings.count()
print(f"\nEmbedding Generation Results:")
print(f"Total embeddings generated: {embeddings_count}")

# Sample embedding
sample_embedding = df_embeddings.select("file_name", "chunk_content", "embedding").limit(1).collect()[0]
print(f"\nSample embedding from {sample_embedding.file_name}:")
print(f"Content: {sample_embedding.chunk_content[:100]}...")
print(f"Embedding dimension: {len(sample_embedding.embedding)}")
print(f"Embedding sample: {sample_embedding.embedding[:5]}...")

mlflow.log_metrics({
    "embeddings_generated": embeddings_count,
    "embedding_dimension": len(sample_embedding.embedding)
})

logger.info(f"Embedding generation completed: {embeddings_count} embeddings")

In [None]:
# Step 2: Validate embedding quality
embedding_quality = embedding_validator.validate_embeddings(df_embeddings)

print("\nEmbedding Quality Report:")
print(json.dumps(embedding_quality, indent=2))

# Log embedding quality metrics
mlflow.log_metrics({
    "embedding_quality_score": embedding_quality['quality_score'],
    "null_embeddings_percentage": embedding_quality['null_percentage'],
    "dimension_consistency": 1.0 if embedding_quality['dimension_consistency'] else 0.0
})

logger.info(f"Embedding quality validation completed. Score: {embedding_quality['quality_score']:.3f}")

In [None]:
# Step 3: Create additional features
logger.info("Creating additional text features")
df_with_features = feature_engineer.create_text_features(df_embeddings)

# Add semantic features
logger.info("Creating semantic features")
df_with_semantic = feature_engineer.create_semantic_features(df_with_features)

# Add contextual features
logger.info("Creating contextual features")
df_final = feature_engineer.create_contextual_features(df_with_semantic)

# Show feature engineering results
print("\nFeature Engineering Results:")
print(f"Final dataset shape: {df_final.count()} rows")

# Sample features
sample_features = df_final.select(
    "file_name", "chunk_id", "text_features", "semantic_features", "contextual_features"
).limit(1).collect()[0]

print(f"\nSample features from {sample_features.file_name}:")
print(f"Text features: {sample_features.text_features}")
print(f"Semantic features: {sample_features.semantic_features}")
print(f"Contextual features: {sample_features.contextual_features}")

logger.info("Feature engineering completed")

## 5. Vector Store Creation

In [None]:
# Save processed data to Delta table for vector store
embeddings_table = "rag_embeddings"

# Prepare final dataset with required columns
df_for_vector_store = df_final.select(
    col("content_hash").alias("chunk_id"),  # Use content_hash as unique ID
    col("chunk_content"),
    col("embedding"),
    col("file_name"),
    col("chunk_type"),
    col("token_count"),
    col("embedding_timestamp")
)

# Save to Delta table with Change Data Feed enabled
df_for_vector_store.write \
    .format("delta") \
    .mode("overwrite") \
    .option("delta.enableChangeDataFeed", "true") \
    .saveAsTable(embeddings_table)

print(f"\nEmbeddings saved to table: {embeddings_table}")
print(f"Total records: {df_for_vector_store.count()}")

logger.info(f"Embeddings table created: {embeddings_table}")

In [None]:
# Initialize vector store manager
vector_store = VectorStoreManager(config)

# Create vector index
logger.info("Creating vector search index")
index_created = vector_store.create_index(df_for_vector_store, force_recreate=True)

if index_created:
    print("\nVector index created successfully!")
    
    # Get index statistics
    index_stats = vector_store.get_index_stats()
    print("\nVector Index Statistics:")
    print(json.dumps(index_stats, indent=2))
    
    # Log vector store metrics
    mlflow.log_metrics({
        "vector_index_created": 1.0,
        "vectors_indexed": df_for_vector_store.count()
    })
    
    logger.info("Vector index creation completed")
else:
    print("\nFailed to create vector index")
    mlflow.log_metrics({"vector_index_created": 0.0})
    logger.error("Vector index creation failed")

## 6. Pipeline Validation and Testing

In [None]:
# Test vector search functionality
if index_created:
    logger.info("Testing vector search functionality")
    
    # Generate test query embedding
    test_query = "What is machine learning and how does it work?"
    
    # For testing, we'll use a sample embedding from our dataset
    sample_embedding = df_for_vector_store.select("embedding").limit(1).collect()[0].embedding
    
    # Perform search
    search_results = vector_store.search(
        query_embedding=sample_embedding,
        top_k=5
    )
    
    print(f"\nVector Search Test Results:")
    print(f"Query: {test_query}")
    print(f"Results found: {len(search_results)}")
    
    for i, result in enumerate(search_results[:3]):
        print(f"\nResult {i+1}:")
        print(f"  Score: {result.score:.4f}")
        print(f"  Document: {result.document_name}")
        print(f"  Content: {result.content[:150]}...")
    
    # Log search test metrics
    mlflow.log_metrics({
        "search_test_results": len(search_results),
        "avg_search_score": np.mean([r.score for r in search_results]) if search_results else 0.0
    })
    
    logger.info(f"Vector search test completed: {len(search_results)} results")
else:
    print("\nSkipping search test - vector index not available")

In [None]:
# Generate comprehensive pipeline report
pipeline_report = {
    "pipeline_execution": {
        "timestamp": datetime.now().isoformat(),
        "status": "completed",
        "spark_version": spark.version
    },
    "data_ingestion": {
        "total_documents": total_docs,
        "quality_score": quality_report.get('duplicate_percentage', 0),
        "sources": {
            "arxiv_papers": len(arxiv_papers),
            "additional_urls": len(additional_urls)
        }
    },
    "text_processing": {
        "documents_processed": docs_after,
        "total_chunks": total_chunks,
        "avg_chunk_tokens": chunk_quality.get('average_tokens_per_chunk', 0),
        "quality_passed": chunk_quality.get('validation_passed', False)
    },
    "feature_engineering": {
        "embeddings_generated": embeddings_count,
        "embedding_dimension": config['models']['embedding']['dimension'],
        "quality_score": embedding_quality.get('quality_score', 0)
    },
    "vector_store": {
        "index_created": index_created,
        "provider": config['vector_db']['provider'],
        "index_name": config['vector_db']['index_name']
    }
}

print("\n" + "="*50)
print("PIPELINE EXECUTION REPORT")
print("="*50)
print(json.dumps(pipeline_report, indent=2))

# Save report as MLflow artifact
with open("/tmp/pipeline_report.json", "w") as f:
    json.dump(pipeline_report, f, indent=2)

mlflow.log_artifact("/tmp/pipeline_report.json", "reports")

# Log final pipeline metrics
mlflow.log_metrics({
    "pipeline_success": 1.0,
    "total_processing_time": (datetime.now() - datetime.fromisoformat(pipeline_report['pipeline_execution']['timestamp'].replace('Z', '+00:00'))).total_seconds()
})

logger.info("Pipeline execution completed successfully")

## 7. Next Steps

The data pipeline has been successfully executed. Here's what was accomplished:

1. **Data Ingestion**: Successfully ingested documents from arXiv and additional URLs
2. **Text Processing**: Extracted, cleaned, and intelligently chunked text content
3. **Feature Engineering**: Generated high-quality embeddings and additional features
4. **Vector Store**: Created searchable vector index for similarity search
5. **Quality Validation**: Validated data quality at each step

### Ready for Next Phase:
- **RAG Pipeline**: The vector store is ready for retrieval-augmented generation
- **API Deployment**: Data is prepared for serving through the API
- **Evaluation**: Ready for quality assessment and performance testing

### Monitoring:
- All metrics have been logged to MLflow for tracking
- Quality reports are available for ongoing monitoring
- Pipeline can be re-run with new data sources

Proceed to notebook `02_rag_pipeline.ipynb` to test the complete RAG system!