In [16]:
import requests
import json
import os
import pandas as pd
pd.set_option('display.max_columns', 30)
from datetime import datetime
from dateutil.relativedelta import relativedelta

In [None]:
"""
1. get the data from S3
2. weather data transformations
3. taxi trips transformations - done
4. update payment_type_master - done
5. update company_master - done
6. update taxi_trips with company and payment_type id-s (replace the string values with id-s from the latest master tables) - done
7. upload weather data to S3
8. upload taxi data to S3
9. upload newest payment_type_master and company_master to S3
"""

'\n1. get the data from S3\n2. weather data transformations\n3. taxi trips transformations\n4. update payment_type_master\n5. update company_master\n6. update taxi_trips with company and payment_type id-s (replace the string values with id-s from the latest master tables)\n7. upload weather data to S3\n8. upload taxi data to S3\n9. upload newest payment_type_master and company_master to S3\n'

## taxi_trips transformation codes

In [18]:
current_datetime = datetime.now() - relativedelta(months=2) # 2 months earlier from now
formatted_datetime = current_datetime.strftime("%Y-%m-%d") # formating the 

url = (
    f"https://data.cityofchicago.org/resource/ajtu-isnz.json?"
    f"$where=trip_start_timestamp >= '{formatted_datetime}T00:00:00' "
    f"AND trip_start_timestamp <= '{formatted_datetime}T23:59:59'&$limit=30000"
)

headers = {'X-App-Token' : os.environ.get('CHICAGO_API_TOKEN')}

response = requests.get(url)

data = response.json()

In [19]:
taxi_trips = pd.DataFrame(data)

In [None]:
taxi_trips.drop(["pickup_census_tract", "dropoff_census_tract"], axis=1, inplace=True) # lot of values are NaN values
taxi_trips.drop(["pickup_centroid_location", "dropoff_centroid_location"], axis=1, inplace=True) # duplicated data we don't need them

taxi_trips.dropna(inplace = True)

taxi_trips.rename(columns={"pickup_community_area": "pickup_community_area_id", "dropoff_community_area": "dropoff_community_area_id"}, inplace=True)

taxi_trips["datetime_for_weather"] = pd.to_datetime(taxi_trips["trip_start_timestamp"]).dt.floor("H")

  taxi_trips["datetime_for_weather"] = pd.to_datetime(taxi_trips["trip_start_timestamp"]).dt.floor("H")


In [21]:
taxi_trips.head()

Unnamed: 0,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_community_area_id,dropoff_community_area_id,fare,tips,tolls,extras,trip_total,payment_type,company,pickup_centroid_latitude,pickup_centroid_longitude,dropoff_centroid_latitude,dropoff_centroid_longitude,datetime_for_weather
0,0706835645a030fb56452671b41ed76f0eabd4dc,2f8c51760f8358b29119959a9a4864d3d320e18a642b59...,2025-01-18T23:45:00.000,2025-01-18T23:45:00.000,492,0.58,8,24,9.5,2.83,0,0,12.83,Mobile,Flash Cab,41.899602111,-87.633308037,41.901206994,-87.676355989,2025-01-18 23:00:00
2,ede0ad083099d1f42ddbf58886b360842758ac93,791d74df896226a452a8e223e2ec9fa0df7d80bb7ca180...,2025-01-18T23:45:00.000,2025-01-19T00:15:00.000,1009,3.41,28,6,13.78,2.87,0,0,17.15,Mobile,Tac - Checker Cab Dispatch,41.874005383,-87.66351755,41.944226601,-87.655998182,2025-01-18 23:00:00
3,ed1a9c1f5f3f155cb91651cf759d43b7c2c6c7d0,a6dda627be96fe26d3b2b06015a2a60358b81f91ca6c3a...,2025-01-18T23:45:00.000,2025-01-19T00:00:00.000,774,2.14,28,8,9.75,0.0,0,1,10.75,Cash,5 Star Taxi,41.874005383,-87.66351755,41.899602111,-87.633308037,2025-01-18 23:00:00
4,eb62e9537f26eae72e7ebcd80ccf5f3890549a89,d786258c399e3cb198dcba48a1626465b7e02e1b6d8df2...,2025-01-18T23:45:00.000,2025-01-18T23:45:00.000,332,1.09,8,8,6.0,0.0,0,0,6.0,Cash,Taxicab Insurance Agency Llc,41.899602111,-87.633308037,41.899602111,-87.633308037,2025-01-18 23:00:00
5,e9d340c41af384fabde53eb587b7aead822b6f3a,83cdef885e81832f503b6929e7fe568508653c46268f84...,2025-01-18T23:45:00.000,2025-01-18T23:45:00.000,660,0.0,8,7,28.0,0.0,0,0,28.0,Cash,Taxi Affiliation Services,41.899602111,-87.633308037,41.922686284,-87.649488729,2025-01-18 23:00:00


### taxi_trips transformation function

In [22]:
def taxi_trips_transformations(taxi_trips: pd.DataFrame) -> pd.DataFrame:
    """
    Transforms a DataFrame containing taxi trip data by performing the following steps:
    
    1. Drops unnecessary columns: "pickup_census_tract", "dropoff_census_tract", 
       "pickup_centroid_location", and "dropoff_centroid_location".
    2. Removes rows with missing values.
    3. Renames columns "pickup_community_area" and "dropoff_community_area" to 
       "pickup_community_area_id" and "dropoff_community_area_id", respectively.
    4. Adds a new column "datetime_for_weather", which is derived by rounding 
       the "trip_start_timestamp" column to the nearest hour.

    Args:
        taxi_trips (pd.DataFrame): The input DataFrame containing taxi trip data.

    Returns:
        pd.DataFrame: The transformed DataFrame with the specified modifications.
    """

    if not isinstance(taxi_trips, pd.DataFrame):
        raise TypeError('taxi_trips is not a valid pandas DataFrame.')

    taxi_trips.drop(["pickup_census_tract", "dropoff_census_tract",
                     "pickup_centroid_location", "dropoff_centroid_location"], axis=1, inplace=True)

    taxi_trips.dropna(inplace = True)

    taxi_trips.rename(columns={"pickup_community_area": "pickup_community_area_id", "dropoff_community_area": "dropoff_community_area_id"}, inplace=True)

    taxi_trips["datetime_for_weather"] = pd.to_datetime(taxi_trips["trip_start_timestamp"]).dt.floor("H")

    return taxi_trips

### company update codes

In [24]:
company_master = taxi_trips['company'].drop_duplicates().reset_index(drop = True)

company_master = pd.DataFrame(
    {
        'company_id' : range(1, len(company_master) + 1),
        'company' : company_master
    }
)

company_master.tail()

Unnamed: 0,company_id,company
27,28,5167 - 71969 5167 Taxi Inc
28,29,Koam Taxi Association
29,30,Tac - American United Dispatch
30,31,2733 - 74600 Benny Jona
31,32,Tac - Yellow Non Color


In [25]:
new_company_data = [
    {'company' : 'Taxicab Insurance Agency Llc'},
    {'company' : 'X'},
    {'company' : 'Y'}
]

new_company_mapping = pd.DataFrame(new_company_data)

new_company_mapping

Unnamed: 0,company
0,Taxicab Insurance Agency Llc
1,X
2,Y


In [26]:
company_max_id = company_master['company_id'].max()
company_max_id

32

In [None]:
# several-line alternative
new_companies_list = []

for company in new_company_mapping['company'].values:
    if company not in company_master['company'].values:
        new_companies_list.append(company)

# one-line alternative
new_companies_list = [company for company in new_company_mapping['company'].values if company not in company_master['company'].values]

new_companies_list

['X', 'Y']

In [35]:
new_companies_df = pd.DataFrame(
    {
    'company_id' : range(company_max_id + 1, company_max_id + len(new_companies_list) + 1),
    'company' : new_companies_list
    }
)

new_companies_df

Unnamed: 0,company_id,company
0,33,X
1,34,Y


In [37]:
updated_company_master = pd.concat([company_master, new_companies_df], ignore_index = True)
updated_company_master.tail()

Unnamed: 0,company_id,company
29,30,Tac - American United Dispatch
30,31,2733 - 74600 Benny Jona
31,32,Tac - Yellow Non Color
32,33,X
33,34,Y


In [46]:
def update_company_master(taxi_trips: pd.DataFrame, company_master: pd.DataFrame) -> pd.DataFrame:
    """
    Updates the company master DataFrame by adding new companies found in the taxi trips DataFrame.

    Args:
        taxi_trips (pd.DataFrame): DataFrame containing taxi trip information, including company names.
        company_master (pd.DataFrame): DataFrame containing the existing company master information 
                                       with columns 'company_id' and 'company'.

    Returns:
        pd.DataFrame: Updated company master DataFrame with new companies added. New companies are assigned
                      unique IDs starting from the maximum company_id in the existing company_master DataFrame.
    """

    company_max_id = company_master['company_id'].max()

    new_companies_list = [company for company in taxi_trips['company'].values if company not in company_master['company'].values]

    new_companies_df = pd.DataFrame({
    'company_id' : range(company_max_id + 1, company_max_id + len(new_companies_list) + 1),
    'company' : new_companies_list
    })

    updated_company_master = pd.concat([company_master, new_companies_df], ignore_index = True)

    return updated_company_master


In [47]:
taxi_trips_company_only = pd.DataFrame({
    'company_id' : [1, 2, 3],
    'company' : ['Taxicab Insurance Agency Llc', 'X', 'Y']
})

taxi_trips_company_only

Unnamed: 0,company_id,company
0,1,Taxicab Insurance Agency Llc
1,2,X
2,3,Y


In [48]:
updated_company_master = update_company_master(taxi_trips = taxi_trips_company_only, company_master = company_master)
updated_company_master.tail()

Unnamed: 0,company_id,company
29,30,Tac - American United Dispatch
30,31,2733 - 74600 Benny Jona
31,32,Tac - Yellow Non Color
32,33,X
33,34,Y


### payment_type_master codes

In [49]:
payment_type_master = taxi_trips['payment_type'].drop_duplicates().reset_index(drop = True)

payment_type_master = pd.DataFrame(
    {
        'payment_type_id' : range(1, len(payment_type_master) + 1),
        'payment_type' : payment_type_master
    }
)

payment_type_master

Unnamed: 0,payment_type_id,payment_type
0,1,Mobile
1,2,Cash
2,3,Credit Card
3,4,Prcard
4,5,Unknown
5,6,No Charge
6,7,Dispute


In [50]:
taxi_trips_payment_type_only = pd.DataFrame({
    'payment_type_id' : [1, 2, 3],
    'payment_type' : ['Credit Card', 'X', 'Y']
})

taxi_trips_payment_type_only

Unnamed: 0,payment_type_id,payment_type
0,1,Credit Card
1,2,X
2,3,Y


In [None]:
def update_payment_type_master(taxi_trips: pd.DataFrame, payment_type_master: pd.DataFrame) -> pd.DataFrame:
    """
    Updates the payment type master DataFrame by adding new payment types found in the taxi trips DataFrame.

    Args:
        taxi_trips (pd.DataFrame): DataFrame containing taxi trip information, including payment types.
        payment_type_master (pd.DataFrame): DataFrame containing the existing payment type master information 
                                            with columns 'payment_type_id' and 'payment_type'.

    Returns:
        pd.DataFrame: Updated payment type master DataFrame with new payment types added. 
                      New payment types are assigned unique IDs starting from the maximum payment_type_id 
                      in the existing payment_type_master DataFrame.
    """

    payment_type_max_id = payment_type_master['payment_type_id'].max()

    new_payment_types_list = [payment_type for payment_type in taxi_trips['payment_type'].values if payment_type not in payment_type_master['payment_type'].values]

    new_payment_types_df = pd.DataFrame({
    'payment_type_id' : range(payment_type_max_id + 1, payment_type_max_id + len(new_payment_types_list) + 1),
    'payment_type' : new_payment_types_list
    })

    updated_payment_type_master = pd.concat([payment_type_master, new_payment_types_df], ignore_index = True)

    return updated_payment_type_master

In [54]:
updated_payment_type_master = update_payment_type_master(taxi_trips = taxi_trips_payment_type_only, payment_type_master = payment_type_master)
updated_payment_type_master

Unnamed: 0,payment_type_id,payment_type
0,1,Mobile
1,2,Cash
2,3,Credit Card
3,4,Prcard
4,5,Unknown
5,6,No Charge
6,7,Dispute
7,8,X
8,9,Y


### creating a generic update master table function

In [59]:
def update_master(taxi_trips: pd.DataFrame, master: pd.DataFrame, id_coulomn: str, value_column: str) -> pd.DataFrame:
    """
    Updates a master DataFrame by adding new values found in the taxi trips DataFrame.

    Args:
        taxi_trips (pd.DataFrame): DataFrame containing taxi trip information, including the column with new values.
        master (pd.DataFrame): DataFrame representing the master data to be updated, including specified ID and value columns.
        id_coulomn (str): Name of the column in the master DataFrame representing unique identifiers.
        value_column (str): Name of the column in the master DataFrame containing the values to be checked and updated.

    Returns:
        pd.DataFrame: Updated master DataFrame with new values added. New values are assigned unique IDs starting 
                      from the maximum value in the specified ID column of the existing master DataFrame.
    """

    max_id = master[id_coulomn].max()

    new_values_list = [value for value in taxi_trips[value_column].values if value not in master[value_column].values]

    new_values_df = pd.DataFrame({
    id_coulomn : range(max_id + 1, max_id + len(new_values_list) + 1),
    value_column : new_values_list
    })

    updated_master = pd.concat([master, new_values_df], ignore_index = True)

    return updated_master

In [60]:
test_payment_type_master = update_master(taxi_trips = taxi_trips_payment_type_only, master = payment_type_master, id_coulomn = 'payment_type_id', value_column = 'payment_type')
test_payment_type_master

Unnamed: 0,payment_type_id,payment_type
0,1,Mobile
1,2,Cash
2,3,Credit Card
3,4,Prcard
4,5,Unknown
5,6,No Charge
6,7,Dispute
7,8,X
8,9,Y


In [61]:
test_company_master = update_master(taxi_trips = taxi_trips_company_only, master = company_master, id_coulomn = 'company_id', value_column = 'company')
test_company_master

Unnamed: 0,company_id,company
0,1,Flash Cab
1,2,Tac - Checker Cab Dispatch
2,3,5 Star Taxi
3,4,Taxicab Insurance Agency Llc
4,5,Taxi Affiliation Services
5,6,City Service
6,7,Globe Taxi
7,8,Sun Taxi
8,9,Chicago Independents
9,10,Blue Ribbon Taxi Association


### update taxi_trips with the most recent company_master and payment_type_master function

In [67]:
def update_taxi_trips_with_master_data(taxi_trips: pd.DataFrame, payment_type_master: pd.DataFrame, company_master: pd.DataFrame) -> pd.DataFrame:
    """
    Updates the taxi trips DataFrame by merging it with payment type and company master DataFrames to replace 
    textual payment type and company data with their corresponding IDs.

    Args:
        taxi_trips (pd.DataFrame): DataFrame containing taxi trip information, including 'payment_type' and 'company' columns.
        payment_type_master (pd.DataFrame): DataFrame containing master data for payment types, with columns 
                                            'payment_type_id' and 'payment_type'.
        company_master (pd.DataFrame): DataFrame containing master data for companies, with columns 
                                       'company_id' and 'company'.

    Returns:
        pd.DataFrame: Updated taxi trips DataFrame where 'payment_type' and 'company' columns are replaced with 
                      'payment_type_id' and 'company_id' columns from the respective master DataFrames.
    """
    taxi_trips_id = taxi_trips.merge(payment_type_master, on = 'payment_type')

    taxi_trips_id = taxi_trips_id.merge(company_master, on = 'company')

    taxi_trips_id.drop(['payment_type', 'company'], axis = 1, inplace = True)

    return taxi_trips_id

In [69]:
taxi_trips_id = update_taxi_trips_with_master_data(taxi_trips = taxi_trips, payment_type_master = payment_type_master, company_master = company_master)

taxi_trips_id.sample(5)

Unnamed: 0,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_community_area_id,dropoff_community_area_id,fare,tips,tolls,extras,trip_total,pickup_centroid_latitude,pickup_centroid_longitude,dropoff_centroid_latitude,dropoff_centroid_longitude,datetime_for_weather,payment_type_id,company_id
112,eda8c49a638e80bc48865caa79dbdd47a588359d,f9bc93a0ba6b1f18c9709a96c99bb9c5a99054b1711f80...,2025-01-18T23:15:00.000,2025-01-18T23:30:00.000,1068,4.37,8,6,16.27,0,0,0,16.77,41.899602111,-87.633308037,41.944226601,-87.655998182,2025-01-18 23:00:00,1,6
7800,050481f8b22bdefac18fda1e157973cf31edea42,1d375f1440c49fc2eaf259ecaeea2a10b11fba5b30a691...,2025-01-18T08:45:00.000,2025-01-18T09:00:00.000,1062,4.95,8,34,15.75,0,0,1,16.75,41.899602111,-87.633308037,41.842076117,-87.633973422,2025-01-18 08:00:00,2,1
563,cd44b1f4b2cf188ed57ea72e09bb7c524e614b75,c3ebfce9cb4f64d18d29e3e416e9f64dfe671eb8226d44...,2025-01-18T22:00:00.000,2025-01-18T22:00:00.000,474,1.57,32,28,7.5,2,0,0,10.0,41.878865584,-87.625192142,41.874005383,-87.66351755,2025-01-18 22:00:00,3,7
3828,d7a8f26626bd00e8b2bb3176f45e405cfd1c99a4,449fa4909552757130d09d98ebc7770e2dd94579036b0e...,2025-01-18T15:45:00.000,2025-01-18T16:30:00.000,3278,17.94,76,8,46.0,3,0,4,53.5,41.980264315,-87.913624596,41.899602111,-87.633308037,2025-01-18 15:00:00,3,1
3854,b6277a275e32bba63adb1ce3d9654d07d44b043e,72fd2a97a38a77919c31eb8875711aefb94f776ceb4fef...,2025-01-18T15:45:00.000,2025-01-18T16:00:00.000,1019,7.33,38,43,20.25,0,0,0,20.25,41.812948939,-87.617859676,41.761577908,-87.572781987,2025-01-18 15:00:00,4,1


### weather transformations function

In [71]:
def transform_weather_data(weather_data: json) -> pd.DataFrame:
    """
    Transforms weather data in JSON format into a structured pandas DataFrame for easier analysis.

    Args:
        weather_data (json): JSON object containing hourly weather data, including time, temperature, wind speed, 
                             rain, and precipitation.

    Returns:
        pd.DataFrame: A DataFrame with the following columns:
                      - 'datetime': Timestamps converted to datetime objects.
                      - 'tempretaure': Hourly temperatures (in °C or relevant unit).
                      - 'wind_speed': Hourly wind speed (in m/s or relevant unit).
                      - 'rain': Hourly rainfall data.
                      - 'precipitation': Hourly precipitation data.
    """
    weather_data_filtered = {
        "datetime": weather_data["hourly"]["time"],
        "tempretaure": weather_data["hourly"]["temperature_2m"],
        "wind_speed": weather_data["hourly"]["wind_speed_10m"],
        "rain": weather_data["hourly"]["rain"],
        "precipitation": weather_data["hourly"]["precipitation"],
    }

    weather_df = pd.DataFrame(weather_data_filtered)

    weather_df["datetime"] = pd.to_datetime(weather_df["datetime"])

    return weather_df

In [72]:
current_datetime = datetime.now() - relativedelta(months=2)
formatted_datetime = current_datetime.strftime("%Y-%m-%d")

url = "https://archive-api.open-meteo.com/v1/era5"

params = {
    "latitude": 41.85,
    "longitude": -87.65,
    "start_date": formatted_datetime,
    "end_date": formatted_datetime,
    "hourly": "temperature_2m,wind_speed_10m,rain,precipitation"
}

response = requests.get(url, params = params)
weather_data = response.json()

weather_data_df = transform_weather_data(weather_data = weather_data)

weather_data_df

Unnamed: 0,datetime,tempretaure,wind_speed,rain,precipitation
0,2025-01-21 00:00:00,-13.3,13.7,0.0,0.0
1,2025-01-21 01:00:00,-13.4,12.8,0.0,0.0
2,2025-01-21 02:00:00,-13.9,10.7,0.0,0.0
3,2025-01-21 03:00:00,-14.3,10.8,0.0,0.0
4,2025-01-21 04:00:00,-14.7,14.7,0.0,0.0
5,2025-01-21 05:00:00,-15.4,14.3,0.0,0.0
6,2025-01-21 06:00:00,-16.1,14.8,0.0,0.0
7,2025-01-21 07:00:00,-17.2,15.2,0.0,0.0
8,2025-01-21 08:00:00,-18.1,14.6,0.0,0.0
9,2025-01-21 09:00:00,-18.7,15.0,0.0,0.0
