# Imports

In [215]:
import json
import os
from datetime import datetime
from dateutil.relativedelta import relativedelta

import pandas as pd
pd.set_option("display.max_columns", 40) # to see all columns
import requests

# Steps
DONE 1. get the data from S3 <br />
DONE 2. weather data transformation <br />
DONE 3. taxi data transformation <br />
DONE 4. update payment type master <br />
DONE 5. update company master <br />
DONE 6. update taxi data with company and payment_type IDs (replace the string values with IDs from the latest master files) <br />
DONE 7. upload weather data to s3 <br />
DONE 8. upload taxi data to s3 <br />
DONE 9. upload the udated payment_type master <br />
DONE 10. upload the udated company master <br />




# 3. Taxi Data Transformations

In [7]:
# current datetime (08.11) - 2 month = 06.11
""" 
In order to simulate daily data load, we have to go back two months in time, and download that day’s data. 
If today is 2023 12 01, then we download 2023 10 01 data.
One day is about 15 Mb, and around 14.000 rows.
"""
current_datetime = datetime.now() - relativedelta(months=2)
formatted_datetime = current_datetime.strftime("%Y-%m-%d")
formatted_datetime

url = f"""https://data.cityofchicago.org/resource/ajtu-isnz.json?$where=trip_start_timestamp >= 
'{formatted_datetime}T00:00:00' AND trip_start_timestamp <= '{formatted_datetime}T23:59:59'&$limit=30000"""
# headers = {"Y-App-Token": os.environ.get("CHICAGO_API_TOKEN")}
response = requests.get(url)
data = response.json()

In [258]:
taxi_trips = pd.DataFrame(data)
taxi_trips.head()

Unnamed: 0,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_community_area,fare,tips,tolls,extras,trip_total,payment_type,company,pickup_centroid_latitude,pickup_centroid_longitude,pickup_centroid_location,dropoff_community_area,dropoff_centroid_latitude,dropoff_centroid_longitude,dropoff_centroid_location,pickup_census_tract,dropoff_census_tract
0,0364a6791472e0a38aef0c3da63ade080f469f26,da3e1a4ae3869a45b431284672484905a694d6a7a08a05...,2024-06-25T23:45:00.000,2024-06-26T00:15:00.000,1299,10.0,5,27.5,0.0,0,0.0,27.5,Prcard,Flash Cab,41.947791586,-87.683834942,"{'type': 'Point', 'coordinates': [-87.68383494...",,,,,,
1,fe13b4ac2a8111f89d77bb8355f64983e58d2c32,3cf614bd7410ac2d0334dd9e1275a2d1c5a3ede41a98a5...,2024-06-25T23:45:00.000,2024-06-25T23:45:00.000,600,5.1,76,14.75,1.0,0,12.5,28.25,Credit Card,Taxi Affiliation Services,41.980264315,-87.913624596,"{'type': 'Point', 'coordinates': [-87.91362459...",,,,,,
2,fd02bf4bac4dedb770966a454942c945d181ec28,5d58f628a7d8340722380e52d042fb43ba69ae9e42559f...,2024-06-25T23:45:00.000,2024-06-26T00:00:00.000,515,4.21,76,13.0,0.0,0,4.0,17.0,Cash,5 Star Taxi,41.980264315,-87.913624596,"{'type': 'Point', 'coordinates': [-87.91362459...",10.0,41.985015101,-87.804532006,"{'type': 'Point', 'coordinates': [-87.80453200...",,
3,fbf544bddd47225e0b49386acfcce24b5fd95ba9,19ccfdbec42380e8beda815e0a69bd92aaec6bc3982643...,2024-06-25T23:45:00.000,2024-06-25T23:45:00.000,180,0.0,33,80.0,0.0,0,0.0,80.0,Cash,"Taxicab Insurance Agency, LLC",41.849246754,-87.624135298,"{'type': 'Point', 'coordinates': [-87.62413529...",33.0,41.859349715,-87.617358006,"{'type': 'Point', 'coordinates': [-87.61735800...",17031841000.0,17031330100.0
4,f90bda0fd83dba1fd513628594987bc1d83419c9,ea8e6df913a36562d8eddf662abe7722f4c0dc08527e98...,2024-06-25T23:45:00.000,2024-06-26T00:00:00.000,1380,9.7,56,26.75,6.45,0,5.0,38.2,Credit Card,Taxi Affiliation Services,41.79259236,-87.769615453,"{'type': 'Point', 'coordinates': [-87.76961545...",33.0,41.857183858,-87.620334624,"{'type': 'Point', 'coordinates': [-87.62033462...",,


In [259]:
def taxi_trips_transformations(taxi_trips: pd.DataFrame) -> pd.DataFrame:

    """
    Perform transformations with the taxi data.
    Rarameters:
    - taxi_trips: pd.DataFrame containing the daily taxi trips
    Return: taxi_trips: pd.DataFrame containing the transformed daily taxi trips
    """

    # if not isinstance(taxi_trips: pd.DataFrame):
        # raise TypeError("Taxi trips is not a valid DataFrame.")
    
    # missing columns ValueError
    
    # drop null values
    taxi_trips.drop(["pickup_census_tract", "dropoff_census_tract", 
                    "pickup_centroid_location", "dropoff_centroid_location"], axis = 1, inplace = True)
    taxi_trips.dropna(inplace=True)

    # rename columns
    taxi_trips.rename(columns={"pickup_community_area": "pickup_community_area_id",
                               "dropoff_community_area": "dropoff_community_area_id"}, inplace=True)

    # helper column to map weather data
    taxi_trips["datetime_for_weather"] = pd.to_datetime(taxi_trips["trip_start_timestamp"]).dt.floor("H")  
        
    return taxi_trips

In [260]:
transformed_taxi_trips = taxi_trips_transformations(taxi_trips)
transformed_taxi_trips.head()

  taxi_trips["datetime_for_weather"] = pd.to_datetime(taxi_trips["trip_start_timestamp"]).dt.floor("H")


Unnamed: 0,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_community_area_id,fare,tips,tolls,extras,trip_total,payment_type,company,pickup_centroid_latitude,pickup_centroid_longitude,dropoff_community_area_id,dropoff_centroid_latitude,dropoff_centroid_longitude,datetime_for_weather
2,fd02bf4bac4dedb770966a454942c945d181ec28,5d58f628a7d8340722380e52d042fb43ba69ae9e42559f...,2024-06-25T23:45:00.000,2024-06-26T00:00:00.000,515,4.21,76,13.0,0.0,0,4,17.0,Cash,5 Star Taxi,41.980264315,-87.913624596,10,41.985015101,-87.804532006,2024-06-25 23:00:00
3,fbf544bddd47225e0b49386acfcce24b5fd95ba9,19ccfdbec42380e8beda815e0a69bd92aaec6bc3982643...,2024-06-25T23:45:00.000,2024-06-25T23:45:00.000,180,0.0,33,80.0,0.0,0,0,80.0,Cash,"Taxicab Insurance Agency, LLC",41.849246754,-87.624135298,33,41.859349715,-87.617358006,2024-06-25 23:00:00
4,f90bda0fd83dba1fd513628594987bc1d83419c9,ea8e6df913a36562d8eddf662abe7722f4c0dc08527e98...,2024-06-25T23:45:00.000,2024-06-26T00:00:00.000,1380,9.7,56,26.75,6.45,0,5,38.2,Credit Card,Taxi Affiliation Services,41.79259236,-87.769615453,33,41.857183858,-87.620334624,2024-06-25 23:00:00
5,f8a2f7fa89bfcd37231fc1791bc9d024cafa7167,78d1ae230754ef2eca7aa3e828789bc4714b8a261a35d5...,2024-06-25T23:45:00.000,2024-06-25T23:45:00.000,4,0.0,8,45.0,5.0,0,0,50.5,Credit Card,City Service,41.892507781,-87.626214906,8,41.892507781,-87.626214906,2024-06-25 23:00:00
6,f2a46942fffc2added201c32b2a8a6d0b6dc8125,d1a7c7e8e9cf388f9923e529d82166c5a3baf4262ec914...,2024-06-25T23:45:00.000,2024-06-26T00:15:00.000,1140,11.7,76,29.75,10.25,0,4,44.0,Credit Card,Chicago Independents,41.980264315,-87.913624596,22,41.92276062,-87.699155343,2024-06-25 23:00:00


# 4. Update Payment Master

In [261]:
def update_payment_type_master(payment_type_master: pd.DataFrame, new_payment_types_df: pd.DataFrame) -> pd.DataFrame:
    """
    Extend the payment_type master with new payment types if tere are any.
    Parameters:
    - payment_type_master is a pd.DataFrame containing containing the payment_type_master data,
    - new_payment_type_df is a pd.DataFrame containing the new payment types.
    Returns:
    - updated_payment_type_master is a pd.DataFrame containing the updated_payment_type_master data.
    """
    payment_type_max_id = payment_type_master["payment_type_id"].max()
    new_payment_type_list = [payment_type for payment_type in new_payment_types_df["payment_type"].values 
                             if payment_type not in payment_type_master["payment_type"].values]
    new_payment_types_df = pd.DataFrame({
        "payment_type_id": range(payment_type_max_id +1, payment_type_max_id + len(new_payment_type_list) + 1),
        "payment_type": new_payment_type_list
    })
    
    updated_payment_type_master = pd.concat([payment_type_master, new_payment_types_df], ignore_index=True)
    
    return updated_payment_type_master

In [262]:
payment_type_master = taxi_trips["payment_type"].drop_duplicates().reset_index(drop=True)
payment_type_master = pd.DataFrame(
    {
        "payment_type_id": range(1, len(payment_type_master) + 1),
        "payment_type": payment_type_master
    }
)
payment_type_master.tail()

Unnamed: 0,payment_type_id,payment_type
2,3,Mobile
3,4,Prcard
4,5,Unknown
5,6,No Charge
6,7,Dispute


In [263]:
new_payment_types_df = pd.DataFrame({
    "payment_type_id": [1, 2, 3],
    "payment_type": ["Credit Card", "X", "y"]
})

new_payment_types_df

Unnamed: 0,payment_type_id,payment_type
0,1,Credit Card
1,2,X
2,3,y


In [264]:
updated_payment_type_master = update_payment_type_master(payment_type_master, new_payment_types_df)

updated_payment_type_master

Unnamed: 0,payment_type_id,payment_type
0,1,Cash
1,2,Credit Card
2,3,Mobile
3,4,Prcard
4,5,Unknown
5,6,No Charge
6,7,Dispute
7,8,X
8,9,y


# 5. Update Company Master

In [265]:
def update_company_master(company_master: pd.DataFrame, new_companies_df: pd.DataFrame) -> pd.DataFrame:
    """
    Extend the company master with new companies if tere are any.
    Parameters:
    - company_master is a pd.DataFrame containing the daily taxi trips,
    - new_companies_df is a pd.DataFrame containing the company_master data.
    Returns:
    - updated_company_master is a pd.DataFrame containing the updated company_master data.
    """
    company_max_id = company_master["company_id"].max()

    # new_companies_list = [] <br />
#    for company in new_company_mapping["company"].values: <br />
#       if company not in company_master["company"].values: <br />
#           new_companies_list.append(company)
    new_companies_list = [company for company in new_companies_df["company"].values if company not in company_master["company"].values]
    
    new_companies_df = pd.DataFrame({
        "company_id": range(company_max_id +1, company_max_id + len(new_companies_list) + 1),
        "company": new_companies_list
    })
    
    updated_company_master = pd.concat([company_master, new_companies_df], ignore_index=True)
    
    return updated_company_master

In [266]:
company_master = taxi_trips["company"].drop_duplicates().reset_index(drop=True)
company_master = pd.DataFrame(
    {
        "company_id": range(1, len(company_master) + 1),
        "company": company_master
    }
)
company_master.tail()

Unnamed: 0,company_id,company
30,31,Star North Taxi Management Llc
31,32,2733 - 74600 Benny Jona
32,33,3556 - 36214 RC Andrews Cab
33,34,Metro Jet Taxi A.
34,35,Tac - Yellow Non Color


In [267]:
new_companies_df = pd.DataFrame({
    "company_id": [1, 2, 3],
    "company": ["Tac - Yellow Non Color", "X", "y"]
})

new_companies_df.dtypes

company_id     int64
company       object
dtype: object

In [268]:
updated_company_master = update_company_master(company_master, new_companies_df)

updated_company_master.tail()

Unnamed: 0,company_id,company
32,33,3556 - 36214 RC Andrews Cab
33,34,Metro Jet Taxi A.
34,35,Tac - Yellow Non Color
35,36,X
36,37,y


# Generic Master Update Function

In [269]:
payment_type_master.tail()

Unnamed: 0,payment_type_id,payment_type
2,3,Mobile
3,4,Prcard
4,5,Unknown
5,6,No Charge
6,7,Dispute


In [270]:
company_master.tail()

Unnamed: 0,company_id,company
30,31,Star North Taxi Management Llc
31,32,2733 - 74600 Benny Jona
32,33,3556 - 36214 RC Andrews Cab
33,34,Metro Jet Taxi A.
34,35,Tac - Yellow Non Color


In [271]:
def update_master(master: pd.DataFrame, new_entries: pd.DataFrame, id_column: str, value_column: str) -> pd.DataFrame:
    """
    Extend the master with new entries if tere are any.
    Parameters:
    - master is a pd.DataFrame containing the values of one column of the daily taxi trips,
    - new_entries is a pd.DataFrame containing the new entries of that column stored in master,
    - id_column is an string (str) to identify the entries,
    - value_column is a string (str) containing the column name.
    Returns:
    - updated_data_master is a pd.DataFrame containing the updated master data with the new entries.
    """
    max_id = master[id_column].max()

    # new_entry_list = [] 
    #    for entry in new_entries["value_column"].values: 
    #       if entry not in master["value_column"].values: 
    #           new_entry_list.append()
    new_entry_list = [value for value in new_entries[value_column].values if value not in master[value_column].values]
    
    new_entries = pd.DataFrame({
        id_column: range(max_id + 1, max_id + len(new_entry_list) + 1),
        value_column: new_entry_list
    })
    
    updated_master = pd.concat([master, new_entries], ignore_index=True)
    
    return updated_master

In [272]:
updated_company_master = update_master(company_master, new_companies_df, "company_id", "company")
# updated_master.tail()

In [273]:
updated_payment_type_master = update_master(payment_type_master, new_payment_types_df, "payment_type_id", "payment_type")
# updated_master.tail()

# Update Taxi Trips with Masters

In [274]:
taxi_trips_id = taxi_trips.merge(payment_type_master, on="payment_type")
taxi_trips_id = taxi_trips_id.merge(company_master, on="company")
taxi_trips_id.drop(["payment_type", "company"], axis=1, inplace=True)

taxi_trips_id.head()

Unnamed: 0,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_community_area_id,fare,tips,tolls,extras,trip_total,pickup_centroid_latitude,pickup_centroid_longitude,dropoff_community_area_id,dropoff_centroid_latitude,dropoff_centroid_longitude,datetime_for_weather,payment_type_id,company_id
0,fd02bf4bac4dedb770966a454942c945d181ec28,5d58f628a7d8340722380e52d042fb43ba69ae9e42559f...,2024-06-25T23:45:00.000,2024-06-26T00:00:00.000,515,4.21,76,13.0,0.0,0,4,17.0,41.980264315,-87.913624596,10,41.985015101,-87.804532006,2024-06-25 23:00:00,1,1
1,fbf544bddd47225e0b49386acfcce24b5fd95ba9,19ccfdbec42380e8beda815e0a69bd92aaec6bc3982643...,2024-06-25T23:45:00.000,2024-06-25T23:45:00.000,180,0.0,33,80.0,0.0,0,0,80.0,41.849246754,-87.624135298,33,41.859349715,-87.617358006,2024-06-25 23:00:00,1,2
2,f90bda0fd83dba1fd513628594987bc1d83419c9,ea8e6df913a36562d8eddf662abe7722f4c0dc08527e98...,2024-06-25T23:45:00.000,2024-06-26T00:00:00.000,1380,9.7,56,26.75,6.45,0,5,38.2,41.79259236,-87.769615453,33,41.857183858,-87.620334624,2024-06-25 23:00:00,2,3
3,f8a2f7fa89bfcd37231fc1791bc9d024cafa7167,78d1ae230754ef2eca7aa3e828789bc4714b8a261a35d5...,2024-06-25T23:45:00.000,2024-06-25T23:45:00.000,4,0.0,8,45.0,5.0,0,0,50.5,41.892507781,-87.626214906,8,41.892507781,-87.626214906,2024-06-25 23:00:00,2,4
4,f2a46942fffc2added201c32b2a8a6d0b6dc8125,d1a7c7e8e9cf388f9923e529d82166c5a3baf4262ec914...,2024-06-25T23:45:00.000,2024-06-26T00:15:00.000,1140,11.7,76,29.75,10.25,0,4,44.0,41.980264315,-87.913624596,22,41.92276062,-87.699155343,2024-06-25 23:00:00,2,5


In [277]:
def update_taxi_trips_with_master_data(transformed_taxi_trips: pd.DataFrame, updated_payment_type_master: pd.DataFrame, 
                                       updated_company_master: pd.DataFrame) -> pd.DataFrame:
    """
    Update taxi trips with new payment and company types and change the names with ids.
    Parameters:
    - transformed taxi_trips is a pd.Dataframe containing the transformed taxi trips data,
    - updated_payment_type_master is a pd.Dataframe containing the updated payment type master data,
    - updated_company_master is a pd.Dataframe containing the updated company master data.
    Return:
    - merged taxi_trips is a pd.Dataframe containing the transformed taxi trips data and the IDs of the updated master data.
    """
    transformed_taxi_trips = transformed_taxi_trips.merge(updated_payment_type_master, on="payment_type")
    merged_taxi_trips = transformed_taxi_trips.merge(updated_company_master, on="company")
    merged_taxi_trips.drop(["payment_type", "company"], axis=1, inplace=True)

    return merged_taxi_trips

In [278]:
merged_taxi_trips = update_taxi_trips_with_master_data(taxi_trips, updated_payment_type_master, updated_company_master)
merged_taxi_trips.head()

Unnamed: 0,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_community_area_id,fare,tips,tolls,extras,trip_total,pickup_centroid_latitude,pickup_centroid_longitude,dropoff_community_area_id,dropoff_centroid_latitude,dropoff_centroid_longitude,datetime_for_weather,payment_type_id,company_id
0,fd02bf4bac4dedb770966a454942c945d181ec28,5d58f628a7d8340722380e52d042fb43ba69ae9e42559f...,2024-06-25T23:45:00.000,2024-06-26T00:00:00.000,515,4.21,76,13.0,0.0,0,4,17.0,41.980264315,-87.913624596,10,41.985015101,-87.804532006,2024-06-25 23:00:00,1,1
1,fbf544bddd47225e0b49386acfcce24b5fd95ba9,19ccfdbec42380e8beda815e0a69bd92aaec6bc3982643...,2024-06-25T23:45:00.000,2024-06-25T23:45:00.000,180,0.0,33,80.0,0.0,0,0,80.0,41.849246754,-87.624135298,33,41.859349715,-87.617358006,2024-06-25 23:00:00,1,2
2,f90bda0fd83dba1fd513628594987bc1d83419c9,ea8e6df913a36562d8eddf662abe7722f4c0dc08527e98...,2024-06-25T23:45:00.000,2024-06-26T00:00:00.000,1380,9.7,56,26.75,6.45,0,5,38.2,41.79259236,-87.769615453,33,41.857183858,-87.620334624,2024-06-25 23:00:00,2,3
3,f8a2f7fa89bfcd37231fc1791bc9d024cafa7167,78d1ae230754ef2eca7aa3e828789bc4714b8a261a35d5...,2024-06-25T23:45:00.000,2024-06-25T23:45:00.000,4,0.0,8,45.0,5.0,0,0,50.5,41.892507781,-87.626214906,8,41.892507781,-87.626214906,2024-06-25 23:00:00,2,4
4,f2a46942fffc2added201c32b2a8a6d0b6dc8125,d1a7c7e8e9cf388f9923e529d82166c5a3baf4262ec914...,2024-06-25T23:45:00.000,2024-06-26T00:15:00.000,1140,11.7,76,29.75,10.25,0,4,44.0,41.980264315,-87.913624596,22,41.92276062,-87.699155343,2024-06-25 23:00:00,2,5


# Weather Data Transformation

In [290]:
current_datetime = datetime.now() - relativedelta(months=2)
formatted_datetime = current_datetime.strftime("%Y-%m-%d")

url = "https://archive-api.open-meteo.com/v1/era5"

params = {
    'latitude': 41.85,
    'longitude': -87.65,
    'start_date': formatted_datetime,
    'end_date': formatted_datetime,
    'hourly': 'temperature_2m,wind_speed_10m,rain,precipitation'
}
response = requests.get(url, params = params)
weather_data = response.json()
# response
# weather_data

In [291]:
def transform_weather_data(weather_data:json) -> pd.DataFrame:
    """
    Make transformations on the daily weather API response.
    Parameters:
    - weather_data in json format.
    Return:
    - weather_data pd.DataFrame.
    """

    weather_data = {
        "datetime": weather_data["hourly"]["time"],
        "temperature": weather_data["hourly"]["temperature_2m"],
        "wind_speed": weather_data["hourly"]["wind_speed_10m"],
        "rain": weather_data["hourly"]["rain"],
        "precipitation": weather_data["hourly"]["precipitation"]
    }

    weather_data = pd.DataFrame(weather_data)

    weather_data["datetime"] = pd.to_datetime(weather_data["datetime"])

    return weather_data

In [292]:
weather_data = transform_weather_data(weather_data)
weather_data.head()

Unnamed: 0,datetime,temperature,wind_speed,rain,precipitation
0,2024-06-25 00:00:00,27.3,15.2,0.0,0.0
1,2024-06-25 01:00:00,27.6,14.9,0.0,0.0
2,2024-06-25 02:00:00,27.3,19.2,0.0,0.0
3,2024-06-25 03:00:00,26.5,18.9,0.0,0.0
4,2024-06-25 04:00:00,25.9,25.3,0.0,0.0
