In [1]:
import pandas as pd

# ETL Simple Pipeline

## Extract Data

In [2]:
countries = pd.read_csv('../data/input/Countries.csv')
countries

Unnamed: 0,Countries,Population
0,Austria,V:9006398
1,Bolivia,V:11673021
2,China,V:1439323776
3,Denmark,V:5792202
4,Egypt,V:102334404
5,Ethiopia,V:114963588
6,Finland,V:5540720
7,France,V:65273511
8,Germany,V:83783942
9,Greece,V:10423054


In [3]:
countries_metadata = pd.read_csv('../data/input/Countries_metadata.csv')
countries_metadata

Unnamed: 0,country_names,Land_Area
0,Greece,V:128900
1,China,V:9388211
2,Denmark,V:42430
3,Ethiopia,V:1000000
4,Egypt,V:995450
5,Bolivia,V:1083300
6,Austria,V:82409
7,France,V:547557
8,Germany,V:348560
9,Finland,V:303890


In [4]:
countries_metadata.dtypes

country_names    object
Land_Area        object
dtype: object

## Transform 1

In [6]:
countries.Population = countries.Population.map(lambda x: x.split(':')[1])
countries['Population'] = countries['Population'].astype('int64')
countries

Unnamed: 0,Countries,Population
0,Austria,9006398
1,Bolivia,11673021
2,China,1439323776
3,Denmark,5792202
4,Egypt,102334404
5,Ethiopia,114963588
6,Finland,5540720
7,France,65273511
8,Germany,83783942
9,Greece,10423054


In [7]:
countries.dtypes

Countries     object
Population     int64
dtype: object

In [8]:
countries_metadata.Land_Area = countries_metadata.Land_Area.map(lambda x: x.split(':')[1])
countries_metadata['Land_Area'] = countries_metadata['Land_Area'].astype('int64')
countries_metadata

Unnamed: 0,country_names,Land_Area
0,Greece,128900
1,China,9388211
2,Denmark,42430
3,Ethiopia,1000000
4,Egypt,995450
5,Bolivia,1083300
6,Austria,82409
7,France,547557
8,Germany,348560
9,Finland,303890


## Transform 2

In [9]:
countries['Population'] = countries['Population']/1000
countries = countries.rename(columns={'Population':'Population_per_k'})
countries

Unnamed: 0,Countries,Population_per_k
0,Austria,9006.398
1,Bolivia,11673.021
2,China,1439323.776
3,Denmark,5792.202
4,Egypt,102334.404
5,Ethiopia,114963.588
6,Finland,5540.72
7,France,65273.511
8,Germany,83783.942
9,Greece,10423.054


In [10]:
countries_metadata['Land_Area'] = countries_metadata['Land_Area']/1000
countries_metadata = countries_metadata.rename(columns={'Land_Area':'Land_Area_per_k'})
countries_metadata

Unnamed: 0,country_names,Land_Area_per_k
0,Greece,128.9
1,China,9388.211
2,Denmark,42.43
3,Ethiopia,1000.0
4,Egypt,995.45
5,Bolivia,1083.3
6,Austria,82.409
7,France,547.557
8,Germany,348.56
9,Finland,303.89


## Load Data

In [None]:
countries.to_csv('../data/countries.csv')
countries_metadata.to_csv('../data/countries_metadata.csv')

# Create `etl_pipeline.py`

In [None]:
import pandas as pd
import numpy as np
import os


class DataPreprocessor:
    def __init__(self, path_folder = "path/to/root directory"):

        self.path_folder = path_folder
        
        # Path to input
        self.path_input_folder = "{}/input/".format(path_folder)
        self.path_input_countries = self.path_input_folder + 'Countries.csv'
        self.path_input_countries_metadata = self.path_input_folder + 'Countries_metadata.csv'

        # Path to modified input
        self.path_input_modified_folder = "{}/input_modified/".format(path_folder)
        self.path_input_modified_countries = self.path_input_modified_folder + 'Countries.csv'
        self.path_input_modified_countries_metadata = self.path_input_modified_folder + 'Countries_metadata.csv'

        # Path on which output tables are saved
        self.path_output_folder = "{}/output/".format(path_folder)
        self.path_output_countries = self.path_output_folder + 'Countries.csv'
        self.path_output_countries_metadata = self.path_output_folder + 'Countries_metadata.csv'

        # create dictionaries for read dtypes
        self.read_dtypes_countries = {'Countries':'category'}
        self.read_dtypes_countries_metadata = {'country_names':'category'}

        # create folders for output if not existent yet
        if not os.path.exists(self.path_input_modified_folder):
            os.makedirs(self.path_input_modified_folder)
        if not os.path.exists(self.path_output_folder):
            os.makedirs(self.path_output_folder) 


    def read_data_from_raw_input(self, save_countries=True, save_countries_metadata=True):

        print("Start:\tRead in countries Dataset")
        self.countries = pd.read_csv(self.path_input_countries, dtype=self.read_dtypes_countries)
        self.countries.Population = self.countries.Population.map(lambda x: x.split(':')[1])
        self.countries['Population'] = self.countries['Population'].astype('int64')
        print("Finish:\tRead in countries Dataset")

        print("Start:\tRead in countries_metadata Dataset")       
        self.countries_metadata = pd.read_csv(self.path_input_countries_metadata, dtype=self.read_dtypes_countries_metadata)
        self.countries_metadata.Land_Area = self.countries_metadata.Land_Area.map(lambda x: x.split(':')[1])
        self.countries_metadata['Land_Area'] = self.countries_metadata['Land_Area'].astype('int64')
        print("Finish:\tRead in countries_metadata Dataset")

        if save_countries:
            print("Start:\tSave countries Dataset to disc")
            self.countries.to_csv(self.path_input_modified_countries, index=False)
            print("Finish:\tSave countries Dataset to disc")
 
        if save_countries_metadata:
            print("Start:\tSave countries_metadata Dataset to disc")
            self.countries_metadata.to_csv(self.path_input_modified_countries_metadata, index=False)
            print("Finish:\tSave countries_metadata Dataset to disc")


    def read_data_from_modified_input(self):

        self.countries = pd.read_csv(self.path_input_modified_countries, dtype=self.read_dtypes_countries)
        self.countries_metadata = pd.read_csv(self.path_input_modified_countries_metadata, dtype=self.read_dtypes_countries_metadata)


    def preprocess_data(self, save_preprocess_countries=True, save_preprocess_countries_metadata=True):

        print("Start:\tPreprocessing countries Dataset")
        self.preprocess_countries()
        print("Finish:\tPreprocessing countries Dataset")

        print("Start:\tPreprocessing countries_metadata Dataset")
        self.preprocess_countries_metadata()
        print("Finish:\tPreprocessing countries_metadata Dataset")

        if save_preprocess_countries:
            print("Start:\tSave countries Dataset to disc")
            self.countries.to_csv(self.path_output_countries, index=False)
            print("Finish:\tSave countries Dataset to disc")

        if save_preprocess_countries_metadata:
            print("Start:\tSave countries_metadata Dataset to disc")
            self.countries_metadata.to_csv(self.path_output_countries_metadata, index=False)
            print("Finish:\tSave countries_metadata Dataset to disc")

        return self.countries, self.countries_metadata


    def preprocess_countries(self):
        
        self.countries['Population'] = self.countries['Population']/1000
        self.countries = self.countries.rename(columns={'Population':'Population_per_k'})


    def preprocess_countries_metadata(self):
        
        self.countries_metadata['Land_Area'] = self.countries_metadata['Land_Area']/1000
        self.countries_metadata = self.countries_metadata.rename(columns={'Land_Area':'Land_Area_per_k'})


    def read_preprocessed_tables(self):
        
        print("Start:\tRead in modified countries Dataset")
        self.countries = pd.read_csv(self.path_output_countries, dtype=self.read_dtypes_countries)
        print("Finish:\tRead in modified countries Dataset")

        print("Start:\tRead in modified countries_metadata Dataset")       
        self.countries_metadata = pd.read_csv(self.path_output_countries_metadata, dtype=self.read_dtypes_countries_metadata)
        print("Finish:\tRead in modified countries_metadata Dataset")

        return self.countries, self.countries_metadata


def main():

    datapreprocesssor = DataPreprocessor()
    datapreprocesssor.read_data_from_raw_input()
    datapreprocesssor.read_data_from_modified_input()
    datapreprocesssor.preprocess_data()
    print('ETL has been successfully completed !!')

#if __name__ == '__main__':
#    main()


