# ETL Pipeline

__Bernardo Di Chiara__

_March 6<sup>st</sup>, 2020_

## Table of contents
[1. Exploratory Data Analysys](#1.)<br>
<font color='white'>....</font>[1.1. Importing the Needed Packages](#1.1.)<br>
<font color='white'>....</font>[1.2. Defining the Used Functions](#1.2.)<br>
<font color='white'>....</font>[1.3. Examining the Input Data](#1.3.)<br>
<font color='white'>....</font>[1.4. Fetching Some Aggregate Values Useful for Testing](#1.4.)<br>
<font color='white'>....</font>[1.5. Conclusions](#1.5.)<br>

## 1. Exploratory Data Analysis <a name="1."></a>

### 1.1. Importing the Needed Packages <a name="1.1."></a>

In [1]:
import datetime
import pandas as pd
import numpy as np

### 1.2. Defining the Used Functions <a name="1.2."></a>

In [2]:
def CSV_to_Pandas(filename, dfname, consolle=False):
    # This function reads data from a CSV file, converts the data into dataframe 
    # format and returns the dataframe.
    # The function needs as input parameters: the file name in string format,
    # the dataframe name in string format and optionally indication if to provide
    # consolle feedback (boolean).
    
    import pandas as pd
    
    # Importing data from a CSV file and converting it into a Pandas dataframe
    if consolle:
        print ("Loading data ... (it might take a while)\n")
    dfname = pd.read_csv(filename)
    return dfname

In [3]:
def df_basic_data(dfname, showcontent=False):
    # This function prints basic information about a given dataframe.
    # The function needs as input parameters: the dataframe name in string format
    # and optionally an indication if to show some dataframe contents (boolean).
    
    import pandas as pd
    
    # Printing dataframe basic data
    print ("Dataframe length:", len(dfname), "\n")
    print ("Dataframe columns names and data types:")
    print (dfname.dtypes.to_string(), "\n")
    if showcontent == True:
        print ("Visualizing the content of the first 5 rows:")
        print (dfname.head(), "\n")
    # Checking the number of NaN entries for each column
    print ("Amount of NaN entries for each column:")
    print (dfname.isna().sum().to_string(), "\n")

In [4]:
def show_value_counts(dfname, *omitted_column):
    # This function shows the amount of occurrencies of each single value 
    # of each column of a defined dataframe
    # The function is useful for dataframes which have categorical variables
    # The function requires the dataframe name as a mandatory parameter
    # It is possible to insert one or more columns to be omitted from the calculation
    
    import pandas as pd

    for column in dfname.columns:
        if column not in omitted_column:
            print("column name: ", column)
            print("column datatype: ", dfname[column].dtypes)
            print("values count:")
            print(dfname[column].value_counts().to_string(), "\n")

In [5]:
def visualize_df(dfname, batch_size=50):
    # This function allows visualizing the full content of a dataframe in chunks 
    # by using user's input
    # The dataframe name is a mandatory input parameter
    # The batch_size is by default 50 entries and can be changed when calling the function
    
    import pandas as pd

    i = 0
    while i < len(dfname):
        print("\nHandling entries from {} to {}".format(i, i+batch_size))
        vis_df = complete_df[i:i+batch_size]
        i += batch_size
        print (vis_df)
        try:
            userid = int(input("Enter '1' if you want to see more entries\n"
                               "Type any other (or no key at all) and then press Enter to quit\n"))
            if userid != 1:
                break
        except:
            break

In [6]:
def date_generator(start_date, end_date):
    # This function takes as an input a start date and an end date in string format
    # and returns a list of dates contained between the two provided dates
    # The dates shall be in the fpormat dd-mm-yyyy
    
    import datetime

    start_d = datetime.datetime.strptime(start_date, "%d-%m-%Y")
    end_d = datetime.datetime.strptime(end_date, "%d-%m-%Y")
    # Creating a list of dates from the start day to the end day
    gener_dates = [start_d + datetime.timedelta(days=x) for x in range(0, (end_d-start_d).days+1)] 
    # Extracting the dates
    dates = []
    for date in gener_dates:
        dates.append(date.strftime("%Y-%m-%d"))
    return dates

### 1.3. Examining the Input Data <a name="1.3."></a>

The testing data consists of CSV files for each day from July 1st to December 12th 2018 (165 files). The file names are in the format yyyy-mm-dd.csv (where yyyy is 2018, mm is the month and dd is the day). No file is missing and the file names are correct and consistent.

In [7]:
# Fetching the first CSV file
df_2018_07_01 = CSV_to_Pandas('data/2018-07-01.csv', 'df_2018_07_01', True)

Loading data ... (it might take a while)



In [8]:
# Visualizing the content
df_basic_data(df_2018_07_01)
df_2018_07_01

Dataframe length: 10 

Dataframe columns names and data types:
temperature      object
skipped_beat    float64
at_risk           int64 

Amount of NaN entries for each column:
temperature     0
skipped_beat    1
at_risk         0 



Unnamed: 0,temperature,skipped_beat,at_risk
0,LOW,,1
1,HIGH,1.0,1
2,HIGH,2.0,1
3,LOW,1.0,1
4,HIGH,0.0,0
5,HIGH,0.0,0
6,LOW,1.0,0
7,LOW,0.0,0
8,LOW,1.0,1
9,MEDIUM,2.0,1


In [9]:
# Fetching another CSV file and visualizing basic dataframe characteristics
df_2018_08_31 = CSV_to_Pandas('data/2018-08-31.csv', 'df_2018_08_31')
df_basic_data(df_2018_08_31)

Dataframe length: 10 

Dataframe columns names and data types:
temperature      object
at_risk           int64
skipped_beat    float64
price           float64 

Amount of NaN entries for each column:
temperature     0
at_risk         0
skipped_beat    2
price           0 



In [10]:
# Visualizing the all content
df_2018_08_31

Unnamed: 0,temperature,at_risk,skipped_beat,price
0,MEDIUM,1,0.0,9158.897271
1,HIGH,1,0.0,13119.380152
2,LOW,1,1.0,39751.189729
3,HIGH,1,2.0,35010.426548
4,LOW,1,1.0,21199.899382
5,LOW,0,,8633.31937
6,LOW,1,0.0,206014.060966
7,LOW,0,,204228.367452
8,LOW,1,2.0,28772.568787
9,LOW,0,0.0,2203.230279


In [11]:
# Fetching the rest of the data

# Generating a list of dates covering the full list of available test CSV files
dates = date_generator('01-07-2018', '12-12-2018')
# Initializing an empty dataframe to contain the result
complete_df = pd.DataFrame()
# Iterating through all the CSV files
for date in dates:
    # Reading the CSV file and copying it into a dataframe
    df = CSV_to_Pandas("data/{}.csv".format(date), 'df')
    # Appending the dataframe to the result dataframe
    # 'sort=True' is necessary since the columns are not always in the same order
    complete_df = complete_df.append(df, sort=True, ignore_index = True)

In [12]:
# Visualizing basic dataframe characteristics of the complete dataframe
df_basic_data(complete_df, True)

Dataframe length: 1650 

Dataframe columns names and data types:
at_risk           int64
price           float64
skipped_beat    float64
temperature      object 

Visualizing the content of the first 5 rows:
   at_risk  price  skipped_beat temperature
0        1    NaN           NaN         LOW
1        1    NaN           1.0        HIGH
2        1    NaN           2.0        HIGH
3        1    NaN           1.0         LOW
4        0    NaN           0.0        HIGH 

Amount of NaN entries for each column:
at_risk           0
price           610
skipped_beat     98
temperature       0 



In [13]:
# Changing the variables' datatypes
complete_df['temperature'] = complete_df['temperature'].astype('category')
complete_df['at_risk'] = complete_df['at_risk'].astype('bool')
df_basic_data(complete_df, False)

Dataframe length: 1650 

Dataframe columns names and data types:
at_risk             bool
price            float64
skipped_beat     float64
temperature     category 

Amount of NaN entries for each column:
at_risk           0
price           610
skipped_beat     98
temperature       0 



In [14]:
# Checking the occurrencies of each value for each column except for 'price'
show_value_counts(complete_df, 'price')

column name:  at_risk
column datatype:  bool
values count:
True     826
False    824 

column name:  skipped_beat
column datatype:  float64
values count:
1.0    475
0.0    401
2.0    321
3.0    188
4.0    100
5.0     45
6.0     20
7.0      2 

column name:  temperature
column datatype:  category
values count:
LOW       574
MEDIUM    540
HIGH      536 



In [15]:
# Running a quick visual check on the all dataset
visualize_df(complete_df)


Handling entries from 0 to 50
    at_risk  price  skipped_beat temperature
0      True    NaN           NaN         LOW
1      True    NaN           1.0        HIGH
2      True    NaN           2.0        HIGH
3      True    NaN           1.0         LOW
4     False    NaN           0.0        HIGH
5     False    NaN           0.0        HIGH
6     False    NaN           1.0         LOW
7     False    NaN           0.0         LOW
8      True    NaN           1.0         LOW
9      True    NaN           2.0      MEDIUM
10    False    NaN           3.0         LOW
11     True    NaN           0.0        HIGH
12     True    NaN           1.0         LOW
13    False    NaN           0.0      MEDIUM
14     True    NaN           2.0        HIGH
15    False    NaN           3.0      MEDIUM
16     True    NaN           3.0      MEDIUM
17     True    NaN           1.0         LOW
18     True    NaN           2.0         LOW
19    False    NaN           2.0        HIGH
20     True    NaN      

In [16]:
# Changing 'NaN' into 'None' for compatibility with MySQL
sql_ready_df = complete_df.where((pd.notnull(complete_df)), None)
df_basic_data(sql_ready_df, True)

Dataframe length: 1650 

Dataframe columns names and data types:
at_risk             bool
price             object
skipped_beat      object
temperature     category 

Visualizing the content of the first 5 rows:
   at_risk price skipped_beat temperature
0     True  None         None         LOW
1     True  None            1        HIGH
2     True  None            2        HIGH
3     True  None            1         LOW
4    False  None            0        HIGH 

Amount of NaN entries for each column:
at_risk           0
price           610
skipped_beat     98
temperature       0 



### 1.4. Fetching Some Aggregate Values Useful for Testing <a name="1.4."></a>

In [17]:
# Eliminating entries having null prices
clean_df = complete_df.loc[:, ['temperature', 'price']].dropna()
df_basic_data(clean_df , True)

Dataframe length: 1040 

Dataframe columns names and data types:
temperature    category
price           float64 

Visualizing the content of the first 5 rows:
    temperature         price
610      MEDIUM   9158.897271
611        HIGH  13119.380152
612         LOW  39751.189729
613        HIGH  35010.426548
614         LOW  21199.899382 

Amount of NaN entries for each column:
temperature    0
price          0 



In [18]:
# Selecting the last 300 entries
# Grouping by temperature and calculating the mean price
# Showing it in the column mean_price
aggregate_df = clean_df.tail(300).groupby(['temperature']).mean().rename(columns={'price': 'mean_price'})
aggregate_df

Unnamed: 0_level_0,mean_price
temperature,Unnamed: 1_level_1
HIGH,37697.317426
LOW,37569.445257
MEDIUM,29441.747247


In [19]:
# Fetching a subset of the data covering all the days till September 2nd

# Generating a list of dates covering the full list of available test CSV files
dates = date_generator('01-07-2018', '02-09-2018')
# Initializing an empty dataframe to contain the result
partial_df = pd.DataFrame()
# Iterating through all the CSV files
for date in dates:
    # Reading the CSV file and copying it into a dataframe
    df = CSV_to_Pandas("data/{}.csv".format(date), 'df')
    # Appending the dataframe to the result dataframe
    # 'sort=True' is necessary since the columns are not always in the same order
    partial_df = partial_df.append(df, sort=True, ignore_index = True)
# Vsualizing   
df_basic_data(partial_df, False)

Dataframe length: 640 

Dataframe columns names and data types:
at_risk           int64
price           float64
skipped_beat    float64
temperature      object 

Amount of NaN entries for each column:
at_risk           0
price           610
skipped_beat     46
temperature       0 



In [20]:
# Eliminating entries having null prices
clean_part_df = partial_df.loc[:, ['temperature', 'price']].dropna()
# Selecting the last 300 entries
# Grouping by temperature and calculating the mean price
# Showing it in the column mean_price
aggr_part_df= clean_part_df.tail(300).groupby(['temperature']).mean().rename(columns={'price': 'mean_price'})
aggr_part_df

Unnamed: 0_level_0,mean_price
temperature,Unnamed: 1_level_1
HIGH,20359.376878
LOW,107378.899714
MEDIUM,38763.760778


### 1.5. Conclusions <a name="1.5."></a>

Each file contains 10 entries.

The column temperature contains categorical data (LOW, MEDIUM, HIGH) (text to be converted into category).

The column skipped_beat contains integer kind of data with values from 0 to 7.

The column at_risk contains integer numbers from 0 to 1 (that could be converted to boolean).

The column price contains decimal data.

The columns are not always in the same order in the origin files and in certain files some column (like the column price) is missing.

All together there are 610 entries with missing price value and 98 entries with missing skipped_beat value.

There seem to be no other reason for data cleaning.

---
<sub>Linux Ubuntu 18.04</sub><br>
<sub>Jupyter Notebook server 6.0.1</sub><br>
<sub>Python 3.6.8</sub><br>
<sub>numpy 1.17.3</sub><br>
<sub>pandas 0.25.2</sub><br>