# Mexican federal budget pre-processing pipeline

## Instructions

To you run the notebook:

1. choose a unique `ITERATION_LABEL` for each pipeline run
2. specify your input folder and files (`INPUT_FOLDER`, `INPUT_FILES`)
3. make sure your column mapping (`COLUMN_ALIASES`) is correct
3. run the whole notebook by clicking on __Kernel > Restart & Run All__

## Settings

Choose a unique iteration label for each pipeline run.

In [1]:
ITERATION_LABEL = 'test-run-7'

Specify your input folder here.

In [2]:
INPUT_FOLDER = 'pipeline.in'

Put your input files inside the folder and describe them here.

In [3]:
INPUT_FILES = {
    2010: {'name': 'Cuenta_Publica_2010.csv', 'encoding': 'windows-1252'},
    2011: {'name': 'Cuenta_Publica_2011.csv', 'encoding': 'windows-1252'},
    2012: {'name': 'Cuenta_Publica_2012.csv', 'encoding': 'windows-1252'},
    2013: {'name': 'Cuenta_Publica_2013.csv', 'encoding': 'windows-1252'},
    2014: {'name': 'Cuenta_Publica_2014.csv', 'encoding': 'windows-1252'},
    2015: {'name': 'Cuenta_Publica_2015.csv', 'encoding': 'windows-1252'},
    2016: {'name': 'PEF2016_AC01.csv', 'encoding': 'cp850'}
}

If your input files don't all have the same column names, define your mapping here. 

In [4]:
COLUMN_ALIASES = {
    'Actividad Institucional': ['AI'],
    'Adefas': ['ADEFAS'],
    'Aprobado': [
        'PEF_2016',
        'Importe Presupuesto de Egresos de la Federación',
        'Importe Presupuesto de Egresos de la Federación (PEF)'
    ],
    'Ciclo': None,
    'Clave de cartera': ['CLAVE_CARTERA'],
    'Descripción de Fuente de Financiamiento': ['FUENTE_FINAN_DESCRIPCION'],
    'Descripción de Función': ['FUNCIONL_DESCRIPCION'],
    'Descripción de Grupo Funcional': [
        'Descripción de Finalidad',
        'GRUPO_FUN_DESCRIPCION',
        'Descripción de Grupo Funcional'
    ],
    'Descripción de Objeto del Gasto': ['CONCEPTO_DESCRIPCION'],
    'Descripción de Programa Presupuestario': ['PROGR_PRES_DESCRIPCION'],
    'Descripción de Ramo': ['RAMO_DESCRIPCION'],
    'Descripción de Reasignacion': ['REASIGNACION_DESCRIPCION'],
    'Descripción de Subfunción': ['SUBFUNCIONL_DESCRIPCION'],
    'Descripción de Tipo de Gasto': ['TIPO_GASTO_DESCRIPCION'],
    'Descripción de Unidad Responsable': ['UNIDAD_DESCRIPCION'],
    'Descripción de la Actividad Institucional': [
        'ACTIVIDAD_INST_DESCRIPCION',
        'Descripción de Actividad Institucional'
    ],
    'Descripción de la entidad federativa': ['ENTIDAD_FED_DESCRIPCION'],
    'Descripción de la modalidad del programa presupuestario': [
        'MODALIDAD_DESCRIPCION',
        'Descripción del Identificador del Programa Presupuestario',
        'Descripción del Identificador de Programa Presupuestario'
    ],
    'Devengado': None,
    'Ejercicio': None,
    'Ejercido': None,
    'Entidad Federativa': ['EF'],
    'Fuente de Financiamiento': ['FF'],
    'Función': ['FN'],
    'Grupo Funcional': [
        'Finalidad', 'GF', 'Grupo Funcional'
    ],
    'Modalidad del Programa presupuestario': [
        'MOD',
        'Identificador de Programa Presupuestario',
        'Identificador del Programa Presupuestario'
    ],
    'Modificado': None,
    'Objeto del Gasto': ['CONCEPTO'],
    'Pagado': None,
    'Programa Presupuestario': ['PP'],
    'Ramo': None,
    'Reasignacion': ['RA'],
    'Subfunción': ['SF'],
    'Tipo de Gasto': ['TG'],
    'Unidad Responsable': ['UNIDAD']
}

That's it. Now just run the notebook from beginning to end.

## Imports

In [5]:
from sys import stdout
from pandas import read_csv, concat, DataFrame
from numpy import nan
from os.path import join, isdir
from os import mkdir
from json import dumps
from pprint import pprint

## Configuration

In [6]:
BASENAME = 'mexican_federal_budget'
OUTPUT_FOLDER = 'pipeline.out'
ITERATION_FOLDER = join(OUTPUT_FOLDER, ITERATION_LABEL)
MERGED_FILE = join(ITERATION_FOLDER, BASENAME + '.merged.csv')

In [7]:
if isdir(ITERATION_FOLDER):
    raise ValueError('Please enter a unique iteration label')
    
mkdir(ITERATION_FOLDER)

## Encoding inspection

Detect the file encodings of the input files using the `cChardet` utility library. __Warning:__ it's not always accurate. This is meant only as an indication only. In the end, encodings will be taken from `INPUT_FILES`.

In [8]:
def detect_encodings():
    """Detect CSV file encoding with the cChardet library"""

    try:
        import cchardet as chardet
    except ImportError:
        cChardet = 'https://github.com/PyYoshi/cChardet'
        print('Encoding inspection skipped: install %s', cChardet)
        return

    results = {}
    results_file = join(OUTPUT_FOLDER, ITERATION_LABEL, 'encodings.detected.json')
    
    for year, file in sorted(INPUT_FILES.items()):
        datafile = join(INPUT_FOLDER, file['name'])
        
        with open(datafile, 'rb') as f:
            text = f.read()
            
        result = chardet.detect(text)
        results.update({year: result})
        print(year, 'Inspected', file['name'], result)
    
    with open(results_file, 'w+') as json:
        json.write(dumps(results))
        print('\nSaved encoding detection report to', results_file)
        
detect_encodings()

2010 Inspected Cuenta_Publica_2010.csv {'encoding': 'WINDOWS-1252', 'confidence': 0.9900000095367432}
2011 Inspected Cuenta_Publica_2011.csv {'encoding': 'WINDOWS-1252', 'confidence': 0.9900000095367432}
2012 Inspected Cuenta_Publica_2012.csv {'encoding': 'WINDOWS-1252', 'confidence': 0.9900000095367432}
2013 Inspected Cuenta_Publica_2013.csv {'encoding': 'WINDOWS-1252', 'confidence': 0.9900000095367432}
2014 Inspected Cuenta_Publica_2014.csv {'encoding': 'WINDOWS-1252', 'confidence': 0.9900000095367432}
2015 Inspected Cuenta_Publica_2015.csv {'encoding': 'WINDOWS-1252', 'confidence': 0.9900000095367432}
2016 Inspected PEF2016_AC01.csv {'encoding': 'WINDOWS-1252', 'confidence': 0.9900000095367432}

Saved encoding detection report to pipeline.out/test-run-7/encodings.detected.json


## Load files

In [9]:
def read_columns(file, encoding):
    """Return clean CSV file headers"""
    
    with open(file, encoding=encoding) as csv:
        header = csv.readline()
        return header.replace('\n', '').split(',')

In [10]:
def force_strings(columns):
    """Return string enforcement for each column of a CSV file"""
    
    for column in columns:
        yield column, str

In [11]:
def load_csv_files():
    """Load raw data (CSV) files"""
    
    batch = {}
    
    for year, file in sorted(INPUT_FILES.items()):
        filepath = join(INPUT_FOLDER, file['name'])
        column_names = read_columns(filepath, file['encoding'])
        column_types = dict(force_strings(column_names))
        
        batch[year] = read_csv(filepath, encoding=file['encoding'], dtype=column_types)
        print('Loaded', file['name'], 'with encoding', file['encoding'])
        stdout.flush()
            
    return batch

## Clean the data

In [12]:
def strip_cell_padding(batch):
    for year in sorted(batch.keys()):
        for column in batch[year].columns:
            batch[year].rename(columns={column: column.strip()}, inplace=True)
            batch[year][column] = batch[year][column].apply(lambda x: x.strip() if x is not nan else x)
        print(year, 'stripped cell paddings')
        stdout.flush()

In [13]:
def delete_empty_columns(batch):
    for year in batch.keys():
        for column in batch[year].columns:
            if 'Unnamed:' in column:
                try:
                    del batch[year][column]
                    print(year, column, 'deleted')
                    stdout.flush()
                except KeyError:
                    pass  

In [14]:
def count_missing_values(batch):
    table = []

    for column in get_union_of_columns(batch):
        row = {'Column': column}
        
        for year in batch.keys():
            if column in batch[year].columns:
                nb_empty_cells = batch[year][column].apply(lambda x: 1 if x is nan else 0).sum()
            else:
                nb_empty_cells = nan
                
            row.update({year: nb_empty_cells})
            if nb_empty_cells not in (nan, 0):
                print(year, 'found', nb_empty_cells, 'missing values in', column)

        table.append(row)
        
    ordered_columns = ['Column']
    ordered_columns.extend(sorted(batch.keys()))
    
    return DataFrame(table).reindex_axis(ordered_columns, axis=1)

In [15]:
def count_duplicates(batch):
    for year, df in sorted(batch.items()):
        nb_duplicate_lines = df.duplicated().apply(lambda x: 1 if x is True else 0).sum()
        print(year, 'found', nb_duplicate_lines, 'duplicate lines')

## Alias column names

In [16]:
def get_union_of_columns(batch):
    union = set()
    for year in batch.keys():
        union = union | set(batch[year].columns)
    return union

In [17]:
from yaml import load

def load_aliases(file):
    with open(file) as yaml:
        aliases = load(yaml.read())
        return aliases

In [18]:
def map_columns_to_aliases(batch, list_of_aliases):
    for year in sorted(batch.keys()):
        for column in sorted(batch[year].columns):
            if not column in list_of_aliases:
                for reference, aliases in list_of_aliases.items():
                    if aliases:
                        if column in aliases:
                            batch[year].rename(columns={column: reference}, inplace=True)
                            print(year, column, 'replaced with', reference)
                            stdout.flush()
                            break  
                else:
                    print(year, 'NO ALIAS: ', column)
                    stdout.flush()

In [19]:
def build_overview(batch):
    table = []
    
    for column in get_union_of_columns(batch):
        row = {'Column': column}
        for year in batch.keys():
            row.update({year: column in batch[year].columns})
        table.append(row)
        
    ordered_columns = ['Column']
    ordered_columns.extend(sorted(batch.keys()))
    
    overview = DataFrame(table).reindex_axis(ordered_columns, axis=1)
    return overview

## Check expenditure sums

There's a little cleaning to do on the amount columns (zeros represented by a dash). Assume thousands are seperated by a comma.

In [20]:
EXPENDITURE_COLUMNS = [
    'Ejercido', 
    'Devengado', 
    'Aprobado', 
    'Pagado', 
    'Modificado', 
    'Adefas', 
    'Ejercicio'
]

def clean_expenditure_columns(batch):
    check_sums = []

    for column in EXPENDITURE_COLUMNS:
        row = {'Column': column}
        
        for year in sorted(batch.keys()):
            try:
                series = batch[year][column]
                
                # I'm assuming -' represents zero
                series = series.apply(lambda x: '0' if x == '-' else x)
                series = series.apply(lambda x: x.replace(',', '') if x is not nan else x)                
                batch[year][column] = series.astype(float)
                check_sum = batch[year][column].sum()
                
                print(year, 'cleaned and summed', column)
                
            except KeyError:
                check_sum = nan
                
            row.update({year: check_sum})
        
        check_sums.append(row)

    ordered_columns = ['Column']
    ordered_columns.extend(sorted(batch.keys()))
    return DataFrame(check_sums).reindex_axis(ordered_columns, axis=1)    

##  Pipeline

In [21]:
def do_pipeline():

    def echo_section(section):
        print('\n', section, '\n')

    echo_section('Loading files')
    datasets = load_csv_files()
    
    echo_section('Delete empty columns')
    delete_empty_columns(datasets)

    echo_section('Stripping padding from cells')
    strip_cell_padding(datasets)
    
    echo_section('Counting duplicate lines (NOT de-duplicating)')
    count_duplicates(datasets)
    
    echo_section('Mapping column to aliases')
    map_columns_to_aliases(datasets, COLUMN_ALIASES)

    echo_section('Counting missing values')
    missing_values_report = count_missing_values(datasets)
    
    echo_section('Building column mapping overview')
    column_mapping_report = build_overview(datasets)
    
    echo_section('Cleaning expenditure columns')
    sums_report = clean_expenditure_columns(datasets)
    
    echo_section('Merging datasets')
    merged_dataset = concat(list(datasets.values()))
    
    missing_file = join(ITERATION_FOLDER, BASENAME + '.missing.tsv')
    columns_file = join(ITERATION_FOLDER, BASENAME + '.columns.tsv')
    sums_file = join(ITERATION_FOLDER, BASENAME + '.sums.tsv')
    aliases_file = join(ITERATION_FOLDER, BASENAME + '.aliases.json')
    inputs_file = join(ITERATION_FOLDER, BASENAME + '.inputs.json')

    merged_dataset.to_csv(MERGED_FILE, encoding='utf-8', index=False)
    missing_values_report.to_csv(missing_file, encoding='utf-8', index=False, sep='\t')
    column_mapping_report.to_csv(columns_file, encoding='utf-8', index=False, sep='\t')
    sums_report.to_csv(sums_file, encoding='utf-8', index=False, sep='\t')
    
    with open(aliases_file, 'w+') as json:
        json.write(dumps(COLUMN_ALIASES, indent=4))
        
    with open(aliases_file, 'w+') as json:
        json.write(dumps(INPUT_FILES, indent=4))
    
    print('Saved merged datasets to', MERGED_FILE)    
    print('Saved input configuration to', inputs_file)    
    print('Saved aliases configuration to', aliases_file)    
    print('Saved missing values report to', missing_file)    
    print('Saved column mapping report to', columns_file)    
    print('Saved check sums report to', sums_file)    

    echo_section('Pipeline run "%s" done and saved to %s' % (ITERATION_LABEL, ITERATION_FOLDER))

    return merged_dataset, column_mapping_report, missing_values_report, sums_report, datasets

## Run the pipeline

In [22]:
merged_budget, column_mapping, missing_values, sums, raw_data = do_pipeline()


 Loading files 

Loaded Cuenta_Publica_2010.csv with encoding windows-1252
Loaded Cuenta_Publica_2011.csv with encoding windows-1252
Loaded Cuenta_Publica_2012.csv with encoding windows-1252
Loaded Cuenta_Publica_2013.csv with encoding windows-1252
Loaded Cuenta_Publica_2014.csv with encoding windows-1252
Loaded Cuenta_Publica_2015.csv with encoding windows-1252
Loaded PEF2016_AC01.csv with encoding cp850

 Delete empty columns 

2011 Unnamed: 25 deleted
2011 Unnamed: 26 deleted
2011 Unnamed: 27 deleted
2011 Unnamed: 28 deleted
2011 Unnamed: 29 deleted
2011 Unnamed: 30 deleted
2011 Unnamed: 31 deleted
2011 Unnamed: 32 deleted
2011 Unnamed: 33 deleted
2011 Unnamed: 34 deleted
2011 Unnamed: 35 deleted
2011 Unnamed: 36 deleted
2011 Unnamed: 37 deleted
2011 Unnamed: 38 deleted
2011 Unnamed: 39 deleted
2011 Unnamed: 40 deleted
2011 Unnamed: 41 deleted

 Stripping padding from cells 

2010 stripped cell paddings
2011 stripped cell paddings
2012 stripped cell paddings
2013 stripped cell padd

## Quality control

In [23]:
list(merged_budget.columns)

['Actividad Institucional',
 'Adefas',
 'Aprobado',
 'Ciclo',
 'Clave de cartera',
 'Descripción de Fuente de Financiamiento',
 'Descripción de Función',
 'Descripción de Grupo Funcional',
 'Descripción de Objeto del Gasto',
 'Descripción de Programa Presupuestario',
 'Descripción de Ramo',
 'Descripción de Reasignacion',
 'Descripción de Subfunción',
 'Descripción de Tipo de Gasto',
 'Descripción de Unidad Responsable',
 'Descripción de la Actividad Institucional',
 'Descripción de la entidad federativa',
 'Descripción de la modalidad del programa presupuestario',
 'Devengado',
 'Ejercicio',
 'Ejercido',
 'Entidad Federativa',
 'Fuente de Financiamiento',
 'Función',
 'Grupo Funcional',
 'Modalidad del Programa presupuestario',
 'Modificado',
 'Objeto del Gasto',
 'Pagado',
 'Programa Presupuestario',
 'Ramo',
 'Reasignacion',
 'Subfunción',
 'Tipo de Gasto',
 'Unidad Responsable']

In [24]:
merged_budget.sample(n=10)

Unnamed: 0,Actividad Institucional,Adefas,Aprobado,Ciclo,Clave de cartera,Descripción de Fuente de Financiamiento,Descripción de Función,Descripción de Grupo Funcional,Descripción de Objeto del Gasto,Descripción de Programa Presupuestario,...,Modalidad del Programa presupuestario,Modificado,Objeto del Gasto,Pagado,Programa Presupuestario,Ramo,Reasignacion,Subfunción,Tipo de Gasto,Unidad Responsable
90853,2,,17850.0,2011,,Recursos fiscales,Desarrollo Sustentable,Desarrollo Económico,Servicio de telefonía celular,Actividades de apoyo administrativo,...,M,,31501,,1,16,,2,1,135
106856,9,,713600.0,2013,0.0,Recursos fiscales,"Recreación, Cultura y Otras Manifestaciones So...",Desarrollo Social,Subsidios a Entidades Federativas y Municipios,Deporte,...,S,,43801,,205,11,,1,1,L6I
127515,18,0.0,22532172.0,2015,0.0,Recursos fiscales,Salud,Desarrollo Social,Asignaciones adicionales al sueldo,Prestación de servicios en los diferentes nive...,...,E,26866731.23,15403,26866731.23,23,12,,2,1,NBB
165547,5,,3499442.0,2012,0.0,Recursos fiscales,"Ciencia, Tecnología e Innovación",Desarrollo Económico,Aguinaldo o gratificación de fin de año,Investigación científica y tecnológica,...,E,,13202,,9,16,,1,1,D00
197411,1,,0.0,2012,0.0,Recursos fiscales,Coordinación de la Política de Gobierno,Gobierno,Arrendamiento de maquinaria y equipo,"Planeación, concertación y control",...,R,,32601,,1,22,,6,1,103
35024,10,,13500.0,2011,,Recursos fiscales,Comunicaciones y Transportes,Desarrollo Económico,Arrendamiento de edificios y locales,Proyectos de infraestructura económica de carr...,...,K,,32201,,31,9,,1,3,633
141823,4,3239184.0,32664.0,2015,0.0,Recursos fiscales,"Agropecuaria, Silvicultura, Pesca y Caza",Desarrollo Económico,Fletes y maniobras,Implementación de políticas enfocadas al medio...,...,P,3270418.0,34701,31234.0,1,15,,1,1,B00
160968,3,0.0,0.0,2014,0.0,Recursos fiscales,Protección Ambiental,Desarrollo Social,Servicios bancarios y financieros,Administración Sustentable del Agua,...,G,203.0,34101,203.0,1,16,,2,1,B00
94327,10,0.0,78400.0,2015,14096520012.0,Recursos fiscales,Transporte,Desarrollo Económico,Servicios de supervisión de obras,Proyectos de infraestructura económica de carr...,...,K,84296.92,62903,84296.92,31,9,,1,3,652
87438,1,,0.0,2010,,Recursos fiscales,Administración Pública,Gobierno,Utensilios para el servicio de alimentación,Actividades de apoyo a la función pública y bu...,...,O,,2303,,1,16,,3,1,B23


In [25]:
sums

Unnamed: 0,Column,2010,2011,2012,2013,2014,2015,2016
0,Ejercido,2474100000000.0,2695930000000.0,2896331000000.0,3134797000000.0,,,
1,Devengado,,,,3135015000000.0,3426242000000.0,3761997000000.0,
2,Aprobado,2376915000000.0,2538282000000.0,2754868000000.0,2943495000000.0,3334259000000.0,3508463000000.0,5297126000000.0
3,Pagado,,,,,3386609000000.0,3728056000000.0,
4,Modificado,,,,,3427172000000.0,3763467000000.0,
5,Adefas,,,,,36941610000.0,31122650000.0,
6,Ejercicio,,,,,3424774000000.0,3760422000000.0,


In [26]:
column_mapping

Unnamed: 0,Column,2010,2011,2012,2013,2014,2015,2016
0,Descripción de la modalidad del programa presu...,True,True,True,True,True,True,True
1,Descripción de Reasignacion,False,False,False,False,False,False,True
2,Función,True,True,True,True,True,True,True
3,Ramo,True,True,True,True,True,True,True
4,Descripción de la Actividad Institucional,True,True,True,True,True,True,True
5,Descripción de Programa Presupuestario,True,True,True,True,True,True,True
6,Reasignacion,False,False,False,False,False,False,True
7,Modalidad del Programa presupuestario,True,True,True,True,True,True,True
8,Ejercicio,False,False,False,False,True,True,False
9,Descripción de Objeto del Gasto,True,True,True,True,True,True,True


In [27]:
missing_values

Unnamed: 0,Column,2010,2011,2012,2013,2014,2015,2016
0,Descripción de la modalidad del programa presu...,0.0,1.0,0.0,0.0,0.0,0.0,0.0
1,Descripción de Reasignacion,,,,,,,0.0
2,Función,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,Ramo,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,Descripción de la Actividad Institucional,0.0,1.0,0.0,0.0,0.0,0.0,0.0
5,Descripción de Programa Presupuestario,0.0,1.0,0.0,0.0,0.0,0.0,0.0
6,Reasignacion,,,,,,,0.0
7,Modalidad del Programa presupuestario,0.0,1.0,0.0,0.0,0.0,0.0,0.0
8,Ejercicio,,,,,0.0,0.0,
9,Descripción de Objeto del Gasto,0.0,1.0,28.0,0.0,0.0,0.0,0.0


In [28]:
with open(MERGED_FILE) as file:
    for n in range(10):
        print(file.readline())

Actividad Institucional,Adefas,Aprobado,Ciclo,Clave de cartera,Descripción de Fuente de Financiamiento,Descripción de Función,Descripción de Grupo Funcional,Descripción de Objeto del Gasto,Descripción de Programa Presupuestario,Descripción de Ramo,Descripción de Reasignacion,Descripción de Subfunción,Descripción de Tipo de Gasto,Descripción de Unidad Responsable,Descripción de la Actividad Institucional,Descripción de la entidad federativa,Descripción de la modalidad del programa presupuestario,Devengado,Ejercicio,Ejercido,Entidad Federativa,Fuente de Financiamiento,Función,Grupo Funcional,Modalidad del Programa presupuestario,Modificado,Objeto del Gasto,Pagado,Programa Presupuestario,Ramo,Reasignacion,Subfunción,Tipo de Gasto,Unidad Responsable

4,,99305000.0,2016,0,Recursos fiscales,Legislación,Gobierno,Obra pública en bienes propios,Mantenimiento de Infraestructura,Poder Legislativo,Otros,Legislación,Gasto de obra pública,H. Cámara de Diputados,Llevar a cabo el proceso Legislativo,D