# How to use the etl module

To demonstrate the features of the module, we perform a simplified ETL pipeline, where we extract raw data from a csv, remove duplicate rows, and store the tranformed dataset in a csv file.

In [1]:
import pandas as pd
import numpy as np

### Import etl module

In [2]:
# add src to module search path 

import os
import sys

current_dir = os.getcwd()
parent_dir = os.path.abspath(os.path.join(current_dir, os.pardir))

if os.name == "nt":
    path_separator = "\\"
else:
    path_separator = "/"
    
module_path = f"{parent_dir}{path_separator}src"
sys.path.append(module_path)

# import custom module
from etl import DataPreprocessing

In [3]:
file_path = f'..{path_separator}data{path_separator}raw{path_separator}interview_signup.csv'

# class instantiation
Pipeline_1 = DataPreprocessing(file_path)

In [4]:
# load raw data from csv
sep = ','
header = 0
dtype = {'original_product_name': str,
         'postcode'             : str,
         'bundesland'           : str,
         'total_bonus'          : 'float64',
         'order_date'           : str}

Pipeline_1.load_data_from_csv(# encoding='utf-8', 
                            sep=sep,
                            header=header,
                            dtype=dtype)
Pipeline_1.df.shape

(318345, 5)

In [5]:
# column names
Pipeline_1.df.columns

Index(['original_product_name', 'postcode', 'bundesland', 'total_bonus',
       'order_date'],
      dtype='object')

Check data types

In [6]:
Pipeline_1.df.dtypes

original_product_name     object
postcode                  object
bundesland                object
total_bonus              float64
order_date                object
dtype: object

In [7]:
Pipeline_1.df['original_product_name'].unique()

array(['E.ON STROM', 'E.ON STROM ÖKO', 'E.ON STROM ÖKO 24',
       'E.ON STROM 24', 'E.ON STROM PUR', 'E.ON STROM Ã–KO',
       'E.ON STROM 24 24 24', 'E.ON STROM 24 24', 'E.ON STROM ÖO',
       'E.ON STROM 24 24 24 24 24 24 24'], dtype=object)

In [8]:
Pipeline_1.df['bundesland'].unique()

array(['Nordrhein-Westfalen', 'Baden-Württemberg', 'Hessen', 'Berlin',
       'Schleswig-Holstein', 'Niedersachsen', nan, 'Bayern',
       'Rheinland-Pfalz', 'Sachsen', 'Bremen', 'Brandenburg', 'Thüringen',
       'Saarland', 'Mecklenburg-Vorpommern', 'Hamburg', 'Sachsen-Anhalt'],
      dtype=object)

In [9]:
Pipeline_1.df['order_date'].apply(lambda x: type(x)).unique()


array([<class 'str'>], dtype=object)

In [10]:
# order_date contains all strings
Pipeline_1.df['order_date'].apply(lambda x: type(x)).unique()

# convert order_date column to datetime.date
Pipeline_1.df['order_date'] = pd.to_datetime(Pipeline_1.df['order_date'], format="%Y-%m-%d")
Pipeline_1.df['order_date'] = Pipeline_1.df['order_date'].dt.date

In [11]:
# check conversion status
Pipeline_1.df.info()  # alt.: Pipeline_1.df.dtypes

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 318345 entries, 0 to 318344
Data columns (total 5 columns):
 #   Column                 Non-Null Count   Dtype  
---  ------                 --------------   -----  
 0   original_product_name  318345 non-null  object 
 1   postcode               318345 non-null  object 
 2   bundesland             288813 non-null  object 
 3   total_bonus            318345 non-null  float64
 4   order_date             318345 non-null  object 
dtypes: float64(1), object(4)
memory usage: 12.1+ MB


Remove duplicate rows:

In [12]:
# delete duplicate rows
Pipeline_1.remove_duplicate_rows()
Pipeline_1.df.shape

(318175, 5)

Check missing data

In [13]:
# check number of missing values for each column
columns = list(Pipeline_1.df.columns)
Pipeline_1.missing_values(columns).sum()

# alt.: my_class.df.info()

original_product_name        0
postcode                     0
bundesland               29521
total_bonus                  0
order_date                   0
dtype: int64

In [14]:
# returns logical index of all rows with missing state
idx_missing_state = Pipeline_1.missing_values('bundesland')

# Return sample of filled states
Pipeline_1.df.loc[~idx_missing_state, 'bundesland'].sample(10)

54536           Niedersachsen
254382     Schleswig-Holstein
203006                 Bayern
308543                 Hessen
55258                  Bayern
192078                 Bayern
242230    Nordrhein-Westfalen
3017      Nordrhein-Westfalen
35160     Nordrhein-Westfalen
159255    Nordrhein-Westfalen
Name: bundesland, dtype: object

In [15]:
# count number of valid German states 
Pipeline_1.validate_state('bundesland').sum()

288654

Inspect the invalid postcode cases:

In [16]:
idx_valid_postcodes = Pipeline_1.validate_postcode("postcode", r"^[0-9]{5}$")
print("Number of valid postcodes: ", idx_valid_postcodes.sum())

# return sample of 20 invalid postcodes
Pipeline_1.df.loc[~idx_valid_postcodes, 'postcode'].sample(20)

Number of valid postcodes:  226865


266562    22399.0
59115        4347
80220     45130.0
317829    96170.0
209581    97845.0
316405    70190.0
205290    73457.0
238221    46236.0
42644        1917
125886     4317.0
19548     14612.0
144482    63477.0
15685     63579.0
231209       4329
157384    66359.0
99071     24628.0
2906      82547.0
248083    86865.0
152724    92249.0
230718     4178.0
Name: postcode, dtype: object

In [17]:
# Remove decimals and check again
Pipeline_1.remove_decimals('postcode')

# remaining invalid postcodes
idx_valid_postcodes = Pipeline_1.validate_postcode('postcode')
print("Remaining invalid postcodes: ", Pipeline_1.df.shape[0] - idx_valid_postcodes.sum())

# show sample of 20 remaining invalid cases
Pipeline_1.df.loc[~idx_valid_postcodes, 'postcode'].sample(20)

78598 entries were changed.

Remaining invalid postcodes:  16610


118949    4179
39825     6258
175001    4229
149986    6722
135447    6449
118929    6849
60027     4425
212094    9599
229339    6184
316487    4821
200987    6484
120038    1705
286222    8459
195293    8297
204875    6231
268463    9496
152907    9456
75701     1662
164115    4442
91513     1108
Name: postcode, dtype: object

In [18]:
# check for postcodes with more than 5 digits and less than 4 digits
idx_postcode_unequal_5 = (Pipeline_1.df['postcode'].str.len()>5) | (Pipeline_1.df['postcode'].str.len()<4)

Pipeline_1.df.loc[idx_postcode_unequal_5, 'postcode']

266922    92696JAVAS
Name: postcode, dtype: object

In [19]:
# drop record with index 266922
Pipeline_1.df.drop(index=266922, inplace=True)

# remaining invalid postcodes
idx_valid_postcodes = Pipeline_1.validate_postcode('postcode')
print("Remaining invalid postcodes: ", Pipeline_1.df.shape[0] - idx_valid_postcodes.sum())

Remaining invalid postcodes:  16609


In [20]:
# pad postcodes with zero from left
Pipeline_1.zero_padding('postcode', side='left', n=5)

16609 entries were changed.



In [21]:
# remaining invalid postcodes
idx_valid_postcodes = Pipeline_1.validate_postcode('postcode')
print("Remaining invalid postcodes: ", Pipeline_1.df.shape[0] - idx_valid_postcodes.sum())

# have a look at cleaned column
Pipeline_1.df[idx_valid_postcodes].sample(20)

Remaining invalid postcodes:  0


Unnamed: 0,original_product_name,postcode,bundesland,total_bonus,order_date
56802,E.ON STROM,74599,Baden-Württemberg,116.0,2018-03-06
273857,E.ON STROM ÖKO,53227,Nordrhein-Westfalen,225.0,2018-11-11
51697,E.ON STROM ÖKO,64739,,152.0,2018-03-01
70451,E.ON STROM ÖKO,28217,Bremen,99.0,2018-03-22
7470,E.ON STROM,13088,Berlin,83.0,2018-01-09
291436,E.ON STROM,95138,Bayern,140.0,2018-12-01
218723,E.ON STROM,60316,,123.0,2018-09-09
237109,E.ON STROM 24,16547,Brandenburg,152.0,2018-09-30
114887,E.ON STROM,33619,Nordrhein-Westfalen,212.0,2018-05-13
263483,E.ON STROM,96489,Bayern,350.0,2018-10-30


In [22]:
# store preprocessed csv in folder data/processed
file_path_processed = f"..{path_separator}data{path_separator}processed{path_separator}demo_etl_module_processed.csv"
Pipeline_1.save_data_to_csv(file_path_processed, index=False)