# NYC Taxi Dataset Analysis for Thesis Research

## Adaptive Schema Evolution Detection and Mapping Regeneration in ETL Pipelines

This notebook demonstrates how the NYC Taxi Trip dataset can be used for evaluating adaptive schema evolution detection and mapping regeneration in ETL pipelines using Retrieval-Augmented Large Language Models.

### Dataset Overview
The NYC Taxi Trip dataset is maintained by the New York City Taxi and Limousine Commission (TLC) and provides an ideal testbed for schema evolution research due to:
- Multiple versions with documented schema changes
- Real-world data quality challenges
- Sufficient volume for scalability testing
- Public availability for reproducibility


In [None]:
# Import required libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import json
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# Set display options
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', 50)

print("Libraries imported successfully!")
print(f"Pandas version: {pd.__version__}")
print(f"NumPy version: {np.__version__}")


## 1. Dataset Acquisition and Schema Versions

The NYC Taxi dataset has evolved over time with significant schema changes. We'll simulate different versions to demonstrate schema evolution patterns.


In [None]:
# Create sample data representing different schema versions
# Version 1: Early schema (2010-2014)
def create_taxi_v1(n_records=1000):
    """Create Version 1 of NYC Taxi dataset with early schema"""
    np.random.seed(42)
    data = {
        'medallion': [f'med_{i:06d}' for i in range(n_records)],
        'hack_license': [f'hack_{np.random.randint(100000, 999999)}' for _ in range(n_records)],
        'vendor_id': np.random.choice(['VTS', 'CMT', 'DDS'], n_records),
        'rate_code': np.random.choice([1, 2, 3, 4, 5, 6], n_records),
        'store_and_fwd_flag': np.random.choice(['Y', 'N'], n_records),
        'pickup_datetime': pd.date_range('2014-01-01', periods=n_records, freq='5min'),
        'dropoff_datetime': pd.date_range('2014-01-01 00:10', periods=n_records, freq='5min'),
        'passenger_count': np.random.randint(1, 6, n_records),
        'trip_time_in_secs': np.random.randint(60, 3600, n_records),
        'trip_distance': np.random.uniform(0.1, 20.0, n_records).round(2),
        'pickup_longitude': np.random.uniform(-74.05, -73.85, n_records),
        'pickup_latitude': np.random.uniform(40.6, 40.9, n_records),
        'dropoff_longitude': np.random.uniform(-74.05, -73.85, n_records),
        'dropoff_latitude': np.random.uniform(40.6, 40.9, n_records),
    }
    return pd.DataFrame(data)

# Version 2: Updated schema (2015-2016) with schema changes
def create_taxi_v2(n_records=1000):
    """Create Version 2 with schema changes: renamed columns, new columns"""
    np.random.seed(42)
    data = {
        'medallion': [f'med_{i:06d}' for i in range(n_records)],
        'hack_license': [f'hack_{np.random.randint(100000, 999999)}' for _ in range(n_records)],
        'vendor_id': np.random.choice(['VTS', 'CMT', 'DDS'], n_records),
        'rate_code': np.random.choice([1, 2, 3, 4, 5, 6], n_records),
        'store_and_fwd_flag': np.random.choice(['Y', 'N'], n_records),
        'pickup_datetime': pd.date_range('2015-01-01', periods=n_records, freq='5min'),
        'dropoff_datetime': pd.date_range('2015-01-01 00:10', periods=n_records, freq='5min'),
        'passenger_count': np.random.randint(1, 6, n_records),
        'trip_duration': np.random.randint(60, 3600, n_records),  # RENAMED from trip_time_in_secs
        'trip_distance': np.random.uniform(0.1, 20.0, n_records).round(2),
        'pickup_longitude': np.random.uniform(-74.05, -73.85, n_records),
        'pickup_latitude': np.random.uniform(40.6, 40.9, n_records),
        'dropoff_longitude': np.random.uniform(-74.05, -73.85, n_records),
        'dropoff_latitude': np.random.uniform(40.6, 40.9, n_records),
        # NEW COLUMNS ADDED
        'payment_type': np.random.choice([1, 2, 3, 4, 5, 6], n_records),
        'fare_amount': np.random.uniform(2.5, 50.0, n_records).round(2),
        'surcharge': np.random.uniform(0.0, 1.0, n_records).round(2),
        'mta_tax': np.random.choice([0.0, 0.5], n_records),
        'tip_amount': np.random.uniform(0.0, 10.0, n_records).round(2),
        'tolls_amount': np.random.uniform(0.0, 5.0, n_records).round(2),
        'total_amount': np.random.uniform(3.0, 60.0, n_records).round(2),
    }
    return pd.DataFrame(data)

# Version 3: Latest schema (2017+) with further changes
def create_taxi_v3(n_records=1000):
    """Create Version 3 with additional schema changes"""
    np.random.seed(42)
    data = {
        'medallion': [f'med_{i:06d}' for i in range(n_records)],
        'hack_license': [f'hack_{np.random.randint(100000, 999999)}' for _ in range(n_records)],
        'vendor_id': np.random.choice(['VTS', 'CMT', 'DDS'], n_records),
        'rate_code': np.random.choice([1, 2, 3, 4, 5, 6], n_records),
        'store_and_fwd_flag': np.random.choice(['Y', 'N'], n_records),
        'pickup_datetime': pd.date_range('2017-01-01', periods=n_records, freq='5min'),
        'dropoff_datetime': pd.date_range('2017-01-01 00:10', periods=n_records, freq='5min'),
        'passenger_count': np.random.randint(1, 6, n_records),
        'trip_duration': np.random.randint(60, 3600, n_records),
        'trip_distance': np.random.uniform(0.1, 20.0, n_records).round(2),
        # CHANGED: Coordinates replaced with location IDs
        'pickup_location_id': np.random.randint(1, 265, n_records),
        'dropoff_location_id': np.random.randint(1, 265, n_records),
        'payment_type': np.random.choice([1, 2, 3, 4, 5, 6], n_records),
        'fare_amount': np.random.uniform(2.5, 50.0, n_records).round(2),
        'surcharge': np.random.uniform(0.0, 1.0, n_records).round(2),
        'mta_tax': np.random.choice([0.0, 0.5], n_records),
        'tip_amount': np.random.uniform(0.0, 10.0, n_records).round(2),
        'tolls_amount': np.random.uniform(0.0, 5.0, n_records).round(2),
        'total_amount': np.random.uniform(3.0, 60.0, n_records).round(2),
        # NEW COLUMNS ADDED
        'improvement_surcharge': np.random.uniform(0.0, 0.3, n_records).round(2),
        'congestion_surcharge': np.random.uniform(0.0, 2.5, n_records).round(2),
    }
    return pd.DataFrame(data)

# Create datasets
print("Creating dataset versions...")
taxi_v1 = create_taxi_v1(1000)
taxi_v2 = create_taxi_v2(1000)
taxi_v3 = create_taxi_v3(1000)

print(f"✓ Version 1 created: {len(taxi_v1)} records, {len(taxi_v1.columns)} columns")
print(f"✓ Version 2 created: {len(taxi_v2)} records, {len(taxi_v2.columns)} columns")
print(f"✓ Version 3 created: {len(taxi_v3)} records, {len(taxi_v3.columns)} columns")


## 2. Schema Comparison and Evolution Detection

This section demonstrates how schema evolution can be detected and analyzed, which is central to the thesis research.


In [None]:
# Schema extraction function
def extract_schema(df, version_name):
    """Extract schema information from a DataFrame"""
    schema = {
        'version': version_name,
        'columns': {},
        'column_count': len(df.columns),
        'row_count': len(df),
        'dtypes': {}
    }
    
    for col in df.columns:
        schema['columns'][col] = {
            'dtype': str(df[col].dtype),
            'nullable': df[col].isna().any(),
            'unique_count': df[col].nunique(),
            'sample_values': df[col].dropna().head(3).tolist() if not df[col].dropna().empty else []
        }
        schema['dtypes'][col] = str(df[col].dtype)
    
    return schema

# Extract schemas
schema_v1 = extract_schema(taxi_v1, 'v1')
schema_v2 = extract_schema(taxi_v2, 'v2')
schema_v3 = extract_schema(taxi_v3, 'v3')

print("Schema Extraction Results:")
print("=" * 60)
print(f"\nVersion 1 Schema:")
print(f"  Columns: {schema_v1['column_count']}")
print(f"  Column names: {list(schema_v1['columns'].keys())}")

print(f"\nVersion 2 Schema:")
print(f"  Columns: {schema_v2['column_count']}")
print(f"  Column names: {list(schema_v2['columns'].keys())}")

print(f"\nVersion 3 Schema:")
print(f"  Columns: {schema_v3['column_count']}")
print(f"  Column names: {list(schema_v3['columns'].keys())}")


In [None]:
# Schema comparison function
def compare_schemas(old_schema, new_schema):
    """Compare two schemas and detect changes"""
    old_cols = set(old_schema['columns'].keys())
    new_cols = set(new_schema['columns'].keys())
    
    changes = {
        'added_columns': list(new_cols - old_cols),
        'removed_columns': list(old_cols - new_cols),
        'common_columns': list(old_cols & new_cols),
        'type_changes': [],
        'renamed_columns': []
    }
    
    # Check for type changes in common columns
    for col in changes['common_columns']:
        old_type = old_schema['columns'][col]['dtype']
        new_type = new_schema['columns'][col]['dtype']
        if old_type != new_type:
            changes['type_changes'].append({
                'column': col,
                'old_type': old_type,
                'new_type': new_type
            })
    
    # Detect potential renames (simple heuristic: similar names)
    from difflib import SequenceMatcher
    removed = old_cols - new_cols
    added = new_cols - old_cols
    
    for old_col in removed:
        for new_col in added:
            similarity = SequenceMatcher(None, old_col.lower(), new_col.lower()).ratio()
            if similarity > 0.6:  # Threshold for potential rename
                changes['renamed_columns'].append({
                    'old_name': old_col,
                    'new_name': new_col,
                    'similarity': similarity
                })
    
    return changes

# Compare schemas
print("Schema Comparison: V1 → V2")
print("=" * 60)
changes_v1_v2 = compare_schemas(schema_v1, schema_v2)
print(f"Added columns: {changes_v1_v2['added_columns']}")
print(f"Removed columns: {changes_v1_v2['removed_columns']}")
print(f"Renamed columns: {changes_v1_v2['renamed_columns']}")
print(f"Type changes: {changes_v1_v2['type_changes']}")

print("\nSchema Comparison: V2 → V3")
print("=" * 60)
changes_v2_v3 = compare_schemas(schema_v2, schema_v3)
print(f"Added columns: {changes_v2_v3['added_columns']}")
print(f"Removed columns: {changes_v2_v3['removed_columns']}")
print(f"Renamed columns: {changes_v2_v3['renamed_columns']}")
print(f"Type changes: {changes_v2_v3['type_changes']}")


## 3. Data Analysis and Characteristics

Understanding the data characteristics is crucial for ETL pipeline design and schema evolution handling.


In [None]:
# Basic statistics
print("Dataset Statistics")
print("=" * 60)

for name, df in [("Version 1", taxi_v1), ("Version 2", taxi_v2), ("Version 3", taxi_v3)]:
    print(f"\n{name}:")
    print(f"  Shape: {df.shape}")
    print(f"  Memory usage: {df.memory_usage(deep=True).sum() / 1024:.2f} KB")
    print(f"  Missing values: {df.isnull().sum().sum()}")
    print(f"  Duplicate rows: {df.duplicated().sum()}")

# Statistical summary for numeric columns
print("\n\nNumeric Column Statistics (Version 2):")
print("=" * 60)
numeric_cols = taxi_v2.select_dtypes(include=[np.number]).columns
print(taxi_v2[numeric_cols].describe())


## 4. Relevance to Thesis Research

### How This Dataset Supports the Research

1. **Schema Evolution Patterns**: The dataset demonstrates real-world schema evolution including:
   - Column additions (payment fields, surcharges)
   - Column renames (trip_time_in_secs → trip_duration)
   - Structural changes (coordinates → location IDs)
   - Type changes and precision modifications

2. **ETL Pipeline Scenarios**: The dataset represents typical source data requiring:
   - Extraction from multiple versions
   - Transformation to handle schema changes
   - Loading into data warehouses with consistent schemas

3. **RAG-Enhanced LLM Application**: 
   - Historical schema mappings can be retrieved as examples
   - LLMs can learn patterns from previous schema evolutions
   - Context-aware mapping generation using retrieved examples

4. **Evaluation Metrics**:
   - Detection accuracy for different change types
   - Mapping correctness for various schema transformations
   - Performance impact of adaptive updates


In [None]:
# Save schemas for use in thesis research
import os
import json

# Create schemas directory if it doesn't exist
schemas_dir = '../schemas'
os.makedirs(schemas_dir, exist_ok=True)

# Save schemas as JSON
with open(f'{schemas_dir}/taxi_schema_v1.json', 'w') as f:
    json.dump(schema_v1, f, indent=2, default=str)

with open(f'{schemas_dir}/taxi_schema_v2.json', 'w') as f:
    json.dump(schema_v2, f, indent=2, default=str)

with open(f'{schemas_dir}/taxi_schema_v3.json', 'w') as f:
    json.dump(schema_v3, f, indent=2, default=str)

# Save change detection results
with open(f'{schemas_dir}/taxi_changes_v1_v2.json', 'w') as f:
    json.dump(changes_v1_v2, f, indent=2, default=str)

with open(f'{schemas_dir}/taxi_changes_v2_v3.json', 'w') as f:
    json.dump(changes_v2_v3, f, indent=2, default=str)

print("✓ Schemas and change detection results saved to schemas/ directory")
print("\nThese files can be used for:")
print("  - Schema evolution detection evaluation")
print("  - RAG retrieval examples")
print("  - LLM mapping generation testing")
print("  - ETL pipeline validation")


## 5. Integration with ETL Pipeline

The detected schema changes can be used to automatically regenerate ETL mappings using RAG-enhanced LLMs.


In [None]:
# Example: How schema changes would trigger ETL mapping regeneration
print("ETL Mapping Regeneration Scenario")
print("=" * 60)
print("\nWhen schema changes are detected:")
print("1. Schema detection identifies changes (as shown above)")
print("2. RAG system retrieves similar schema evolution examples")
print("3. LLM generates new ETL transform mappings")
print("4. Generated mappings are validated")
print("5. ETL pipeline is updated automatically")

print("\n\nExample Mapping Requirements (V1 → V2):")
print("- Map 'trip_time_in_secs' → 'trip_duration'")
print("- Add default values for new payment columns")
print("- Handle new fare-related columns")
print("- Maintain existing business logic")

print("\n\nExample Mapping Requirements (V2 → V3):")
print("- Map coordinate columns to location IDs")
print("- Add new surcharge fields")
print("- Update data type handling")
print("- Preserve existing transformations")


## 6. Summary and Next Steps

This notebook has demonstrated:

1. ✅ Dataset creation with schema evolution patterns
2. ✅ Schema extraction and comparison
3. ✅ Change detection (additions, removals, renames, type changes)
4. ✅ Data analysis and characteristics
5. ✅ Relevance to thesis research objectives

### Next Steps for Thesis Research:

1. **Implement RAG System**: Build retrieval system for schema evolution examples
2. **LLM Integration**: Connect with LLM APIs for mapping generation
3. **Evaluation Framework**: Develop metrics for detection accuracy and mapping correctness
4. **ETL Pipeline Integration**: Integrate with Apache Airflow for automated updates
5. **Experiments**: Run comprehensive experiments on multiple schema evolution scenarios
