In [1]:
from datetime import datetime
from dateutil.relativedelta import relativedelta
from typing import Dict

import pandas as pd
pd.set_option('display.max_columns', 30)

import requests

In [2]:
"""
Transform Load Lambda funcntion
1. get data from s3
2. weather data transformation - done
3. taxi data transformation - done
4. payment type update - done
5. company update - done
6. update fact taxi trips with the ids from dim payment type and dim company
7. upload dim weather to s3
8. upload fact taxi trips
9. upload dim payment type
10. upload dim company
"""

'\nTransform Load Lambda funcntion\n1. get data from s3\n2. weather data transformation - done\n3. taxi data transformation - done\n4. payment type update - done\n5. company update - done\n6. update fact taxi trips with the ids from dim payment type and dim company\n7. upload dim weather to s3\n8. upload fact taxi trips\n9. upload dim payment type\n10. upload dim company\n'

### base dataframe

In [3]:
date_str = (datetime.now() - relativedelta(months = 2)).strftime("%Y-%m-%d")

In [4]:
url = (f"https://data.cityofchicago.org/resource/ajtu-isnz.json?"
	   f"$where=trip_start_timestamp >= '{date_str}T00:00:00' "
	   f"AND trip_start_timestamp <= '{date_str}T23:59:59'"
	   f"&$limit=30000")
response = requests.get(url)
data = response.json()

In [5]:
taxi_trips = pd.DataFrame(data)

## taxi transformations

In [6]:
def taxi_trips_transformations(taxi_trips: pd.DataFrame) -> pd.DataFrame :
	"""
	1. drop selected columns
	2. drop null values
	3. rename selected columns
	4. create helper column for dim_weather join

	:param taxi_trips: dataframe holding the daily taxi trips
	:raises TypeError: if taxi_trips is not a dataframe
	:return: transformed dataframe
	"""
	if not isinstance(taxi_trips, pd.DataFrame) :
		raise TypeError('taxi_trips is not a valid dataframe')

	taxi_trips.drop(['pickup_census_tract', 'dropoff_census_tract', 'pickup_centroid_location', 'dropoff_centroid_location'], axis = 1,
					inplace = True)
	taxi_trips.dropna(inplace = True)
	taxi_trips.rename(
			columns = {'pickup_community_area' : 'pickup_community_area_id', 'dropoff_community_area' : 'dropoff_community_area_id'},
			inplace = True)
	# időjárás adatok segéd oszlop - adott trip start timestamp órára lekerekített értéke
	taxi_trips['trip_start_timestamp'] = pd.to_datetime(taxi_trips['trip_start_timestamp'])
	taxi_trips['datetime_for_weather'] = taxi_trips['trip_start_timestamp'].dt.floor('h')  # órára lefelé kerekít
	return taxi_trips

In [7]:
taxi_trips_transformed = taxi_trips_transformations(taxi_trips)

## company and payment type transformations

In [8]:
def update_company_payment_type(taxi_trips: pd.DataFrame, dim_df: pd.DataFrame, id_col: str, value_col: str) -> pd.DataFrame :
	"""
	extend the dimension df with new values if there are any

	:param taxi_trips: dataframe holding the daily taxi trips
	:param dim_df: dataframe with the dimension data
	:param id_col: id column of the dimension dataframe
	:param value_col: name column of the dimension dataframe containing the values
	:return: updated dimension data, new values added if there is any new values in taxi trips data
	"""
	todays_dim_data = pd.DataFrame(taxi_trips[value_col].unique(), columns = [value_col])
	new_dim_data = todays_dim_data[~todays_dim_data[value_col].isin(dim_df[value_col])]

	if not new_dim_data.empty :
		max_id = dim_df[id_col].max()
		new_dim_data[id_col] = range(max_id + 1, max_id + 1 + len(new_dim_data))
		dim_df = pd.concat([dim_df, new_dim_data], ignore_index = True)

	return dim_df

## company and payment type dimension tables

In [9]:
dim_payment_type = taxi_trips['payment_type'].drop_duplicates().reset_index(drop = True)
dim_payment_type = pd.DataFrame({
	'payment_type_id' : range(1, len(dim_payment_type) + 1),
	'payment_type' : dim_payment_type
})

In [10]:
dim_company = taxi_trips['company'].drop_duplicates().reset_index(drop = True)
dim_company = pd.DataFrame({
	'company_id' : range(1, len(dim_company) + 1),
	'company' : dim_company
})

In [11]:
dim_payment_type_updated = update_company_payment_type(taxi_trips, dim_payment_type, 'payment_type_id', 'payment_type')
dim_company_updated = update_company_payment_type(taxi_trips, dim_company, 'company_id', 'company')

## update fact taxi trips with company and payment type ids

In [12]:
def update_fact_taxi_trips_with_dimension_data(taxi_trips: pd.DataFrame, dim_payment_type: pd.DataFrame, dim_company: pd.DataFrame) -> pd.DataFrame :
	"""
	add the dimension data to the transformed taxi trips dataframe

	:taxi_trips: fact table taxi trips
	:dim_payment_type: dimension table for payment types
	:dim_company: dimension table for company
	:return: transformed taxi trips dataframe with payment type and company id
	"""
	fact_taxi_trips = taxi_trips.merge(dim_payment_type, on = 'payment_type', how = 'left')
	fact_taxi_trips = fact_taxi_trips.merge(dim_company, on = 'company', how = 'left')
	fact_taxi_trips.drop(['payment_type', 'company'], axis = 1, inplace = True)
	return fact_taxi_trips

In [13]:
taxi_trips_transformed_with_dim_ids = update_fact_taxi_trips_with_dimension_data(taxi_trips_transformed, dim_payment_type_updated,
																				 dim_company_updated)

## weather data

In [14]:
def transform_weather(weather_data: Dict) -> pd.DataFrame :
	"""
	select and transform the weather data

	:weather_data: daily weather data from the API
	:return: transformed weather dataframe
	"""
	weather_data = {
		'datetime' : data['hourly']['time'],
		'temperature' : data['hourly']['temperature_2m'],
		'wind_speed' : data['hourly']['wind_speed_10m'],
		'rain' : data['hourly']['rain'],
		'precipitation' : data['hourly']['precipitation']}
	weather_df = pd.DataFrame(weather_data)
	weather_df['datetime'] = pd.to_datetime(weather_df['datetime'])
	return weather_df