# Data Quality Analysis - Portal da Transparência

This notebook analyzes the quality of data from Portal da Transparência API.

## Objectives:
1. Assess data completeness
2. Identify missing values and patterns
3. Detect data anomalies
4. Evaluate data consistency
5. Generate data quality report

In [None]:
# Setup
import os
import sys
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from pathlib import Path
from datetime import datetime
import json
from typing import Dict, List, Any, Tuple

# Add project root to path
sys.path.insert(0, str(Path().absolute().parent.parent))

from src.api.client import TransparenciaAPIClient

# Initialize client
client = TransparenciaAPIClient()
print("API Client initialized successfully")

# Set display options
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)

## 1. Data Collection for Quality Analysis

In [None]:
# Collect sample data from multiple endpoints
data_samples = {}
sample_size = 100  # Number of records to analyze per endpoint

endpoints_to_analyze = [
    ("orgaos", lambda: client.get_orgaos(sistema="siafi", pagina=1, quantidade=sample_size)),
    ("contratos", lambda: client.get_contratos(pagina=1, quantidade=sample_size)),
    ("fornecedores", lambda: client.get_fornecedores(pagina=1, quantidade=sample_size)),
    ("licitacoes", lambda: client.get_licitacoes(pagina=1, quantidade=sample_size)),
    ("pagamentos", lambda: client.get_pagamentos(pagina=1, quantidade=sample_size))
]

print("Collecting data samples for quality analysis...\n")

for name, fetch_func in endpoints_to_analyze:
    try:
        print(f"Fetching {name}...", end=" ")
        data = fetch_func()
        if data:
            data_samples[name] = pd.DataFrame(data)
            print(f"✓ Success ({len(data)} records)")
        else:
            print(f"⚠ No data returned")
    except Exception as e:
        print(f"✗ Error: {str(e)[:50]}...")

print(f"\nCollected data from {len(data_samples)} endpoints")

## 2. Data Completeness Analysis

In [None]:
def analyze_completeness(df: pd.DataFrame, name: str) -> Dict[str, Any]:
    """Analyze data completeness for a DataFrame."""
    total_cells = df.size
    missing_cells = df.isna().sum().sum()
    
    # Calculate completeness metrics
    completeness = {
        'endpoint': name,
        'total_records': len(df),
        'total_fields': len(df.columns),
        'total_cells': total_cells,
        'missing_cells': missing_cells,
        'completeness_rate': (1 - missing_cells / total_cells) * 100 if total_cells > 0 else 0,
        'missing_by_column': df.isna().sum().to_dict(),
        'complete_records': len(df.dropna()),
        'partial_records': len(df) - len(df.dropna())
    }
    
    return completeness

# Analyze completeness for all samples
completeness_results = []

for name, df in data_samples.items():
    result = analyze_completeness(df, name)
    completeness_results.append(result)
    
    print(f"\n{name.upper()} Completeness:")
    print(f"  Total records: {result['total_records']}")
    print(f"  Total fields: {result['total_fields']}")
    print(f"  Completeness rate: {result['completeness_rate']:.2f}%")
    print(f"  Complete records: {result['complete_records']}")
    print(f"  Partial records: {result['partial_records']}")

In [None]:
# Visualize completeness rates
completeness_df = pd.DataFrame([
    {'Endpoint': r['endpoint'], 'Completeness Rate': r['completeness_rate']} 
    for r in completeness_results
])

fig = px.bar(
    completeness_df,
    x='Endpoint',
    y='Completeness Rate',
    title='Data Completeness by Endpoint',
    text='Completeness Rate',
    color='Completeness Rate',
    color_continuous_scale='RdYlGn'
)

fig.update_traces(texttemplate='%{text:.1f}%', textposition='outside')
fig.update_layout(yaxis_title='Completeness Rate (%)')
fig.show()

## 3. Missing Value Pattern Analysis

In [None]:
# Analyze missing value patterns for each endpoint
for name, df in data_samples.items():
    if len(df) == 0:
        continue
        
    print(f"\n{'='*60}")
    print(f"Missing Value Analysis: {name.upper()}")
    print('='*60)
    
    # Calculate missing percentages
    missing_pct = (df.isna().sum() / len(df) * 100).sort_values(ascending=False)
    missing_pct = missing_pct[missing_pct > 0]
    
    if len(missing_pct) > 0:
        # Create visualization
        fig = px.bar(
            x=missing_pct.values,
            y=missing_pct.index,
            orientation='h',
            title=f'Missing Values by Field - {name}',
            labels={'x': 'Missing %', 'y': 'Field'},
            text=missing_pct.values
        )
        
        fig.update_traces(texttemplate='%{text:.1f}%', textposition='outside')
        fig.update_layout(height=max(300, len(missing_pct) * 25))
        fig.show()
        
        # Print summary
        print(f"\nFields with missing values: {len(missing_pct)}")
        print("\nTop fields with missing data:")
        for field, pct in missing_pct.head(5).items():
            print(f"  - {field}: {pct:.1f}% missing")
    else:
        print("✓ No missing values found!")

## 4. Data Type Analysis

In [None]:
def analyze_data_types(df: pd.DataFrame, name: str) -> pd.DataFrame:
    """Analyze data types and potential issues."""
    type_analysis = []
    
    for col in df.columns:
        col_data = df[col].dropna()
        
        if len(col_data) == 0:
            continue
            
        analysis = {
            'column': col,
            'dtype': str(df[col].dtype),
            'unique_values': df[col].nunique(),
            'null_count': df[col].isna().sum(),
            'sample_values': list(col_data.head(3))
        }
        
        # Check for potential date fields
        if 'data' in col.lower() or 'date' in col.lower():
            analysis['potential_date'] = True
            
        # Check for potential numeric fields stored as strings
        if df[col].dtype == 'object':
            try:
                # Test if values can be converted to numeric
                pd.to_numeric(col_data.head(10), errors='coerce').notna().sum()
                if pd.to_numeric(col_data.head(10), errors='coerce').notna().sum() > 5:
                    analysis['potential_numeric'] = True
            except:
                pass
                
        type_analysis.append(analysis)
    
    return pd.DataFrame(type_analysis)

# Analyze data types for each endpoint
for name, df in data_samples.items():
    if len(df) == 0:
        continue
        
    print(f"\nData Type Analysis: {name.upper()}")
    print("-" * 60)
    
    type_df = analyze_data_types(df, name)
    
    # Show potential issues
    date_fields = type_df[type_df.get('potential_date', False) == True]['column'].tolist()
    if date_fields:
        print(f"Potential date fields: {', '.join(date_fields)}")
        
    numeric_fields = type_df[type_df.get('potential_numeric', False) == True]['column'].tolist()
    if numeric_fields:
        print(f"Potential numeric fields stored as text: {', '.join(numeric_fields)}")
    
    # Show data type distribution
    dtype_counts = type_df['dtype'].value_counts()
    print("\nData type distribution:")
    for dtype, count in dtype_counts.items():
        print(f"  {dtype}: {count} columns")

## 5. Value Distribution Analysis

In [None]:
# Analyze value distributions for numeric columns
for name, df in data_samples.items():
    if len(df) == 0:
        continue
        
    numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
    
    # Also try to identify numeric columns stored as strings
    for col in df.select_dtypes(include=['object']).columns:
        if 'valor' in col.lower() or 'value' in col.lower() or 'preco' in col.lower():
            try:
                df[f"{col}_numeric"] = pd.to_numeric(df[col], errors='coerce')
                if df[f"{col}_numeric"].notna().sum() > len(df) * 0.5:
                    numeric_cols.append(f"{col}_numeric")
            except:
                pass
    
    if numeric_cols:
        print(f"\nNumeric Value Analysis: {name.upper()}")
        print("-" * 60)
        
        for col in numeric_cols[:5]:  # Analyze up to 5 numeric columns
            col_data = df[col].dropna()
            if len(col_data) > 0:
                print(f"\n{col}:")
                print(f"  Count: {len(col_data)}")
                print(f"  Mean: {col_data.mean():.2f}")
                print(f"  Median: {col_data.median():.2f}")
                print(f"  Std Dev: {col_data.std():.2f}")
                print(f"  Min: {col_data.min():.2f}")
                print(f"  Max: {col_data.max():.2f}")
                
                # Check for outliers using IQR method
                Q1 = col_data.quantile(0.25)
                Q3 = col_data.quantile(0.75)
                IQR = Q3 - Q1
                outliers = ((col_data < (Q1 - 1.5 * IQR)) | (col_data > (Q3 + 1.5 * IQR))).sum()
                print(f"  Potential outliers: {outliers} ({outliers/len(col_data)*100:.1f}%)")

## 6. Duplicate Detection

In [None]:
# Check for duplicates in each dataset
duplicate_analysis = []

for name, df in data_samples.items():
    if len(df) == 0:
        continue
        
    # Check for complete duplicates
    complete_dups = df.duplicated().sum()
    
    # Try to identify ID columns
    id_columns = [col for col in df.columns if 
                  'id' in col.lower() or 
                  'codigo' in col.lower() or 
                  'numero' in col.lower()]
    
    subset_dups = 0
    if id_columns:
        # Check for duplicates on ID columns
        subset_dups = df.duplicated(subset=id_columns[:1]).sum()
    
    analysis = {
        'endpoint': name,
        'total_records': len(df),
        'complete_duplicates': complete_dups,
        'id_columns': ', '.join(id_columns[:3]) if id_columns else 'None found',
        'id_duplicates': subset_dups
    }
    
    duplicate_analysis.append(analysis)

# Display duplicate analysis
dup_df = pd.DataFrame(duplicate_analysis)
print("Duplicate Analysis Summary:")
print(dup_df.to_string(index=False))

# Visualize if duplicates found
if dup_df['complete_duplicates'].sum() > 0 or dup_df['id_duplicates'].sum() > 0:
    fig = go.Figure()
    
    fig.add_trace(go.Bar(
        name='Complete Duplicates',
        x=dup_df['endpoint'],
        y=dup_df['complete_duplicates']
    ))
    
    fig.add_trace(go.Bar(
        name='ID Duplicates',
        x=dup_df['endpoint'],
        y=dup_df['id_duplicates']
    ))
    
    fig.update_layout(
        title='Duplicate Records by Endpoint',
        xaxis_title='Endpoint',
        yaxis_title='Number of Duplicates',
        barmode='group'
    )
    
    fig.show()

## 7. Data Consistency Checks

In [None]:
# Perform consistency checks
consistency_issues = {}

for name, df in data_samples.items():
    if len(df) == 0:
        continue
        
    issues = []
    
    # Check for date consistency
    date_cols = [col for col in df.columns if 'data' in col.lower()]
    for col in date_cols:
        try:
            # Try to parse dates
            dates = pd.to_datetime(df[col], errors='coerce')
            invalid_dates = dates.isna().sum() - df[col].isna().sum()
            if invalid_dates > 0:
                issues.append(f"Invalid date format in '{col}': {invalid_dates} records")
                
            # Check for future dates
            future_dates = (dates > pd.Timestamp.now()).sum()
            if future_dates > 0:
                issues.append(f"Future dates found in '{col}': {future_dates} records")
        except:
            pass
    
    # Check for negative values in amount fields
    amount_cols = [col for col in df.columns if 
                   'valor' in col.lower() or 
                   'preco' in col.lower() or
                   'amount' in col.lower()]
    
    for col in amount_cols:
        try:
            values = pd.to_numeric(df[col], errors='coerce')
            negative_values = (values < 0).sum()
            if negative_values > 0:
                issues.append(f"Negative values in '{col}': {negative_values} records")
        except:
            pass
    
    # Check for unusually long strings
    string_cols = df.select_dtypes(include=['object']).columns
    for col in string_cols:
        max_length = df[col].astype(str).str.len().max()
        if max_length > 1000:
            issues.append(f"Very long strings in '{col}': max length {max_length}")
    
    if issues:
        consistency_issues[name] = issues

# Display consistency issues
if consistency_issues:
    print("Data Consistency Issues Found:")
    print("=" * 60)
    for endpoint, issues in consistency_issues.items():
        print(f"\n{endpoint.upper()}:")
        for issue in issues:
            print(f"  ⚠ {issue}")
else:
    print("✓ No major consistency issues found!")

## 8. Generate Data Quality Report

In [None]:
# Generate comprehensive data quality report
quality_report = {
    "report_metadata": {
        "generated_at": datetime.now().isoformat(),
        "total_endpoints_analyzed": len(data_samples),
        "sample_size_per_endpoint": sample_size
    },
    "summary": {
        "average_completeness": np.mean([r['completeness_rate'] for r in completeness_results]),
        "endpoints_with_duplicates": sum(1 for d in duplicate_analysis if d['complete_duplicates'] > 0),
        "endpoints_with_issues": len(consistency_issues)
    },
    "detailed_analysis": {}
}

# Add detailed analysis for each endpoint
for name in data_samples.keys():
    endpoint_report = {
        "completeness": next((r for r in completeness_results if r['endpoint'] == name), None),
        "duplicates": next((d for d in duplicate_analysis if d['endpoint'] == name), None),
        "consistency_issues": consistency_issues.get(name, [])
    }
    quality_report["detailed_analysis"][name] = endpoint_report

# Save report
reports_dir = Path().absolute().parent.parent / "reports"
reports_dir.mkdir(exist_ok=True)

report_filename = f"data_quality_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(reports_dir / report_filename, "w", encoding="utf-8") as f:
    json.dump(quality_report, f, ensure_ascii=False, indent=2)

print(f"\nData quality report saved to: reports/{report_filename}")

## 9. Data Quality Score Card

In [None]:
# Create a visual scorecard
scores = []

for name in data_samples.keys():
    # Calculate quality score (0-100)
    completeness = next((r['completeness_rate'] for r in completeness_results if r['endpoint'] == name), 0)
    has_duplicates = next((d['complete_duplicates'] > 0 for d in duplicate_analysis if d['endpoint'] == name), False)
    has_issues = name in consistency_issues
    
    # Simple scoring: completeness - penalties
    score = completeness
    if has_duplicates:
        score -= 10
    if has_issues:
        score -= 5 * len(consistency_issues.get(name, []))
    
    score = max(0, min(100, score))  # Ensure score is between 0-100
    
    scores.append({
        'Endpoint': name,
        'Quality Score': score,
        'Grade': 'A' if score >= 90 else 'B' if score >= 80 else 'C' if score >= 70 else 'D' if score >= 60 else 'F'
    })

scores_df = pd.DataFrame(scores)

# Create scorecard visualization
fig = go.Figure()

# Add bars
colors = ['green' if s >= 80 else 'yellow' if s >= 60 else 'red' for s in scores_df['Quality Score']]

fig.add_trace(go.Bar(
    x=scores_df['Endpoint'],
    y=scores_df['Quality Score'],
    text=[f"{s:.1f}<br>Grade: {g}" for s, g in zip(scores_df['Quality Score'], scores_df['Grade'])],
    textposition='outside',
    marker_color=colors
))

fig.update_layout(
    title='Data Quality Scorecard',
    xaxis_title='Endpoint',
    yaxis_title='Quality Score',
    yaxis_range=[0, 110],
    showlegend=False
)

# Add threshold lines
fig.add_hline(y=90, line_dash="dash", line_color="green", annotation_text="Excellent (A)")
fig.add_hline(y=80, line_dash="dash", line_color="blue", annotation_text="Good (B)")
fig.add_hline(y=70, line_dash="dash", line_color="orange", annotation_text="Fair (C)")
fig.add_hline(y=60, line_dash="dash", line_color="red", annotation_text="Poor (D)")

fig.show()

# Print summary
print("\nData Quality Summary:")
print(scores_df.to_string(index=False))
print(f"\nAverage Quality Score: {scores_df['Quality Score'].mean():.1f}")

## 10. Recommendations

In [None]:
# Generate recommendations based on analysis
print("DATA QUALITY RECOMMENDATIONS")
print("=" * 80)

print("\n1. IMMEDIATE ACTIONS:")

# Check completeness
low_completeness = [r['endpoint'] for r in completeness_results if r['completeness_rate'] < 80]
if low_completeness:
    print(f"   - Investigate low completeness in: {', '.join(low_completeness)}")
    print("     Consider if missing values are expected or indicate data issues")

# Check duplicates
has_duplicates = [d['endpoint'] for d in duplicate_analysis if d['complete_duplicates'] > 0]
if has_duplicates:
    print(f"   - Remove duplicates from: {', '.join(has_duplicates)}")
    print("     Implement deduplication logic in data pipeline")

# Check consistency
if consistency_issues:
    print(f"   - Fix consistency issues in: {', '.join(consistency_issues.keys())}")
    print("     Implement data validation rules")

print("\n2. DATA PIPELINE IMPROVEMENTS:")
print("   - Implement data type validation during ingestion")
print("   - Add automated quality checks before storing data")
print("   - Create data quality monitoring dashboard")
print("   - Set up alerts for quality degradation")

print("\n3. DATA GOVERNANCE:")
print("   - Document expected data formats and ranges")
print("   - Establish data quality SLAs with API provider")
print("   - Create data dictionary with business rules")
print("   - Implement regular quality audits")

print("\n4. SPECIFIC FIELD RECOMMENDATIONS:")
# Identify fields that need attention
for name, df in data_samples.items():
    date_fields = [col for col in df.columns if 'data' in col.lower()]
    if date_fields and df[date_fields].isna().sum().sum() > 0:
        print(f"   - {name}: Standardize date formats in {', '.join(date_fields)}")

print("\n5. NEXT STEPS:")
print("   - Create automated data quality checks")
print("   - Build data cleaning pipelines")
print("   - Implement incremental data updates")
print("   - Design quality metrics tracking")