In [None]:
from datetime import datetime   
from dateutil.relativedelta import relativedelta

from typing import Dict 

import pandas as pd
pd.set_option("display.max_columns", 30) 
import requests

In [None]:
""""
Steps for the Transform Load Lambda function
1. Get data from S3 (taxi, weather)
2. Weather data transformations -  DONE
3. Taxi data transformations  -  DONE
4. Update dim_payment_type   - DONE
5. Update dim_company  - DONE
6. Update fact_taxi_trips with the ids from the dim_payment_type and dim_company - DONE 
7. Upload dim_weather to S3
8. Upload fact_taxi_trips to S3
9. Upload dim_payment_type and dim_company (current, and previous version)

"""

In [None]:
current_datetime = datetime.now() - relativedelta(months=2)
formatted_datetime = current_datetime.strftime("%Y-%m-%d")

url = ( 
    f"https://data.cityofchicago.org/resource/ajtu-isnz.json?"
    f"$where=trip_start_timestamp >= '{formatted_datetime}T00:00:00' " 
    f"AND trip_start_timestamp <= '{formatted_datetime}T23:59:59'"
    f"&$limit=30000"
)
response = requests.get(url)
data = response.json()

len(data)  # Number of taxi trips on February 16, 2024

taxi_trips = pd.DataFrame(data)
taxi_trips.head()

#### Taxi data transformations

In [None]:
def taxi_trips_transformations(taxi_trips: pd.DataFrame) -> pd.DataFrame:
    """ Perform transformations on the taxi data.

    1.Drop selected columns
    2.Drop NULL values across all columns
    3.Rename selected columns
    4.Create "datetime_for_weather" helper column (for dim_weather join) 

    :param taxi_trips: _The DataFrame holding the daily taxi trips.
    :return: Transformed taxi trips DataFrame.
    """

    if not isinstance(taxi_trips, pd.DataFrame):
        raise ValueError("Taxi trips is not a valid DataFrame.")


    taxi_trips.drop(["pickup_census_tract", "dropoff_census_tract", 
                     "pickup_centroid_location", "dropoff_centroid_location"], axis=1, inplace=True,  errors="ignore" )
    taxi_trips.dropna(inplace=True)

    taxi_trips.rename(columns={"pickup_community_area": "pickup_community_area_id",
                            "dropoff_community_area": "dropoff_community_area_id"}, inplace=True)
    
    taxi_trips['trip_start_timestamp'] = pd.to_datetime(taxi_trips['trip_start_timestamp'])
    taxi_trips["datetime_for_weather"] = taxi_trips["trip_start_timestamp"].dt.floor("h")

    return taxi_trips

In [None]:
taxi_trips_transformed = taxi_trips_transformations(taxi_trips)

taxi_trips_transformed.head()

In [None]:
taxi_trips_transformed.info()

#### Dim Compyna and Dim Payment Type update

In [None]:
def update_dim_company_dim_payment_type(taxi_trips: pd.DataFrame, dim_df: pd.DataFrame, id_col: str, value_col= str) -> pd.DataFrame:
    """Extend the dimension DataFrame with new values if there are any.

    :param taxi_trips:      DataFrame with the daily taxi trips.
    :param dim_df:          DataFrame with the dimension data (company, payment type).
    :param id_col:          The id column of the dimension DataFrame.
    :param value_col:       Name of the column in dimension DataFrame containing the values.
    :return:                The updated dimension data, if new values are in the taxi data, they will be loaded to it. 
    """


    todays_dim_data = pd.DataFrame(taxi_trips[value_col].unique(), columns=[value_col])
    new_dim_data = todays_dim_data[~todays_dim_data[value_col].isin(dim_df[value_col])]

    if not new_dim_data.empty:
    
        max_id = dim_df[id_col].max() 
        new_dim_data[id_col] = range(max_id + 1, max_id + 1 + len(new_dim_data))
        dim_df = pd.concat([dim_df, new_dim_data], ignore_index=True)
    
    return dim_df


In [None]:
dim_payment_type = taxi_trips["payment_type"].drop_duplicates().reset_index(drop=True)

dim_payment_type = pd.DataFrame(
    {
    "payment_type_id": range(1, len(dim_payment_type) + 1),
    "payment_type": dim_payment_type
    }
)

dim_company = taxi_trips["company"].drop_duplicates().reset_index(drop=True)

dim_company = pd.DataFrame(
    {
    "company_id": range(1, len(dim_company) + 1),
    "company": dim_company
    }
)

In [None]:
dim_payment_type_updated = update_dim_company_dim_payment_type(taxi_trips, dim_payment_type, "payment_type_id", "payment_type")
dim_company_updated = update_dim_company_dim_payment_type(taxi_trips, dim_company, "company_id", "company")


In [None]:
dim_payment_type_updated


In [None]:
dim_company_updated

#### Update fact_taxi_trips with company and payment_type ids

In [None]:
def update_fact_taxi_trips_with_dimension_data(taxi_trips: pd.DataFrame, dim_payment_type: pd.DataFrame, dim_company: pd.DataFrame) -> pd.DataFrame:
    """ Update the fact_taxi_trips DataFrame with the ids from the dim_payment_type and dim_company DataFrames.

    :param taxi_trips:          The DataFrame with the daily taxi trips.
    :param dim_payment_type:    The payment type dimension table.
    :param dim_company:         The company dimension table.
    :return:                    The taxi trips data, with only payment_type_id and company_id, without company and payment_type values.
    """
    
    fact_taxi_trips = taxi_trips.merge(dim_payment_type, on = "payment_type")
    fact_taxi_trips = fact_taxi_trips.merge(dim_company, on = "company")
    fact_taxi_trips.drop(["payment_type", "company"], axis=1, inplace=True)

    return fact_taxi_trips

In [None]:
taxi_trips_transformed_with_dim_ids = update_fact_taxi_trips_with_dimension_data(taxi_trips_transformed, dim_payment_type, dim_company)
taxi_trips_transformed_with_dim_ids.head()

#### Weather transformations


In [None]:
def transform_weather(weather_data: dict) -> pd.DataFrame:
    """ Select and transform weather data.

    :param weather_data:    The daily weather data from the Open Meteo API.
    :return:                Transformed weather pandas DataFrame.
    """

    weather_data = {
        "datetime":data["hourly"]["time"],
        "temperature":data["hourly"]["temperature_2m"],
        "wind_speed":data["hourly"]["wind_speed_10m"],
        "rain": data["hourly"]["rain"],
        "precipitation":data["hourly"]["precipitation"],
        }


    weather_df = pd.DataFrame(weather_data)
    weather_df["datetime"] = pd.to_datetime(weather_df["datetime"])

    return weather_df

In [None]:
current_datetime = datetime.now() - relativedelta(months=2)
formatted_datetime = current_datetime.strftime("%Y-%m-%d")

formatted_datetime



url = "https://archive-api.open-meteo.com/v1/era5"

params = {
    "latitude": 41.85,
    "longitude": -87.65,
    "start_date": formatted_datetime,
    "end_date": formatted_datetime,
    "hourly":"temperature_2m,wind_speed_10m,rain,precipitation",
}

response = requests.get(url, params=params)
weather_raw_data = response.json()

weather_raw_data

In [None]:
weather_df = transform_weather(weather_raw_data)

weather_df

In [None]:
weather_df