Weather ETL

![bttf image](bttflogo.png)



Data is being collected to understand what were the weather conditions during a shipment and how those conditions influence the fuel consumption. 

In order to showcase my interpretation of the solution the transformation is dissected and showing all the outputs for the specific cell.

In this case the weather data arrives into a landing zone, so in order to reach it let's glob the files:

*Globbing this data takes about 50 secs and the current storage use of the landing zone is 2.26 MB



In [None]:
import glob
import os
import json
import pandas as pd



# getting the json files names from the landing zone and appending into the data dict
globbed_files = glob.glob(r"C:/Users/E.ALVAREZHERNANDEZ/VS Projects/caseStudy/landing_zone/weather/*.json")

data = [] 
for file in globbed_files:
    frame = pd.read_json(file, lines=True)
    data.append(frame)

data


In [None]:
#Concatenating everything into a single dataframe

df = pd.concat(data, ignore_index=True)

df

1. Flattening: 

Our first challenge is to flatten the data, since relevant values are in a struct form.

In [None]:
#First I normalize the first level of the wind column
from pandas import json_normalize


df = df.join(json_normalize(df['weather'].to_list()))\
       .drop(['weather'], axis=1)

df

In [None]:
#Normalize to enter into the second level
df = df.join(json_normalize(df['wind'].to_list()))\
       .drop(['wind'], axis=1)

df

In [None]:
#Getting into the wind information
df = df.join(json_normalize(df[0].to_list()))\
       .drop([0], axis=1)

df

2. DateTime Format:

 Datetime data is in an Epoch format, in order to be human readable we need to transform it into a datetime format.

In [None]:
#Time was in Epoch format so we need to change it into datetime

import datetime

df['dt'] = pd.to_datetime(df['dt'], unit='s')

df


3. Sinking weather into Data Lake:

The sink is being performed into parquet format and compressed in snappy in order to optimize performance and the storage use

In [None]:
#Sinking the clean data into the datalake, partitioned by city

df.to_parquet('C:/Users/E.ALVAREZHERNANDEZ/VS Projects/caseStudy/clean_data/weather/',compression='snappy', partition_cols='city')

4. Delta weather table:

As we recreate the data from the datalake parquet files the processing times improves, from 56 secs to 0.1 secs and the storage use from 2.26 MB to 320 KB, it reduces the size by 84%

In [None]:
#Pulling clean weather temperature

cweather = pd.read_parquet('C:/Users/E.ALVAREZHERNANDEZ/VS Projects/caseStudy/clean_data/weather')

cweather

In [None]:
#In order to get the avg temperature I will subset

temperature = cweather[['dt', 'temp', 'city']]

temperature

5. Connecting with shipments DB:


In [None]:
import psycopg2
import logging

conn = psycopg2.connect(host='localhost',
                        database='bttf',
                        user='postgres',
                        password='Zurich2022!',
                        port='5432')



In [None]:
pointer = conn.cursor()

pointer.execute("SELECT * FROM shipments.shipments")

rows = pointer.fetchall() #Tupple format


In [None]:
shdf = pd.DataFrame(rows, columns=['id', 'truck', 'driver', 'shipment_start_timestamp', 'shipment_end_timestamp', 'start_location', 'end_location', 'shipment_distance', 'consumed_fuel'])
shdf

6. Fixing shipments timestamp formats:

Asumption: Weather doesn't have a big variance from hour to hour, so to facilitate the analysis I'm going to round the shipment hours to the closest hour.

In [None]:
#Dates are in sql format, so in here I invert them to match the df convention

import datetime


shdf['shipment_start_timestamp'] = pd.to_datetime(pd.to_datetime(shdf['shipment_start_timestamp'] ).apply(lambda x: datetime.datetime.strftime(x, '%Y-%m-%d %H:%M:%S')))
shdf['shipment_end_timestamp'] = pd.to_datetime(pd.to_datetime(shdf['shipment_end_timestamp'] ).apply(lambda x: datetime.datetime.strftime(x, '%Y-%m-%d %H:%M:%S')))

shdf

In [None]:
shdf['shipment_start_timestamp'] = shdf['shipment_start_timestamp'].dt.round('H')
shdf['shipment_end_timestamp'] = shdf['shipment_end_timestamp'].dt.round('H')

#Calculates how many liters the trucks consume every 100 kms
shdf['fConsumption']= (shdf['consumed_fuel']*100)/shdf['shipment_distance']
shdf


7. Merging data:

In this step we merge the dataframes to get the weather conditions at location and time for the shipments

In [None]:
#Merging to get the first temperature matching the start location and the start timestamp


dfmerge = pd.merge(shdf, temperature, how='left', left_on=[shdf['start_location'].astype(str), shdf['shipment_start_timestamp'].astype(str)], right_on=[temperature['city'].astype(str), temperature['dt'].astype(str)])
dfmerge= dfmerge.rename(columns={'temp':'startTemp'}).drop(['key_0', 'key_1', 'city', 'dt'], axis=1)

dfmerge

In [None]:
#Merging to get the end temperature matching the end location and the end timestamp


dfmerge2 = pd.merge(dfmerge, temperature, how='left', left_on=[dfmerge['end_location'].astype(str), dfmerge['shipment_end_timestamp'].astype(str)], right_on=[temperature['city'].astype(str), temperature['dt'].astype(str)])
dfmerge2= dfmerge2.drop(['key_0', 'key_1', 'dt', 'city'], axis=1).rename(columns={'temp':'endTemp'})
dfmerge2

In [None]:
#Getting the average temperature for the trip, and setting the timestamp to datetime as it was required to pass them as strings for the merge

shipment = dfmerge2

shipment['avgTemp']= shipment[['startTemp','endTemp']].mean(axis=1)


shipment['shipment_start_timestamp'] = pd.to_datetime(shipment['shipment_start_timestamp'])
shipment['shipment_end_timestamp'] = pd.to_datetime(shipment['shipment_end_timestamp'])

shipment['duration'] = (shipment['shipment_end_timestamp'] - shipment['shipment_start_timestamp']).astype('timedelta64[h]')

shipment

In [None]:
#The correlation matrix shows some results already


shipment.corr()

In [None]:
#Sinking the clean data into the datalake


shipment.to_parquet('C:/Users/E.ALVAREZHERNANDEZ/VS Projects/caseStudy/clean_data/DataLake/gold.parquet',compression='snappy')

8. Plotting the correlation:

Since the weather temperature seems to be fake data, f.e: Barcelona couldn't be -12Celsius in July, the correlation is cohorsed and next I'm plotting the the relation to make it visual

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
analysis = pd.read_parquet('C:/Users/E.ALVAREZHERNANDEZ/VS Projects/caseStudy/clean_data/DataLake/gold.parquet')

analysis

In [None]:
analysis = analysis[analysis['start_location']=='Barcelona']
analysis= analysis[['fConsumption', 'avgTemp']]


In [None]:
analysis.plot(kind='scatter',x='avgTemp', y='fConsumption')
plt.show()



In [None]:
import numpy as np
import scipy.stats

analysis = analysis.dropna(subset=['avgTemp'])

x = analysis['avgTemp']
y = analysis['fConsumption']



scipy.stats.pearsonr(x,y)

scipy.stats.spearmanr(x,y)

scipy.stats.kendalltau(x,y)

result = scipy.stats.linregress(x,y)
result.slope
result.intercept
result.rvalue
result.pvalue
result.stderr

In [None]:
import matplotlib.pyplot as plt

plt.style.use('ggplot')

slope, intercept, r, p, stderr = scipy.stats.linregress(x,y)

line = f'Regression line: y:{intercept:.2f}+{slope:.2f}x, r={r:.2f}'

line

In [None]:
fig, ax = plt.subplots()
ax.plot(x, y, linewidth=0, marker='s', label='Data points')
ax.plot(x, intercept + slope*x, label=line)
ax.set_xlabel('Avg Temperature')
ax.set_ylabel('Fuel Consumption')
ax.legend(facecolor='white')
plt.show()