In [None]:
# Import required libraries
import pandas as pd
import numpy as np
import json
import os
from pathlib import Path
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# Set up plotting style
plt.style.use('default')
sns.set_palette("husl")

# Define paths
RAW_DATA_DIR = Path("../data/raw")
PROCESSED_DIR = Path("../data/processed")
WEATHER_DIR = RAW_DATA_DIR
RETAIL_FILE = RAW_DATA_DIR / "retail_sales_dataset.csv"
HEADLINES_DIR = RAW_DATA_DIR / "web"

print(f"Raw data directory: {RAW_DATA_DIR.absolute()}")
print(f"Processed data directory: {PROCESSED_DIR.absolute()}")

## 1. Load Raw Data

In [None]:
# Load weather data
def load_weather_data():
    weather_files = list(WEATHER_DIR.glob("*_weather_*.json"))
    weather_records = []
    
    for file_path in weather_files:
        try:
            with open(file_path, 'r') as f:
                data = json.load(f)
                
            record = {
                'timestamp': data.get('dt'),
                'date': pd.to_datetime(data.get('dt'), unit='s').normalize() if data.get('dt') else None,
                'city': data.get('name'),
                'country': data.get('sys', {}).get('country'),
                'temperature': data.get('main', {}).get('temp'),
                'feels_like': data.get('main', {}).get('feels_like'),
                'humidity': data.get('main', {}).get('humidity'),
                'pressure': data.get('main', {}).get('pressure'),
                'weather_main': data.get('weather', [{}])[0].get('main'),
                'weather_description': data.get('weather', [{}])[0].get('description'),
                'wind_speed': data.get('wind', {}).get('speed'),
                'wind_direction': data.get('wind', {}).get('deg'),
                'clouds': data.get('clouds', {}).get('all'),
                'source_file': file_path.name
            }
            weather_records.append(record)
            
        except Exception as e:
            print(f"Error processing {file_path.name}: {e}")
    
    return pd.DataFrame(weather_records)

weather_df = load_weather_data()
print(f"Weather data loaded: {len(weather_df)} records")
display(weather_df.head())

In [None]:
# Load retail data
def load_retail_data():
    try:
        df = pd.read_csv(RETAIL_FILE)
        print(f"Retail data loaded: {len(df)} records")
        return df
    except Exception as e:
        print(f"Error loading retail data: {e}")
        return pd.DataFrame()

retail_df = load_retail_data()
if not retail_df.empty:
    display(retail_df.head())

In [None]:
# Load headlines data
def load_headlines_data():
    headlines_files = list(HEADLINES_DIR.glob("*.json"))
    headlines_records = []
    
    for file_path in headlines_files:
        try:
            with open(file_path, 'r') as f:
                data = json.load(f)
                
            if isinstance(data, list):
                for item in data:
                    record = {
                        'title': item.get('title'),
                        'description': item.get('description'),
                        'url': item.get('url'),
                        'published_at': item.get('publishedAt'),
                        'source': item.get('source', {}).get('name'),
                        'author': item.get('author'),
                        'content': item.get('content'),
                        'source_file': file_path.name
                    }
                    headlines_records.append(record)
            else:
                record = {
                    'title': data.get('title'),
                    'description': data.get('description'),
                    'url': data.get('url'),
                    'published_at': data.get('publishedAt'),
                    'source': data.get('source', {}).get('name'),
                    'author': data.get('author'),
                    'content': data.get('content'),
                    'source_file': file_path.name
                }
                headlines_records.append(record)
                
        except Exception as e:
            print(f"Error processing {file_path.name}: {e}")
    
    return pd.DataFrame(headlines_records)

headlines_df = load_headlines_data()
print(f"Headlines data loaded: {len(headlines_df)} records")
display(headlines_df.head())

## 2. Weather Data Transformations

In [None]:
# Weather data transformation prototype
def transform_weather_data(df):
    """Transform weather data - prototype version"""
    transformed = df.copy()
    
    # Convert timestamp to datetime
    transformed['date'] = pd.to_datetime(transformed['timestamp'], unit='s', errors='coerce').dt.normalize()
    
    # Handle missing values
    numeric_cols = ['temperature', 'humidity', 'pressure', 'wind_speed', 'clouds']
    for col in numeric_cols:
        if col in transformed.columns:
            transformed[col] = transformed[col].fillna(transformed[col].mean())
    
    # Handle categorical missing values
    transformed['weather_main'] = transformed['weather_main'].fillna('Unknown')
    transformed['weather_description'] = transformed['weather_description'].fillna('Unknown')
    transformed['city'] = transformed['city'].fillna('Unknown')
    
    # Add derived columns
    transformed['temp_category'] = pd.cut(
        transformed['temperature'], 
        bins=[-50, 0, 15, 25, 35, 50], 
        labels=['Freezing', 'Cold', 'Cool', 'Warm', 'Hot']
    )
    
    transformed['humidity_category'] = pd.cut(
        transformed['humidity'], 
        bins=[0, 30, 60, 80, 100], 
        labels=['Dry', 'Normal', 'Humid', 'Very Humid']
    )
    
    # Select final columns
    final_columns = [
        'date', 'city', 'temperature', 'humidity', 'pressure', 
        'weather_main', 'weather_description', 'wind_speed',
        'temp_category', 'humidity_category'
    ]
    
    return transformed[final_columns].dropna(subset=['date'])

# Apply transformation
weather_transformed = transform_weather_data(weather_df)
print(f"Weather data transformed: {len(weather_transformed)} records")
print("Transformed columns:", weather_transformed.columns.tolist())
display(weather_transformed.head())

In [None]:
# Aggregate weather data to daily level (prototype)
def aggregate_weather_daily(df):
    """Aggregate weather data to daily level"""
    daily_agg = df.groupby(['date', 'city']).agg({
        'temperature': ['mean', 'min', 'max'],
        'humidity': 'mean',
        'pressure': 'mean',
        'wind_speed': 'mean',
        'weather_main': lambda x: x.mode()[0] if not x.mode().empty else 'Unknown'
    }).round(2)
    
    # Flatten column names
    daily_agg.columns = ['temp_mean', 'temp_min', 'temp_max', 'humidity_mean', 'pressure_mean', 'wind_speed_mean', 'weather_main_mode']
    daily_agg = daily_agg.reset_index()
    
    return daily_agg

# Apply daily aggregation
weather_daily = aggregate_weather_daily(weather_transformed)
print(f"Weather data aggregated to daily: {len(weather_daily)} records")
display(weather_daily.head())

## 3. Retail Data Transformations

In [None]:
# Retail data transformation prototype
def transform_retail_data(df):
    """Transform retail data - prototype version"""
    if df.empty:
        return df
    
    transformed = df.copy()
    
    # Identify date columns
    date_cols = [col for col in transformed.columns if 'date' in col.lower()]
    for col in date_cols:
        try:
            transformed[col] = pd.to_datetime(transformed[col], errors='coerce')
            print(f"Converted {col} to datetime")
        except:
            print(f"Could not convert {col} to datetime")
    
    # Handle missing values for numeric columns
    numeric_cols = transformed.select_dtypes(include=['number']).columns
    for col in numeric_cols:
        transformed[col] = transformed[col].fillna(transformed[col].median())
    
    # Handle categorical missing values
    categorical_cols = transformed.select_dtypes(include=['object']).columns
    for col in categorical_cols:
        transformed[col] = transformed[col].fillna('Unknown')
    
    # Standardize column names (lowercase, replace spaces with underscores)
    transformed.columns = transformed.columns.str.lower().str.replace(' ', '_')
    
    # Add derived columns (example)
    # This would depend on actual column names in the retail data
    
    return transformed

# Apply transformation
if not retail_df.empty:
    retail_transformed = transform_retail_data(retail_df)
    print(f"Retail data transformed: {len(retail_transformed)} records")
    print("Transformed columns:", retail_transformed.columns.tolist())
    display(retail_transformed.head())
else:
    retail_transformed = pd.DataFrame()
    print("No retail data to transform")

## 4. Headlines Data Transformations

In [None]:
# Headlines data transformation prototype
def transform_headlines_data(df):
    """Transform headlines data - prototype version"""
    transformed = df.copy()
    
    # Convert published_at to datetime
    transformed['published_at'] = pd.to_datetime(transformed['published_at'], errors='coerce')
    transformed['date'] = transformed['published_at'].dt.normalize()
    
    # Clean text fields
    text_cols = ['title', 'description', 'content']
    for col in text_cols:
        if col in transformed.columns:
            transformed[col] = transformed[col].fillna('')
            transformed[col] = transformed[col].str.strip()
    
    # Handle missing sources
    transformed['source'] = transformed['source'].fillna('Unknown')
    
    # Add derived columns
    if 'title' in transformed.columns:
        transformed['title_length'] = transformed['title'].str.len()
        transformed['has_description'] = transformed['description'].str.len() > 0
    
    # Filter out invalid records
    valid_records = transformed.dropna(subset=['date', 'title'])
    
    # Select final columns
    final_columns = [
        'date', 'title', 'description', 'source', 'url', 
        'title_length', 'has_description'
    ]
    
    return valid_records[final_columns]

# Apply transformation
headlines_transformed = transform_headlines_data(headlines_df)
print(f"Headlines data transformed: {len(headlines_transformed)} records")
print("Transformed columns:", headlines_transformed.columns.tolist())
display(headlines_transformed.head())

## 5. Data Integration Prototype

In [None]:
# Data integration prototype
def integrate_datasets(weather_df, retail_df, headlines_df):
    """Integrate the three datasets using merge_asof"""
    
    # Ensure date columns are datetime
    for df in [weather_df, retail_df, headlines_df]:
        if 'date' in df.columns:
            df['date'] = pd.to_datetime(df['date'], errors='coerce')
    
    # Sort by date for merge_asof
    weather_sorted = weather_df.dropna(subset=['date']).sort_values('date')
    retail_sorted = retail_df.dropna(subset=['date']).sort_values('date') if not retail_df.empty else pd.DataFrame()
    headlines_sorted = headlines_df.dropna(subset=['date']).sort_values('date')
    
    print(f"Weather records: {len(weather_sorted)}")
    print(f"Retail records: {len(retail_sorted)}")
    print(f"Headlines records: {len(headlines_sorted)}")
    
    # Start with retail data as base (if available)
    if not retail_sorted.empty:
        # Merge retail with nearest weather record
        integrated = pd.merge_asof(
            retail_sorted,
            weather_sorted,
            on='date',
            direction='nearest',
            suffixes=('_retail', '_weather')
        )
        
        # Merge with headlines
        integrated = pd.merge_asof(
            integrated,
            headlines_sorted,
            on='date',
            direction='nearest',
            suffixes=('', '_headlines')
        )
    else:
        # If no retail data, integrate weather and headlines
        integrated = pd.merge_asof(
            weather_sorted,
            headlines_sorted,
            on='date',
            direction='nearest',
            suffixes=('', '_headlines')
        )
    
    return integrated

# Apply integration
integrated_data = integrate_datasets(weather_daily, retail_transformed, headlines_transformed)
print(f"Integrated data: {len(integrated_data)} records")
print("Integrated columns:", integrated_data.columns.tolist())
display(integrated_data.head())

## 6. Validation and Comparison

In [None]:
# Compare with existing processed data
def compare_with_existing(processed_file, new_data):
    """Compare new transformation results with existing processed data"""
    try:
        existing_data = pd.read_parquet(processed_file)
        print(f"Existing data: {len(existing_data)} records")
        print(f"New data: {len(new_data)} records")
        
        print("\nExisting data columns:", existing_data.columns.tolist())
        print("New data columns:", new_data.columns.tolist())
        
        # Compare column overlap
        common_cols = set(existing_data.columns) & set(new_data.columns)
        print(f"Common columns: {len(common_cols)}")
        
        return existing_data, common_cols
        
    except Exception as e:
        print(f"Could not load existing data: {e}")
        return None, set()

# Check for existing processed files
processed_files = list(PROCESSED_DIR.glob("*.parquet"))
if processed_files:
    print("Existing processed files:")
    for file in processed_files:
        print(f"  - {file.name}")
        
    # Compare with integrated dataset if it exists
    integrated_file = PROCESSED_DIR / "integrated_dataset.parquet"
    if integrated_file.exists():
        existing_integrated, common_cols = compare_with_existing(integrated_file, integrated_data)
else:
    print("No existing processed files found")

In [None]:
# Validation checks
def validate_transformations(df, name):
    """Validate transformation results"""
    print(f"\n{name} Validation:")
    print(f"Records: {len(df)}")
    print(f"Columns: {len(df.columns)}")
    print(f"Missing values: {df.isnull().sum().sum()}")
    print(f"Duplicate records: {df.duplicated().sum()}")
    
    # Date validation
    if 'date' in df.columns:
        valid_dates = pd.to_datetime(df['date'], errors='coerce').notnull().sum()
        print(f"Valid dates: {valid_dates}/{len(df)}")
    
    # Numeric validation
    numeric_cols = df.select_dtypes(include=['number']).columns
    if len(numeric_cols) > 0:
        print(f"Numeric columns: {len(numeric_cols)}")
        for col in numeric_cols[:3]:  # Check first 3
            outliers = detect_outliers_iqr(df, col)
            print(f"  {col}: {outliers.sum()} outliers")

def detect_outliers_iqr(df, column):
    """Detect outliers using IQR method"""
    if column not in df.columns or df[column].dtype not in ['int64', 'float64']:
        return pd.Series([], dtype=bool)
    
    Q1 = df[column].quantile(0.25)
    Q3 = df[column].quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    
    return (df[column] < lower_bound) | (df[column] > upper_bound)

# Validate all transformations
validate_transformations(weather_daily, "Weather")
if not retail_transformed.empty:
    validate_transformations(retail_transformed, "Retail")
validate_transformations(headlines_transformed, "Headlines")
validate_transformations(integrated_data, "Integrated")

## 7. Export Prototyped Transformations

In [None]:
# Export transformed data for testing
def export_prototypes():
    """Export transformed data to test files"""
    output_dir = Path("../data/test_output")
    output_dir.mkdir(exist_ok=True)
    
    # Export weather data
    weather_output = output_dir / "weather_prototype.parquet"
    weather_daily.to_parquet(weather_output, index=False)
    print(f"Weather prototype exported: {weather_output}")
    
    # Export retail data
    if not retail_transformed.empty:
        retail_output = output_dir / "retail_prototype.parquet"
        retail_transformed.to_parquet(retail_output, index=False)
        print(f"Retail prototype exported: {retail_output}")
    
    # Export headlines data
    headlines_output = output_dir / "headlines_prototype.parquet"
    headlines_transformed.to_parquet(headlines_output, index=False)
    print(f"Headlines prototype exported: {headlines_output}")
    
    # Export integrated data
    integrated_output = output_dir / "integrated_prototype.parquet"
    integrated_data.to_parquet(integrated_output, index=False)
    print(f"Integrated prototype exported: {integrated_output}")
    
    print(f"\nAll prototype files exported to: {output_dir.absolute()}")

# Export the prototypes
export_prototypes()

## 8. Summary and Next Steps

In [None]:
print("ETL PROTOTYPING SUMMARY")
print("=" * 40)

print(f"Weather data: {len(weather_df)} raw → {len(weather_daily)} processed")
if not retail_transformed.empty:
    print(f"Retail data: {len(retail_df)} raw → {len(retail_transformed)} processed")
else:
    print("Retail data: No data available")
print(f"Headlines data: {len(headlines_df)} raw → {len(headlines_transformed)} processed")
print(f"Integrated data: {len(integrated_data)} records")

print("\nTRANSFORMATION FEATURES TESTED:")
print("✓ Date parsing and normalization")
print("✓ Missing value handling")
print("✓ Data aggregation (weather daily)")
print("✓ Categorical encoding")
print("✓ Data integration with merge_asof")
print("✓ Derived column creation")

print("\nVALIDATION RESULTS:")
print(f"Weather missing values: {weather_daily.isnull().sum().sum()}")
if not retail_transformed.empty:
    print(f"Retail missing values: {retail_transformed.isnull().sum().sum()}")
print(f"Headlines missing values: {headlines_transformed.isnull().sum().sum()}")
print(f"Integrated missing values: {integrated_data.isnull().sum().sum()}")

print("\nNEXT STEPS:")
print("1. Review transformation logic and adjust as needed")
print("2. Test edge cases and error handling")
print("3. Compare with existing production transformations")
print("4. Implement approved changes in production scripts")
print("5. Add unit tests for transformation functions")
print("6. Document transformation rules and business logic")