# ETL Pipeline: SIAFEM and SIGEO Data Processing

This notebook demonstrates how to run the ETL pipeline in Google Colab.
It handles authentication, data extraction, transformation, and loading to BigQuery.

**Requirements:**
- Google Cloud Project with BigQuery enabled
- Service account credentials (JSON key)
- Proper IAM permissions for BigQuery

## 1. Setup and Installation

In [None]:
# Install required packages
!pip install pandas google-cloud-bigquery db-dtypes requests -q

In [None]:
import pandas as pd
import numpy as np
from datetime import datetime
import requests
import json
import os
from google.colab import auth, files
from google.cloud import bigquery

print('All packages imported successfully!')

## 2. GCP Authentication

In [None]:
# Authenticate with Google Cloud
auth.authenticate_user()
print('Authenticated with Google Cloud!')

## 3. Upload Service Account Credentials (if needed)

If you prefer to use a service account instead of user authentication:

In [None]:
# Optional: Upload service account JSON key
# print('Upload your service account JSON key:')
# uploaded = files.upload()
# credentials_file = list(uploaded.keys())[0]
# os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_file
# print(f'Credentials file set: {credentials_file}')

## 4. Configuration

In [None]:
# Set your GCP project configuration
PROJECT_ID = 'your-gcp-project-id'  # Change this
DATASET_ID = 'financial_metrics'
TABLE_ID = 'budget_execution'

# Optional API endpoints
SIAFEM_API_URL = None  # Set if you have SIAFEM API access
SIGEO_API_URL = None   # Set if you have SIGEO API access

# BigQuery configuration
client = bigquery.Client(project=PROJECT_ID)

print(f'Configured for project: {PROJECT_ID}')

## 5. Sample Data Creation

Create sample SIAFEM/SIGEO-like data for demonstration:

In [None]:
# Create sample financial data (SIAFEM/SIGEO format)
sample_data = pd.DataFrame({
    'municipio_codigo': ['3106200', '3106200', '3504008', '3504008', '2611606'],
    'municipio_nome': ['sao paulo', 'sao paulo', 'rio de janeiro', 'rio de janeiro', 'recife'],
    'fonte_dados': ['SIAFEM', 'SIAFEM', 'SIGEO', 'SIGEO', 'SIAFEM'],
    'orcamento_previsto': [1000000.0, 1500000.0, 2000000.0, 2500000.0, 1200000.0],
    'orcamento_executado': [850000.0, 1200000.0, 1800000.0, 2300000.0, 950000.0],
    'valor_despesa': [500000.0, 600000.0, 750000.0, 800000.0, 450000.0],
    'programa_codigo': ['001', '002', '001', '003', '002'],
    'programa_nome': ['educacao', 'saude', 'educacao', 'infraestrutura', 'saude'],
    'despesa_por_funcao': ['EDUCACAO', 'SAUDE', 'EDUCACAO', 'INFRAESTRUTURA', 'SAUDE'],
    'rubrica_orcamentaria': ['3190.39', '3190.39', '3190.39', '4490.39', '3190.39'],
    'periodo_referencia': ['2024-01-31', '2024-02-29', '2024-01-31', '2024-02-29', '2024-01-31']
})

print(f'Sample data shape: {sample_data.shape}')
print('\nFirst 5 rows:')
sample_data.head()

## 6. Data Cleaning and Transformation

In [None]:
def clean_financial_data(df):
    """Clean and standardize financial data."""
    df_cleaned = df.copy()
    
    # Convert numeric columns
    numeric_cols = ['orcamento_previsto', 'orcamento_executado', 'valor_despesa']
    for col in numeric_cols:
        df_cleaned[col] = pd.to_numeric(df_cleaned[col], errors='coerce')
    
    # Standardize text fields
    text_cols = ['municipio_nome', 'despesa_por_funcao', 'programa_nome', 'fonte_dados']
    for col in text_cols:
        df_cleaned[col] = df_cleaned[col].str.strip().str.upper()
    
    # Convert date
    df_cleaned['periodo_referencia'] = pd.to_datetime(df_cleaned['periodo_referencia']).dt.date
    
    # Add metadata
    df_cleaned['data_coleta'] = datetime.utcnow()
    df_cleaned['validacao_status'] = 'VALID'
    df_cleaned['processed_timestamp'] = datetime.utcnow()
    
    return df_cleaned

# Clean the sample data
df_cleaned = clean_financial_data(sample_data)
print('Data cleaned successfully!')
print(f'Cleaned data shape: {df_cleaned.shape}')
df_cleaned.head()

## 7. Calculate Financial Metrics

In [None]:
def calculate_execution_rate(df):
    """Calculate budget execution rate."""
    df_calc = df.copy()
    mask = df_calc['orcamento_previsto'] != 0
    df_calc.loc[mask, 'taxa_execucao'] = (
        (df_calc.loc[mask, 'orcamento_executado'] / df_calc.loc[mask, 'orcamento_previsto']) * 100
    )
    df_calc.loc[~mask, 'taxa_execucao'] = 0.0
    return df_calc

df_metrics = calculate_execution_rate(df_cleaned)

print('Budget Execution Rates:')
print(df_metrics[['municipio_nome', 'orcamento_previsto', 'orcamento_executado', 'taxa_execucao']].to_string())

## 8. Data Analysis and Visualization

In [None]:
import matplotlib.pyplot as plt

# Execution rate by municipality
fig, ax = plt.subplots(figsize=(12, 5))

municipios = df_metrics['municipio_nome'].unique()
execution_rates = []

for municipio in municipios:
    rate = df_metrics[df_metrics['municipio_nome'] == municipio]['taxa_execucao'].mean()
    execution_rates.append(rate)

ax.bar(municipios, execution_rates, color='steelblue', alpha=0.8)
ax.set_ylabel('Execution Rate (%)', fontsize=12)
ax.set_xlabel('Municipality', fontsize=12)
ax.set_title('Budget Execution Rate by Municipality', fontsize=14, fontweight='bold')
ax.set_ylim([0, 120])
ax.axhline(y=100, color='red', linestyle='--', linewidth=2, label='100% (Full Execution)')
ax.legend()
ax.grid(axis='y', alpha=0.3)

plt.tight_layout()
plt.show()

print('\nExecution Rate Statistics:')
print(df_metrics['taxa_execucao'].describe())

## 9. Prepare Data for BigQuery

In [None]:
# Define BigQuery schema
schema = [
    bigquery.SchemaField('data_coleta', 'TIMESTAMP', mode='NULLABLE'),
    bigquery.SchemaField('fonte_dados', 'STRING', mode='NULLABLE'),
    bigquery.SchemaField('municipio_codigo', 'STRING', mode='NULLABLE'),
    bigquery.SchemaField('municipio_nome', 'STRING', mode='NULLABLE'),
    bigquery.SchemaField('orcamento_previsto', 'FLOAT64', mode='NULLABLE'),
    bigquery.SchemaField('orcamento_executado', 'FLOAT64', mode='NULLABLE'),
    bigquery.SchemaField('taxa_execucao', 'FLOAT64', mode='NULLABLE'),
    bigquery.SchemaField('valor_despesa', 'FLOAT64', mode='NULLABLE'),
    bigquery.SchemaField('programa_codigo', 'STRING', mode='NULLABLE'),
    bigquery.SchemaField('programa_nome', 'STRING', mode='NULLABLE'),
    bigquery.SchemaField('despesa_por_funcao', 'STRING', mode='NULLABLE'),
    bigquery.SchemaField('rubrica_orcamentaria', 'STRING', mode='NULLABLE'),
    bigquery.SchemaField('periodo_referencia', 'DATE', mode='NULLABLE'),
    bigquery.SchemaField('validacao_status', 'STRING', mode='NULLABLE'),
    bigquery.SchemaField('processed_timestamp', 'TIMESTAMP', mode='NULLABLE'),
]

print('BigQuery schema defined:')
for field in schema:
    print(f'  - {field.name}: {field.field_type}')

## 10. Load Data to BigQuery (Optional)

**Note:** This requires proper BigQuery permissions and a valid project ID.

In [None]:
# Uncomment to load data to BigQuery

# table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"
# job_config = bigquery.LoadJobConfig(schema=schema, write_disposition='WRITE_TRUNCATE')
# job = client.load_table_from_dataframe(df_metrics, table_id, job_config=job_config)
# job.result()
# print(f'Successfully loaded {job.output_rows} rows to {table_id}')

## 11. Export Results

In [None]:
# Export processed data to CSV
output_file = 'financial_data_processed.csv'
df_metrics.to_csv(output_file, index=False)
print(f'Data exported to {output_file}')

# Download from Colab
files.download(output_file)
print('File ready for download!')

## 12. Summary Report

In [None]:
print('='*60)
print('ETL PIPELINE SUMMARY REPORT')
print('='*60)
print(f'Total records processed: {len(df_metrics)}')
print(f'Date range: {df_metrics["periodo_referencia"].min()} to {df_metrics["periodo_referencia"].max()}')
print(f'Data sources: {df_metrics["fonte_dados"].unique().tolist()}')
print(f'Municipalities: {df_metrics["municipio_nome"].nunique()}')
print(f'Programs: {df_metrics["programa_nome"].nunique()}')
print()
print('BUDGET STATISTICS:')
print(f'Total budgeted: R$ {df_metrics["orcamento_previsto"].sum():,.2f}')
print(f'Total executed: R$ {df_metrics["orcamento_executado"].sum():,.2f}')
print(f'Average execution rate: {df_metrics["taxa_execucao"].mean():.2f}%')
print()
print('VALIDATION:')
print(f'Valid records: {(df_metrics["validacao_status"] == "VALID").sum()}')
print(f'Processing timestamp: {df_metrics["processed_timestamp"].max()}')
print('='*60)