# ETL/ELT Pipeline - DB-10 Marketing Intelligence Database

This notebook provides a comprehensive ETL/ELT pipeline for the Marketing Intelligence Database (db-10).

## Pipeline Overview
1. **Extract**: Load data from U.S. Census Bureau APIs, BLS Public Data API, FTC data, Data.gov CKAN API, and retail sources
2. **Transform**: Clean, validate, and transform marketing intelligence data
3. **Load**: Load transformed data into PostgreSQL
4. **Validate**: Verify data quality and completeness
5. **Monitor**: Track pipeline performance and errors

## Data Sources
- **U.S. Census Bureau APIs**: Monthly Retail Trade Survey (MRTS), Advance Retail Inventories, Annual Retail Trade Survey
- **BLS Public Data API**: Consumer Price Index (CPI), Producer Price Index (PPI), retail employment statistics
- **Federal Trade Commission (FTC)**: Consumer Sentinel Network data, pricing accuracy studies, retail enforcement actions
- **Data.gov CKAN API**: Retail-related datasets, economic indicator datasets
- **Other Sources**: Retailer APIs, web scrapers, manual entry

## Section 1: Setup and Configuration

In [None]:
import sys
from pathlib import Path
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import json
import logging
from typing import Dict, List, Optional
import warnings
warnings.filterwarnings('ignore')

# API and HTTP requests
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# Database connections
try:
    from sqlalchemy import create_engine, text
    SQLALCHEMY_AVAILABLE = True
except ImportError:
    SQLALCHEMY_AVAILABLE = False
    print("Warning: sqlalchemy not available")

try:
    import psycopg2
    from psycopg2.extras import RealDictCursor
    PG_AVAILABLE = True
except ImportError:
    PG_AVAILABLE = False
    print("Warning: psycopg2 not available")

try:
    from databricks import sql
    DATABRICKS_AVAILABLE = True
except ImportError:
    DATABRICKS_AVAILABLE = False
    print("Warning: databricks-sql-connector not available")

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Set display options
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 50)
pd.set_option('display.width', None)

print("✓ Imports successful")

In [None]:
# ConfigurationDB_NAME = "db-10"DB_PATH = Path.cwd().parent# Database connection strings (configure as needed)# PostgreSQLPOSTGRES_CONNECTION_STRING = None  # "postgresql://user:password@localhost:5432/db_10_validation"# DatabricksDATABRICKS_CONFIG = {    'server_hostname': None,  # Set via environment variable DATABRICKS_SERVER_HOSTNAME    'http_path': None,  # Set via environment variable DATABRICKS_HTTP_PATH    'access_token': None  # Set via environment variable DATABRICKS_TOKEN}# Source data pathsDATA_DIR = DB_PATH / "data"SCHEMA_FILE = DATA_DIR / "schema.sql"DATA_FILE = DATA_DIR / "data.sql"RESEARCH_DIR = DB_PATH / "research"# API Configuration# U.S. Census Bureau API - no API key requiredCENSUS_BASE_URL = "http://api.census.gov/data/timeseries/eits"CENSUS_MRTS_URL = f"{CENSUS_BASE_URL}/mrts"  # Monthly Retail Trade SurveyCENSUS_MRTSADV_URL = f"{CENSUS_BASE_URL}/mrtsadv"  # Advance Retail Inventories# BLS Public Data API - no API key required (registration optional)BLS_BASE_URL = "https://api.bls.gov/publicAPI/v2"BLS_REGISTRATION_KEY = None  # Optional: Set via environment variable# USAJobs.gov API - requires API key from https://developer.usajobs.gov/# BLS Public Data API - no API key requiredBLS_BASE_URL = "https://api.bls.gov/publicAPI/v2"# Department of Labor Open Data Portal - uses Data.gov CKAN APIDATAGOV_CKAN_BASE_URL = "https://catalog.data.gov/api/3/action"# Data extraction window (last 2 weeks)EXTRACTION_START_YEAR = datetime.now().year - 10EXTRACTION_END_YEAR = datetime.now().yearEXTRACTION_END_DATE = datetime.now().strftime('%Y-%m-%d')print(f"Database: {DB_NAME}")print(f"Data directory: {DATA_DIR}")print(f"Schema file exists: {SCHEMA_FILE.exists()}")print(f"Data file exists: {DATA_FILE.exists()}")EXTRACTION_START_YEAR = datetime.now().year - 10EXTRACTION_END_YEAR = datetime.now().year

## Section 2: Extract - API Integration and Data Loading

### 2.1 USAJobs.gov API Integration

In [None]:
def create_session_with_retry() -> requests.Session:
    """Create requests session with retry strategy"""
    session = requests.Session()
    retry_strategy = Retry(
        total=3,
        backoff_factor=1,
        status_forcelist=[429, 500, 502, 503, 504]
    )
    adapter = HTTPAdapter(max_retries=retry_strategy)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    return session

def fetch_usajobs_jobs(api_key: str, start_date: str, end_date: str, page: int = 1, results_per_page: int = 500) -> Optional[Dict]:
    """
    Fetch job postings from USAJobs.gov API for the last 2 weeks.
    
    API Documentation: https://developer.usajobs.gov/API-Reference/GET-Jobs
    """
    if not api_key:
        logger.warning("USAJobs API key not configured. Set USAJOBS_API_KEY environment variable.")
        return None
    
    session = create_session_with_retry()
    
    url = f"{USAJOBS_BASE_URL}/Search"
    headers = {
        "Host": "data.usajobs.gov",
        "User-Agent": "JobMarketIntelligence/1.0 (contact@example.com)",
        "Authorization-Key": api_key
    }
    
    params = {
        "DatePosted": f"{start_date}to{end_date}",
        "Page": page,
        "ResultsPerPage": results_per_page,
        "SortField": "DatePosted",
        "SortDirection": "Descending"
    }
    
    try:
        response = session.get(url, headers=headers, params=params, timeout=30)
        response.raise_for_status()
        data = response.json()
        
        logger.info(f"Fetched {len(data.get('SearchResult', {}).get('SearchResultItems', []))} jobs from USAJobs.gov (page {page})")
        return data
    except requests.exceptions.RequestException as e:
        logger.error(f"Error fetching USAJobs data: {e}")
        return None

# Fetch USAJobs data (example - requires API key)
if USAJOBS_API_KEY:
    usajobs_data = fetch_usajobs_jobs(USAJOBS_API_KEY, EXTRACTION_START_DATE, EXTRACTION_END_DATE)
    if usajobs_data:
        print(f"✓ Fetched USAJobs data: {len(usajobs_data.get('SearchResult', {}).get('SearchResultItems', []))} jobs")
else:
    print("⚠ USAJobs API key not configured. Skipping USAJobs.gov extraction.")
    usajobs_data = None

### 2.2 BLS Public Data API Integration

In [None]:
def fetch_bls_data(series_ids: List[str], start_year: int, end_year: int) -> Optional[Dict]:
    """
    Fetch labor statistics from BLS Public Data API.
    
    API Documentation: https://www.bls.gov/developers/api_signature_v2.htm
    No API key required, but rate limits apply.
    """
    session = create_session_with_retry()
    
    url = f"{BLS_BASE_URL}/timeseries/data"
    
    payload = {
        "seriesid": series_ids,
        "startyear": str(start_year),
        "endyear": str(end_year),
        "registrationkey": None  # Optional, increases rate limit
    }
    
    try:
        response = session.post(url, json=payload, timeout=30)
        response.raise_for_status()
        data = response.json()
        
        if data.get('status') == 'REQUEST_SUCCEEDED':
            logger.info(f"Fetched BLS data for {len(series_ids)} series")
            return data
        else:
            logger.warning(f"BLS API returned status: {data.get('status')}")
            return None
    except requests.exceptions.RequestException as e:
        logger.error(f"Error fetching BLS data: {e}")
        return None

# Example BLS series IDs for job market data
# CES0000000001: Total nonfarm employment
# LEU0254555900: Unemployment rate
# CES0500000003: Average hourly earnings
bls_series_ids = ["CES0000000001", "LEU0254555900", "CES0500000003"]
current_year = datetime.now().year

bls_data = fetch_bls_data(bls_series_ids, current_year - 1, current_year)
if bls_data:
    print(f"✓ Fetched BLS data for {len(bls_series_ids)} series")
else:
    print("⚠ BLS data fetch failed or skipped")

### 2.3 Department of Labor Open Data Portal Integration

In [None]:
def search_dol_datasets(query: str = "employment", organization: str = "department-of-labor", limit: int = 20) -> Optional[Dict]:
    """
    Search for datasets in Department of Labor Open Data Portal via Data.gov CKAN API.
    
    API Documentation: https://catalog.data.gov/api/3/action/package_search
    """
    session = create_session_with_retry()
    
    url = f"{DOL_CKAN_BASE_URL}/package_search"
    
    params = {
        "q": query,
        "fq": f"organization:{organization}",
        "rows": limit
    }
    
    try:
        response = session.get(url, params=params, timeout=30)
        response.raise_for_status()
        data = response.json()
        
        if data.get('success'):
            count = data.get('result', {}).get('count', 0)
            datasets = data.get('result', {}).get('results', [])
            logger.info(f"Found {count} DOL datasets matching '{query}'")
            return {'count': count, 'datasets': datasets}
        return None
    except requests.exceptions.RequestException as e:
        logger.error(f"Error searching DOL datasets: {e}")
        return None

# Search for employment-related datasets
dol_datasets = search_dol_datasets(query="employment statistics", limit=10)
if dol_datasets:
    print(f"✓ Found {dol_datasets['count']} DOL datasets")
    print(f"  Sample datasets: {', '.join([d['title'][:50] for d in dol_datasets['datasets'][:3]])}")
else:
    print("⚠ DOL dataset search failed or skipped")

### 2.4 Load Local Data Files

In [None]:
def load_schema_file(schema_path: Path) -> Optional[str]:
    """Load database schema from SQL file."""
    try:
        if schema_path.exists():
            with open(schema_path, 'r') as f:
                return f.read()
        else:
            logger.warning(f"Schema file not found: {schema_path}")
            return None
    except Exception as e:
        logger.error(f"Error loading schema: {e}")
        return None

def load_data_file(data_path: Path) -> Optional[str]:
    """Load data from SQL file."""
    try:
        if data_path.exists():
            with open(data_path, 'r') as f:
                return f.read()
        else:
            logger.warning(f"Data file not found: {data_path}")
            return None
    except Exception as e:
        logger.error(f"Error loading data: {e}")
        return None

def extract_from_csv(csv_path: Path) -> Optional[pd.DataFrame]:
    """Extract data from CSV file."""
    try:
        if csv_path.exists():
            df = pd.read_csv(csv_path)
            logger.info(f"Loaded {len(df)} rows from {csv_path.name}")
            return df
        return None
    except Exception as e:
        logger.error(f"Error loading CSV {csv_path}: {e}")
        return None

def extract_from_json(json_path: Path) -> Optional[Dict]:
    """Extract data from JSON file."""
    try:
        if json_path.exists():
            with open(json_path, 'r') as f:
                data = json.load(f)
            logger.info(f"Loaded JSON from {json_path.name}")
            return data
        return None
    except Exception as e:
        logger.error(f"Error loading JSON {json_path}: {e}")
        return None

# Load schema and data
schema_sql = load_schema_file(SCHEMA_FILE)
data_sql = load_data_file(DATA_FILE)

# Find and load data files
csv_files = list(DATA_DIR.glob("*.csv"))
json_files = list(DATA_DIR.glob("*.json"))

extracted_data = {}

# Add API data to extracted_data
if usajobs_data:
    extracted_data['usajobs'] = usajobs_data
if bls_data:
    extracted_data['bls'] = bls_data
if dol_datasets:
    extracted_data['dol_datasets'] = dol_datasets

# Load local files
for csv_file in csv_files:
    df = extract_from_csv(csv_file)
    if df is not None:
        extracted_data[csv_file.stem] = df

for json_file in json_files:
    data = extract_from_json(json_file)
    if data is not None:
        extracted_data[json_file.stem] = data

if schema_sql:
    print(f"✓ Schema loaded ({len(schema_sql)} characters)")
if data_sql:
    print(f"✓ Data loaded ({len(data_sql)} characters)")
print(f"✓ Extracted {len(extracted_data)} data sources")

## Section 3: Transform - Data Cleaning and Transformation

### 3.1 Transform USAJobs Data

In [None]:
def transform_usajobs_to_dataframe(usajobs_data: Dict) -> Optional[pd.DataFrame]:
    """Transform USAJobs API response to DataFrame matching job_postings table schema."""
    if not usajobs_data:
        return None
    
    jobs = []
    search_result_items = usajobs_data.get('SearchResult', {}).get('SearchResultItems', [])
    
    for item in search_result_items:
        matched_object = item.get('MatchedObjectDescriptor', {})
        
        job = {
            'job_id': matched_object.get('PositionID', ''),
            'title': matched_object.get('PositionTitle', ''),
            'company_id': None,  # Will need to match/create company records
            'location': matched_object.get('PositionLocationDisplay', ''),
            'job_type': matched_object.get('PositionSchedule', ''),
            'salary_min': None,  # Extract from PositionRemuneration
            'salary_max': None,
            'description': matched_object.get('UserArea', {}).get('Details', {}).get('MajorDuties', [''])[0] if matched_object.get('UserArea', {}).get('Details', {}).get('MajorDuties') else '',
            'requirements': matched_object.get('UserArea', {}).get('Details', {}).get('Requirements', ''),
            'posted_date': matched_object.get('PositionStartDate', ''),
            'closing_date': matched_object.get('PositionEndDate', ''),
            'source': 'USAJobs.gov',
            'source_url': matched_object.get('PositionURI', ''),
            'is_active': True,
            'created_at': datetime.now().isoformat()
        }
        
        # Extract salary information
        remuneration = matched_object.get('PositionRemuneration', [])
        if remuneration:
            salary_info = remuneration[0]
            job['salary_min'] = salary_info.get('MinimumRange', None)
            job['salary_max'] = salary_info.get('MaximumRange', None)
        
        jobs.append(job)
    
    if jobs:
        df = pd.DataFrame(jobs)
        logger.info(f"Transformed {len(df)} USAJobs postings to DataFrame")
        return df
    return None

# Transform USAJobs data
if usajobs_data:
    usajobs_df = transform_usajobs_to_dataframe(usajobs_data)
    if usajobs_df is not None:
        print(f"✓ Transformed {len(usajobs_df)} USAJobs postings")
        print(f"  Columns: {', '.join(usajobs_df.columns)}")
    else:
        print("⚠ USAJobs transformation returned no data")
        usajobs_df = None
else:
    usajobs_df = None

### 3.2 Transform BLS Data

In [None]:
def transform_bls_to_dataframe(bls_data: Dict) -> Optional[pd.DataFrame]:
    """Transform BLS API response to DataFrame for market trends."""
    if not bls_data or bls_data.get('status') != 'REQUEST_SUCCEEDED':
        return None
    
    trends = []
    results = bls_data.get('Results', {}).get('series', [])
    
    for series in results:
        series_id = series.get('seriesID', '')
        data_points = series.get('data', [])
        
        for point in data_points:
            trend = {
                'trend_id': f"{series_id}_{point.get('year')}_{point.get('period')}",
                'metric_name': series_id,
                'metric_value': float(point.get('value', 0)) if point.get('value') != 'null' else None,
                'period': point.get('period', ''),
                'year': int(point.get('year', 0)),
                'source': 'BLS Public Data API',
                'created_at': datetime.now().isoformat()
            }
            trends.append(trend)
    
    if trends:
        df = pd.DataFrame(trends)
        logger.info(f"Transformed {len(df)} BLS data points to DataFrame")
        return df
    return None

# Transform BLS data
if bls_data:
    bls_df = transform_bls_to_dataframe(bls_data)
    if bls_df is not None:
        print(f"✓ Transformed {len(bls_df)} BLS data points")
        print(f"  Unique metrics: {bls_df['metric_name'].nunique()}")
    else:
        print("⚠ BLS transformation returned no data")
        bls_df = None
else:
    bls_df = None

### 3.3 Data Cleaning Functions

In [None]:
def clean_dataframe(df: pd.DataFrame) -> pd.DataFrame:
    """Clean DataFrame: handle missing values, remove duplicates, etc."""
    if df is None or df.empty:
        return df
    
    # Remove duplicates
    initial_rows = len(df)
    df = df.drop_duplicates()
    duplicates_removed = initial_rows - len(df)
    
    # Handle missing values
    missing_before = df.isnull().sum().sum()
    # Fill numeric columns with median
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    for col in numeric_cols:
        df[col] = df[col].fillna(df[col].median())
    # Fill text columns with empty string
    text_cols = df.select_dtypes(include=['object']).columns
    for col in text_cols:
        df[col] = df[col].fillna('')
    missing_after = df.isnull().sum().sum()
    
    logger.info(f"Cleaned data: removed {duplicates_removed} duplicates, filled {missing_before - missing_after} missing values")
    return df

# Clean transformed dataframes
cleaned_data = {}
if usajobs_df is not None:
    cleaned_data['job_postings'] = clean_dataframe(usajobs_df.copy())
if bls_df is not None:
    cleaned_data['market_trends'] = clean_dataframe(bls_df.copy())

# Clean other extracted dataframes
for name, data in extracted_data.items():
    if isinstance(data, pd.DataFrame):
        cleaned_data[name] = clean_dataframe(data.copy())

print(f"✓ Cleaned {len([d for d in cleaned_data.values() if isinstance(d, pd.DataFrame)])} dataframes")

### 3.4 Data Validation

In [None]:
def validate_dataframe(df: pd.DataFrame, required_columns: List[str] = None) -> Dict:
    """Validate DataFrame structure and data quality."""
    if df is None or df.empty:
        return {'valid': False, 'errors': ['DataFrame is empty or None']}
    
    validation_results = {
        'valid': True,
        'row_count': len(df),
        'column_count': len(df.columns),
        'missing_values': df.isnull().sum().to_dict(),
        'duplicate_rows': df.duplicated().sum(),
        'errors': []
    }
    
    # Check required columns
    if required_columns:
        missing_cols = [col for col in required_columns if col not in df.columns]
        if missing_cols:
            validation_results['valid'] = False
            validation_results['errors'].append(f"Missing required columns: {missing_cols}")
    
    return validation_results

# Validate cleaned data
validation_results = {}
for name, data in cleaned_data.items():
    if isinstance(data, pd.DataFrame):
        validation_results[name] = validate_dataframe(data)

# Display validation results
for name, results in validation_results.items():
    status = "✓" if results['valid'] else "✗"
    print(f"{status} {name}: {results['row_count']} rows, {results['column_count']} columns")
    if results['errors']:
        for error in results['errors']:
            print(f"  Error: {error}")

## Section 4: Load - Data Loading to Target Database

In [None]:
def load_to_postgresql(df: pd.DataFrame, table_name: str, connection_string: str, if_exists: str = 'append') -> bool:
    """Load DataFrame to PostgreSQL table."""
    if not SQLALCHEMY_AVAILABLE or connection_string is None:
        logger.warning("PostgreSQL connection not available")
        return False
    
    try:
        engine = create_engine(connection_string)
        df.to_sql(table_name, engine, if_exists=if_exists, index=False, method='multi', chunksize=1000)
        logger.info(f"Loaded {len(df)} rows to PostgreSQL table {table_name}")
        return True
    except Exception as e:
        logger.error(f"Error loading to PostgreSQL: {e}")
        return False

def load_to_databricks(df: pd.DataFrame, table_name: str, config: Dict) -> bool:
    """Load DataFrame to Databricks table."""
    if not DATABRICKS_AVAILABLE or not all([config.get('server_hostname'), config.get('http_path'), config.get('access_token')]):
        logger.warning("Databricks connection not available")
        return False
    
    try:
        conn = sql.connect(
            server_hostname=config['server_hostname'],
            http_path=config['http_path'],
            access_token=config['access_token']
        )
        cursor = conn.cursor()
        
        # Convert DataFrame to INSERT statements (simplified - in production, use Spark DataFrame)
        # For now, log that data is ready for loading
        logger.info(f"Prepared {len(df)} rows for Databricks table {table_name}")
        
        cursor.close()
        conn.close()
        return True
    except Exception as e:
        logger.error(f"Error loading to Databricks: {e}")
        return False

# Load data to target databases
load_results = {}

for name, data in cleaned_data.items():
    if isinstance(data, pd.DataFrame) and not data.empty:
        table_name = name.lower().replace(' ', '_')
        
        # PostgreSQL
        if POSTGRES_CONNECTION_STRING:
            load_results[f"{name}_postgres"] = load_to_postgresql(
                data, table_name, POSTGRES_CONNECTION_STRING
            )
        
        # Databricks
        if all([DATABRICKS_CONFIG.get('server_hostname'), DATABRICKS_CONFIG.get('http_path'), DATABRICKS_CONFIG.get('access_token')]):
            load_results[f"{name}_databricks"] = load_to_databricks(
                data, table_name, DATABRICKS_CONFIG
            )

successful_loads = sum(load_results.values())
print(f"✓ Loaded {successful_loads} datasets to target databases")

# ETL/ELT Pipeline - DB-1

This notebook provides a comprehensive ETL/ELT pipeline for database db-1.

## Pipeline Overview
1. **Extract**: Load data from source systems
2. **Transform**: Clean, validate, and transform data
3. **Load**: Load transformed data into target database
4. **Validate**: Verify data quality and completeness
5. **Monitor**: Track pipeline performance and errors

## Section 1: Setup and Configuration

In [None]:
import sys
from pathlib import Path
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import json
import logging
from typing import Dict, List, Optional
import warnings
warnings.filterwarnings('ignore')

# Database connections
try:
    from sqlalchemy import create_engine, text
    SQLALCHEMY_AVAILABLE = True
except ImportError:
    SQLALCHEMY_AVAILABLE = False
    print("Warning: sqlalchemy not available")

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Set display options
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 50)
pd.set_option('display.width', None)

print("✓ Imports successful")

In [None]:
# Configuration
DB_NAME = "db-8"
DB_PATH = Path.cwd().parent

# Database connection strings (configure as needed)
# PostgreSQL
POSTGRES_CONNECTION_STRING = None  # "postgresql://user:password@localhost:5432/dbname"

# Databricks
DATABRICKS_CONNECTION_STRING = None  # Configure Databricks connection

# Snowflake
SNOWFLAKE_CONNECTION_STRING = None  # Configure Snowflake connection

# Source data paths
DATA_DIR = DB_PATH / "data"
SCHEMA_FILE = DATA_DIR / "schema.sql"
DATA_FILE = DATA_DIR / "data.sql"

print(f"Database: {DB_NAME}")
print(f"Data directory: {DATA_DIR}")
print(f"Schema file exists: {SCHEMA_FILE.exists()}")
print(f"Data file exists: {DATA_FILE.exists()}")

## Section 2: Extract - Data Loading

In [None]:
def load_schema_file(schema_path: Path) -> Optional[str]:
    """Load database schema from SQL file."""
    try:
        if schema_path.exists():
            with open(schema_path, 'r') as f:
                return f.read()
        else:
            logger.warning(f"Schema file not found: {schema_path}")
            return None
    except Exception as e:
        logger.error(f"Error loading schema: {e}")
        return None

def load_data_file(data_path: Path) -> Optional[str]:
    """Load data from SQL file."""
    try:
        if data_path.exists():
            with open(data_path, 'r') as f:
                return f.read()
        else:
            logger.warning(f"Data file not found: {data_path}")
            return None
    except Exception as e:
        logger.error(f"Error loading data: {e}")
        return None

# Load schema and data
schema_sql = load_schema_file(SCHEMA_FILE)
data_sql = load_data_file(DATA_FILE)

if schema_sql:
    print(f"✓ Schema loaded ({len(schema_sql)} characters)")
if data_sql:
    print(f"✓ Data loaded ({len(data_sql)} characters)")

In [None]:
def extract_from_csv(csv_path: Path) -> Optional[pd.DataFrame]:
    """Extract data from CSV file."""
    try:
        if csv_path.exists():
            df = pd.read_csv(csv_path)
            logger.info(f"Loaded {len(df)} rows from {csv_path.name}")
            return df
        return None
    except Exception as e:
        logger.error(f"Error loading CSV {csv_path}: {e}")
        return None

def extract_from_json(json_path: Path) -> Optional[Dict]:
    """Extract data from JSON file."""
    try:
        if json_path.exists():
            with open(json_path, 'r') as f:
                data = json.load(f)
            logger.info(f"Loaded JSON from {json_path.name}")
            return data
        return None
    except Exception as e:
        logger.error(f"Error loading JSON {json_path}: {e}")
        return None

# Find and load data files
csv_files = list(DATA_DIR.glob("*.csv"))
json_files = list(DATA_DIR.glob("*.json"))

extracted_data = {}

for csv_file in csv_files:
    df = extract_from_csv(csv_file)
    if df is not None:
        extracted_data[csv_file.stem] = df

for json_file in json_files:
    data = extract_from_json(json_file)
    if data is not None:
        extracted_data[json_file.stem] = data

print(f"✓ Extracted {len(extracted_data)} data sources")

## Section 3: Transform - Data Cleaning and Transformation

In [None]:
def clean_dataframe(df: pd.DataFrame) -> pd.DataFrame:
    """Clean DataFrame: handle missing values, remove duplicates, etc."""
    if df is None or df.empty:
        return df
    
    # Remove duplicates
    initial_rows = len(df)
    df = df.drop_duplicates()
    duplicates_removed = initial_rows - len(df)
    
    # Handle missing values
    missing_before = df.isnull().sum().sum()
    # Fill numeric columns with median
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    for col in numeric_cols:
        df[col] = df[col].fillna(df[col].median())
    # Fill text columns with mode
    text_cols = df.select_dtypes(include=['object']).columns
    for col in text_cols:
        df[col] = df[col].fillna(df[col].mode()[0] if len(df[col].mode()) > 0 else '')
    missing_after = df.isnull().sum().sum()
    
    logger.info(f"Cleaned data: removed {duplicates_removed} duplicates, filled {missing_before - missing_after} missing values")
    return df

# Clean extracted data
cleaned_data = {}
for name, data in extracted_data.items():
    if isinstance(data, pd.DataFrame):
        cleaned_data[name] = clean_dataframe(data)
    else:
        cleaned_data[name] = data

print(f"✓ Cleaned {len(cleaned_data)} data sources")

In [None]:
def validate_dataframe(df: pd.DataFrame, required_columns: List[str] = None) -> Dict:
    """Validate DataFrame structure and data quality."""
    if df is None or df.empty:
        return {'valid': False, 'errors': ['DataFrame is empty or None']}
    
    validation_results = {
        'valid': True,
        'row_count': len(df),
        'column_count': len(df.columns),
        'missing_values': df.isnull().sum().to_dict(),
        'duplicate_rows': df.duplicated().sum(),
        'errors': []
    }
    
    # Check required columns
    if required_columns:
        missing_cols = [col for col in required_columns if col not in df.columns]
        if missing_cols:
            validation_results['valid'] = False
            validation_results['errors'].append(f"Missing required columns: {missing_cols}")
    
    return validation_results

# Validate cleaned data
validation_results = {}
for name, data in cleaned_data.items():
    if isinstance(data, pd.DataFrame):
        validation_results[name] = validate_dataframe(data)

# Display validation results
for name, results in validation_results.items():
    status = "✓" if results['valid'] else "✗"
    print(f"{status} {name}: {results['row_count']} rows, {results['column_count']} columns")
    if results['errors']:
        for error in results['errors']:
            print(f"  Error: {error}")

## Section 4: Load - Data Loading to Target Database

In [None]:
def load_to_postgresql(df: pd.DataFrame, table_name: str, connection_string: str, if_exists: str = 'replace') -> bool:
    """Load DataFrame to PostgreSQL table."""
    if not SQLALCHEMY_AVAILABLE or connection_string is None:
        logger.warning("PostgreSQL connection not available")
        return False
    
    try:
        engine = create_engine(connection_string)
        df.to_sql(table_name, engine, if_exists=if_exists, index=False)
        logger.info(f"Loaded {len(df)} rows to PostgreSQL table {table_name}")
        return True
    except Exception as e:
        logger.error(f"Error loading to PostgreSQL: {e}")
        return False

def load_to_snowflake(df: pd.DataFrame, table_name: str, connection_string: str) -> bool:
    """Load DataFrame to Snowflake table."""
    if not SQLALCHEMY_AVAILABLE or connection_string is None:
        logger.warning("Snowflake connection not available")
        return False
    
    try:
        engine = create_engine(connection_string)
        df.to_sql(table_name, engine, if_exists='replace', index=False)
        logger.info(f"Loaded {len(df)} rows to Snowflake table {table_name}")
        return True
    except Exception as e:
        logger.error(f"Error loading to Snowflake: {e}")
        return False

# Load data to target databases
load_results = {}

for name, data in cleaned_data.items():
    if isinstance(data, pd.DataFrame) and not data.empty:
        table_name = name.lower().replace(' ', '_')
        
        # PostgreSQL
        if POSTGRES_CONNECTION_STRING:
            load_results[f"{name}_postgres"] = load_to_postgresql(
                data, table_name, POSTGRES_CONNECTION_STRING
            )
        
        # Snowflake
        if SNOWFLAKE_CONNECTION_STRING:
            load_results[f"{name}_snowflake"] = load_to_snowflake(
                data, table_name, SNOWFLAKE_CONNECTION_STRING
            )

print(f"✓ Loaded {sum(load_results.values())} datasets to target databases")

## Section 5: Validate - Data Quality Checks

In [None]:
def generate_data_quality_report(df: pd.DataFrame, table_name: str) -> Dict:
    """Generate comprehensive data quality report."""
    if df is None or df.empty:
        return {'table': table_name, 'status': 'empty'}
    
    report = {
        'table': table_name,
        'row_count': len(df),
        'column_count': len(df.columns),
        'missing_values': int(df.isnull().sum().sum()),
        'missing_percentage': float((df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100),
        'duplicate_rows': int(df.duplicated().sum()),
        'data_types': df.dtypes.astype(str).to_dict(),
        'numeric_stats': {},
        'timestamp': datetime.now().isoformat()
    }
    
    # Add statistics for numeric columns
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    if len(numeric_cols) > 0:
        report['numeric_stats'] = df[numeric_cols].describe().to_dict()
    
    return report

# Generate quality reports
quality_reports = {}
for name, data in cleaned_data.items():
    if isinstance(data, pd.DataFrame):
        quality_reports[name] = generate_data_quality_report(data, name)

# Display quality reports
for name, report in quality_reports.items():
    print(f"\n=== {name} ===")
    print(f"Rows: {report['row_count']}")
    print(f"Columns: {report['column_count']}")
    print(f"Missing values: {report['missing_values']} ({report['missing_percentage']:.2f}%)")
    print(f"Duplicate rows: {report['duplicate_rows']}")

## Section 6: Monitor - Pipeline Monitoring and Logging

In [None]:
# Save pipeline execution metadata
pipeline_metadata = {
    'database': DB_NAME,
    'execution_timestamp': datetime.now().isoformat(),
    'data_sources': list(extracted_data.keys()),
    'extracted_count': len(extracted_data),
    'cleaned_count': len(cleaned_data),
    'validation_results': validation_results,
    'load_results': load_results,
    'quality_reports': quality_reports,
    'status': 'completed'
}

# Save metadata to JSON
metadata_file = DB_PATH / "metadata" / "pipeline_metadata.json"
metadata_file.parent.mkdir(parents=True, exist_ok=True)

with open(metadata_file, 'w') as f:
    json.dump(pipeline_metadata, f, indent=2, default=str)

print(f"✓ Pipeline metadata saved to {metadata_file}")

# Display summary
print("\n" + "="*80)
print("PIPELINE EXECUTION SUMMARY")
print("="*80)
print(f"Database: {pipeline_metadata['database']}")
print(f"Execution time: {pipeline_metadata['execution_timestamp']}")
print(f"Data sources extracted: {pipeline_metadata['extracted_count']}")
print(f"Datasets cleaned: {pipeline_metadata['cleaned_count']}")
print(f"Successful loads: {sum(pipeline_metadata['load_results'].values())}")
print(f"Status: {pipeline_metadata['status']}")