# 📊 Data Quality Analysis - Financial Transaction Data

## 🎯 Overview
This notebook demonstrates the complete data quality analysis workflow for financial transaction data using our Data Quality Framework.

**Objectives:**
- Load and validate financial transaction data
- Run comprehensive data quality checks
- Analyze validation results and patterns
- Generate detailed quality reports
- Provide actionable insights for data governance

**Author:** Srinivasan V  
**Date:** July 2025  
**Framework Version:** 1.0

# Data Quality Framework for Financial Transactions

## Overview
This notebook demonstrates a comprehensive data quality framework for financial transaction data using PySpark. It includes validation checks for mandatory fields, value ranges, duplicates, and referential integrity, along with automated reporting capabilities.

## Key Features
- ✅ Modular, reusable data quality check functions
- ✅ Config-driven validation rules
- ✅ Failed records logging and inspection
- ✅ Weekly data quality reports
- ✅ PySpark for scalable data processing

## Data Schema
- **transaction_id**: Unique identifier for each transaction
- **account_id**: Account identifier
- **amount**: Transaction amount (must be > 0)
- **currency**: Currency code (must be in approved list)
- **timestamp**: Transaction timestamp

In [None]:
# Import Required Libraries and Set Up Spark Session
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import json
import os
import sys
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# Add parent directory to path to import our modules
sys.path.append('../')
from src.data_quality_framework import DataQualityFramework
from src.validators import *
from src.utils import load_config
from src.report_generator import DataQualityReportGenerator

# PySpark imports
try:
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    PYSPARK_AVAILABLE = True
    print("✓ PySpark is available")
except ImportError:
    PYSPARK_AVAILABLE = False
    print("⚠️ PySpark not available, falling back to Pandas")

# Set up plotting style
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)

# Initialize Spark Session (if available)
if PYSPARK_AVAILABLE:
    spark = SparkSession.builder \
        .appName("DataQualityFramework") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .getOrCreate()
    
    print("✓ Spark Session created successfully")
    print(f"Spark Version: {spark.version}")
else:
    spark = None
    print("Using Pandas for data processing")

print("✅ All libraries imported successfully!")
print(f"📅 Analysis Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

In [None]:
# Load Configuration Parameters
config_base_path = "../config"

# Load data quality configuration
def load_config(config_path):
    """Load configuration from JSON file"""
    try:
        with open(config_path, 'r') as f:
            return json.load(f)
    except FileNotFoundError:
        print(f"⚠️ Config file not found: {config_path}")
        return {}

# Load main configuration
dq_config = load_config(os.path.join(config_base_path, "data_quality_config.json"))
print("✓ Data Quality Configuration loaded")

# Load approved currencies
currencies_config = load_config(os.path.join(config_base_path, "currencies.json"))
approved_currencies = currencies_config.get("approved_currencies", [])
print(f"✓ Approved currencies loaded: {len(approved_currencies)} currencies")

# Display configuration
print("\n📋 Configuration Summary:")
print(f"Mandatory Fields: {dq_config.get('validation_rules', {}).get('mandatory_fields', [])}")
print(f"Amount Range: {dq_config.get('validation_rules', {}).get('amount_validation', {})}")
print(f"Sample Approved Currencies: {approved_currencies[:10]}...")
print(f"Critical Pass Rate Threshold: {dq_config.get('data_quality_thresholds', {}).get('critical_pass_rate', 0.95)}")
print(f"Warning Pass Rate Threshold: {dq_config.get('data_quality_thresholds', {}).get('warning_pass_rate', 0.90)}")

In [None]:
# Load Financial Transactions Data
data_path = "../data/sample_transactions.csv"

# Load data using appropriate method
if PYSPARK_AVAILABLE and spark:
    # Load using PySpark
    print("📊 Loading data with PySpark...")
    df_spark = spark.read.option("header", "true").option("inferSchema", "true").csv(data_path)
    
    # Convert to Pandas for easier visualization and analysis
    df = df_spark.toPandas()
    
    print(f"✓ Data loaded successfully using PySpark")
    print(f"Dataset shape: {df_spark.count()} rows × {len(df_spark.columns)} columns")
else:
    # Load using Pandas
    print("📊 Loading data with Pandas...")
    df = pd.read_csv(data_path)
    df_spark = None
    
    print(f"✓ Data loaded successfully using Pandas")
    print(f"Dataset shape: {df.shape}")

# Display basic information about the dataset
print("\n📈 Dataset Overview:")
print(f"Total Records: {len(df):,}")
print(f"Columns: {list(df.columns)}")
print(f"Memory Usage: {df.memory_usage(deep=True).sum() / 1024 / 1024:.2f} MB")

# Display first few rows
print("\n📋 Sample Data:")
display(df.head())

# Display data types and missing values
print("\n🔍 Data Types and Missing Values:")
info_df = pd.DataFrame({
    'Column': df.columns,
    'Data Type': df.dtypes,
    'Non-Null Count': df.count(),
    'Null Count': df.isnull().sum(),
    'Null Percentage': (df.isnull().sum() / len(df) * 100).round(2)
})
display(info_df)

In [None]:
# Define Data Quality Check Functions

def check_mandatory_fields(df, mandatory_fields):
    """
    Check for non-null values in mandatory fields
    
    Args:
        df: Input DataFrame (Pandas or PySpark)
        mandatory_fields: List of mandatory field names
    
    Returns:
        Dictionary with check results
    """
    if PYSPARK_AVAILABLE and hasattr(df, 'sql_ctx'):
        # PySpark implementation
        total_records = df.count()
        failed_records = df.filter(
            reduce(lambda x, y: x | y, [col(field).isNull() for field in mandatory_fields])
        )
        failed_count = failed_records.count()
    else:
        # Pandas implementation
        total_records = len(df)
        mask = df[mandatory_fields].isnull().any(axis=1)
        failed_count = mask.sum()
        failed_records = df[mask]
    
    passed_count = total_records - failed_count
    pass_rate = passed_count / total_records if total_records > 0 else 0
    
    return {
        'check_name': 'mandatory_fields',
        'total_records': total_records,
        'passed_count': passed_count,
        'failed_count': failed_count,
        'pass_rate': pass_rate,
        'failed_records': failed_records
    }

def check_amount_range(df, min_amount=0.01, max_amount=1000000.00):
    """
    Check if amounts are within valid range
    
    Args:
        df: Input DataFrame
        min_amount: Minimum valid amount
        max_amount: Maximum valid amount
    
    Returns:
        Dictionary with check results
    """
    if PYSPARK_AVAILABLE and hasattr(df, 'sql_ctx'):
        # PySpark implementation
        total_records = df.count()
        failed_records = df.filter(
            (col("amount") <= 0) | (col("amount") > max_amount) | col("amount").isNull()
        )
        failed_count = failed_records.count()
    else:
        # Pandas implementation
        total_records = len(df)
        mask = (df['amount'] <= 0) | (df['amount'] > max_amount) | df['amount'].isnull()
        failed_count = mask.sum()
        failed_records = df[mask]
    
    passed_count = total_records - failed_count
    pass_rate = passed_count / total_records if total_records > 0 else 0
    
    return {
        'check_name': 'amount_range',
        'total_records': total_records,
        'passed_count': passed_count,
        'failed_count': failed_count,
        'pass_rate': pass_rate,
        'failed_records': failed_records,
        'min_amount': min_amount,
        'max_amount': max_amount
    }

def check_currency_codes(df, approved_currencies):
    """
    Check if currency codes are in approved list
    
    Args:
        df: Input DataFrame
        approved_currencies: List of approved currency codes
    
    Returns:
        Dictionary with check results
    """
    if PYSPARK_AVAILABLE and hasattr(df, 'sql_ctx'):
        # PySpark implementation
        total_records = df.count()
        failed_records = df.filter(
            (~col("currency").isin(approved_currencies)) | col("currency").isNull()
        )
        failed_count = failed_records.count()
    else:
        # Pandas implementation
        total_records = len(df)
        mask = (~df['currency'].isin(approved_currencies)) | df['currency'].isnull()
        failed_count = mask.sum()
        failed_records = df[mask]
    
    passed_count = total_records - failed_count
    pass_rate = passed_count / total_records if total_records > 0 else 0
    
    return {
        'check_name': 'currency_codes',
        'total_records': total_records,
        'passed_count': passed_count,
        'failed_count': failed_count,
        'pass_rate': pass_rate,
        'failed_records': failed_records,
        'approved_currencies_count': len(approved_currencies)
    }

def check_duplicate_transactions(df, id_column='transaction_id'):
    """
    Check for duplicate transaction IDs
    
    Args:
        df: Input DataFrame
        id_column: Column name for transaction ID
    
    Returns:
        Dictionary with check results
    """
    if PYSPARK_AVAILABLE and hasattr(df, 'sql_ctx'):
        # PySpark implementation
        total_records = df.count()
        # Find duplicates by counting occurrences
        duplicate_ids = df.groupBy(id_column).count().filter(col("count") > 1)
        duplicate_transactions = df.join(duplicate_ids, id_column)
        failed_count = duplicate_transactions.count()
    else:
        # Pandas implementation
        total_records = len(df)
        duplicates_mask = df.duplicated(subset=[id_column], keep=False)
        failed_count = duplicates_mask.sum()
        failed_records = df[duplicates_mask]
    
    passed_count = total_records - failed_count
    pass_rate = passed_count / total_records if total_records > 0 else 0
    
    return {
        'check_name': 'duplicate_transactions',
        'total_records': total_records,
        'passed_count': passed_count,
        'failed_count': failed_count,
        'pass_rate': pass_rate,
        'failed_records': failed_records if not PYSPARK_AVAILABLE else duplicate_transactions
    }

def check_timestamp_format(df, timestamp_column='timestamp', date_format='yyyy-MM-dd HH:mm:ss'):
    """
    Check timestamp format and validity
    
    Args:
        df: Input DataFrame
        timestamp_column: Column name for timestamp
        date_format: Expected date format
    
    Returns:
        Dictionary with check results
    """
    if PYSPARK_AVAILABLE and hasattr(df, 'sql_ctx'):
        # PySpark implementation
        total_records = df.count()
        # Try to parse timestamp and check for nulls after parsing
        df_with_parsed = df.withColumn(
            "parsed_timestamp", 
            to_timestamp(col(timestamp_column), date_format)
        )
        failed_records = df_with_parsed.filter(
            col("parsed_timestamp").isNull() | col(timestamp_column).isNull()
        )
        failed_count = failed_records.count()
    else:
        # Pandas implementation
        total_records = len(df)
        try:
            parsed_timestamps = pd.to_datetime(df[timestamp_column], format='%Y-%m-%d %H:%M:%S', errors='coerce')
            mask = parsed_timestamps.isnull() | df[timestamp_column].isnull()
            failed_count = mask.sum()
            failed_records = df[mask]
        except:
            failed_count = total_records
            failed_records = df.copy()
    
    passed_count = total_records - failed_count
    pass_rate = passed_count / total_records if total_records > 0 else 0
    
    return {
        'check_name': 'timestamp_format',
        'total_records': total_records,
        'passed_count': passed_count,
        'failed_count': failed_count,
        'pass_rate': pass_rate,
        'failed_records': failed_records,
        'expected_format': date_format
    }

print("✓ Data quality check functions defined successfully")

In [None]:
# Apply Data Quality Checks

print("🔍 Running Data Quality Checks...")
print("=" * 50)

# Determine which DataFrame to use for checks
working_df = df_spark if PYSPARK_AVAILABLE and df_spark else df

# Initialize results storage
validation_results = {}
failed_records_collection = {}

# 1. Check Mandatory Fields
print("1️⃣ Checking mandatory fields...")
mandatory_fields = dq_config.get('validation_rules', {}).get('mandatory_fields', [])
result_mandatory = check_mandatory_fields(working_df, mandatory_fields)
validation_results['mandatory_fields'] = result_mandatory

if result_mandatory['failed_count'] > 0:
    failed_records_collection['mandatory_fields'] = result_mandatory['failed_records']

print(f"   ✓ Pass Rate: {result_mandatory['pass_rate']:.2%}")
print(f"   ✓ Passed: {result_mandatory['passed_count']:,}")
print(f"   ✗ Failed: {result_mandatory['failed_count']:,}")

# 2. Check Amount Range
print("\n2️⃣ Checking amount range...")
amount_config = dq_config.get('validation_rules', {}).get('amount_validation', {})
min_amount = amount_config.get('min_value', 0.01)
max_amount = amount_config.get('max_value', 1000000.00)

result_amount = check_amount_range(working_df, min_amount, max_amount)
validation_results['amount_range'] = result_amount

if result_amount['failed_count'] > 0:
    failed_records_collection['amount_range'] = result_amount['failed_records']

print(f"   ✓ Valid Range: ${min_amount:,.2f} - ${max_amount:,.2f}")
print(f"   ✓ Pass Rate: {result_amount['pass_rate']:.2%}")
print(f"   ✓ Passed: {result_amount['passed_count']:,}")
print(f"   ✗ Failed: {result_amount['failed_count']:,}")

# 3. Check Currency Codes
print("\n3️⃣ Checking currency codes...")
result_currency = check_currency_codes(working_df, approved_currencies)
validation_results['currency_codes'] = result_currency

if result_currency['failed_count'] > 0:
    failed_records_collection['currency_codes'] = result_currency['failed_records']

print(f"   ✓ Approved Currencies: {len(approved_currencies)}")
print(f"   ✓ Pass Rate: {result_currency['pass_rate']:.2%}")
print(f"   ✓ Passed: {result_currency['passed_count']:,}")
print(f"   ✗ Failed: {result_currency['failed_count']:,}")

# 4. Check Duplicate Transactions
print("\n4️⃣ Checking for duplicate transactions...")
result_duplicates = check_duplicate_transactions(working_df)
validation_results['duplicate_transactions'] = result_duplicates

if result_duplicates['failed_count'] > 0:
    failed_records_collection['duplicate_transactions'] = result_duplicates['failed_records']

print(f"   ✓ Pass Rate: {result_duplicates['pass_rate']:.2%}")
print(f"   ✓ Passed: {result_duplicates['passed_count']:,}")
print(f"   ✗ Failed: {result_duplicates['failed_count']:,}")

# 5. Check Timestamp Format
print("\n5️⃣ Checking timestamp format...")
result_timestamp = check_timestamp_format(working_df)
validation_results['timestamp_format'] = result_timestamp

if result_timestamp['failed_count'] > 0:
    failed_records_collection['timestamp_format'] = result_timestamp['failed_records']

print(f"   ✓ Expected Format: YYYY-MM-DD HH:MM:SS")
print(f"   ✓ Pass Rate: {result_timestamp['pass_rate']:.2%}")
print(f"   ✓ Passed: {result_timestamp['passed_count']:,}")
print(f"   ✗ Failed: {result_timestamp['failed_count']:,}")

print("\n" + "=" * 50)
print("✅ Data Quality Checks Completed!")

# Calculate overall statistics
total_records = len(df)
total_failed = sum([result['failed_count'] for result in validation_results.values()])
overall_pass_rate = (total_records - total_failed) / total_records if total_records > 0 else 0

print(f"\n📊 Overall Summary:")
print(f"   📈 Total Records: {total_records:,}")
print(f"   ✅ Total Passed: {total_records - total_failed:,}")
print(f"   ❌ Total Failed: {total_failed:,}")
print(f"   📈 Overall Pass Rate: {overall_pass_rate:.2%}")

# Determine quality status
critical_threshold = dq_config.get('data_quality_thresholds', {}).get('critical_pass_rate', 0.95)
warning_threshold = dq_config.get('data_quality_thresholds', {}).get('warning_pass_rate', 0.90)

if overall_pass_rate >= critical_threshold:
    status = "🟢 EXCELLENT"
elif overall_pass_rate >= warning_threshold:
    status = "🟡 WARNING"
else:
    status = "🔴 CRITICAL"

print(f"   🏆 Quality Status: {status}")

In [None]:
# Store Failed Records for Inspection

print("💾 Storing Failed Records for Inspection...")
print("=" * 45)

# Create directory for failed records if it doesn't exist
failed_records_dir = "../data/failed_records"
os.makedirs(failed_records_dir, exist_ok=True)

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
saved_files = []

# Store each type of failed records
for check_name, failed_df in failed_records_collection.items():
    if PYSPARK_AVAILABLE and hasattr(failed_df, 'sql_ctx'):
        # Convert PySpark DataFrame to Pandas for saving
        failed_pandas = failed_df.toPandas()
    else:
        failed_pandas = failed_df.copy()
    
    if len(failed_pandas) > 0:
        # Add metadata columns
        failed_pandas['validation_check'] = check_name
        failed_pandas['validation_timestamp'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        failed_pandas['failure_reason'] = f"Failed {check_name.replace('_', ' ')} validation"
        
        # Save to CSV
        filename = f"failed_{check_name}_{timestamp}.csv"
        filepath = os.path.join(failed_records_dir, filename)
        failed_pandas.to_csv(filepath, index=False)
        
        saved_files.append(filepath)
        print(f"✓ Saved {len(failed_pandas)} failed records: {filename}")

# Create a comprehensive failed records summary
if saved_files:
    print(f"\n📁 Failed records saved to: {failed_records_dir}")
    print(f"📄 Files created: {len(saved_files)}")
    
    # Create a summary of all failed records
    all_failed_records = []
    for check_name, failed_df in failed_records_collection.items():
        if PYSPARK_AVAILABLE and hasattr(failed_df, 'sql_ctx'):
            failed_pandas = failed_df.toPandas()
        else:
            failed_pandas = failed_df.copy()
        
        if len(failed_pandas) > 0:
            failed_pandas['validation_check'] = check_name
            all_failed_records.append(failed_pandas)
    
    if all_failed_records:
        combined_failed = pd.concat(all_failed_records, ignore_index=True)
        combined_filename = f"all_failed_records_{timestamp}.csv"
        combined_filepath = os.path.join(failed_records_dir, combined_filename)
        combined_failed.to_csv(combined_filepath, index=False)
        
        print(f"✓ Combined failed records saved: {combined_filename}")
        print(f"📊 Total failed records across all checks: {len(combined_failed)}")
        
        # Display sample of failed records
        print(f"\n🔍 Sample Failed Records:")
        display(combined_failed.head(10))
        
        # Failed records by validation check
        failed_by_check = combined_failed['validation_check'].value_counts()
        print(f"\n📊 Failed Records by Validation Check:")
        for check, count in failed_by_check.items():
            print(f"   {check.replace('_', ' ').title()}: {count:,}")
            
else:
    print("🎉 No failed records found! All data passed validation checks.")

print("\n" + "=" * 45)

In [None]:
# Aggregate Data Quality Results and Visualizations

print("📊 Creating Data Quality Visualizations...")
print("=" * 45)

# Create summary DataFrame for easier analysis
summary_data = []
for check_name, result in validation_results.items():
    summary_data.append({
        'Validation_Check': check_name.replace('_', ' ').title(),
        'Total_Records': result['total_records'],
        'Passed_Count': result['passed_count'],
        'Failed_Count': result['failed_count'],
        'Pass_Rate': result['pass_rate'],
        'Pass_Rate_Percent': f"{result['pass_rate']:.2%}",
        'Status': '✅ PASS' if result['pass_rate'] >= 0.95 else '⚠️ WARNING' if result['pass_rate'] >= 0.90 else '❌ FAIL'
    })

summary_df = pd.DataFrame(summary_data)
print("📋 Data Quality Summary Table:")
display(summary_df)

# Create visualizations
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 12))

# 1. Pass Rates by Validation Check
checks = summary_df['Validation_Check']
pass_rates = summary_df['Pass_Rate'] * 100

colors = ['green' if rate >= 95 else 'orange' if rate >= 90 else 'red' for rate in pass_rates]
bars1 = ax1.bar(checks, pass_rates, color=colors, alpha=0.7)
ax1.set_title('Pass Rates by Validation Check', fontsize=14, fontweight='bold')
ax1.set_ylabel('Pass Rate (%)')
ax1.set_ylim(0, 100)
ax1.tick_params(axis='x', rotation=45)

# Add value labels on bars
for bar, rate in zip(bars1, pass_rates):
    ax1.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 1, 
            f'{rate:.1f}%', ha='center', va='bottom', fontweight='bold')

# 2. Failed Records Count
failed_counts = summary_df['Failed_Count']
bars2 = ax2.bar(checks, failed_counts, color='red', alpha=0.7)
ax2.set_title('Failed Records by Validation Check', fontsize=14, fontweight='bold')
ax2.set_ylabel('Number of Failed Records')
ax2.tick_params(axis='x', rotation=45)

# Add value labels
for bar, count in zip(bars2, failed_counts):
    if count > 0:
        ax2.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.5, 
                f'{count}', ha='center', va='bottom', fontweight='bold')

# 3. Overall Quality Distribution Pie Chart
total_passed = total_records - total_failed
sizes = [total_passed, total_failed]
labels = ['Passed Records', 'Failed Records']
colors_pie = ['green', 'red']

if total_failed > 0:
    wedges, texts, autotexts = ax3.pie(sizes, labels=labels, autopct='%1.1f%%', 
                                      colors=colors_pie, startangle=90)
    for autotext in autotexts:
        autotext.set_color('white')
        autotext.set_fontweight('bold')
else:
    ax3.pie([100], labels=['All Records Passed'], colors=['green'], autopct='%1.1f%%')

ax3.set_title('Overall Data Quality Distribution', fontsize=14, fontweight='bold')

# 4. Quality Score Gauge
overall_score = overall_pass_rate * 100
ax4.barh(['Overall Quality Score'], [overall_score], 
         color='green' if overall_score >= 95 else 'orange' if overall_score >= 90 else 'red',
         alpha=0.7)
ax4.set_xlim(0, 100)
ax4.set_xlabel('Quality Score (%)')
ax4.set_title(f'Overall Quality Score: {overall_score:.1f}%', fontsize=14, fontweight='bold')

# Add score text
ax4.text(overall_score/2, 0, f'{overall_score:.1f}%', 
         ha='center', va='center', fontweight='bold', fontsize=12, color='white')

plt.tight_layout()
plt.show()

# Additional analysis: Currency distribution
print("\n💱 Currency Distribution Analysis:")
if 'currency' in df.columns:
    currency_counts = df['currency'].value_counts()
    print("Top 10 currencies by transaction count:")
    display(currency_counts.head(10))
    
    # Check which currencies are not in approved list
    invalid_currencies = df[~df['currency'].isin(approved_currencies)]['currency'].value_counts()
    if len(invalid_currencies) > 0:
        print(f"\n⚠️ Invalid currencies found ({len(invalid_currencies)} unique):")
        display(invalid_currencies.head(10))

# Amount distribution analysis
print("\n💰 Amount Distribution Analysis:")
if 'amount' in df.columns:
    amount_stats = df['amount'].describe()
    print("Amount statistics:")
    display(amount_stats)
    
    # Plot amount distribution
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
    
    # Histogram
    df['amount'].hist(bins=50, ax=ax1, alpha=0.7, color='blue')
    ax1.set_title('Amount Distribution')
    ax1.set_xlabel('Amount')
    ax1.set_ylabel('Frequency')
    ax1.axvline(df['amount'].mean(), color='red', linestyle='--', label=f'Mean: ${df["amount"].mean():.2f}')
    ax1.legend()
    
    # Box plot
    df.boxplot(column='amount', ax=ax2)
    ax2.set_title('Amount Box Plot')
    ax2.set_ylabel('Amount')
    
    plt.tight_layout()
    plt.show()

In [None]:
# Generate Weekly Data Quality Report

print("📋 Generating Weekly Data Quality Report...")
print("=" * 50)

# Create report data structure
week_start = datetime.now() - timedelta(days=datetime.now().weekday())
week_end = week_start + timedelta(days=6)
report_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

# Executive Summary
executive_summary = {
    'Report_Period': f"{week_start.strftime('%Y-%m-%d')} to {week_end.strftime('%Y-%m-%d')}",
    'Report_Generated': report_timestamp,
    'Total_Records_Processed': total_records,
    'Total_Passed_Records': total_records - total_failed,
    'Total_Failed_Records': total_failed,
    'Overall_Pass_Rate': f"{overall_pass_rate:.2%}",
    'Quality_Status': status.split()[-1],  # Extract just the status word
    'Data_Quality_Score': f"{overall_pass_rate * 100:.1f}/100"
}

print("📊 Executive Summary:")
for key, value in executive_summary.items():
    print(f"   {key.replace('_', ' ')}: {value}")

# Detailed Results Table
print(f"\n📋 Detailed Validation Results:")
detailed_results = summary_df[['Validation_Check', 'Total_Records', 'Passed_Count', 
                              'Failed_Count', 'Pass_Rate_Percent', 'Status']].copy()
display(detailed_results)

# Failed Records Summary
if failed_records_collection:
    print(f"\n❌ Failed Records Summary:")
    failed_summary = []
    for check_name, failed_df in failed_records_collection.items():
        if PYSPARK_AVAILABLE and hasattr(failed_df, 'sql_ctx'):
            count = failed_df.count()
        else:
            count = len(failed_df)
        
        if count > 0:
            failed_summary.append({
                'Validation_Check': check_name.replace('_', ' ').title(),
                'Failed_Count': count,
                'Percentage_of_Total': f"{(count / total_records * 100):.2f}%"
            })
    
    if failed_summary:
        failed_summary_df = pd.DataFrame(failed_summary)
        display(failed_summary_df)

# Recommendations based on results
print(f"\n💡 Recommendations:")
recommendations = []

for check_name, result in validation_results.items():
    pass_rate = result['pass_rate']
    check_display_name = check_name.replace('_', ' ').title()
    
    if pass_rate < 0.90:
        if check_name == 'mandatory_fields':
            recommendations.append(f"🔴 {check_display_name}: Immediate action required. Review data ingestion process for mandatory field validation.")
        elif check_name == 'amount_range':
            recommendations.append(f"🔴 {check_display_name}: Review transaction amount validation rules and data sources.")
        elif check_name == 'currency_codes':
            recommendations.append(f"🔴 {check_display_name}: Update currency validation list or review data sources for invalid currencies.")
        elif check_name == 'duplicate_transactions':
            recommendations.append(f"🔴 {check_display_name}: Investigate transaction ID generation process to prevent duplicates.")
        elif check_name == 'timestamp_format':
            recommendations.append(f"🔴 {check_display_name}: Standardize timestamp format across all data sources.")
    elif pass_rate < 0.95:
        recommendations.append(f"🟡 {check_display_name}: Monitor closely. Pass rate below excellent threshold.")
    else:
        recommendations.append(f"🟢 {check_display_name}: Excellent quality. Maintain current standards.")

for rec in recommendations:
    print(f"   {rec}")

# Export report to files
reports_dir = "../data/reports"
os.makedirs(reports_dir, exist_ok=True)

# Excel Report
report_timestamp_file = datetime.now().strftime("%Y%m%d_%H%M%S")
excel_filename = f"Weekly_DQ_Report_{week_start.strftime('%Y%m%d')}_{report_timestamp_file}.xlsx"
excel_filepath = os.path.join(reports_dir, excel_filename)

try:
    with pd.ExcelWriter(excel_filepath, engine='openpyxl') as writer:
        # Executive Summary Sheet
        summary_sheet_data = pd.DataFrame(list(executive_summary.items()), 
                                        columns=['Metric', 'Value'])
        summary_sheet_data.to_excel(writer, sheet_name='Executive_Summary', index=False)
        
        # Detailed Results Sheet
        detailed_results.to_excel(writer, sheet_name='Validation_Details', index=False)
        
        # Failed Records Summary
        if failed_summary:
            failed_summary_df.to_excel(writer, sheet_name='Failed_Records_Summary', index=False)
        
        # Raw Data Sample
        df.head(100).to_excel(writer, sheet_name='Data_Sample', index=False)
    
    print(f"\n✅ Excel report saved: {excel_filename}")
    
except Exception as e:
    print(f"⚠️ Warning: Could not create Excel report: {e}")

# HTML Report
html_filename = f"Weekly_DQ_Report_{week_start.strftime('%Y%m%d')}.html"
html_filepath = os.path.join(reports_dir, html_filename)

html_content = f"""
<!DOCTYPE html>
<html>
<head>
    <title>Weekly Data Quality Report</title>
    <style>
        body {{ font-family: Arial, sans-serif; margin: 20px; line-height: 1.6; }}
        .header {{ background-color: #f0f8ff; padding: 20px; border-radius: 10px; margin-bottom: 20px; }}
        .summary {{ background-color: #f9f9f9; padding: 15px; border-radius: 5px; margin: 20px 0; }}
        .excellent {{ color: #28a745; }}
        .warning {{ color: #ffc107; }}
        .critical {{ color: #dc3545; }}
        table {{ border-collapse: collapse; width: 100%; margin: 20px 0; }}
        th, td {{ border: 1px solid #ddd; padding: 12px; text-align: left; }}
        th {{ background-color: #f2f2f2; font-weight: bold; }}
        .metric {{ font-size: 1.2em; margin: 10px 0; }}
        .status-excellent {{ background-color: #d4edda; color: #155724; }}
        .status-warning {{ background-color: #fff3cd; color: #856404; }}
        .status-critical {{ background-color: #f8d7da; color: #721c24; }}
    </style>
</head>
<body>
    <div class="header">
        <h1>📊 Weekly Data Quality Report</h1>
        <p><strong>Report Period:</strong> {executive_summary['Report_Period']}</p>
        <p><strong>Generated:</strong> {executive_summary['Report_Generated']}</p>
    </div>
    
    <div class="summary">
        <h2>🎯 Executive Summary</h2>
        <div class="metric">📈 Total Records: <strong>{executive_summary['Total_Records_Processed']:,}</strong></div>
        <div class="metric">✅ Passed Records: <strong>{executive_summary['Total_Passed_Records']:,}</strong></div>
        <div class="metric">❌ Failed Records: <strong>{executive_summary['Total_Failed_Records']:,}</strong></div>
        <div class="metric">📊 Pass Rate: <strong>{executive_summary['Overall_Pass_Rate']}</strong></div>
        <div class="metric">🏆 Quality Status: <strong class="{executive_summary['Quality_Status'].lower()}">{executive_summary['Quality_Status']}</strong></div>
        <div class="metric">📈 Quality Score: <strong>{executive_summary['Data_Quality_Score']}</strong></div>
    </div>
    
    <h2>📋 Detailed Validation Results</h2>
    <table>
        <tr>
            <th>Validation Check</th>
            <th>Total Records</th>
            <th>Passed</th>
            <th>Failed</th>
            <th>Pass Rate</th>
            <th>Status</th>
        </tr>
"""

for _, row in detailed_results.iterrows():
    status_class = 'excellent' if '✅' in row['Status'] else 'warning' if '⚠️' in row['Status'] else 'critical'
    html_content += f"""
        <tr>
            <td>{row['Validation_Check']}</td>
            <td>{row['Total_Records']:,}</td>
            <td>{row['Passed_Count']:,}</td>
            <td>{row['Failed_Count']:,}</td>
            <td>{row['Pass_Rate_Percent']}</td>
            <td class="status-{status_class}">{row['Status']}</td>
        </tr>
    """

html_content += f"""
    </table>
    
    <h2>💡 Key Recommendations</h2>
    <ul>
"""

for rec in recommendations:
    html_content += f"<li>{rec}</li>"

html_content += """
    </ul>
    
    <div class="summary" style="margin-top: 40px;">
        <p><strong>Note:</strong> This report was automatically generated by the Data Quality Framework.</p>
        <p>For detailed failed records analysis, please refer to the failed records CSV files in the data/failed_records directory.</p>
    </div>
</body>
</html>
"""

try:
    with open(html_filepath, 'w', encoding='utf-8') as f:
        f.write(html_content)
    
    print(f"✅ HTML report saved: {html_filename}")
    
except Exception as e:
    print(f"⚠️ Warning: Could not create HTML report: {e}")

print(f"\n📁 Reports saved to: {reports_dir}")
print(f"📄 Available formats: Excel, HTML")
print(f"\n🎯 Next Steps:")
print(f"   1. Review failed records in: ../data/failed_records/")
print(f"   2. Open reports in: ../data/reports/")
print(f"   3. Implement recommended actions")
print(f"   4. Schedule weekly report generation")

print("\n✅ Weekly Data Quality Report Generation Completed!")