In [None]:
# configs.py

# S3 Bucket and Folder Configuration
BUCKET = "cubix-chicago-taxi-bb"

# Raw Data Folders
RAW_WEATHER_FOLDER = "raw_data/to_processed/weather_data/"
RAW_TAXI_TRIPS_FOLDER = "raw_data/to_processed/taxi_data/"

# Processed Data Folders
TARGET_TAXI_TRIPS_FOLDER = "raw_data/processed/taxi_data/"
TARGET_WEATHER_FOLDER = "raw_data/processed/weather_data/"

# Transformed Data Folders
TRANSFORMED_TAXI_TRIPS_FOLDER = "transformed_data/taxi_trips/"
TRANSFORMED_WEATHER_FOLDER = "transformed_data/weather/"

# Master Data Folders and Files
PAYMENT_TYPE_MASTER_FOLDER = "transformed_data/payment_type/"
COMPANY_MASTER_FOLDER = "transformed_data/company/"

# Master file names
PAYMENT_TYPE_MASTER_FILE = "payment_type_master.csv"
COMPANY_MASTER_FILE = "company_master.csv"

In [None]:
# functions.py

from io import StringIO
import json
from typing import List, Dict, Union

import boto3
import pandas as pd


s3 = boto3.client("s3")


def taxi_trips_transformations(taxi_trips: pd.DataFrame) -> pd.DataFrame:
    """Perform transformations on the taxi data.
    
    1. Drop selected columns
    2. Drop NULL values accross all columns.
    3. Rename columns.
    4. Create "datetime_for_weather" column.

    :param taxi_trips:  The DataFrame holding the daily taxi trips
    :raises TypeError:  When taxi_trips parameter is not a valid pandas DataFrame.
    :return:            Transformed, cleaned taxi_trips DataFrame. 
    """
    if not isinstance(taxi_trips, pd.DataFrame):
        raise TypeError("taxi_trips is not a valid pandas DataFrame.")

    taxi_trips.drop(["pickup_census_tract", "dropoff_census_tract", 
                     "pickup_centroid_location", "dropoff_centroid_location"], axis=1, inplace=True)
    taxi_trips.dropna(inplace=True)
    taxi_trips.rename(columns={"pickup_community_area": "pickup_community_area_id",
                                "dropoff_community_area": "dropoff_community_area_id"}, inplace=True)
    taxi_trips["datetime_for_weather"] = pd.to_datetime(taxi_trips["trip_start_timestamp"]).dt.floor("H")

    return taxi_trips
    
   
def update_taxi_trips_with_master_data(taxi_trips: pd.DataFrame, payment_type_master: pd.DataFrame, company_master: pd.DataFrame) -> pd.DataFrame:
    """Update the taxi_trips DataFrame with the company_master and payment_type_master ids, and delete the string columns.

    :param taxi_trips:              The DataFrame with the daily taxi trips.
    :param payment_type_master:     The payment type master table.
    :param company_master:          The company master table.
    :return:                        The taxi trips data, with only payment_type_id and company_id, without company or
                                    payment_type values.
    """
    taxi_trips_id = taxi_trips.merge(payment_type_master, on="payment_type")
    taxi_trips_id = taxi_trips_id.merge(company_master, on="company")
    taxi_trips_id.drop(["payment_type", "company"], axis=1, inplace=True)

    return taxi_trips_id

   
def update_master(taxi_trips: pd.DataFrame, master: pd.DataFrame, id_column: str, value_column: str) -> pd.DataFrame:
    """Extend the master DataFrame with new values if there are any.

    :param taxi_trips:      DataFrame with the daily taxi trips.
    :param master:          DataFrame with the master data (company, payment_type).
    :param id_column:       The id column of the master DataFrame.
    :param value_column:    Name of the column in master_df containing the values.
    :return:                The updated master data, if new values are in the taxi data, they will be loaded to it.
    """
    max_id = master[id_column].max()

    new_values_list = list(set(taxi_trips[value_column].values) - set(master[value_column].values))
    new_values_df = pd.DataFrame({
        id_column: range(max_id + 1, max_id + len(new_values_list) + 1),
        value_column: new_values_list
    })
    updated_master = pd.concat([master, new_values_df], ignore_index=True)
    
    new_values_df = pd.DataFrame({
        id_column: range(max_id + 1, max_id + len(new_values_list) + 1),
        value_column: new_values_list
    })
    updated_master = pd.concat([master, new_values_df], ignore_index=True)

    return updated_master    


def transform_weather_data(weather_data: json) -> pd.DataFrame:
    """Select and transform weather data.

    :param weather_data:    The daily weather data from the Open Meteo API.
    :return:                Transformed weather data.  
    """
    weather_data_filtered = {
            "datetime": weather_data["hourly"]["time"],
            "tempretaure": weather_data["hourly"]["temperature_2m"],
            "wind_speed": weather_data["hourly"]["wind_speed_10m"],
            "rain": weather_data["hourly"]["rain"],
            "precipitation": weather_data["hourly"]["precipitation"],
        }

    weather_df = pd.DataFrame(weather_data_filtered)
    weather_df["datetime"] = pd.to_datetime(weather_df["datetime"])

    return weather_df


def read_csv_from_s3(bucket: str, path: str, filename: str) -> pd.DataFrame:
    """Downloads a csv file from an S3 bucket.

    :param bucket:      The bucket where the file is.
    :param path:        Path to the file.
    :param filename:    Name of the file to read.
    :return:            A DataFrame created from the csv file.
    """
    full_path = f"{path}{filename}"
    
    object = s3.get_object(Bucket=bucket, Key=full_path)
    object = object["Body"].read().decode("utf-8")
    output_df = pd.read_csv(StringIO(object))
    
    return output_df


def read_json_from_s3(bucket: str, filename: str) -> Union[List[Dict], Dict]:
    """Downloads a json file from an S3 bucket.

    :param s3:          S3 client.
    :param bucket:      The bucket where the file is.
    :param filename:    Name of the file to read.
    :return:            A list of dictionaries or a dictionary (taxi or weather data).
    """
    response = s3.get_object(Bucket=bucket, Key=filename)
    content = response["Body"]
    taxi_trips_data_json = json.loads(content.read())
    
    return taxi_trips_data_json
    
    
def upload_dataframe_to_s3(dataframe: pd.DataFrame, bucket: str, path: str):
    """Uploads a dataframe to the specified S3 path.

    :param s3:          S3 client.
    :param dataframe:   The DataFrame to be uploaded.
    :param bucket:      Name of the S3 bucket where the file will be stored.
    :param path:        Path within the bucket to upload the file.
    """
    buffer = StringIO()
    dataframe.to_csv(buffer, index=False)
    df_content = buffer.getvalue()
    s3.put_object(Bucket=bucket, Key=path, Body=df_content)

 
def upload_master_data_to_s3(bucket: str, path: str, file_type: str, dataframe: pd.DataFrame):
    """Uploads master data (payment_type or company) to S3. Copies the previous version and creates the new one.

    :param s3:          S3 client.
    :param bucket:      Name of the S3 bucket where the file will be stored.
    :param path:        Path within the bucket to upload the file.
    :param file_type:   Either "company" or "payment_type".
    :param dataframe:   The DataFrame to be uploaded.
    :raises ValueError: Raised when file_type is not "company" or "payment_type".
    """
    if not file_type in ["company", "payment_type"]:
        raise ValueError("file_type must be either 'company' or 'payment_type'.")

    master_file_path = f"{path}{file_type}_master.csv"
    previous_master_file_path = f"transformed_data/master_table_previous_version/{file_type}_master_previous_version.csv"
    
    s3.copy_object(
        Bucket=bucket,
        CopySource={"Bucket": bucket, "Key": master_file_path},
        Key=previous_master_file_path
    )
    
    upload_dataframe_to_s3(bucket=bucket, dataframe=dataframe, path=master_file_path)
    
def move_file_on_s3(bucket: str, source_key: str, target_key: str):
    """
    Moves a file within S3 by copying it to a new location and deleting the original.

    :param bucket:     Name of the S3 bucket.
    :param source_key: Current path (including filename) of the file.
    :param target_key: New path (including filename) where the file will be moved.
    """
    s3.copy_object(
        Bucket=bucket,
        CopySource={"Bucket": bucket, "Key": source_key},
        Key=target_key
    )
    
    s3.delete_object(Bucket=bucket, Key=source_key)
    print(f"File moved from s3://{bucket}/{source_key} to s3://{bucket}/{target_key}.")


def upload_and_move_file_on_s3(
        dataframe: pd.DataFrame, 
        datetime_col: str, 
        bucket: str, 
        file_type: str, 
        filename: str,
        source_path: str,
        target_path_raw: str,
        target_path_transformed: str
    ):
    """
    Orchestrates uploading a DataFrame to S3 and moving a file within S3.

    :param dataframe:               The DataFrame to upload.
    :param datetime_col:            Datetime column name, used to derive the date for the filename.
    :param bucket:                  Name of the S3 bucket.
    :param file_type:               "weather" or "taxi".
    :param filename:                Name of the file to be moved.
    :param source_path:             Source path within the bucket.
    :param target_path_raw:         Target path within the bucket where the raw data is copied.
    :param
    """ 
    formatted_date = dataframe[datetime_col].iloc[0].strftime("%Y-%m-%d")
    transformed_path = f"{target_path_transformed}{file_type}_{formatted_date}.csv"
    
    upload_dataframe_to_s3(bucket=bucket, dataframe=dataframe, path=transformed_path)

    source_key = f"{source_path}{filename}"
    target_key = f"{target_path_raw}{filename}"
    move_file_on_s3(bucket, source_key, target_key)
  

In [None]:
# lambda_function.py

import boto3
import pandas as pd

from configs import (
    BUCKET,
    RAW_TAXI_TRIPS_FOLDER,
    RAW_WEATHER_FOLDER,
    TARGET_TAXI_TRIPS_FOLDER,
    TARGET_WEATHER_FOLDER,
    TRANSFORMED_TAXI_TRIPS_FOLDER,
    TRANSFORMED_WEATHER_FOLDER,
    PAYMENT_TYPE_MASTER_FOLDER,
    COMPANY_MASTER_FOLDER,
    PAYMENT_TYPE_MASTER_FILE,
    COMPANY_MASTER_FILE,
)
from functions import (
    taxi_trips_transformations,
    update_taxi_trips_with_master_data,
    update_master,
    transform_weather_data,
    read_csv_from_s3,
    read_json_from_s3,
    upload_master_data_to_s3,
    upload_and_move_file_on_s3,
)


def process_taxi_data(s3: boto3.client, payment_type_master: pd.DataFrame, company_master: pd.DataFrame):
    """Process and transform taxi trip data.

    1. Check the raw taxi trips folder for new JSON files.
    2. Read and transform them.
    3. Update the Company Master and Payment Type Master mapping tables.
    4. Update the transformed taxi data with Company and Payment Type ids (change string values to ids).
    5. Move the raw files from the "to_processed" to the "processed" folder, and upload the cleaned and transformed DataFrame.
    6. Update the Company Master and Payment Type Master tables.

    :param s3:                      S3 client.
    :param payment_type_master:     Payment Type Master DataFrame.
    :param company_master:          Company Master DataFrame.
    """
    for file in s3.list_objects(Bucket=BUCKET, Prefix=RAW_TAXI_TRIPS_FOLDER)["Contents"]:
        taxi_trip_key = file["Key"]
        
        if taxi_trip_key.split("/")[-1].strip() != "":
            if taxi_trip_key.split(".")[1] == "json":
                
                filename = taxi_trip_key.split("/")[-1]               
                taxi_trips_data_json = read_json_from_s3(BUCKET, taxi_trip_key)
                
                taxi_trips_data_raw = pd.DataFrame(taxi_trips_data_json)
                taxi_trips_transformed = taxi_trips_transformations(taxi_trips_data_raw)
      
                company_master_updated = update_master(taxi_trips_transformed, company_master, "company_id", "company")
                payment_type_master_updated = update_master(taxi_trips_transformed, payment_type_master, "payment_type_id", "payment_type")
                
                taxi_trips = update_taxi_trips_with_master_data(taxi_trips_transformed, payment_type_master_updated, company_master_updated)
                
                upload_and_move_file_on_s3(
                        dataframe=taxi_trips, 
                        datetime_col="datetime_for_weather", 
                        bucket=BUCKET, 
                        file_type="taxi", 
                        filename=filename,
                        source_path=RAW_TAXI_TRIPS_FOLDER,
                        target_path_raw=TARGET_TAXI_TRIPS_FOLDER,
                        target_path_transformed=TRANSFORMED_TAXI_TRIPS_FOLDER
                    )
                print(f"Taxi trips file {filename} uploaded and moved.")
                
                upload_master_data_to_s3(bucket=BUCKET, path=PAYMENT_TYPE_MASTER_FOLDER, file_type="payment_type", dataframe=payment_type_master_updated)
                print("Payment Type Master has been updated.")
                upload_master_data_to_s3(bucket=BUCKET, path=COMPANY_MASTER_FOLDER, file_type="company", dataframe=company_master_updated)
                print("Company Master has been updated.")


def process_weather_data(s3: boto3.client):
    """Process and transform weather data.

    1. Check the raw weather folder for new JSON files.
    2. Read and transform them.
    3. Move the raw files from the "to_processed" to the "processed" folder, and upload the cleaned and transformed DataFrame.

    :param s3:  S3 client.
    """
    for file in s3.list_objects(Bucket=BUCKET, Prefix=RAW_WEATHER_FOLDER)["Contents"]:
        weather_key = file["Key"]
        
        if weather_key.split("/")[-1].strip() != "":
            if weather_key.split(".")[1] == "json":
                
                filename = weather_key.split("/")[-1]
                weather_data_json = read_json_from_s3(BUCKET, weather_key)
                weather_data = transform_weather_data(weather_data_json)
                
                upload_and_move_file_on_s3(
                        dataframe=weather_data, 
                        datetime_col="datetime", 
                        bucket=BUCKET, 
                        file_type="weather", 
                        filename=filename,
                        source_path=RAW_WEATHER_FOLDER,
                        target_path_raw=TARGET_WEATHER_FOLDER,
                        target_path_transformed=TRANSFORMED_WEATHER_FOLDER
                    )
            print(f"Weather file {filename} uploaded and moved.")


def lambda_handler(event, context):
    s3 = boto3.client("s3")

    payment_type_master = read_csv_from_s3(bucket=BUCKET, path=PAYMENT_TYPE_MASTER_FOLDER, filename=PAYMENT_TYPE_MASTER_FILE)
    company_master = read_csv_from_s3(bucket=BUCKET, path=COMPANY_MASTER_FOLDER, filename=COMPANY_MASTER_FILE)

    process_taxi_data(s3, payment_type_master, company_master)
    process_weather_data(s3)

    print("All files have been processed.")


In [None]:
import boto3
import pandas as pd

from config import (
    BUCKET,
    RAW_TAXI_TRIPS_FOLDER,
    RAW_WEATHER_FOLDER,
    TARGET_TAXI_TRIPS_FOLDER,
    TARGET_WEATHER_FOLDER,
    TRANSFORMED_TAXI_TRIPS_FOLDER,
    TRANSFORMED_WEATHER_FOLDER,
    PAYMENT_TYPE_MASTER_FOLDER,
    COMPANY_MASTER_FOLDER,
    PAYMENT_TYPE_MASTER_FILE,
    COMPANY_MASTER_FILE,
)

from functions import (
    taxi_trips_transformations,
    update_taxi_trips_with_master_data,
    update_master,
    transform_weather_data,
    read_csv_from_s3,
    read_json_from_s3,
    upload_master_data_to_s3,
    upload_and_move_file_on_s3
)


def lambda_handler(event, context):
    s3 = boto3.client("s3")
    
    payment_type_master = read_csv_from_s3(bucket=BUCKET, path=PAYMENT_TYPE_MASTER_FOLDER, filename=PAYMENT_TYPE_MASTER_FILE)
    company_master = read_csv_from_s3(bucket=BUCKET, path=COMPANY_MASTER_FOLDER, filename=COMPANY_MASTER_FILE)

    # TAXI DATA TRANSFORMATION AND LOADING
    for file in s3.list_objects(Bucket=BUCKET, Prefix=RAW_TAXI_TRIPS_FOLDER)["Contents"]:
        taxi_trip_key = file["Key"]
        
        if taxi_trip_key.split("/")[-1].strip() != "":
            if taxi_trip_key.split(".")[1] == "json":
                
                filename = taxi_trip_key.split("/")[-1]
                
                taxi_trips_data_json = read_json_from_s3(BUCKET, taxi_trip_key)
                
                taxi_trips_data_raw = pd.DataFrame(taxi_trips_data_json)
                taxi_trips_transformed = taxi_trips_transformations(taxi_trips_data_raw)
      
                company_master_updated = update_master(taxi_trips_transformed, company_master, "company_id", "company")
                payment_type_master_updated = update_master(taxi_trips_transformed, payment_type_master, "payment_type_id", "payment_type")
                
                taxi_trips = update_taxi_trips_with_master_data(taxi_trips_transformed, payment_type_master_updated, company_master_updated)
                
                upload_and_move_file_on_s3(
                        dataframe=taxi_trips, 
                        datetime_col="datetime_for_weather", 
                        bucket=BUCKET, 
                        file_type="taxi", 
                        filename=filename,
                        source_path=RAW_TAXI_TRIPS_FOLDER,
                        target_path_raw=TARGET_TAXI_TRIPS_FOLDER,
                        target_path_transformed=TRANSFORMED_TAXI_TRIPS_FOLDER
                    )
                print("taxi_trips is uploaded and moved.")
                
                upload_master_data_to_s3(bucket=BUCKET, path=PAYMENT_TYPE_MASTER_FOLDER, file_type="payment_type", dataframe=payment_type_master_updated)
                print("payment_type_master has been updated.")
                upload_master_data_to_s3(bucket=BUCKET, path=COMPANY_MASTER_FOLDER, file_type="company", dataframe=company_master_updated)
                print("company_master has been updated.")

    # WEATHER DATA TRANSFORMATION AND LOADING
    for file in s3.list_objects(Bucket=BUCKET, Prefix=RAW_WEATHER_FOLDER)["Contents"]:
        weather_key = file["Key"]
        
        if weather_key.split("/")[-1].strip() != "":
            if weather_key.split(".")[1] == "json":
                
                filename = weather_key.split("/")[-1]
                
                weather_data_json = read_json_from_s3(BUCKET, weather_key)
                
                weather_data = transform_weather_data(weather_data_json)
                
                upload_and_move_file_on_s3(
                        dataframe=weather_data, 
                        datetime_col="datetime", 
                        bucket=BUCKET, 
                        file_type="weather", 
                        filename=filename,
                        source_path=RAW_WEATHER_FOLDER,
                        target_path_raw=TARGET_WEATHER_FOLDER,
                        target_path_transformed=TRANSFORMED_WEATHER_FOLDER
                    )
                print("weather is uploaded and moved.")