In [4]:
from datetime import datetime
from dateutil.relativedelta import relativedelta
import json
import os

import pandas as pd
pd.set_option("display.max_columns", 30)
import requests

In [5]:
"""
1. get the data from s3
2. weather_data transformations
3. taxi_trips transormations - DONE
4. update payment_type_master - DONE
5. update company_master - DONE
6. update taxi_trips with company and payment_type id-s (replace the string values with id-s from the latest master tables) - DONE
7. upload weather data to s3
8. upload taxi data to s3
9. upload the newest payment_type_master and company_master
"""

'\n1. get the data from s3\n2. weather_data transformations\n3. taxi_trips transormations\n4. update payment_type_master\n5. update company_master\n6. update taxi_trips with company and payment_type id-s (replace the string values with id-s from the latest master tables) \n7. upload weather data to s3\n8. upload taxi data to s3\n9. upload the newest payment_type_master and company_master\n'

### taxi_trips transformation

In [6]:
# Extract T-2 months' data

current_datetime = datetime.now() - relativedelta(months=2)
formatted_datetime = current_datetime.strftime("%Y-%m-%d")

url = f"https://data.cityofchicago.org/resource/ajtu-isnz.json?$where=trip_start_timestamp >= '{formatted_datetime}T00:00:00' AND trip_start_timestamp <= '{formatted_datetime}T23:59:59'&$limit=30000"

# headers = {"X-App-Token": os.environ.get("CHICAGO_API_TOKEN")}
# response = requests.get(url, headers=headers)

response = requests.get(url)

data = response.json()

In [7]:
taxi_trips = pd.DataFrame(data)

In [8]:
taxi_trips.drop(["pickup_census_tract", "dropoff_census_tract"], axis=1, inplace=True)
taxi_trips.drop(["pickup_centroid_location", "dropoff_centroid_location"], axis=1, inplace=True)

taxi_trips.dropna(inplace=True)

taxi_trips.rename(columns={"pickup_community_area": "pickup_community_area_id", 
                           "dropoff_community_area": "dropoff_community_area_id"}, inplace=True)

taxi_trips["datetime_for_weather"] = pd.to_datetime(taxi_trips["trip_start_timestamp"]).dt.floor("H")

  taxi_trips["datetime_for_weather"] = pd.to_datetime(taxi_trips["trip_start_timestamp"]).dt.floor("H")


In [9]:
taxi_trips.head()

Unnamed: 0,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_community_area_id,dropoff_community_area_id,fare,tips,tolls,extras,trip_total,payment_type,company,pickup_centroid_latitude,pickup_centroid_longitude,dropoff_centroid_latitude,dropoff_centroid_longitude,datetime_for_weather
0,9a0eee16b60afd495822f50ab2b7559f775777d5,de9289a1fce135051f6e9206044f8211c86a1a6acd68d4...,2024-06-12T23:45:00.000,2024-06-13T00:15:00.000,1616,12.86,8,48,34.0,0.0,0,0.0,34.0,Prcard,Flash Cab,41.899602111,-87.633308037,41.729676423,-87.572717134,2024-06-12 23:00:00
1,99a385190a881371dcde15bd12a49e3b7865b1f0,8dbaf4164524af926ddeab5dd084183e585f4cb8cc47fe...,2024-06-12T23:45:00.000,2024-06-13T00:15:00.000,1210,12.65,76,4,32.0,7.3,0,4.0,43.8,Credit Card,Medallion Leasin,41.980264315,-87.913624596,41.975170943,-87.687515515,2024-06-12 23:00:00
3,8c83163b2a8d6dd025276d1b9ae201f86152b010,1dca7e7332893fa1836f448df24637325574b860632c18...,2024-06-12T23:45:00.000,2024-06-13T00:15:00.000,1589,5.93,76,10,21.75,0.0,0,7.5,29.25,Cash,Flash Cab,41.980264315,-87.913624596,41.985015101,-87.804532006,2024-06-12 23:00:00
4,8c12322c21accd3d99f37bf7b01bd5b2bf5204e9,545ac2dfd5b722e0f0d884cc68ec27d5b0537410edd8bf...,2024-06-12T23:45:00.000,2024-06-12T23:45:00.000,275,0.57,28,32,9.0,1.15,0,0.0,10.15,Mobile,Tac - American United Dispatch,41.874005383,-87.66351755,41.878865584,-87.625192142,2024-06-12 23:00:00
5,88effcf5d6be3f6d66678f4c59f052eac1254aaf,95c90b10f68a3ce3430ee2b6107608ada7453b1cce0303...,2024-06-12T23:45:00.000,2024-06-13T00:15:00.000,1851,12.07,8,44,29.71,8.97,0,0.0,38.68,Mobile,Sun Taxi,41.899602111,-87.633308037,41.740205756,-87.615969523,2024-06-12 23:00:00


#### taxi_trips transformation function

In [10]:
def taxi_trips_transformations(taxi_trips: pd.DataFrame) -> pd.DataFrame:
    """Perorm transformations with the taxi data

    Parameters
    ---------
    taxi_trips: pd.DataFrame
        The Dataframe holding the daily taxi trips.

    Returns
    ---------
    pd.DataFrame
        The cleaned, transformed DataFrame holding the daily taxi trips.

    """

    if not isinstance(taxi_trips, pd.DataFrame):
        raise TypeError("taxi_trips is not a valid pandas DataFrame")

    taxi_trips.drop(["pickup_census_tract", "dropoff_census_tract", 
                     "pickup_centroid_location", "dropoff_centroid_location"], axis=1, inplace=True)

    taxi_trips.dropna(inplace=True)

    taxi_trips.rename(columns={"pickup_community_area": "pickup_community_area_id", 
                            "dropoff_community_area": "dropoff_community_area_id"}, inplace=True)

    taxi_trips["datetime_for_weather"] = pd.to_datetime(taxi_trips["trip_start_timestamp"]).dt.floor("H")

    return taxi_trips

In [11]:
# taxi_trips_transformations("String")

### company update codes

In [12]:
company_master = taxi_trips["company"].drop_duplicates().reset_index(drop=True)
company_master = pd.DataFrame(
    {
        "company_id": range(1, len(company_master) + 1),
        "company": company_master
    }
)
company_master

Unnamed: 0,company_id,company
0,1,Flash Cab
1,2,Medallion Leasin
2,3,Tac - American United Dispatch
3,4,Sun Taxi
4,5,Globe Taxi
5,6,Taxi Affiliation Services
6,7,City Service
7,8,5 Star Taxi
8,9,Choice Taxi Association
9,10,Taxicab Insurance Agency Llc


In [13]:
new_company_data = [
    {"company": "5167 - 71969 5167 Taxi Inc"},
    {"company": "x"},
    {"company": "y",}
]

new_company_mapping = pd.DataFrame(new_company_data)
new_company_mapping

Unnamed: 0,company
0,5167 - 71969 5167 Taxi Inc
1,x
2,y


In [14]:
company_max_id = company_master["company_id"].max()
company_max_id

np.int64(33)

In [15]:
new_companies_list = []

for company in new_company_mapping["company"].values:
    if company not in company_master["company"].values:
        new_companies_list.append(company)

# one line
new_companies_list_one_line = [company for company in new_company_mapping["company"].values if company not in company_master["company"].values]

new_companies_list_one_line

['x', 'y']

In [16]:
new_companies_df = pd.DataFrame({
    "company_id" : range(company_max_id + 1, company_max_id + len(new_companies_list) + 1),
    "company" : new_companies_list
})

new_companies_df

Unnamed: 0,company_id,company
0,34,x
1,35,y


In [17]:
updated_company_master = pd.concat([company_master, new_companies_df], ignore_index=True)

updated_company_master.tail()

Unnamed: 0,company_id,company
30,31,Tac - Yellow Non Color
31,32,Metro Jet Taxi A.
32,33,Petani Cab Corp
33,34,x
34,35,y


In [18]:
def update_company_master(taxi_trips: pd.DataFrame, company_master: pd.DataFrame) -> pd.DataFrame:
    """Extend the company master with new companies if there are new companies.

    Parameters
    ----------
    taxi_trips: pd.DataFrame
        DataFrame holding the daily taxi trips.
    company_master: pd.DataFrame
        DataFrame holding the company_master data.

    Returns:
    pd.DataFrame
        The updated company_master data, if new companies are in the taxi data, they will be loaded to it.    
    """

    company_max_id = company_master["company_id"].max()
    new_companies_list = [company for company in taxi_trips["company"].values if company not in company_master["company"].values]
    new_companies_df = pd.DataFrame({
        "company_id" : range(company_max_id + 1, company_max_id + len(new_companies_list) + 1),
        "company" : new_companies_list
    })

    new_companies_df
    
    updated_company_master = pd.concat([company_master, new_companies_df], ignore_index=True)

    return updated_company_master


In [19]:
taxi_trips_company_only = pd.DataFrame({
    "company_id": [1, 2, 3], 
    "company": ["6574 - Babylon Express Inc.", "x", "y"]
})
taxi_trips_company_only

Unnamed: 0,company_id,company
0,1,6574 - Babylon Express Inc.
1,2,x
2,3,y


In [20]:
updated_company_master = update_company_master(taxi_trips=taxi_trips_company_only, company_master=company_master)

In [21]:
updated_company_master.tail()

Unnamed: 0,company_id,company
30,31,Tac - Yellow Non Color
31,32,Metro Jet Taxi A.
32,33,Petani Cab Corp
33,34,x
34,35,y


### payment_type_master codes

In [22]:
payment_type_master = taxi_trips["payment_type"].drop_duplicates().reset_index(drop=True)
payment_type_master = pd.DataFrame(
    {
        "payment_type_id": range(1, len(payment_type_master) + 1),
        "payment_type": payment_type_master
    }
)

taxi_trips_payment_type_only = pd.DataFrame({
    "payment_type_id": [1, 2, 3], 
    "payment_type": ["Credit Card", "x", "y"]
})
taxi_trips_payment_type_only

Unnamed: 0,payment_type_id,payment_type
0,1,Credit Card
1,2,x
2,3,y


In [23]:
def update_payment_type_master(taxi_trips: pd.DataFrame, payment_type_master: pd.DataFrame) -> pd.DataFrame:
    """Extend the payment_type master with new companies if there are new payment types.

    Parameters
    ----------
    taxi_trips: pd.DataFrame
        DataFrame holding the daily taxi trips.
    payment_type_master: pd.DataFrame
        DataFrame holding the payment_type_master data.

    Returns:
    pd.DataFrame
        The updated payment_type_master data, if new payment types are in the taxi data, they will be loaded to it.    
    """

    payment_type_max_id = payment_type_master["payment_type_id"].max()
    new_payment_types_list = [payment_type for payment_type in taxi_trips["payment_type"].values if payment_type not in payment_type_master["payment_type"].values]
    new_payment_types_df = pd.DataFrame({
        "payment_type_id" : range(payment_type_max_id + 1, payment_type_max_id + len(new_payment_types_list) + 1),
        "payment_type" : new_payment_types_list
    })
    
    updated_payment_type_master = pd.concat([payment_type_master, new_payment_types_df], ignore_index=True)

    return updated_payment_type_master

In [24]:
updated_payment_type_master = update_payment_type_master(taxi_trips=taxi_trips_payment_type_only, payment_type_master=payment_type_master)

In [25]:
updated_payment_type_master

Unnamed: 0,payment_type_id,payment_type
0,1,Prcard
1,2,Credit Card
2,3,Cash
3,4,Mobile
4,5,Unknown
5,6,No Charge
6,7,Dispute
7,8,x
8,9,y


### Creating a generic update master table function

In [26]:
def update_master(taxi_trips: pd.DataFrame, master: pd.DataFrame, id_column: str, value_column: str) -> pd.DataFrame:
    """Extend the master DF with new companies if there are new values if there are any.

    Parameters
    ----------
    taxi_trips: pd.DataFrame
        DataFrame holding the daily taxi trips.
    master: pd.DataFrame
        DataFrame holding the master data.
    id_column: str
        The id column of the master DataFrame.
    value_column: str
        Name of the column in master_df containing the values.

    Returns:
    pd.DataFrame
        The updated master data, if new values are in the taxi data, they will be loaded to it.    
    """

    max_id = master[id_column].max()

    new_values_list = [value for value in taxi_trips[value_column].values if value not in master[value_column].values]
    new_values_df = pd.DataFrame({
       id_column : range(max_id + 1, max_id + len(new_values_list) + 1),
       value_column : new_values_list
    })
    
    updated_master = pd.concat([master, new_values_df], ignore_index=True)

    return updated_master

In [28]:
test_payment_type_master = update_master(taxi_trips=taxi_trips_payment_type_only, master=payment_type_master, id_column="payment_type_id", value_column="payment_type")

In [29]:
test_payment_type_master

Unnamed: 0,payment_type_id,payment_type
0,1,Prcard
1,2,Credit Card
2,3,Cash
3,4,Mobile
4,5,Unknown
5,6,No Charge
6,7,Dispute
7,8,x
8,9,y


In [30]:
test_company_master = update_master(taxi_trips=taxi_trips_company_only, master=company_master, id_column="company_id", value_column="company")

In [32]:
test_company_master.tail()

Unnamed: 0,company_id,company
30,31,Tac - Yellow Non Color
31,32,Metro Jet Taxi A.
32,33,Petani Cab Corp
33,34,x
34,35,y


### update taxi_trips with the most recent company_master and payment_type master function

In [44]:
def update_taxi_trips_with_master_data(taxi_trips: pd.DataFrame, payment_type_master: pd.DataFrame, company_master: pd.DataFrame) -> pd.DataFrame:
    """Update the taxi_trips DataFrame with the company_master and payment_type_master ids, and delete the string columns.

    Parameters
    ----------
    taxi_trips : pd.DataFrame
        The DataFrame with the daily taxi trips.
    payment_type_master : pd.DataFrame
        The payment type master table.
    company_master : pd.DataFrame
        The company master table.

    Returns
    -------
    pd.DataFrame
        The taxi_trips data, with only payment_type id and company_id, without company or payment_type values.
    """
    taxi_trips_id = taxi_trips.merge(payment_type_master, on="payment_type")
    taxi_trips_id = taxi_trips_id.merge(company_master, on="company")

    taxi_trips_id.drop(["payment_type", "company"], axis=1, inplace=True)

    return taxi_trips_id

In [45]:
taxi_trips_id = update_taxi_trips_with_master_data(taxi_trips=taxi_trips, payment_type_master=payment_type_master, company_master=company_master)
taxi_trips_id.sample(5)

Unnamed: 0,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_community_area_id,dropoff_community_area_id,fare,tips,tolls,extras,trip_total,pickup_centroid_latitude,pickup_centroid_longitude,dropoff_centroid_latitude,dropoff_centroid_longitude,datetime_for_weather,payment_type_id,company_id
8290,7e151ba0c3d76599a118515c3942637bde1c1a36,74978c2926ee71122a47e2cc503ba65ba909b764c443af...,2024-06-12T15:45:00.000,2024-06-12T16:15:00.000,2388,14.47,22,76,37.85,8.33,0,0,46.18,41.92276062,-87.699155343,41.980264315,-87.913624596,2024-06-12 15:00:00,4,5
6847,a2a79a32a8f67a929ac498471b852573d3a572b2,515dbaaba624daeb95c3dfefb93bfc1764b99ed2ff96b7...,2024-06-12T16:45:00.000,2024-06-12T17:00:00.000,960,6.3,76,10,18.5,0.0,0,0,18.5,41.980264315,-87.913624596,41.985015101,-87.804532006,2024-06-12 16:00:00,3,6
20251,7198b6cf0d5cd110557a34991ec5f4b64fc63937,1b9ab0532482023ae1a4cd4364c4e40f33028c7f1a2b03...,2024-06-12T06:15:00.000,2024-06-12T07:00:00.000,2110,18.82,33,76,46.59,7.69,0,0,54.28,41.857183858,-87.620334624,41.980264315,-87.913624596,2024-06-12 06:00:00,4,11
9341,6a9e9362c0f82ffa346f1b4ba50b58be5cb31cbf,e931cc8f41df1e511815b487341e4896fb01f01b2f8fc6...,2024-06-12T15:00:00.000,2024-06-12T16:00:00.000,3180,12.7,56,32,39.75,8.8,0,4,52.55,41.785998518,-87.750934289,41.884987192,-87.620992913,2024-06-12 15:00:00,2,9
11142,1a8c7d8d9b469fa87ba3210607b357050e99add2,b689cccdd4fd4d537758da1055f25864f9780258416bfd...,2024-06-12T13:45:00.000,2024-06-12T14:00:00.000,297,0.6,28,32,5.25,2.0,0,1,8.75,41.879255084,-87.642648998,41.880994471,-87.632746489,2024-06-12 13:00:00,2,2


### weather transormations function

In [47]:
def transform_weather_data(weather_data: json) -> pd.DataFrame:
    """Make transformations on the daily weather api response.

    Parameters
    ----------
    weather_data : json
        The daily weather data from the Open Meteo API.

    Returns
    -------
    pd.DataFrame
        A DataFrame representation of the data.
    """
    weather_data_filtered = {
        "datetime": weather_data["hourly"]["time"],
        "temperature": weather_data["hourly"]["temperature_2m"],
        "wind_speed": weather_data["hourly"]["wind_speed_10m"],
        "rain": weather_data["hourly"]["rain"],
        "precipitation": weather_data["hourly"]["precipitation"]
    }

    weather_df = pd.DataFrame(weather_data_filtered)

    weather_df['datetime'] = pd.to_datetime(weather_df['datetime'])

    return weather_df

In [48]:
# Test

current_datetime = datetime.now() - relativedelta(months=2)

formatted_datetime = current_datetime.strftime("%Y-%m-%d")

url = "https://archive-api.open-meteo.com/v1/era5"

params = {
    "latitude": 41.85,
    "longitude": -87.65,
    "start_date": formatted_datetime,
    "end_date": formatted_datetime,
    "hourly": "temperature_2m,wind_speed_10m,rain,precipitation"
}

response = requests.get(url, params=params)

weather_data = response.json()

weather_data_df = transform_weather_data(weather_data)

In [49]:
weather_data_df.head()

Unnamed: 0,datetime,temperature,wind_speed,rain,precipitation
0,2024-06-12 00:00:00,22.2,13.9,0.0,0.0
1,2024-06-12 01:00:00,21.2,10.7,0.0,0.0
2,2024-06-12 02:00:00,20.7,8.7,0.0,0.0
3,2024-06-12 03:00:00,20.2,12.1,0.0,0.0
4,2024-06-12 04:00:00,19.3,11.5,0.0,0.0
