In [2]:
import os
from datetime import datetime
from dateutil.relativedelta import relativedelta

import json
import pandas as pd
pd.set_option("display.max_columns", 30)

import requests

In [3]:
"""
1. get the data from S3
2. weather date transformations
3. taxi_trips transformations
4. update payment_type_master
5. update company_master
6. update taxi_trips with company and payment type ids (replace the string values with ids from latest master tables)
7. upload weather date to s3
8. upload taxi date to s3
9. upload the newest payment tyope master and company master date to s3 as well
"""

'\n1. get the data from S3\n2. weather date transformations\n3. taxi_trips transformations\n4. update payment_type_master\n5. update company_master\n6. update taxi_trips with company and payment type ids (replace the string values with ids from latest master tables)\n7. upload weather date to s3\n8. upload taxi date to s3\n9. upload the newest payment tyope master and company master date to s3 as well\n'

## taxi trips transformation codes

In [4]:
#extract T-2 month's Chicaho taxi trips data.

current_date_time = datetime.now() - relativedelta(months=2)
formated_datetime = current_date_time.strftime("%Y-%m-%d")

url = f"https://data.cityofchicago.org/resource/ajtu-isnz.json?$where=trip_start_timestamp >= '{formated_datetime}T00:00:00' AND trip_start_timestamp <= '{formated_datetime}T23:59:59'&$limit=30000"

headers = {"X-App-Token" : os.environ.get("CHICAGO_API_TOKEN")}

response = requests.get(url)

data = response.json()

In [5]:
taxi_trips = pd.DataFrame(data)
taxi_trips.sample(1)

Unnamed: 0,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_community_area,fare,tips,tolls,extras,trip_total,payment_type,company,pickup_centroid_latitude,pickup_centroid_longitude,pickup_centroid_location,dropoff_community_area,dropoff_centroid_latitude,dropoff_centroid_longitude,dropoff_centroid_location,pickup_census_tract,dropoff_census_tract
10198,70e7069e9f69909fcab8e61a5012b694f4c92389,476c0017d13687fac2a4fc4c45c9dd1d8a1fb3dfb0da84...,2025-01-08T11:30:00.000,2025-01-08T11:30:00.000,447,1.04,8,9,2.88,0,0,11.88,Mobile,City Service,41.898331794,-87.620762865,"{'type': 'Point', 'coordinates': [-87.62076286...",8,41.892507781,-87.626214906,"{'type': 'Point', 'coordinates': [-87.62621490...",17031081300,17031081500


In [6]:
# sensus columns do not contain real data, so deleted 
taxi_trips.drop(["pickup_census_tract", "dropoff_census_tract"], axis=1, inplace=True)
taxi_trips.drop(["dropoff_centroid_location", "pickup_centroid_location"], axis=1, inplace=True)

# drop the nan values
taxi_trips.dropna(inplace=True)

#rename columns
taxi_trips.rename(columns={"pickup_community_area" : "pickup_community_area_id",
                          "dropoff_community_area" : "dropoff_community_area_id"}, inplace=True)

#create the weather helper column
taxi_trips["datetime_for_weather"] = pd.to_datetime(taxi_trips["trip_start_timestamp"]).dt.floor("h")
# rounding to hour
#taxi_trips["datetime_for_weather"] = taxi_trips["datetime_for_weather"].dt.floor("h")

In [7]:
taxi_trips.sample(3)

Unnamed: 0,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_community_area_id,fare,tips,tolls,extras,trip_total,payment_type,company,pickup_centroid_latitude,pickup_centroid_longitude,dropoff_community_area_id,dropoff_centroid_latitude,dropoff_centroid_longitude,datetime_for_weather
9644,a80f88780b38468b8b565013aeb7e11652432d7a,6c6606251e8d2b1609f34d755bf884c4d972ab44b47bd7...,2025-01-08T12:00:00.000,2025-01-08T12:00:00.000,225,0.76,32,5.25,0,0,0,5.25,Cash,Flash Cab,41.880994471,-87.632746489,8,41.892042136,-87.63186395,2025-01-08 12:00:00
5257,14bda77cfe44c06977814821a4ee78486c8b22e1,66d3807bc1b0bcf7c3b90c19f7c13356cdd60995199804...,2025-01-08T16:15:00.000,2025-01-08T16:45:00.000,1513,2.99,12,14.5,0,0,0,14.5,Prcard,Flash Cab,41.993930128,-87.758353588,16,41.953582125,-87.72345239,2025-01-08 16:00:00
15158,9aa8c5cf302e59727c2ce0167a3f5af72e768f28,16a233f62883c48f7462a0d5b87191190c49a46fe52f37...,2025-01-08T05:30:00.000,2025-01-08T05:45:00.000,741,11.68,76,29.5,0,0,0,29.5,Prcard,Flash Cab,41.980264315,-87.913624596,21,41.938666196,-87.711210593,2025-01-08 05:00:00


#### taxi trips tranformation function

In [8]:
def taxi_trips_transformation(taxi_trips: pd.DataFrame) -> pd.DataFrame:
    """Performes transformation with the taxi data.

    Parameters
    ----------
    taxi trips: pd.DataFrame
        Tha DataFrame holding the daily taxi trips

    Returns
    -------
    pd.DataFrame
        The cleand transformd DataFrame holdiing the daily taxi trips 

    """

    if not isinstance(taxi_trips, pd.DataFrame):
        raise TypeError("taxi trips is not a valid pandas DateFrame.")
    
    taxi_trips.drop(["pickup_census_tract", "dropoff_census_tract", "dropoff_centroid_location",
                      "pickup_centroid_location"], axis=1, inplace=True)

    taxi_trips.dropna(inplace=True)

    taxi_trips.rename(columns={"pickup_community_area" : "pickup_community_area_id",
                            "dropoff_community_area" : "dropoff_community_area_id"}, inplace=True)

    taxi_trips["datetime_for_weather"] = pd.to_datetime(taxi_trips["trip_start_timestamp"]).dt.floor("h")

    return taxi_trips

    

#### Company code updates

In [9]:
company_master = taxi_trips["company"].drop_duplicates().reset_index(drop=True)

company_master = pd.DataFrame(
    {
    "company_id" : range(1, len(company_master)+1),
    "company" : company_master
    }
)
company_master.tail(2)

Unnamed: 0,company_id,company
33,34,Petani Cab Corp
34,35,Metro Jet Taxi A.


In [10]:
new_company_data =[
    {"company" : "Tac - Blue Diamond Dispatch"},
    {"company" : "X"},
    {"company" :  "Y"}
]

new_company_mapping = pd.DataFrame(new_company_data)
new_company_mapping

Unnamed: 0,company
0,Tac - Blue Diamond Dispatch
1,X
2,Y


In [11]:
company_max_id = company_master["company_id"].max()
company_max_id

np.int64(35)

In [12]:
new_company_list = []

for company in new_company_mapping["company"].values:
    if company not in company_master["company"].values:
        new_company_list.append(company)

#one line
new_company_list_one_line= [company for company in new_company_mapping["company"].values if company not in company_master["company"].values]

new_company_list_one_line

['X', 'Y']

In [13]:
new_companies_df = pd.DataFrame({
    "company_id": range(company_max_id + 1, company_max_id + len(new_company_list)+1),
    "company" : new_company_list
    })
new_companies_df

Unnamed: 0,company_id,company
0,36,X
1,37,Y


In [14]:
updated_company_master = pd.concat([company_master, new_companies_df], ignore_index=True)
updated_company_master.tail(5)

Unnamed: 0,company_id,company
32,33,U Taxicab
33,34,Petani Cab Corp
34,35,Metro Jet Taxi A.
35,36,X
36,37,Y


In [15]:
def updated_company_master(taxi_trips: pd.DataFrame, company_master: pd.DataFrame) -> pd.DataFrame:
    """Ecxtend the the company master with the new companies if there are new companies
   
    Parameters
    ----------
    taxi trips: pd.DataFrame
        DateFrame holding the daily taxi trips
    company_master: pd.DataFrame
        DateFrame holding the company masters data
    Returns
    -------
    pd.DataFrame
        the updated company masters data, if new companies are in the taxi data, they will be loaded to it
        
    
    
    """
    company_max_id = company_master["company_id"].max()
   
    new_company_list= [company for company in taxi_trips["company"].values if company not in company_master["company"].values]

    new_companies_df = pd.DataFrame({
    "company_id": range(company_max_id + 1, company_max_id + len(new_company_list)+1),
    "company" : new_company_list
    })

    updated_company_master = pd.concat([company_master, new_companies_df], ignore_index=True)

    return updated_company_master


In [16]:
taxi_trips_company_only = pd.DataFrame({
    "company_id" : [1, 2, 3],
    "company" : ["Tac - Yellow Non Color", "x","y"]
})
taxi_trips_company_only 

Unnamed: 0,company_id,company
0,1,Tac - Yellow Non Color
1,2,x
2,3,y


In [17]:
updated_company_master = updated_company_master(taxi_trips=taxi_trips_company_only, company_master=company_master)


In [18]:
updated_company_master.tail(3)

Unnamed: 0,company_id,company
34,35,Metro Jet Taxi A.
35,36,x
36,37,y


### payment type master code

In [19]:
payment_type_master = taxi_trips["payment_type"].drop_duplicates().reset_index(drop=True)

payment_type_master = pd.DataFrame(
    {
    "payment_type_id" : range(1, len(payment_type_master)+1),
    "payment_type" : payment_type_master
    }
)

taxi_trips_payment_type_only = pd.DataFrame({
    "payment_type_id" : [1, 2, 3],
    "payment_type" : ["Cash", "x","y"]
})

In [20]:
taxi_trips_payment_type_only 

Unnamed: 0,payment_type_id,payment_type
0,1,Cash
1,2,x
2,3,y


In [21]:
def updated_payment_type_master(taxi_trips: pd.DataFrame, payment_type_master: pd.DataFrame) -> pd.DataFrame:
    """Ecxtend the the payment_type master with the new payment_type if there are new payment_types
   
    Parameters
    ----------
    taxi trips: pd.DataFrame
        DateFrame holding the daily taxi trips
    payment_type_master: pd.DataFrame
        DateFrame holding the payment_type masters data
    Returns
    -------
    pd.DataFrame
        the updated payment_type masters data, if new companies are in the taxi data, they will be loaded to it
        
    
    
    """
    payment_type_max_id = payment_type_master["payment_type_id"].max()
   
    new_payment_types_list= [payment_type for payment_type in taxi_trips["payment_type"].values if payment_type not in payment_type_master["payment_type"].values]

    new_payment_types_df = pd.DataFrame({
    "payment_type_id": range(payment_type_max_id + 1, payment_type_max_id + len(new_payment_types_list)+1),
    "payment_type" : new_payment_types_list
    })

    updated_payment_type_master = pd.concat([payment_type_master, new_payment_types_df], ignore_index=True)

    return updated_payment_type_master

In [22]:
updated_payment_type_master = updated_payment_type_master(taxi_trips=taxi_trips_payment_type_only, payment_type_master=payment_type_master)

In [23]:
updated_payment_type_master

Unnamed: 0,payment_type_id,payment_type
0,1,Credit Card
1,2,Prcard
2,3,Cash
3,4,Mobile
4,5,Unknown
5,6,Dispute
6,7,No Charge
7,8,x
8,9,y


### creating a generic update master table function 

In [24]:
def updated_master(taxi_trips: pd.DataFrame, master: pd.DataFrame, id_column: str, value_column: str) -> pd.DataFrame:
    """Ecxtend the master DateFrame with the new values if there are new any
   
    Parameters
    ----------
    taxi trips: pd.DataFrame
        DateFrame holding the daily taxi trips
    master: pd.DataFrame
        DateFrame holding the masters data
    id_column: str
        The id column of the master DataFrame
    value_column
        Name of the column in master_df containing the values

    Returns
    -------
    pd.DataFrame
        the updated  masters data, if new values are in the taxi data, they will be loaded to it
        
    
    
    """
    max_id = master[id_column].max()
   
    new_values_list= [value for value in taxi_trips[value_column].values if value not in master[value_column].values]

    value_column_df = pd.DataFrame({
    id_column: range(max_id + 1, max_id + len(new_values_list)+1),
    value_column : new_values_list
    })

    updated_master = pd.concat([master, value_column_df ], ignore_index=True)

    return updated_master

In [25]:
test_payment_type_master = updated_master(taxi_trips=taxi_trips_payment_type_only, master=payment_type_master,
                                          id_column="payment_type_id", value_column="payment_type")

In [26]:
test_payment_type_master

Unnamed: 0,payment_type_id,payment_type
0,1,Credit Card
1,2,Prcard
2,3,Cash
3,4,Mobile
4,5,Unknown
5,6,Dispute
6,7,No Charge
7,8,x
8,9,y


In [27]:
test_company_master = updated_master(taxi_trips=taxi_trips_company_only, master=company_master,
                                          id_column="company_id", value_column="company")

In [28]:
test_company_master.tail(5)

Unnamed: 0,company_id,company
32,33,U Taxicab
33,34,Petani Cab Corp
34,35,Metro Jet Taxi A.
35,36,x
36,37,y


### update taxi trips with the most recent company master and payment type function

In [29]:
taxi_trips.head()

Unnamed: 0,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_community_area_id,fare,tips,tolls,extras,trip_total,payment_type,company,pickup_centroid_latitude,pickup_centroid_longitude,dropoff_community_area_id,dropoff_centroid_latitude,dropoff_centroid_longitude,datetime_for_weather
1,01bb9827e01a9a996c0f016ecf612e5932d0553b,91a036e3de3c1f459f2664801ea9388870751ae14e196e...,2025-01-08T23:45:00.000,2025-01-09T00:00:00.000,1033,12.71,56,32.25,11.02,0,4,47.77,Credit Card,City Service,41.79259236,-87.769615453,24,41.901206994,-87.676355989,2025-01-08 23:00:00
2,0bd7c09d4d3d7d3c0d0139e79e27e763a846dcb4,7f9134f3f8ee8c217f4fc14b7beb8d56af92f9faf4d849...,2025-01-08T23:45:00.000,2025-01-09T00:15:00.000,1378,13.18,1,33.5,0.0,0,0,33.5,Prcard,Flash Cab,42.009622881,-87.670166857,28,41.874005383,-87.66351755,2025-01-08 23:00:00
3,32e3af588af3ddf8526ce088b1aae9b55d42547c,4cebb9edbffeb3a0eace8cccef967730a62f5a978869e2...,2025-01-08T23:45:00.000,2025-01-09T00:15:00.000,1388,18.46,76,45.25,0.0,0,7,52.25,Cash,Flash Cab,41.980264315,-87.913624596,32,41.878865584,-87.625192142,2025-01-08 23:00:00
4,33f1c10b2026ce24b6d938c80ddf22905d01c851,13016372e777da1289d557edbe4ce2be8a68e77bc64768...,2025-01-08T23:45:00.000,2025-01-09T00:00:00.000,383,0.94,8,6.79,0.0,0,0,6.79,Mobile,Taxicab Insurance Agency Llc,41.899602111,-87.633308037,8,41.899602111,-87.633308037,2025-01-08 23:00:00
5,40829bb16444d5838ea730d8ba86aec580837014,3835dded237bfc6b12d2181ec37148bd8385590f81597f...,2025-01-08T23:45:00.000,2025-01-08T23:45:00.000,193,0.9,8,6.77,0.0,0,0,6.77,Mobile,Blue Ribbon Taxi Association,41.899602111,-87.633308037,8,41.899602111,-87.633308037,2025-01-08 23:00:00


In [30]:
def update_taxi_trips_with_master_data(taxi_trips: pd.DataFrame, payment_type_master: pd.DataFrame, company_master: pd.DataFrame) ->pd.DataFrame:
    """Update the taxi trips DataFrame with company master ID and payment_type master ID and delete the string columns

    Parameters
    ----------
    taxi_trips : pd.DataFrame
        The DataFrame with the daily taxi trips,
    payment_type_master : pd.DataFrame
        The payment type master table
    company_master : pd.DataFrame
        The company type master table

    Returns
    -------
    pd.DataFrame
        The taxi trips data, with only payment type id and company id, without company or payment type values
    """
    
    taxi_trips_id = taxi_trips.merge(payment_type_master, on="payment_type")
    taxi_trips_id = taxi_trips_id.merge(company_master, on="company")
    
    taxi_trips_id.drop(["payment_type", "company"] ,axis=1, inplace=True)

    return taxi_trips_id 

In [31]:
taxi_trips_id = update_taxi_trips_with_master_data(taxi_trips=taxi_trips, payment_type_master= payment_type_master, company_master= company_master)

In [32]:
taxi_trips_id.sample(5)

Unnamed: 0,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_community_area_id,fare,tips,tolls,extras,trip_total,pickup_centroid_latitude,pickup_centroid_longitude,dropoff_community_area_id,dropoff_centroid_latitude,dropoff_centroid_longitude,datetime_for_weather,payment_type_id,company_id
10081,759e0d204df777100cf08094af66a4a1826e6f5f,25aa142281b3f0fb607daf214eea6509f2cc36e66c10b8...,2025-01-08T10:15:00.000,2025-01-08T10:30:00.000,413,1.05,8,6.5,1.0,0,0.0,8.0,41.89321636,-87.63784421,28,41.88528132,-87.6572332,2025-01-08 10:00:00,1,1
10990,c069f859b6265f8feccdfc281721e068080f64ae,14ba4e86beb4e185e52c5f3d95e680f46e5932a0c5553b...,2025-01-08T09:15:00.000,2025-01-08T09:30:00.000,225,1.0,32,5.5,3.0,0,0.0,9.0,41.884987192,-87.620992913,32,41.877406123,-87.621971652,2025-01-08 09:00:00,1,15
8895,117f516599289641e2c975c43f5fab96b036a1d3,73ffbd5c5ca98e3d17567b8253a7f72a3b1f3b26d82747...,2025-01-08T11:45:00.000,2025-01-08T12:00:00.000,437,1.49,8,7.25,2.0,0,1.0,10.75,41.892507781,-87.626214906,32,41.880994471,-87.632746489,2025-01-08 11:00:00,1,1
455,94392ea4845ef4c0cb54d7f315f09856bc669045,0fdb568ec3259f133b145bbd37341939cdf524b5380df5...,2025-01-08T21:45:00.000,2025-01-08T22:00:00.000,1308,16.72,76,41.75,7.01,0,4.5,53.76,41.980264315,-87.913624596,8,41.899602111,-87.633308037,2025-01-08 21:00:00,1,8
5061,89fb9e80baae51ac3f385a4de9c1c3bfe39c7f23,8ff96b8befe47605908893fbb12cc9a4845f2b6f9bfc38...,2025-01-08T15:45:00.000,2025-01-08T15:45:00.000,0,0.0,56,45.75,0.0,0,0.0,45.75,41.785998518,-87.750934289,56,41.785998518,-87.750934289,2025-01-08 15:00:00,1,7


### Weather transformation function 

In [33]:
def transform_weather_data(weather_data: json) ->pd.DataFrame:
    """Make transformation on the daily weather api response.

    Parameters
    ----------
    weather_data : json
         daily weather data from the Open Meteo API.

    Returns
    -------
    pd.DataFrame
        A DateFrame representation of the data
    """
    weather_data_filtered = {
        "datetime" : weather_data["hourly"]["time"],
        "temperature" : weather_data["hourly"]["temperature_2m"],
        "wind_speed" : weather_data["hourly"]["wind_speed_10m"],
        "rain" : weather_data["hourly"]["rain"],
        "precipitation" : weather_data["hourly"]["precipitation"]
    }

    weather_df = pd.DataFrame(weather_data_filtered)

    weather_df["datetime"] = pd.to_datetime(weather_df["datetime"])
    
    return weather_df

In [34]:
# test
#extract part:

current_date_time = datetime.now() - relativedelta(months=2)
formated_datetime = current_date_time.strftime("%Y-%m-%d")

url = "https://archive-api.open-meteo.com/v1/era5"


params = {
    "latitude" : 41.85,
    "longitude" : -87.65,
    "start_date" : formated_datetime,
    "end_date" : formated_datetime,
    "hourly" : "temperature_2m,wind_speed_10m,rain,precipitation"
}
response = requests.get(url, params=params)

weather_data= response.json()

In [35]:
weather_data_df= transform_weather_data(weather_data)

In [36]:
weather_data_df.head()

Unnamed: 0,datetime,temperature,wind_speed,rain,precipitation
0,2025-01-08 00:00:00,-5.7,2.5,0.0,0.0
1,2025-01-08 01:00:00,-5.6,3.8,0.0,0.0
2,2025-01-08 02:00:00,-5.6,6.3,0.0,0.0
3,2025-01-08 03:00:00,-5.9,10.1,0.0,0.0
4,2025-01-08 04:00:00,-6.3,10.4,0.0,0.0
