In [6]:
# complete_pipeline_no_json.py
import pandas as pd
import os
from datetime import datetime

print("COMPLETE PIPELINE")
print("=" * 60)

# 1. Create necessary folders
os.makedirs('data_standardized', exist_ok=True)

# 2. Define mappings IN CODE
MAPPINGS = {
    'partner_a': {
        'partner_id': 'partner_id',
        'region': {'source': 'region', 'mapping': {
            'Northern': 'Northern',
            'Central': 'Central',
            'Southern': 'Southern'
        }},
        'reporting_month': 'month',
        'households_supported': 'hh_supported',
        'amount_disbursed_usd': 'amount_disbursed_usd',
        'submission_timestamp': 'submitted_at'
    },
    'partner_b': {
        'partner_id': 'implementer',
        'region': {'source': 'location', 'mapping': {
            'North': 'Northern',
            'Central': 'Central',
            'South': 'Southern',
            'Eastern': 'Eastern',
            'Western': 'Western'
        }},
        'reporting_month': 'reporting_period',
        'households_supported': 'households',
        'amount_disbursed_usd': 'disbursed_amount',
        'submission_timestamp': 'submission_time'
    }
}

# 3. Process partner A
print("Processing partner A")
try:
    df_a = pd.read_csv('01_data_raw/partner_A.csv')
    map_a = MAPPINGS['partner_a']
    
    df_a_std = pd.DataFrame()
    df_a_std['partner_id'] = df_a[map_a['partner_id']]
    df_a_std['region'] = df_a[map_a['region']['source']].map(map_a['region']['mapping'])
    df_a_std['reporting_month'] = pd.to_datetime(df_a[map_a['reporting_month']]).dt.strftime('%Y-%m-%d')
    df_a_std['households_supported'] = df_a[map_a['households_supported']]
    df_a_std['amount_disbursed_usd'] = df_a[map_a['amount_disbursed_usd']].round(2)
    df_a_std['submission_timestamp'] = df_a[map_a['submission_timestamp']]
    df_a_std['source_file'] = 'partner_A_2024_01.csv'
    df_a_std['processed_at'] = datetime.now().isoformat()
    
    df_a_std.to_csv('data_standardized/partner_A_standardized.csv', index=False)
    print(f" Saved: {len(df_a_std)} records")
    
except Exception as e:
    print(f"Error with partner A: {e}")
    print("Available columns:", list(df_a.columns) if 'df_a' in locals() else "No data loaded")

# 4. Process partner B
print("\n Processing partner B")
try:
    df_b = pd.read_csv('01_data_raw/partner_B.csv')
    map_b = MAPPINGS['partner_b']
    
    df_b_std = pd.DataFrame()
    df_b_std['partner_id'] = df_b[map_b['partner_id']]
    df_b_std['region'] = df_b[map_b['region']['source']].map(map_b['region']['mapping'])
    # Convert "Jan-2024" to "2024-01-01"
    df_b_std['reporting_month'] = pd.to_datetime(df_b[map_b['reporting_month']], format='%b-%Y').dt.strftime('%Y-%m-%d')
    df_b_std['households_supported'] = df_b[map_b['households_supported']]
    df_b_std['amount_disbursed_usd'] = df_b[map_b['amount_disbursed_usd']].round(2)
    df_b_std['submission_timestamp'] = df_b[map_b['submission_timestamp']]
    df_b_std['source_file'] = 'partner_B_2024_01.csv'
    df_b_std['processed_at'] = datetime.now().isoformat()
    
    df_b_std.to_csv('data_standardized/partner_B_standardized.csv', index=False)
    print(f" Saved: {len(df_b_std)} records")
    
except Exception as e:
    print(f" Error with partner B: {e}")
    print("Available columns:", list(df_b.columns) if 'df_b' in locals() else "No data loaded")

# 5. Combine and show results
print("\n" + "=" * 60)
print("FINAL RESULTS")

if 'df_a_std' in locals() and 'df_b_std' in locals():
    combined = pd.concat([df_a_std, df_b_std], ignore_index=True)
    combined.to_csv('data_standardized/all_standardized.csv', index=False)
    
    print(f" Total standardized records: {len(combined)}")
    print(f" Output saved to: data_standardized/all_standardized.csv")
    
    print("\n SAMPLE DATA:")
    print(combined[['partner_id', 'region', 'reporting_month', 
                    'households_supported', 'amount_disbursed_usd']].head())
    
    print("\n AUDIT TRAIL:")
    print(f"• Source files: {combined['source_file'].unique().tolist()}")
    print(f"• Processing completed: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    
elif 'df_a_std' in locals():
    print(f" Only partner A processed: {len(df_a_std)} records")
elif 'df_b_std' in locals():
    print(f" Only partner B processed: {len(df_b_std)} records")
else:
    print(" No data was processed successfully")

print("\n Layer 2: Standardization attempted")
print("   Next: Layer 3 - Validation Rules")

COMPLETE PIPELINE
Processing partner A
 Saved: 10 records

 Processing partner B
 Saved: 10 records

FINAL RESULTS
 Total standardized records: 20
 Output saved to: data_standardized/all_standardized.csv

 SAMPLE DATA:
  partner_id    region reporting_month  households_supported  \
0       A001  Northern      2024-01-01                   150   
1       A001  Northern      2024-02-01                   165   
2       A001   Central      2024-01-01                   200   
3       A002  Southern      2024-01-01                    90   
4       A002  Southern      2024-02-01                   110   

   amount_disbursed_usd  
0                7500.0  
1                8250.0  
2               10000.0  
3                4500.0  
4                5500.0  

 AUDIT TRAIL:
• Source files: ['partner_A_2024_01.csv', 'partner_B_2024_01.csv']
• Processing completed: 2026-01-26 21:39:26

 Layer 2: Standardization attempted
   Next: Layer 3 - Validation Rules
