# Chicago taxi transform load

## Section initial

### Import necessarry modules

In [None]:
from io import StringIO
import json
import boto3
import pandas as pd

### Define functions

In [None]:
def taxi_trips_transformations(taxi_trips: pd.DataFrame) -> pd.DataFrame:
    """
    Transforms a DataFrame containing taxi trip data by performing the following steps:
    
    1. Drops unnecessary columns: "pickup_census_tract", "dropoff_census_tract", 
       "pickup_centroid_location", and "dropoff_centroid_location".
    2. Removes rows with missing values.
    3. Renames columns "pickup_community_area" and "dropoff_community_area" to 
       "pickup_community_area_id" and "dropoff_community_area_id", respectively.
    4. Adds a new column "datetime_for_weather", which is derived by rounding 
       the "trip_start_timestamp" column to the nearest hour.

    Args:
        taxi_trips (pd.DataFrame): The input DataFrame containing taxi trip data.

    Returns:
        pd.DataFrame: The transformed DataFrame with the specified modifications.
    """

    if not isinstance(taxi_trips, pd.DataFrame):
        raise TypeError('taxi_trips is not a valid pandas DataFrame.')

    taxi_trips.drop(["pickup_census_tract", "dropoff_census_tract",
                     "pickup_centroid_location", "dropoff_centroid_location"], axis=1, inplace=True)

    taxi_trips.dropna(inplace = True)

    taxi_trips.rename(columns={"pickup_community_area": "pickup_community_area_id", "dropoff_community_area": "dropoff_community_area_id"}, inplace=True)

    taxi_trips["datetime_for_weather"] = pd.to_datetime(taxi_trips["trip_start_timestamp"]).dt.floor("H")

    return taxi_trips

In [None]:
def update_taxi_trips_with_master_data(taxi_trips: pd.DataFrame, payment_type_master: pd.DataFrame, company_master: pd.DataFrame) -> pd.DataFrame:
    """
    Updates the taxi trips DataFrame by merging it with payment type and company master DataFrames to replace 
    textual payment type and company data with their corresponding IDs.

    Args:
        taxi_trips (pd.DataFrame): DataFrame containing taxi trip information, including 'payment_type' and 'company' columns.
        payment_type_master (pd.DataFrame): DataFrame containing master data for payment types, with columns 
                                            'payment_type_id' and 'payment_type'.
        company_master (pd.DataFrame): DataFrame containing master data for companies, with columns 
                                       'company_id' and 'company'.

    Returns:
        pd.DataFrame: Updated taxi trips DataFrame where 'payment_type' and 'company' columns are replaced with 
                      'payment_type_id' and 'company_id' columns from the respective master DataFrames.
    """
    taxi_trips_id = taxi_trips.merge(payment_type_master, on = 'payment_type')

    taxi_trips_id = taxi_trips_id.merge(company_master, on = 'company')

    taxi_trips_id.drop(['payment_type', 'company'], axis = 1, inplace = True)

    return taxi_trips_id

In [None]:
def update_master(taxi_trips: pd.DataFrame, master: pd.DataFrame, id_column: str, value_column: str) -> pd.DataFrame:
    """
    Updates a master DataFrame by adding new values found in the taxi trips DataFrame.

    Args:
        taxi_trips (pd.DataFrame): DataFrame containing taxi trip information, including the column with new values.
        master (pd.DataFrame): DataFrame representing the master data to be updated, including specified ID and value columns.
        id_column (str): Name of the column in the master DataFrame representing unique identifiers.
        value_column (str): Name of the column in the master DataFrame containing the values to be checked and updated.

    Returns:
        pd.DataFrame: Updated master DataFrame with new values added. New values are assigned unique IDs starting 
                      from the maximum value in the specified ID column of the existing master DataFrame.
    """

    max_id = master[id_column].max()

    new_values_list = [value for value in taxi_trips[value_column].values if value not in master[value_column].values]

    new_values_df = pd.DataFrame({
    id_column : range(max_id + 1, max_id + len(new_values_list) + 1),
    value_column : new_values_list
    })

    updated_master = pd.concat([master, new_values_df], ignore_index = True)

    return updated_master

In [None]:
def transform_weather_data(weather_data: json) -> pd.DataFrame:
    """
    Transforms weather data in JSON format into a structured pandas DataFrame for easier analysis.

    Args:
        weather_data (json): JSON object containing hourly weather data, including time, temperature, wind speed, 
                             rain, and precipitation.

    Returns:
        pd.DataFrame: A DataFrame with the following columns:
                      - 'datetime': Timestamps converted to datetime objects.
                      - 'tempretaure': Hourly temperatures (in °C or relevant unit).
                      - 'wind_speed': Hourly wind speed (in m/s or relevant unit).
                      - 'rain': Hourly rainfall data.
                      - 'precipitation': Hourly precipitation data.
    """
    weather_data_filtered = {
        "datetime": weather_data["hourly"]["time"],
        "tempretaure": weather_data["hourly"]["temperature_2m"],
        "wind_speed": weather_data["hourly"]["wind_speed_10m"],
        "rain": weather_data["hourly"]["rain"],
        "precipitation": weather_data["hourly"]["precipitation"],
    }

    weather_df = pd.DataFrame(weather_data_filtered)

    weather_df["datetime"] = pd.to_datetime(weather_df["datetime"])

    return weather_df

In [None]:
def read_csv_from_s3(bucket: str, path: str, filename: str) -> pd.DataFrame:
    """
    Reads a CSV file from an S3 bucket and returns its contents as a pandas DataFrame.

    Args:
        bucket (str): The name of the S3 bucket where the file is stored.
        path (str): The path or prefix within the bucket where the file is located.
        filename (str): The name of the file to be read.

    Returns:
        pd.DataFrame: A pandas DataFrame containing the content of the CSV file.

    Raises:
        botocore.exceptions.ClientError: If there is an error accessing the S3 bucket or file.
        UnicodeDecodeError: If the file cannot be decoded as UTF-8.
        pd.errors.EmptyDataError: If the file is empty or cannot be parsed as CSV.
    """
    s3 = boto3.client('s3')
    
    full_path = f'{path}{filename}'

    object = s3.get_object(Bucket = bucket, Key = full_path)
    object = object['Body'].read().decode('utf-8')
    output_df = pd.read_csv(StringIO(object))

    return output_df

In [None]:
def upload_dataframes_to_s3(bucket: str, path: str, dataframe: pd.DataFrame):
    """
    Uploads a pandas DataFrame as a CSV file to an S3 bucket.

    Args:
        bucket (str): The name of the S3 bucket where the CSV file will be uploaded.
        path (str): The file path in the S3 bucket, including the file name and extension (e.g., "folder/file.csv").
        dataframe (pd.DataFrame): The pandas DataFrame to be uploaded.

    Returns:
        None
    """
    s3 = boto3.client('s3')
    buffer = StringIO()
    dataframe.to_csv(buffer, index = False)
    df_content = buffer.getvalue()
    s3.put_object(Bucket = bucket, Key = path, Body = df_content)

In [None]:
def upload_master_data_to_s3(bucket: str, path: str, file_type: str, dataframe: pd.DataFrame):
    """
    Uploads a master data file to an S3 bucket, replacing the existing master file and backing up
    the previous version of the file.

    Args:
        bucket (str): The name of the S3 bucket where the master data file will be stored.
        path (str): The directory path in the S3 bucket where the master data file is located.
        file_type (str): The type or name prefix of the master data file (e.g., "company", "payment_type").
        dataframe (pd.DataFrame): The pandas DataFrame containing the data to be uploaded.

    Functionality:
        1. Backs up the current master data file by copying it to a "previous version" directory in S3.
        2. Converts the provided pandas DataFrame to a CSV format and uploads it to the master file location in S3.

    Returns:
        None
    """
    s3 = boto3.client('s3')

    master_file_path = f'{path}{file_type}_master.csv'
    previous_master_file_path = f'transformed_data/master_table_previous_version/{file_type}_master_previous_version.csv'

    s3.copy_object(
        Bucket = bucket,
        CopySource = {'Bucket': bucket, 'Key': master_file_path},
        Key = previous_master_file_path
    )

    upload_dataframes_to_s3(bucket = bucket, path = master_file_path, dataframe = dataframe)

In [None]:
def upload_and_move_file_on_s3(
        dataframe: pd.DataFrame,
        datetime_column: str,
        bucket: str,
        file_type: str,
        filename: str,
        source_path: str,
        target_path_raw: str,
        target_path_transformed: str,
    ):
    """
        Uploads a pandas DataFrame to an S3 bucket, moves a specified file to a new path, 
    and deletes the original file from the source path.

    Args:
        dataframe (pd.DataFrame): The pandas DataFrame to be uploaded as a CSV file.
        datetime_column (str): The column in the DataFrame containing datetime values for file naming.
        bucket (str): The name of the S3 bucket.
        file_type (str): The prefix or type of the file (e.g., "weather", "taxi").
        filename (str): The name of the file to be moved.
        source_path (str): The path in the S3 bucket where the original file is located.
        target_path_raw (str): The path in the S3 bucket where the original file will be copied.
        target_path_transformed (str): The path in the S3 bucket where the DataFrame will be uploaded.

    Functionality:
        1. Creates a new file name based on the datetime column's value and uploads the DataFrame as a CSV.
        2. Moves the original file from the source path to the target raw path.
        3. Deletes the original file from the source path after it has been moved.

    Returns:
        None
    """
    s3 = boto3.client('s3')
    
    formatted_date = dataframe[datetime_column].iloc[0].strftime("%Y-%m-%d")
    new_path_with_filename = f'{target_path_transformed}{file_type}_{formatted_date}.csv'

    upload_dataframes_to_s3(bucket = bucket, path = new_path_with_filename, dataframe = dataframe)
    
    s3.copy_object(
        Bucket = bucket,
        CopySource = {'Bucket': bucket, 'Key': f'{source_path}{filename}'},
        Key = f'{target_path_raw}{filename}'
    )

    s3.delete_object(Bucket = bucket, Key = f'{source_path}{filename}')

In [None]:
def read_json_from_s3(bucket: str, key: str) -> dict:
    """
    Reads a JSON file from an S3 bucket and returns its content as a Python dictionary.

    Args:
        bucket (str): The name of the S3 bucket where the JSON file is located.
        key (str): The key (file path) of the JSON file within the bucket.

    Returns:
        dict: A dictionary containing the parsed contents of the JSON file.
    """
    s3 = boto3.client("s3")

    response = s3.get_object(Bucket = bucket, Key = key)
    content = response["Body"]
    output_dict = json.loads(content.read())

    return output_dict


## Section main

In [None]:
def lambda_handler(event, context):
    s3 = boto3.client('s3')

    bucket = 'cubix-chicago-taxi-ld'

    raw_taxi_trips_folder = 'raw_data/to_processed/taxi_data/'
    raw_weather_folder = 'raw_data/to_processed/weather_data/'
    target_taxi_trips_folder = 'raw_data/processed/taxi_data/'
    target_weather_folder = 'raw_data/processed/weather_data/'

    transformed_taxi_trips_folder = 'transformed_data/taxi_trips/'
    transformed_weather_folder = 'transformed_data/weather/'
    
    payment_type_master_folder = 'transformed_data/payment_type/'
    payment_type_master_filename = 'payment_type_master.csv'
    
    company_master_folder = 'transformed_data/company/'
    company_master_filename = 'company_master.csv'

    payment_type_master = read_csv_from_s3(bucket = bucket, path = payment_type_master_folder, filename = payment_type_master_filename)
    company_master = read_csv_from_s3(bucket = bucket, path = company_master_folder, filename = company_master_filename)

    # Taxi data transformation and loading
    for file in s3.list_objects(Bucket = bucket, Prefix = raw_taxi_trips_folder)['Contents']: # list out the meta data of files in taxi_data folder
        taxi_trip_key = file['Key'] # key is the part of the json file we need

        if taxi_trip_key.split('/')[-1].strip() != '': # split the strings by '/' to filter only the file names
            if taxi_trip_key.split('.')[1] == 'json': # split the strings by '.' to filter only the json files

                filename = taxi_trip_key.split('/')[-1]

                taxi_trips_data_json = read_json_from_s3(bucket = bucket, key = taxi_trip_key) # transform the json file into a dictionary

                taxi_trips_data_raw = pd.DataFrame(taxi_trips_data_json) # transform the dictionary into a dataframe
                taxi_trips_transformed = taxi_trips_transformations(taxi_trips = taxi_trips_data_raw) # transform the dataframe

                company_master_updated = update_master(taxi_trips = taxi_trips_transformed, master = company_master, id_column = 'company_id', value_column = 'company')
                payment_type_master_updated = update_master(taxi_trips = taxi_trips_transformed, master = payment_type_master, id_column = 'payment_type_id', value_column = 'payment_type')

                taxi_trips = update_taxi_trips_with_master_data(taxi_trips = taxi_trips_transformed, payment_type_master = payment_type_master_updated, company_master = company_master_updated)

                upload_and_move_file_on_s3(
                    dataframe = taxi_trips,
                    datetime_column = 'datetime_for_weather',
                    bucket = bucket,
                    file_type = 'taxi',
                    filename = filename,
                    source_path = raw_taxi_trips_folder,
                    target_path_raw = target_taxi_trips_folder,
                    target_path_transformed = transformed_taxi_trips_folder
                    )
                print('taxi_trips is uploaded and moved.')
                                
                upload_master_data_to_s3(bucket = bucket, path = payment_type_master_folder, file_type = 'payment_type', dataframe = payment_type_master_updated)
                print('payment_type_master has been updated.')
                upload_master_data_to_s3(bucket = bucket, path = company_master_folder, file_type = 'company', dataframe = company_master_updated)
                print('company_master has been updated.')

    # Weather data transformation and loading
    for file in s3.list_objects(Bucket = bucket, Prefix = raw_weather_folder)['Contents']: # list out the meta data of files in weather_data folder
        weather_key = file['Key'] # key is the part of the json file we need

        if weather_key.split('/')[-1].strip() != '': # split the strings by '/' to filter only the file names
            if weather_key.split('.')[1] == 'json': # split the strings by '.' to filter only the json files

                filename = weather_key.split('/')[-1]

                weather_data_json = read_json_from_s3(bucket = bucket, key = weather_key) # transform the json file into a dictionary
                        
                weather_data = transform_weather_data(weather_data_json) # transform the dictionary into a dataframe

                upload_and_move_file_on_s3(
                    dataframe = weather_data,
                    datetime_column = 'datetime',
                    bucket = bucket,
                    file_type = 'weather',
                    filename = filename,
                    source_path = raw_weather_folder,
                    target_path_raw = target_weather_folder,
                    target_path_transformed = transformed_weather_folder
                    )
                print('weather_data is uploaded and moved.')