---
## UK Tourism Data – Ingestion and Cleaning Pipeline

This notebook processes the official ONS tourism statistics, handling the 2024 methodological break and producing clean, ready-for-analysis data sets.

---

## 1. Configuration and Imports

In [None]:
import os
import sys
from pathlib import Path
import logging

import pandas as pd
import numpy as np
from openpyxl import load_workbook

# Add src directory to path for utility functions
SRC_DIR = Path.cwd().parent / "src"
sys.path.append(str(SRC_DIR))

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

## 2. Project Set-up

In [None]:
# Define project paths
ROOT_DIR = Path.cwd().parent
DATA_RAW = ROOT_DIR / "data" / "raw"
DATA_PROCESSED = ROOT_DIR / "data" / "processed"

# Ensure directories exist
DATA_PROCESSED.mkdir(parents=True, exist_ok=True)

# Data file
DATA_FILE = DATA_RAW / "overseas-visitors-to-britain-2024.xlsx"

logger.info(f"Project root: {ROOT_DIR}")
logger.info(f"Data file: {DATA_FILE}")

## 3. Data Inspection

In [None]:
# First, inspect the Excel file structure
def inspect_excel_file(file_path: Path):
    """Inspect Excel file structure without loading full data."""
    wb = load_workbook(file_path, read_only=True)
    sheets = wb.sheetnames
    logger.info(f"Available sheets: {sheets}")
    
    # Sample first few rows of each sheet
    for sheet in sheets:
        ws = wb[sheet]
        headers = [cell.value for cell in next(ws.iter_rows(max_row=1))]
        logger.info(f"Sheet '{sheet}' - Columns: {headers[:5]}...")  # First 5 columns
    wb.close()
    return sheets

sheets = inspect_excel_file(DATA_FILE)

## 4. Load and Validate Data

In [None]:
# Load the main data sheet
try:
    df_raw = pd.read_excel(
        DATA_FILE,
        sheet_name=0,  # Assuming first sheet contains main data
        engine='openpyxl'
    )
    logger.info(f"Successfully loaded data: {df_raw.shape[0]} rows, {df_raw.shape[1]} columns")
except Exception as e:
    logger.error(f"Failed to load data: {e}")
    raise

# Initial data quality check
def initial_validation(df: pd.DataFrame):
    """Perform initial data validation."""
    validation_results = {
        'total_rows': len(df),
        'total_columns': len(df.columns),
        'missing_values': df.isnull().sum().sum(),
        'duplicate_rows': df.duplicated().sum()
    }
    
    # Check for expected columns
    expected_cols = ['Quarter', 'Visits', 'Spending', 'Nights stayed']
    found_cols = [col for col in expected_cols if col in df.columns]
    validation_results['expected_columns_found'] = len(found_cols)
    
    logger.info("Initial validation results:")
    for key, value in validation_results.items():
        logger.info(f"  {key}: {value}")
    
    return validation_results

validation = initial_validation(df_raw)

## 5. Data Cleaning Pipeline

In [None]:
def clean_column_names(df: pd.DataFrame) -> pd.DataFrame:
    """Standardise column names."""
    df_clean = df.copy()
    
    # Create mapping for column renaming
    column_mapping = {
        'Quarter': 'quarter',
        'Visits': 'num_visits',
        'Spending': 'spend_gbp_millions', 
        'Nights stayed': 'num_nights',
        # Add other column mappings as needed
    }
    
    # Rename columns
    df_clean = df_clean.rename(columns=column_mapping)
    
    # Clean any remaining columns
    def standardise_name(name: str) -> str:
        if name in column_mapping.values():
            return name
        return (
            str(name)
            .strip()
            .lower()
            .replace(' ', '_')
            .replace('-', '_')
            .replace('/', '_')
        )
    
    df_clean.columns = [standardise_name(col) for col in df_clean.columns]
    return df_clean

df_clean = clean_column_names(df_raw)

## 6. Parse Temporal Data

In [None]:
def parse_quarter_data(df: pd.DataFrame) -> pd.DataFrame:
    """Parse quarter information into a proper datetime period index."""
    df_temp = df.copy()
    
    # Handle different quarter formats
    if 'quarter' in df_temp.columns:
        # Extract year and quarter (assumes formats like 'Q1 2024' or '1 2024')
        quarter_data = df_temp['quarter'].astype(str).str.extract(r'Q?(\d)\s*(\d{4})')
        
        if not quarter_data.empty:
            df_temp['year'] = quarter_data[1].astype(int)
            df_temp['quarter_num'] = quarter_data[0].astype(int)
            
            # Create period index as quarterly periods
            df_temp['period'] = df_temp.apply(
                lambda x: pd.Period(f"{x['year']}Q{x['quarter_num']}", freq='Q'),
                axis=1
            )
            df_temp = df_temp.set_index('period')
            
            # Add scope flag for the methodological break
            df_temp['geographic_scope'] = df_temp.index.to_series().apply(
                lambda p: 'GB' if p.year >= 2024 else 'UK'
            )
    
    return df_temp

df_processed = parse_quarter_data(df_clean)
logger.info(f"Data after temporal processing: {len(df_processed)} rows")

## 7. Handle Data Types and Quality

In [None]:
def enforce_data_types(df: pd.DataFrame) -> pd.DataFrame:
    """Ensure proper data types and handle missing values."""
    df_typed = df.copy()
    
    # Numeric columns with robust error handling
    numeric_columns = ['num_visits', 'spend_gbp_millions', 'num_nights']
    
    for col in numeric_columns:
        if col in df_typed.columns:
            original_non_null = df_typed[col].notna().sum()
            df_typed[col] = pd.to_numeric(df_typed[col], errors='coerce')
            new_non_null = df_typed[col].notna().sum()
            
            if new_non_null < original_non_null:
                logger.warning(f"Coerced {original_non_null - new_non_null} non-numeric values to NaN in {col}")
    
    return df_typed

df_final = enforce_data_types(df_processed)

# Final data quality report
def generate_quality_report(df: pd.DataFrame) -> dict:
    """Generate a comprehensive data quality report."""
    report = {
        'final_row_count': len(df),
        'final_column_count': len(df.columns),
        'date_range': f"{df.index.min()} to {df.index.max()}" if hasattr(df.index, 'min') else 'N/A',
        'scope_breakdown': df['geographic_scope'].value_counts().to_dict() if 'geographic_scope' in df.columns else 'N/A'
    }
    
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    for col in numeric_cols:
        report[f'{col}_stats'] = {
            'min': df[col].min(),
            'max': df[col].max(), 
            'mean': df[col].mean(),
            'null_count': df[col].isna().sum()
        }
    
    logger.info("Final data quality report:")
    for key, value in report.items():
        logger.info(f"  {key}: {value}")
    
    return report

quality_report = generate_quality_report(df_final)

## 8. Export Clean Data

In [None]:
# Export to multiple formats for different use cases
output_files = {}

# Parquet for analysis
parquet_path = DATA_PROCESSED / "uk_tourism_clean.parquet"
df_final.to_parquet(parquet_path, index=True)
output_files['parquet'] = parquet_path

# CSV for compatibility
csv_path = DATA_PROCESSED / "uk_tourism_clean.csv"
df_final.to_csv(csv_path, index=True)
output_files['csv'] = csv_path

logger.info("Clean data exported to:")
for format_type, file_path in output_files.items():
    logger.info(f"  {format_type.upper()}: {file_path}")

# Create data dictionary
data_dictionary = {
    'quarter': 'Quarter period (Pandas Period object)',
    'num_visits': 'Number of overseas visits (thousands)',
    'spend_gbp_millions': 'Spending in GBP millions',
    'num_nights': 'Number of nights stayed (millions)',
    'geographic_scope': 'Geographical coverage: UK (2019–2023) or GB (2024+)' 
}

dict_path = DATA_PROCESSED / "data_dictionary.json"
import json
with open(dict_path, 'w') as f:
    json.dump(data_dictionary, f, indent=2)

logger.info(f"Data dictionary saved to: {dict_path}")

## 9. Summary

In [None]:
logger.info("🎉 Data ingestion and cleaning completed successfully!")
logger.info(f"📊 Final dataset: {len(df_final)} rows, {len(df_final.columns)} columns")
logger.info(f"📅 Date range: {df_final.index.min()} to {df_final.index.max()}")
logger.info("✅ Ready for exploratory analysis!")