# Complete Data Quality Workflow

This notebook demonstrates a comprehensive data quality workflow using:
- Table profiling
- NULL value analysis
- Duplicate detection
- Data quality checks
- Quality scorecard generation

## Setup

In [None]:
import subprocess
import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from google.cloud import bigquery
from pathlib import Path
from datetime import datetime
import sys

# Set plot style
sns.set_style('whitegrid')
plt.rcParams['figure.figsize'] = (12, 6)

# Initialize BigQuery client
client = bigquery.Client()

# Add scripts to path
sys.path.insert(0, str(Path('../scripts').resolve()))

# Path to utilities
UTILS_DIR = Path('../bin/data-utils')

print('Setup complete!')

## Configuration

Define the table to analyze and quality thresholds.

In [None]:
# Table to analyze
TABLE_ID = 'bigquery-public-data.usa_names.usa_1910_current'

# Quality thresholds
THRESHOLDS = {
    'max_null_percentage': 10.0,      # Maximum acceptable NULL percentage
    'min_uniqueness': 0.01,            # Minimum uniqueness ratio
    'max_duplicate_percentage': 5.0,   # Maximum acceptable duplicate percentage
    'min_completeness': 90.0,          # Minimum data completeness percentage
}

print(f'Analyzing table: {TABLE_ID}')
print(f'Quality thresholds: {json.dumps(THRESHOLDS, indent=2)}')

## 1. Profile the Table

Generate a comprehensive profile of the table.

In [None]:
def run_util(util_name, args, parse_json=True):
    """Run a bq-* utility and return output."""
    util_path = UTILS_DIR / util_name
    cmd = [str(util_path)] + args
    
    result = subprocess.run(cmd, capture_output=True, text=True)
    
    if result.returncode != 0:
        print(f'Error: {result.stderr}')
        return None
    
    if parse_json:
        try:
            return json.loads(result.stdout)
        except json.JSONDecodeError:
            print('Warning: Could not parse JSON output')
            return result.stdout
    return result.stdout

print('Profiling table...')
profile = run_util('bq-profile', [TABLE_ID, '--format=json'])

if profile:
    meta = profile['table_overview']
    print(f"\nTable Overview:")
    print(f"  Rows: {meta['num_rows']:,}")
    print(f"  Size: {meta['num_bytes']:,} bytes")
    print(f"  Columns: {meta['num_columns']}")
    print(f"  Created: {meta['created']}")
    print(f"  Modified: {meta['modified']}")

## 2. NULL Value Analysis

Analyze NULL values across all columns.

In [None]:
if profile:
    col_stats = profile['column_statistics']
    
    # Extract NULL percentages
    null_data = []
    for col_name, stats in col_stats.items():
        null_pct = stats.get('null_percentage', 0)
        null_data.append({
            'Column': col_name,
            'NULL %': null_pct,
            'NULL Count': stats.get('null_count', 0),
            'Total Count': stats.get('total_count', 0),
            'Pass': null_pct <= THRESHOLDS['max_null_percentage']
        })
    
    df_nulls = pd.DataFrame(null_data).sort_values('NULL %', ascending=False)
    
    # Visualize NULL percentages
    plt.figure(figsize=(12, 6))
    colors = ['red' if not pass_check else 'green' for pass_check in df_nulls['Pass']]
    plt.barh(df_nulls['Column'], df_nulls['NULL %'], color=colors, alpha=0.7)
    plt.axvline(x=THRESHOLDS['max_null_percentage'], color='red', linestyle='--', 
                label=f"Threshold: {THRESHOLDS['max_null_percentage']}%")
    plt.xlabel('NULL Percentage')
    plt.title('NULL Value Analysis by Column')
    plt.legend()
    plt.tight_layout()
    plt.show()
    
    print('\nNULL Value Summary:')
    display(df_nulls)
    
    # Check failures
    failures = df_nulls[~df_nulls['Pass']]
    if not failures.empty:
        print(f'\n⚠ {len(failures)} column(s) exceed NULL threshold:')
        display(failures[['Column', 'NULL %']])
    else:
        print('\n✓ All columns pass NULL value check')

## 3. Uniqueness and Duplicate Analysis

In [None]:
if profile:
    # Extract uniqueness ratios
    uniqueness_data = []
    for col_name, stats in col_stats.items():
        uniqueness = stats.get('uniqueness_ratio', 0)
        uniqueness_data.append({
            'Column': col_name,
            'Uniqueness': uniqueness,
            'Distinct Count': stats.get('distinct_count', 0),
            'Total Count': stats.get('total_count', 0),
            'Type': stats.get('data_type', 'UNKNOWN')
        })
    
    df_unique = pd.DataFrame(uniqueness_data).sort_values('Uniqueness', ascending=False)
    
    # Visualize uniqueness
    plt.figure(figsize=(12, 6))
    plt.barh(df_unique['Column'], df_unique['Uniqueness'], alpha=0.7, color='steelblue')
    plt.axvline(x=1.0, color='green', linestyle='--', label='Perfect Uniqueness')
    plt.axvline(x=THRESHOLDS['min_uniqueness'], color='orange', linestyle='--', 
                label=f"Min Threshold: {THRESHOLDS['min_uniqueness']}")
    plt.xlabel('Uniqueness Ratio')
    plt.title('Column Uniqueness Analysis')
    plt.legend()
    plt.tight_layout()
    plt.show()
    
    print('\nUniqueness Summary:')
    display(df_unique)
    
    # Identify potential primary keys
    pk_candidates = df_unique[df_unique['Uniqueness'] >= 0.99]
    if not pk_candidates.empty:
        print(f'\nPotential Primary Key Candidates (uniqueness >= 0.99):')
        display(pk_candidates[['Column', 'Uniqueness', 'Distinct Count']])
    else:
        print('\n⚠ No columns with high uniqueness found (may need composite key)')

## 4. Data Completeness Check

Calculate overall data completeness.

In [None]:
if profile:
    # Calculate completeness per column
    completeness_data = []
    for col_name, stats in col_stats.items():
        total = stats.get('total_count', 0)
        non_null = stats.get('non_null_count', 0)
        completeness = (non_null / total * 100) if total > 0 else 0
        
        completeness_data.append({
            'Column': col_name,
            'Completeness %': completeness,
            'Non-NULL Count': non_null,
            'Total Count': total,
            'Pass': completeness >= THRESHOLDS['min_completeness']
        })
    
    df_completeness = pd.DataFrame(completeness_data).sort_values('Completeness %')
    
    # Overall completeness
    overall_completeness = df_completeness['Completeness %'].mean()
    
    # Visualize completeness
    plt.figure(figsize=(12, 6))
    colors = ['red' if not pass_check else 'green' for pass_check in df_completeness['Pass']]
    plt.barh(df_completeness['Column'], df_completeness['Completeness %'], 
             color=colors, alpha=0.7)
    plt.axvline(x=THRESHOLDS['min_completeness'], color='red', linestyle='--', 
                label=f"Threshold: {THRESHOLDS['min_completeness']}%")
    plt.axvline(x=overall_completeness, color='blue', linestyle=':', 
                label=f"Average: {overall_completeness:.1f}%")
    plt.xlabel('Completeness %')
    plt.title('Data Completeness by Column')
    plt.legend()
    plt.tight_layout()
    plt.show()
    
    print(f'\nOverall Completeness: {overall_completeness:.2f}%')
    print('\nCompleteness Summary:')
    display(df_completeness)
    
    # Check failures
    failures = df_completeness[~df_completeness['Pass']]
    if not failures.empty:
        print(f'\n⚠ {len(failures)} column(s) below completeness threshold:')
        display(failures[['Column', 'Completeness %']])
    else:
        print('\n✓ All columns meet completeness threshold')

## 5. Data Type Distribution

In [None]:
if profile:
    type_dist = profile['data_type_distribution']
    
    # Create pie chart
    plt.figure(figsize=(10, 8))
    colors = sns.color_palette('Set3', len(type_dist))
    plt.pie(type_dist.values(), labels=type_dist.keys(), autopct='%1.1f%%', 
            colors=colors, startangle=90)
    plt.title('Data Type Distribution')
    plt.axis('equal')
    plt.show()
    
    print('\nData Type Summary:')
    df_types = pd.DataFrame([
        {'Type': k, 'Count': v, 'Percentage': f"{v/sum(type_dist.values())*100:.1f}%"}
        for k, v in sorted(type_dist.items(), key=lambda x: x[1], reverse=True)
    ])
    display(df_types)

## 6. Run Custom Data Quality Checks

In [None]:
# Import data quality checks
try:
    from data_quality import DataQualityCheck, run_all_checks
    
    print('Running custom data quality checks...')
    results, all_passed = run_all_checks()
    
    print(f'\nCustom Checks: {"All Passed" if all_passed else "Some Failed"}')
    for result in results:
        status = '✓' if result.passed else '✗'
        print(f'{status} {result.name}: {result.message}')
        
except ImportError:
    print('Data quality script not available. Skipping custom checks.')
except Exception as e:
    print(f'Error running custom checks: {e}')

## 7. Generate Quality Scorecard

Create a comprehensive quality scorecard.

In [None]:
if profile:
    # Calculate quality scores
    scores = {
        'NULL Values': {
            'score': len(df_nulls[df_nulls['Pass']]) / len(df_nulls) * 100,
            'passed': len(df_nulls[df_nulls['Pass']]),
            'total': len(df_nulls),
            'threshold': THRESHOLDS['max_null_percentage']
        },
        'Completeness': {
            'score': overall_completeness,
            'passed': len(df_completeness[df_completeness['Pass']]),
            'total': len(df_completeness),
            'threshold': THRESHOLDS['min_completeness']
        },
        'Uniqueness': {
            'score': len(df_unique[df_unique['Uniqueness'] >= THRESHOLDS['min_uniqueness']]) / len(df_unique) * 100,
            'passed': len(df_unique[df_unique['Uniqueness'] >= THRESHOLDS['min_uniqueness']]),
            'total': len(df_unique),
            'threshold': THRESHOLDS['min_uniqueness']
        }
    }
    
    # Create scorecard DataFrame
    scorecard_data = []
    for metric, data in scores.items():
        scorecard_data.append({
            'Quality Metric': metric,
            'Score': f"{data['score']:.1f}%",
            'Passed': data['passed'],
            'Total': data['total'],
            'Status': '✓ Pass' if data['score'] >= 80 else '⚠ Warning' if data['score'] >= 60 else '✗ Fail'
        })
    
    df_scorecard = pd.DataFrame(scorecard_data)
    
    # Overall quality score
    overall_score = sum(s['score'] for s in scores.values()) / len(scores)
    
    # Display scorecard
    print('\n' + '='*80)
    print(f'DATA QUALITY SCORECARD - {TABLE_ID}')
    print(f'Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}')
    print('='*80)
    print(f'\nOverall Quality Score: {overall_score:.1f}%')
    print()
    display(df_scorecard)
    
    # Visualize scorecard
    fig, axes = plt.subplots(1, 2, figsize=(15, 6))
    
    # Score breakdown
    metric_names = [s['Quality Metric'] for s in scorecard_data]
    metric_scores = [float(s['Score'].rstrip('%')) for s in scorecard_data]
    colors_bar = ['green' if s >= 80 else 'orange' if s >= 60 else 'red' for s in metric_scores]
    
    axes[0].barh(metric_names, metric_scores, color=colors_bar, alpha=0.7)
    axes[0].set_xlabel('Score %')
    axes[0].set_title('Quality Metrics Breakdown')
    axes[0].axvline(x=80, color='green', linestyle='--', alpha=0.5, label='Pass Threshold')
    axes[0].axvline(x=60, color='orange', linestyle='--', alpha=0.5, label='Warning Threshold')
    axes[0].legend()
    
    # Overall gauge
    theta = np.linspace(0, np.pi, 100)
    score_rad = (overall_score / 100) * np.pi
    
    axes[1].plot(np.cos(theta), np.sin(theta), 'k-', linewidth=2)
    axes[1].fill_between(
        np.cos(theta[:int(overall_score)]),
        np.sin(theta[:int(overall_score)]),
        0,
        color='green' if overall_score >= 80 else 'orange' if overall_score >= 60 else 'red',
        alpha=0.6
    )
    axes[1].plot([0, np.cos(score_rad)], [0, np.sin(score_rad)], 'r-', linewidth=3)
    axes[1].text(0, -0.3, f'{overall_score:.1f}%', ha='center', fontsize=24, weight='bold')
    axes[1].text(0, -0.5, 'Overall Quality', ha='center', fontsize=12)
    axes[1].set_xlim(-1.2, 1.2)
    axes[1].set_ylim(-0.7, 1.2)
    axes[1].axis('off')
    axes[1].set_title('Overall Quality Score')
    
    plt.tight_layout()
    plt.show()
    
    # Summary
    print('\n' + '='*80)
    if overall_score >= 80:
        print('✓ EXCELLENT: Data quality meets all standards')
    elif overall_score >= 60:
        print('⚠ WARNING: Data quality needs improvement')
    else:
        print('✗ CRITICAL: Data quality requires immediate attention')
    print('='*80)

## 8. Recommendations

Generate actionable recommendations based on quality analysis.

In [None]:
if profile:
    recommendations = []
    
    # NULL value recommendations
    high_null_cols = df_nulls[df_nulls['NULL %'] > THRESHOLDS['max_null_percentage']]
    if not high_null_cols.empty:
        recommendations.append({
            'Priority': 'HIGH',
            'Category': 'NULL Values',
            'Issue': f"{len(high_null_cols)} column(s) with high NULL percentages",
            'Action': f"Investigate columns: {', '.join(high_null_cols['Column'].tolist())}"
        })
    
    # Completeness recommendations
    low_completeness_cols = df_completeness[df_completeness['Completeness %'] < THRESHOLDS['min_completeness']]
    if not low_completeness_cols.empty:
        recommendations.append({
            'Priority': 'HIGH',
            'Category': 'Completeness',
            'Issue': f"{len(low_completeness_cols)} column(s) below completeness threshold",
            'Action': f"Improve data collection for: {', '.join(low_completeness_cols['Column'].tolist())}"
        })
    
    # Primary key recommendations
    if pk_candidates.empty:
        recommendations.append({
            'Priority': 'MEDIUM',
            'Category': 'Schema',
            'Issue': 'No unique identifier column found',
            'Action': 'Consider adding a primary key or using a composite key'
        })
    
    # Display recommendations
    print('\n' + '='*80)
    print('RECOMMENDATIONS')
    print('='*80)
    
    if recommendations:
        df_recommendations = pd.DataFrame(recommendations)
        display(df_recommendations)
    else:
        print('\n✓ No critical issues found. Data quality is good!')
    
    print('='*80)

## Summary

This notebook demonstrated a complete data quality workflow:

1. **Table Profiling** - Comprehensive analysis of table structure and statistics
2. **NULL Analysis** - Identification of columns with excessive NULL values
3. **Uniqueness Analysis** - Detection of potential primary keys and duplicates
4. **Completeness Check** - Assessment of data completeness across columns
5. **Data Type Distribution** - Overview of column types
6. **Custom Quality Checks** - Extensible framework for domain-specific checks
7. **Quality Scorecard** - Comprehensive quality scoring and visualization
8. **Recommendations** - Actionable insights for data quality improvement

### Next Steps

- Automate this workflow with scheduled notebooks or Airflow DAGs
- Set up alerts for quality score drops
- Integrate with data catalog systems
- Create quality dashboards for stakeholders
- Implement automated remediation for common issues