# Pipelines workshop

In this workshop we are going to build a pipeline that reads in some external data and processes it so it can be used for further data science work.

The data we will be using is data on household incomes and household expenditures, both provided by the Dutch national statistics bureau CBS. We are going to try to link the two.

In this workshop we will be using to libraries you may not have seen before:
- loguru (for logging)
- cbsodata (for interfacing with the CBS Statline service)


In [1]:
import cbsodata
import pandas as pd
from lib.utils import *
from loguru import logger



## Reading in external data and storing it

First we are going to read some data from the internet. The first data set contains information on household incomes. 

When we read data from the internet, we should expect things to go wrong (there may not be an internet connection, the CBS server may be down, etc). A robust pipeline detects this and complains loudly (things should never fail silently).

For each potential error, you should make a decision whether the pipeline can continue after the error has occured or not.

In this case, the CBS household income data is crucial to our data science project, so if we can't read it in, we should abort the pipeline.

In [2]:
# First read in CBS household income data.
cbs_code_household_income = '84493NED'

try:
    hhi_raw = cbsodata.get_data(cbs_code_household_income)
except Exception as e:
    logger.critical(f"Unable to read CBS household income data. Error was: {e}. Aborting")
    exit(0) # Exit kills the entire kernel, which in this case is what we want.


Once we have read in the data, we want to convert it to a Pandas dataframe and then store it in a CSV file.

Since the data comes from the internet, it may not actually be what we expect it to be. The CBS server may be compromised, CBS may have changed it data format, etc.

This means we should also wrap reading and converting the data in a try / except block. Also, when this fails, we should simply crash the kernel instead of continuing.

In [16]:
try:
    df_hhi = pd.DataFrame(hhi_raw)
    # If you want to see what happens when things go wrong, raise an exception yourself
    # raise Exception("Exception to test exception handling.")
except Exception as e:
    logger.critical(f"Unable to parse CBS household income data. Exception was: {e}")
    # At this point we should dump the incorrect data so we can look at it to find out what went wrong.
    dump_file_name = "cbs_household_income_" + timestamp() + ".dump"
    # Of course writing the dump file itself could also go wrong (disk full, wrong filename, etc)
    with open(dump_file_name, "w") as f:
        f.write(str(hhi_raw))
df_hhi

Unnamed: 0,ID,Inkomensbestanddelen,KenmerkenVanHuishoudens,Perioden,ParticuliereHuishoudens_1,ParticuliereHuishoudensRelatief_2,TotaalInkomen_3,GemiddeldInkomen_4,MediaanInkomen_5,AandeelVanBrutoInkomen_6
0,0,1 Inkomen als werknemer,Particuliere huishoudens,2011,4847.2,66.0,290670.0,60.0,51.7,65.5
1,1,1 Inkomen als werknemer,Particuliere huishoudens,2012,4850.8,65.4,295084.0,60.8,52.2,65.2
2,2,1 Inkomen als werknemer,Particuliere huishoudens,2013,4817.2,64.5,296314.0,61.5,52.4,64.3
3,3,1 Inkomen als werknemer,Particuliere huishoudens,2014,4819.7,64.3,300571.0,62.4,52.8,62.5
4,4,1 Inkomen als werknemer,Particuliere huishoudens,2015,4839.3,63.9,300893.0,62.2,52.7,63.3
...,...,...,...,...,...,...,...,...,...,...
77527,77527,14 BESTEEDBAAR INKOMEN,Vermogen: 10e 10%-groep,2019,782.7,100.0,71988.0,92.0,61.9,62.5
77528,77528,14 BESTEEDBAAR INKOMEN,Vermogen: 10e 10%-groep,2020,789.4,100.0,67064.0,85.0,63.4,61.5
77529,77529,14 BESTEEDBAAR INKOMEN,Vermogen: 10e 10%-groep,2021,795.1,100.0,71383.0,89.8,67.7,61.7
77530,77530,14 BESTEEDBAAR INKOMEN,Vermogen: 10e 10%-groep,2022,804.1,100.0,77624.0,96.5,72.0,61.1


We now have a dataframe we can use in the rest of our pipeline. Store it in its original form in de data/raw directory. Storing data as you received it, without further processing, makes your work more transparent and more easily reproducible.

In [None]:
df_hhi.to_csv("data/raw/cbs_hhi_" + timestamp() + ".csv", index = False)

### Now do it yourself

Now do it yourself. Read in data on household expenditure, also from CBS. The code for this data set is 83676NED. Make sure to use try / except blocks to log errors if they occur.

In [None]:
cbs_code_household_expenditure = '83676NED'

try:
    hhe_raw = cbsodata.get_data(cbs_code_household_expenditure)
except Exception as e:
    logger.critical(f"Unable to read CBS household expenditure data. Error was: {e}. Aborting")
    exit(0) # Exit kills the entire kernel, which in this case is what we want.

try:
    df_hhe = pd.DataFrame(hhe_raw)
    # If you want to see what happens when things go wrong, raise an exception yourself
    # raise Exception("Exception to test exception handling.")
except Exception as e:
    logger.critical(f"Unable to parse CBS household expenditure data. Exception was: {e}")
    # At this point we should dump the incorrect data so we can look at it to find out what went wrong.
    dump_file_name = "cbs_household_expenditure_" + timestamp() + ".dump"
    # Of course writing the dump file itself could also go wrong (disk full, wrong filename, etc)
    with open(dump_file_name, "w") as f:
        f.write(str(hhi_raw))
df_hhe.to_csv("data/raw/cbs_hhe_" + timestamp() + ".csv", index=False)




In [21]:
df_hhe.to_csv("data/raw/cbs_hhe_" + timestamp() + ".csv", index=False)


every step: alert and exit on failure
(but make one step that merely alerts)

- read in source data (in raw)
- process soruce data
- store processed data in processed
- place substeps in separate notebooks under process/


In [None]:
# Your code goes here

## Data preparation

It is rare that a data file, especially one sourced from elsewhere, is in a form that you want. You will almost always need to massage and reshape data before you can use it.

This is the case here, too. 


### Preparing the income data file

Let's start with the household income data. For one, this data file contains much, much more information than we need. Also, the shape of the data is in a form that is inconvenient to us.

First, let's filter the data so we have only the records that we need.

Note that this, too, is done inside a try / except block. If the CSV file does not contain the columns that we expect, we want to throw an error and abort processing. Also, we expect the dataframe to contain at least, say, 5 useful rows, so if it does not, something is clearly wrong.


In [49]:
try:
    df_hhi_relevant = df_hhi[(df_hhi['Inkomensbestanddelen'] == '14 BESTEEDBAAR INKOMEN') 
                                 & (df_hhi['KenmerkenVanHuishoudens'] == 'Particuliere huishoudens')]
except Exception as e:
    logger.critical("Unable to filter income data file: {e}")
    exit(0)

min_num_rows = 5
if len(df_hhi_relevant) < min_num_rows:
    logger.critical(f"Not enough rows in income data file. Expected at least {min_num_rows} but got {len(df_hhi_relevant)}")
    exit(0)
df_hhi_relevant



Unnamed: 0,ID,Inkomensbestanddelen,KenmerkenVanHuishoudens,Perioden,ParticuliereHuishoudens_1,ParticuliereHuishoudensRelatief_2,TotaalInkomen_3,GemiddeldInkomen_4,MediaanInkomen_5,AandeelVanBrutoInkomen_6
76609,76609,14 BESTEEDBAAR INKOMEN,Particuliere huishoudens,2011,7347.6,100.0,269012.0,36.6,31.3,60.6
76610,76610,14 BESTEEDBAAR INKOMEN,Particuliere huishoudens,2012,7412.1,100.0,272458.0,36.8,31.4,60.2
76611,76611,14 BESTEEDBAAR INKOMEN,Particuliere huishoudens,2013,7467.8,100.0,275998.0,37.0,31.4,59.9
76612,76612,14 BESTEEDBAAR INKOMEN,Particuliere huishoudens,2014,7496.4,100.0,292108.0,39.0,32.2,60.7
76613,76613,14 BESTEEDBAAR INKOMEN,Particuliere huishoudens,2015,7568.5,100.0,292094.0,38.6,32.5,61.4
76614,76614,14 BESTEEDBAAR INKOMEN,Particuliere huishoudens,2016,7623.2,100.0,306084.0,40.2,33.4,62.1
76615,76615,14 BESTEEDBAAR INKOMEN,Particuliere huishoudens,2017,7694.9,100.0,319738.0,41.6,34.1,61.5
76616,76616,14 BESTEEDBAAR INKOMEN,Particuliere huishoudens,2018,7760.6,100.0,330023.0,42.5,35.0,61.5
76617,76617,14 BESTEEDBAAR INKOMEN,Particuliere huishoudens,2019,7827.4,100.0,357843.0,45.7,36.7,62.0
76618,76618,14 BESTEEDBAAR INKOMEN,Particuliere huishoudens,2020,7894.5,100.0,366657.0,46.4,38.1,62.2


After filtering out the rows we don't need, we're still left with a lot of columns we don't need either.

In fact, we only need five columns:
- "Perioden" (year)
- "ParticuliereHuishoudens_1" (number of households)
- "TotaalInkomen_3" (total household income, in millions €)
- "GemiddeldInkomen_4" (mean household income, in thousands €)
- "MediaanInkomen_5" (median household income, in thousands €)

Let's create a new data frame containing only these columns. Also, let's give the columns better names.

And, again, remember this is data we obtained from the internet, so we should expect it to contain errors. The columns we need may not actually exist.

In [50]:
try:
# Formatting lists like this is a good habit to get into as it allows you to easily add or remove items.
    df_hhi_relevant_cols = df_hhi_relevant[['Perioden', 
                                        'ParticuliereHuishoudens_1', 
                                        'TotaalInkomen_3', 
                                        'GemiddeldInkomen_4',
                                        'MediaanInkomen_5']].copy()
except Exception as e:
    logger.critical("Unable to select relevant columns from income file: {e}")
    exit(0)
    
# Translation table for column names
df_hhi_column_map = {
    'Perioden' : 'year',
    'ParticuliereHuishoudens_1' : 'num_hh',
    'TotaalInkomen_3' : 'total_inc_mill',
    'GemiddeldInkomen_4' : 'mean_inc_k',
    'MediaanInkomen_5' : 'median_inc_k'
}
df_hhi_relevant_cols.rename(mapper = df_hhi_column_map, axis = 1, inplace = True)
df_hhi_relevant_cols

Unnamed: 0,year,num_hh,total_inc_mill,mean_inc_k,median_inc_k
76609,2011,7347.6,269012.0,36.6,31.3
76610,2012,7412.1,272458.0,36.8,31.4
76611,2013,7467.8,275998.0,37.0,31.4
76612,2014,7496.4,292108.0,39.0,32.2
76613,2015,7568.5,292094.0,38.6,32.5
76614,2016,7623.2,306084.0,40.2,33.4
76615,2017,7694.9,319738.0,41.6,34.1
76616,2018,7760.6,330023.0,42.5,35.0
76617,2019,7827.4,357843.0,45.7,36.7
76618,2020,7894.5,366657.0,46.4,38.1


We're done processing the household income data file and we can write the results to the data/processed directory.

Before we do this, however, we need to think about how we are going to log when this processed file was made. We have two options:

1. Add a timestamp to the name of the processed file. This works, and is transparent, but the drawback is that any further processing steps need to do extra work to determine what the latest file is.
2. Store a timestamp, possibly with extra information, in a logfile so that when something goes wrong, we have a record of what happened and when.

Both options have advantages and drawbacks. In this case we're going to choose option 2.

It's possible to use loguru to write to a logfile instead of to the notebook output itself, like we have been doing, and in a professional situation doing so would be a good idea, but for this workshop we will simply append the timestamp to a logfile for the csv file.

In [None]:
csv_logfile = 'data/processed/income.csv.log'
# Again, writing files can fail.
try:
    df_hhi_relevant_cols.to_csv('data/processed/income.csv', index=False)
    with open(csv_logfile, "a") as f:
        f.write(timestamp())
except Exception as e:
    logger.critical("Unable to write processed income csv file: {e}")
    exit(0)

### Preparing the expenditure data file

The outcome data file also has many records we don't care about, but it also has a shape that is inconvenient for us.

The "Bestedingscategorieen" column contains many categories of expenditures that we are not actually interested in. These we need to filter out.

Then, we want to turn the remaining values into columns so we end up with rows for each year, with columns for each expenditure category.

Filtering rows you know how to do. Retain only those rows where the "Bestedingscategorieen" has one of the following values:

- 000000 Alle bestedingen
- 010000 Voedingsmidd. en alc.vrije drank
- 030000 Kleding en schoenen
- 020000 Alcoholhoudende dranken en tabak
- 040000 Huisvesting, water en energie
- 045000 Energie
- 070000 Vervoer


In [None]:
# Your code goes here