
## ETL

The ETL part consists in understanding the data structure and data types, creating the schema of the data and saving the data in a structured format, to be easily reused later, in the next steps of the project. 

Very often, you will receive new data, either an updated version of the data or the next batch of data (if temporal), so it is best that the code is clean and reusable hence when the next data comes, it is easy to process.




The data is organized as follow: 

* raw: to store the raw data
* staging: intermediary step, to explore data
* transformed: data ready to be used in a model
* predicted: predicted data




In [0]:
import pandas as pd
import numpy as np
import re
import os

In [0]:
from google.colab import drive
drive.mount('/content/drive/')

In [0]:
import os
list_files = os.listdir("/content/drive/My Drive/Data/Dataroots_nmbs/raw")
for files in sorted(list_files):
  print (files)

## Weather data
![alt text](https://openweathermap.org/img/w/10d.png)



Directly, we notice that the format is json lines files. We can even verify the content of one of the files.




In [0]:
list_weather_files = ["/content/drive/My Drive/Data/Dataroots_nmbs/raw/{}".format(file)
                      for file in list_files if "owm" in file]
print(list_weather_files)



We need to open these JSON lines and normalise them. Let's try with one example:

In [0]:
import json
import pandas as pd
from pandas.io.json import json_normalize

df_weather = pd.DataFrame()
with open(list_weather_files[0]) as f:
  for line in f:
    line_ = json.loads(line)
    df_weather = pd.concat([df_weather, json_normalize(line_)])

df_weather.head()


we also see that there are 2 id's, one for the weather and one for the other  general data, hence we should rename the weather data before joining it with  the other data

In [0]:
import json
import pandas as pd
from pandas.io.json import json_normalize



df_weather = pd.DataFrame()
with open(list_weather_files[0]) as f:
  for line in f:
    line_ = json.loads(line)
    temp_weather = json_normalize(line_, "weather")
    temp_weather["key"] = 1
    temp_weather.rename({"id": "id_weather"}, axis=1, inplace=True)
    temp_other = json_normalize(line_)
    temp_other["key"] = 1
    df_weather = pd.concat([df_weather, temp_weather.merge(temp_other, on="key")])

df_weather.head()

We now need to drop the weather column.

In [0]:
df_weather.drop("weather", axis=1, inplace=True)

 This is also the moment to do some renaming of the data columns, eg: removing the space, . and other characters, and lowering the string.

In [0]:
import re
print("columns", df_weather.columns)


def clean_col(col):
  """Clean a string.

  In our case, the column names of the dataframe. We remove all the characters 
  that are not a digit (0-9) nor a letter (a-z) not capital letter (A-Z) and 
  and replace them by a underscore _
  @params col : string - string to clean
  """
  return re.sub(r"[^0-9a-zA-Z]+", "_", col.rstrip()).lower()


df_weather.columns = [clean_col(col) for col in df_weather.columns]
print("after renaming", df_weather.columns)

#### Schema

We need to make sure that our data always has the same types. Indeed if we receive update of the data it should always have the same data types. It could be that the data is malformed and does not have the same type, then there is a risk that the rest of the code fails. 


In [0]:
print(df_weather.dtypes)
# let us save the schema and use it for the other datasets
df_weather.dtypes.reset_index().to_csv("/content/drive/My Drive/Data/Dataroots_nmbs/staging/schema_weather.csv",
                                       header=["variable", "type"], index=False)


In [0]:
import re
import datetime as dt

print(list_weather_files[0])

# the date is exactly 8 figures and the time is 6, we can extract it with a
# regular expression

list_time_date = re.findall(r"([0-9]{8})_([0-9]{6})", list_weather_files[0])
date_time = ':'.join(list_time_date[0])
print(date_time)

# we just need to have it in the right format of the date

date = dt.datetime.strptime(date_time, "%Y%m%d:%H%M%S") + dt.timedelta(hours=1)

# little problem is that the hour of the day is the English one, beeing one
# hour earlier than the Brussels time
print(date)


def get_date(string, hour):
  
  list_time_date = re.findall(r"([0-9]{8})_([0-9]{6})", string)
  date_time = ':'.join(list_time_date[0])
  date = dt.datetime.strptime(date_time, "%Y%m%d:%H%M%S")
  + dt.timedelta(hours=hour)
  return date


df_weather["date"] = get_date (list_weather_files[0], 1)


In [0]:

df_weather.dtypes

In [0]:
df_weather.dtypes.reset_index().to_csv ("/content/drive/My Drive/Data/Dataroots_nmbs/staging/schema_owm.csv",
                                       header=["variable", "type"], index=False)

Combining all the above steps into a function, that can be applied to a file and then returns a dataframe

In [0]:
def get_weather_data(file_name, schema):
 
  df_weather = pd.DataFrame()
  with open(file_name) as f:
    for line in f:
      line_ = json.loads(line)
      # flatten the weather data
      temp_weather = json_normalize(line_, "weather")
      temp_weather.rename({"id": "id_weather"}, axis=1, inplace=True)
      temp_weather["key"] = 1
      # normalize the other data
      temp_other = json_normalize(line_)
      temp_other["key"] = 1
      # join the weather and other data
      df_weather = pd.concat([df_weather, temp_weather.merge(temp_other, on="key")])
  # remove the weather data
  df_weather_final = df_weather.drop("weather", axis=1)
  # cleaning the column names
  df_weather_final.columns = [clean_col(col) for col in df_weather_final.columns]
  # add the datetime
  df_weather_final["date"] = get_date(file_name, 1)
  # verifying the schema
  for i, col in enumerate(schema.variable):
    df_weather_final[col] = df_weather_final[col].astype(schema.iloc[i, :].type)
  return df_weather_final


Now that we have the function, we can apply it to all the dataframes and concatenate all the data together, to have a clean dataset.


In [0]:
df_weather = pd.DataFrame()
schema = pd.read_csv("/content/drive/My Drive/Data/Dataroots_nmbs/staging/schema_owm.csv")
for file in list_weather_files:
  print(file)
  df_weather = pd.concat([df_weather, get_weather_data(file, schema)])

df_weather.head()

## NMBS data




In [0]:
list_nmbs_files = ["/content/drive/My Drive/Data/Dataroots_nmbs/raw/{}".format(file)
                   for file in list_files if "nmbs" in file]
print(list_nmbs_files)



In [0]:
df_nmbs = pd.read_csv(list_nmbs_files[0])
df_nmbs.head()


A frequent mistake with using python pandas is to save the index, when there is no index. Then pandas create an additional column named 'unnamed', which should be dropped. 

In [0]:
df_nmbs.drop(['Unnamed: 0', 'stop_name', 'route_id', 'service_id', 'route_long_name' ], axis=1, inplace=True)

The rest of the data looks good. 
First we create the schema as previously and then, we will concatenate and explore in a later phase.

In [0]:
df_nmbs.columns = [clean_col(col) for col in df_nmbs.columns]

# let us save the schema and use it for the other datasets
df_nmbs.dtypes.reset_index().to_csv("/content/drive/My Drive/Data/Dataroots_nmbs/staging/schema_nmbs_.csv",
                                    header=["variable", "type"], index=False)

In [0]:
def get_nmbs_data(file_name, schema):
  
  df_nmbs = pd.read_csv(file_name)
  df_nmbs.drop("Unnamed: 0", axis=1, inplace=True)
  # cleaning the column names
  df_nmbs.columns = [clean_col(col) for col in df_nmbs.columns]
  for i, col in enumerate(schema.variable):
    df_nmbs[col] = df_nmbs[col].astype(schema.iloc[i,:].type)
  # add the datetime
  df_nmbs["date"] = get_date(file_name, 1)
  return df_nmbs

In [0]:
df_nmbs = pd.DataFrame()
schema = pd.read_csv("/content/drive/My Drive/Data/Dataroots_nmbs/staging/schema_nmbs_.csv")
for file in list_nmbs_files:
  df_nmbs = pd.concat([df_nmbs, get_nmbs_data(file, schema)])
df_nmbs.head()
