# Data Ingestion Pipeline
This notebook demonstrates data ingestion from various sources including CSVs, APIs, and databases.


In [None]:
import pandas as pd
import numpy as np
import requests
import json
from sqlalchemy import create_engine, text
import os
from datetime import datetime
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

## 1. CSV Data Ingestion
Load data from CSV files in the raw data directory.

In [None]:
def load_csv_data():
    """Load all CSV files from the raw data directory"""
    data_path = '../data/raw/'
    datasets = {}
    
    csv_files = ['customers.csv', 'transactions.csv', 'events.csv', 'products.csv']
    
    for file in csv_files:
        file_path = os.path.join(data_path, file)
        if os.path.exists(file_path):
            dataset_name = file.replace('.csv', '')
            datasets[dataset_name] = pd.read_csv(file_path)
            logger.info(f"Loaded {dataset_name}: {len(datasets[dataset_name])} records")
        else:
            logger.warning(f"File not found: {file_path}")
    
    return datasets

datasets = load_csv_data()
for name, df in datasets.items():
    print(f"\n{name.upper()} Dataset:")
    print(f"Shape: {df.shape}")
    print(f"Columns: {list(df.columns)}")
    print(df.head(2))

## 2. API Data Ingestion
Simulate ingesting data from external APIs.

In [None]:
def simulate_api_ingestion():
    """Simulate ingesting data from external APIs"""
    # Simulate external API data
    api_data = {
        'market_data': [
            {'symbol': 'AAPL', 'price': 150.25, 'volume': 1000000, 'timestamp': datetime.now().isoformat()},
            {'symbol': 'GOOGL', 'price': 2500.75, 'volume': 800000, 'timestamp': datetime.now().isoformat()},
            {'symbol': 'MSFT', 'price': 300.50, 'volume': 1200000, 'timestamp': datetime.now().isoformat()}
        ],
        'weather_data': [
            {'city': 'New York', 'temperature': 22, 'humidity': 65, 'timestamp': datetime.now().isoformat()},
            {'city': 'Los Angeles', 'temperature': 28, 'humidity': 45, 'timestamp': datetime.now().isoformat()}
        ]
    }
    
    # Convert to DataFrames
    market_df = pd.DataFrame(api_data['market_data'])
    weather_df = pd.DataFrame(api_data['weather_data'])
    
    logger.info(f"Ingested market data: {len(market_df)} records")
    logger.info(f"Ingested weather data: {len(weather_df)} records")
    
    return market_df, weather_df

market_df, weather_df = simulate_api_ingestion()
print("Market Data:")
print(market_df)
print("\nWeather Data:")
print(weather_df)

## 3. Data Quality Checks
Perform basic data quality validation on ingested data.

In [None]:
def perform_data_quality_checks(df, dataset_name):
    """Perform basic data quality checks"""
    print(f"\n=== Data Quality Report for {dataset_name} ===")
    print(f"Total records: {len(df)}")
    print(f"Total columns: {len(df.columns)}")
    
    # Check for missing values
    missing_data = df.isnull().sum()
    if missing_data.sum() > 0:
        print("\nMissing Values:")
        print(missing_data[missing_data > 0])
    else:
        print("\nNo missing values found")
    
    # Check for duplicates
    duplicates = df.duplicated().sum()
    print(f"\nDuplicate records: {duplicates}")
    
    # Data types
    print("\nData Types:")
    print(df.dtypes)
    
    return {
        'total_records': len(df),
        'missing_values': missing_data.sum(),
        'duplicates': duplicates
    }

# Perform quality checks on all datasets
quality_reports = {}
for name, df in datasets.items():
    quality_reports[name] = perform_data_quality_checks(df, name)

## 4. Database Connection Setup
Set up connections to PostgreSQL and simulate BigQuery connection.

In [None]:
def setup_database_connections():
    """Setup database connections"""
    connections = {}
    
    # PostgreSQL connection (local)
    try:
        postgres_url = "postgresql://username:password@localhost:5432/dataplatform"
        postgres_engine = create_engine(postgres_url, echo=False)
        connections['postgresql'] = postgres_engine
        logger.info("PostgreSQL connection established")
    except Exception as e:
        logger.warning(f"PostgreSQL connection failed: {e}")
        # Create SQLite as fallback for demo
        sqlite_engine = create_engine('sqlite:///dataplatform.db')
        connections['sqlite'] = sqlite_engine
        logger.info("Using SQLite as fallback database")
    
    return connections

db_connections = setup_database_connections()

## 5. Data Storage
Store ingested and validated data to databases.

In [None]:
def store_data_to_database(datasets, connections):
    """Store datasets to database"""
    # Use the available connection (PostgreSQL or SQLite fallback)
    engine_key = 'postgresql' if 'postgresql' in connections else 'sqlite'
    engine = connections[engine_key]
    
    try:
        for table_name, df in datasets.items():
            # Store data to database
            df.to_sql(f'raw_{table_name}', engine, if_exists='replace', index=False)
            logger.info(f"Stored {table_name} to database table: raw_{table_name}")
            
            # Verify data was stored
            with engine.connect() as conn:
                result = conn.execute(text(f"SELECT COUNT(*) FROM raw_{table_name}"))
                count = result.scalar()
                print(f"Verified {table_name}: {count} records in database")
    
    except Exception as e:
        logger.error(f"Error storing data: {e}")
        return False
    
    return True

# Store all datasets
storage_success = store_data_to_database(datasets, db_connections)
print(f"\nData storage successful: {storage_success}")

## 6. Data Ingestion Summary
Generate a summary report of the ingestion process.

In [None]:
def generate_ingestion_report(datasets, quality_reports):
    """Generate ingestion summary report"""
    print("\n" + "="*50)
    print("DATA INGESTION SUMMARY REPORT")
    print("="*50)
    print(f"Ingestion Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"Total Datasets Processed: {len(datasets)}")
    
    total_records = sum(len(df) for df in datasets.values())
    print(f"Total Records Ingested: {total_records:,}")
    
    print("\nDataset Breakdown:")
    for name, df in datasets.items():
        quality = quality_reports.get(name, {})
        print(f"  {name.capitalize()}:")
        print(f"    - Records: {len(df):,}")
        print(f"    - Columns: {len(df.columns)}")
        print(f"    - Missing Values: {quality.get('missing_values', 0)}")
        print(f"    - Duplicates: {quality.get('duplicates', 0)}")
    
    print("\nData Quality Status: ✅ PASSED")
    print("Database Storage: ✅ COMPLETED")
    print("="*50)

generate_ingestion_report(datasets, quality_reports)