In [60]:
import pandas as pd
import os
import json
import requests
from datetime import datetime
from dateutil.relativedelta import relativedelta
pd.set_option("display.max_columns", 30)

In [None]:
"""

1. get the data from S3
2. weather data transformation
3. taxi trips transformation - DONE
4. update payment_type_master - DONE
5. update company_master - DONE
6. update taxi_trips with with company and payment_type ids(replace the string values with ids from the latest master tables) - DONE
7. upload weather data to S3
8. upload taxi data to S3
9. upload the newest payment_type_master and company_master to S3

"""

### Taxi trips transformation codes

In [None]:
current_datetime = datetime.now() - relativedelta(months=2)
formatted_datetime = current_datetime.strftime("%Y-%m-%d")
url = (
    f"https://data.cityofchicago.org/resource/ajtu-isnz.json?"
    f"$where=trip_start_timestamp >= '{formatted_datetime}T00:00:00' "
    f"AND trip_start_timestamp <= '{formatted_datetime}T23:59:59'&$limit=30000"
)
response = requests.get(url)
data = response.json()

In [None]:
taxi_trips = pd.DataFrame(data)

In [None]:
taxi_trips.drop(["pickup_census_tract", "dropoff_census_tract"], axis=1, inplace=True)
taxi_trips.drop(["pickup_centroid_location", "dropoff_centroid_location"], axis=1, inplace=True)

taxi_trips.dropna(inplace=True)

taxi_trips.rename(columns={"pickup_community_area": "pickup_community_area_id", 
                           "dropoff_community_area": "dropoff_community_area_id"},
                  inplace=True)

taxi_trips["datetime_for_weather"] = pd.to_datetime(taxi_trips["trip_start_timestamp"])

In [None]:
taxi_trips.head(5)

### Taxi trips transformation function

In [None]:
def taxi_trips_transformations(taxi_trips: pd.DataFrame) -> pd.DataFrame:
    """Perform transformations with the taxi data

    Args:
        taxi_trips (pd.DataFrame): The data frame holding the daily taxi trips.

    Returns:
        pd.DataFrame: The cleaned, transformed data frame holding the daily taxi trips.
    """
    if not isinstance (taxi_trips, pd.DataFrame):
        raise TypeError ("taxi_trips is not a valid pandas DataFrame.")
    
    taxi_trips.drop(["pickup_census_tract", "dropoff_census_tract"], axis=1, inplace=True)
    taxi_trips.drop(["pickup_centroid_location", "dropoff_centroid_location"], axis=1, inplace=True)
    
    taxi_trips.dropna(inplace=True)
    
    taxi_trips.rename(columns={"pickup_community_area": "pickup_community_area_id", 
        "dropoff_community_area": "dropoff_community_area_id"}, inplace=True)
    
    taxi_trips["datetime_for_weather"] = pd.to_datetime(taxi_trips["trip_start_timestamp"])
    
    return taxi_trips

#### Company code updates

In [None]:
company_master = taxi_trips["company"].drop_duplicates().reset_index(drop=True)
company_master = pd.DataFrame(
    {
        "company_id": range(1, len(company_master) + 1),
        "company": company_master
        
    }
    
)

company_master

In [None]:
new_company_data = [
{"company": "3556 - 36214 RC Andrews Cab"},
{"company": "x"},
{"company": "y"}
]

new_company_mapping = pd.DataFrame(new_company_data)

new_company_mapping

In [None]:
company_max_id_value = company_master["company_id"].max()
company_max_id_value

In [None]:
new_companies_list = []

for company in new_company_mapping["company"].values:
    if company not in company_master["company"].values:
        new_companies_list.append(company)
new_companies_list

In [None]:
new_companies_df = pd.DataFrame({
    "company_id": range(company_max_id_value + 1, company_max_id_value + len(new_companies_list) + 1),
    "company": new_companies_list
    })

In [None]:
new_companies_df

In [None]:
updated_company_master = pd.concat([company_master, new_companies_df], ignore_index = True)
updated_company_master.tail()

In [None]:
def update_company_master(taxi_trips: pd.DataFrame, company_master: pd.DataFrame) -> pd.DataFrame:
    """Extend the company master with new companies if there are new companies.

    Args:
        taxi_trips (pd.DataFrame): Dataframe holding the daily taxi trips.
        company_master (pd.DataFrame): Dataframe holding the company_master data .

    Returns:
        pd.DataFrame: The updated company master data, if new companyies are in the taxi data, they will be loaded to it.
    """
    company_max_id_value = company_master["company_id"].max()
    
    new_companies_list = []
    for company in taxi_trips["company"].values:
        if company not in company_master["company"].values:
            new_companies_list.append(company)
            
    new_companies_df = pd.DataFrame({
    "company_id": range(company_max_id_value + 1, company_max_id_value + len(new_companies_list) + 1),
    "company": new_companies_list
    })
    
    updated_company_master = pd.concat([company_master, new_companies_df], ignore_index = True)
    updated_company_master.tail()
    
    return updated_company_master


In [None]:
taxi_trips_company_only = pd.DataFrame({
    "company_id":[1, 2, 3],
    "company":["6574 - Babylon Express Inc.", "X", "Y"]
})

taxi_trips_company_only

In [None]:
updated_company_master = update_company_master(taxi_trips=taxi_trips_company_only, company_master=company_master)

In [None]:
update_company_master.tail()

#### payment_type_master codes

In [39]:
payment_type_master = taxi_trips["payment_type"].drop_duplicates().reset_index(drop=True)
payment_type_master = pd.DataFrame(
    {
        "payment_type_id": range(1, len(payment_type_master) + 1),
        "payment_type": payment_type_master
        
    }
    
)

taxi_trips_payment_type_only = pd.DataFrame({
    "payment_type_id":[1, 2, 3],
    "payment_type":["Credit Card", "X", "Y"]
})

taxi_trips_payment_type_only


Unnamed: 0,payment_type_id,payment_type
0,1,Credit Card
1,2,X
2,3,Y


In [None]:
def update_payment_type_master(taxi_trips: pd.DataFrame, payment_type_master: pd.DataFrame) -> pd.DataFrame:
    """Extend the payment type master with new payment types if there are payment types.

    Args:
        taxi_trips (pd.DataFrame): Dataframe holding the daily taxi trips.
        payment_type_master (pd.DataFrame): Dataframe holding the payment_type data .

    Returns:
        pd.DataFrame: The updated payment type master data, if new payments are in the taxi data, they will be loaded to it.
    """
    payment_type_max_id_value = payment_type_master["payment_type_id"].max()
    
    new_payment_types_list = []
    for payment_type in taxi_trips["payment_type"].values:
        if payment_type not in payment_type_master["payment_type"].values:
            new_payment_types_list.append(payment_type)
            
    new_payment_type_df = pd.DataFrame({
    "payment_type_id": range(payment_type_max_id_value + 1, payment_type_max_id_value + len(new_payment_types_list) + 1),
    "payment_type": new_payment_types_list
    })
    
    updated_payment_type_master = pd.concat([payment_type_master, new_payment_type_df], ignore_index = True)
    
    return updated_payment_type_master


In [40]:
updated_payment_type_master = update_payment_type_master(taxi_trips=taxi_trips_payment_type_only, payment_type_master=payment_type_master)

In [41]:
updated_payment_type_master

Unnamed: 0,payment_type_id,payment_type
0,1,Cash
1,2,Credit Card
2,3,Prcard
3,4,Mobile
4,5,Unknown
5,6,No Charge
6,7,Dispute
7,8,X
8,9,Y


## Creating a generic master table  function

In [None]:
def update_master(taxi_trips: pd.DataFrame, master: pd.DataFrame, id_column: str, value_column: str ) -> pd.DataFrame:
    """Extend the payment type master with master value.

    Args:
        taxi_trips (pd.DataFrame): Dataframe holding the daily taxi trips.
        master (pd.DataFrame): Dataframe holding the payment_type data .
        id_column : str
        The id column of the master dataframe.
        value_column : str
        Name of the in the master_df contining the values.
    Returns:
        pd.DataFrame: The updated  master data if new values are in the taxi data, they will be loaded to it.
    """
    max_id_value = master[id_column].max()
    
    new_values_list = []
    for value in taxi_trips[value_column].values:
        if value not in master[value_column].values:
            new_values_list.append(value)
            
    new_values_df = pd.DataFrame({
    id_column: range(max_id_value + 1, max_id_value + len(new_values_list) + 1),
    value_column: new_values_list
    })
    
    updated_master = pd.concat([master, new_values_df], ignore_index = True)
    
    return updated_master


In [46]:
test_payment_type_master = update_master(taxi_trips=taxi_trips_payment_type_only, master=payment_type_master, id_column="payment_type_id", value_column="payment_type")

In [47]:
test_payment_type_master

Unnamed: 0,payment_type_id,payment_type
0,1,Cash
1,2,Credit Card
2,3,Prcard
3,4,Mobile
4,5,Unknown
5,6,No Charge
6,7,Dispute
7,8,X
8,9,Y


In [48]:
test_company_master = update_master(taxi_trips=taxi_trips_company_only, master=company_master, id_column="company_id", value_column="company")

In [None]:
test_company_master

### update taxi_trips with the most recent payment_type_master and company_master_codes

In [62]:
def update_taxi_trips_with_master_data (taxi_trips: pd.DataFrame,payment_type_master: pd.DataFrame, company_master: pd.DataFrame) -> pd.DataFrame:
    """updathe the taxi_trips Dataframe with the company_master and payment_type_master ids, and delete the string columns.

    Args:
        taxi_trips (pd.DataFrame): Dataframe holding the daily taxi trips.
        payment_type_master (pd.DataFrame): payment master table.
        company_master (pd.DataFrame): company master table.
    Returns:
        pd.DataFrame: The taxi trips data, with only payment type id and company id, without payment_type and company values.
    """
    taxi_trips_id = taxi_trips.merge(payment_type_master, on = "payment_type")
    taxi_trips_id = taxi_trips_id.merge(company_master, on = "company")

    taxi_trips_id.drop(["payment_type","company"], axis = 1, inplace = True)
    
    return taxi_trips_id

In [None]:
taxi_trips_id = update_taxi_trips_with_master_data(taxi_trips=taxi_trips, payment_type_master=payment_type_master, company_master=company_master)

taxi_trips_id.sample(5)

### Weather transformation function

In [64]:
def transform_weather_data(weather_data: json) -> pd.DataFrame:
    """Make transformations on the daily API response

    Args:
        weather_data (json): The daily weather data from open meteo API.

    Returns:
        pd.DataFrame: The dataframe representation of the weather data.
    """
    weather_data_filtered = {
        "datetime": weather_data["hourly"]["time"],
        "tempretaure": weather_data["hourly"]["temperature_2m"],
        "wind_speed": weather_data["hourly"]["wind_speed_10m"],
        "rain": weather_data["hourly"]["rain"],
        "precipitation": weather_data["hourly"]["precipitation"],
    } 

    weather_df = pd.DataFrame(weather_data_filtered)
    weather_df["datetime"] = pd.to_datetime(weather_df["datetime"])

    return weather_df


In [65]:
#test

current_datetime = datetime.now() - relativedelta(months=2)
formatted_datetime = current_datetime.strftime("%Y-%m-%d")
url = "https://archive-api.open-meteo.com/v1/era5"
params = {
    "latitude": 41.85,
    "longitude": -87.65,
    "start_date": formatted_datetime,
    "end_date": formatted_datetime,
    "hourly": "temperature_2m,wind_speed_10m,rain,precipitation"
}
response = requests.get(url, params=params)
weather_data = response.json()
weather_data_df = transform_weather_data(weather_data)



In [66]:
weather_data_df.head

<bound method NDFrame.head of               datetime  tempretaure  wind_speed  rain  precipitation
0  2025-01-23 00:00:00         -5.2        15.6   0.0            0.0
1  2025-01-23 01:00:00         -5.5        14.5   0.0            0.0
2  2025-01-23 02:00:00         -5.4        15.7   0.0            0.0
3  2025-01-23 03:00:00         -5.4        14.7   0.0            0.1
4  2025-01-23 04:00:00         -5.5        13.7   0.0            0.1
5  2025-01-23 05:00:00         -5.6        13.1   0.0            0.2
6  2025-01-23 06:00:00         -5.7        13.2   0.0            0.2
7  2025-01-23 07:00:00         -5.7        13.4   0.0            0.1
8  2025-01-23 08:00:00         -5.8        13.0   0.0            0.1
9  2025-01-23 09:00:00         -5.8        12.6   0.0            0.0
10 2025-01-23 10:00:00         -5.9        13.0   0.0            0.0
11 2025-01-23 11:00:00         -5.2        12.7   0.0            0.1
12 2025-01-23 12:00:00         -5.0        16.2   0.0            0.1
13 2