# The final & working

## DE_final_project_extract (AWS Lambda functions)

### The following AWS Lambda function extracts data from the official data service of the city of Chicago and the chosen meteorological service by calling the respective APIs. It then saves the obtained data into an S3 bucket using a daily update trigger.

In [None]:
import boto3
from datetime import datetime
from dateutil.relativedelta import relativedelta
import json
import os
import requests
from typing import Dict, List


def get_taxi_data(formatted_datetime: str) -> List:
    """
    Retrieves taxi data for the given date.

    Parameters
    ----------
        formatted_datetime (str): The date in YYYY-MM-DD format.

    Returns
    -------
        List: A list of taxi trip data dictionaries.
        Returns a fitting error message if problem occurs.
    """
    
    url_taxi = (
        f"https://data.cityofchicago.org/resource/ajtu-isnz.json?"
        f"$where=trip_start_timestamp >= '{formatted_datetime}T00:00:00' "
        f"AND trip_start_timestamp <= '{formatted_datetime}T23:59:59'&$limit=35000"
    )
    
    headers = {"X-App-Token": os.environ.get("CHICAGO_KEY")}
    
    try:
        response = requests.get(url_taxi)
        response.raise_for_status()
        taxi_data = response.json()
        print(taxi_data)
        print(len(taxi_data))
    
    except requests.exceptions.RequestException as req_exc:
        print(f"This HTTP Request failed: {req_exc}")
    
    except ValueError as v_e:
        print(f"The JSON's parsing has failed: {v_e}")
        
    return taxi_data
    
    
def get_weather_data(formatted_datetime: str) -> List:
    """
    Fetches weather data from the Open-Meteo API for the given date and location.

    Parameters
    ----------
        formatted_datetime (str): The date in YYYY-MM-DD format.

    Returns
    -------
    List
        A dictionary containing weather data. 
        Returns a fitting error message if the request fails or an error occurs.
    """
    
    url_weather = "https://archive-api.open-meteo.com/v1/era5"
    
    params = {
        "latitude": 41.85,
        "longitude": -87.65,
        "start_date": formatted_datetime,
        "end_date": formatted_datetime,
        "hourly": "temperature_2m,wind_speed_10m,rain,precipitation"
    }
    
    try:
        response = requests.get(url_weather, params=params)
        response.raise_for_status()
        weather_data = response.json()
        print(weather_data)
        print(len(weather_data))
    
    except requests.exceptions.HTTPError as http_e:
        print(f"HTTP error occurred: {http_e}")
        
    except Exception as e:
        print(f"Other error occured: {e}")
        
    return weather_data
    

def upload_to_s3(data: Dict, folder_name: str, file_name: str) -> None:
    """
    Uploads the given data to an S3 bucket.

    Parameters
    ----------
        data (Dict): The data to be uploaded, either taxi or weather data.
        folder_name (str): The name of the folder in the S3 bucket.
        file_name (str): The name of the file to be saved in the S3 bucket.
    
    Returns
    -------
        None. This function does not return anything.

    Raises
    ------
    Exception
        If there is an error during the S3 upload process.
    """
    
    s3_client = boto3.client("s3")
    try:
        s3_client.put_object(
            Bucket = "egle.final-project-bucket",
            Key = f"raw_data/to_processed/{folder_name}/{file_name}",
            Body = json.dumps(data)
        )
        
    except Exception as e:
        print(f"Failed to upload to S3: {e}")
    

def lambda_handler(event, context):
    
    current_datetime = datetime.now() - relativedelta(months = 2)
    formatted_datetime = current_datetime.strftime("%Y-%m-%d")
    
    
    taxi_data_api_call = get_taxi_data(formatted_datetime)
    weather_data_api_call = get_weather_data(formatted_datetime)
    
    
    taxi_file_name = f"taxi_raw_{formatted_datetime}.json"
    weather_file_name = f"weather_raw_{formatted_datetime}.json"
    
    upload_to_s3(data = taxi_data_api_call,
                 folder_name = "taxi_data",
                 file_name = taxi_file_name
    )
    print("Taxi data has been uploaded!")
    
    upload_to_s3(data = weather_data_api_call,
                 folder_name = "weather_data",
                 file_name = weather_file_name
    )
    print("Weather data has been uploaded!")

## DE_final_project_transform-load (AWS Lambda function)

### The following code was created in an AWS Lambda environment to handle the Transform and Load portion of the ETL pipeline. The "DE_final_project_extract" function gathers and stores raw data versions in S3, which this code handles. It loads them into the relevant, independently structured S3 Bucket after processing them in accordance with the procedures, inputs, and output variables listed in each function.

### The triggers that were built state that a new dataset is automatically processed each time the DE_final_project_extract daily trigger pulls one. The updated dimension tables and the prior states are kept in specific files, along with the original raw databases and the modified databases.

In [None]:
from io import StringIO
import json

import boto3

import pandas as pd
from typing import Literal

def taxi_trips_transformations(taxi_trips: pd.DataFrame) -> pd.DataFrame:
    """
    Perform transformations with the taxi data.

    Parameters
    ----------
        taxi_trips (pd.DataFrame): The DataFrame holding the daily taxi trips.

    Returns
    -------
        DataFrame: The cleaned, transformed DataFrame holding the daily taxi trips.
    """
    if not isinstance(taxi_trips, pd.DataFrame):
        raise TypeError("taxi_trips is not a valid pandas DataFrame.")
    
    taxi_trips.drop(["pickup_census_tract",
                      "dropoff_census_tract",
                      "pickup_centroid_location",
                      "dropoff_centroid_location"],
                      axis = 1,
                      inplace = True)
    
    taxi_trips.dropna(inplace = True)

    taxi_trips.rename(columns = {"pickup_community_area" : "pickup_community_area_id",
                                 "dropoff_community_area" : "dropoff_community_area_id"},
                                 inplace = True)
    
    taxi_trips["datetime_for_weather"] = pd.to_datetime(taxi_trips["trip_start_timestamp"]).dt.floor("H")

    return taxi_trips
    
################################################################################    

def update_taxi_trips_with_master_data(taxi_trips: pd.DataFrame,
                                       payment_type_master: pd.DataFrame,
                                       company_master: pd.DataFrame) -> pd.DataFrame:
    """
    Update the taxi_trips DataFrame with the company_master and payment_type_master ids, and delete the string columns.

    Parameters
    ----------
        taxi_trips (pd.DataFrame): The DataFrame with the daily taxi trips.
        payment_type_master (pd.DataFrame):The payment type master table.
        company_master (pd.DataFrame): The company master table.

    Returns
    -------
        DataFrame: The taxi trips data, with only payment_type_id and company_id, without company or payment_type values.
    """
    taxi_trips_id = taxi_trips.merge(payment_type_master, 
                                     on = "payment_type")
    
    taxi_trips_id = taxi_trips_id.merge(company_master, 
                                        on = "company")
    
    taxi_trips_id.drop(["payment_type", "company"],
                       axis = 1,
                       inplace = True)
    
    return taxi_trips_id
    
################################################################################

def update_master(taxi_trips: pd.DataFrame, 
                  master: pd.DataFrame, 
                  id_column: str, 
                  value_column: str) -> pd.DataFrame:
    """
    Extend the master DataFrame with new values if there are any.

    Parameters
    ----------
        taxi_trips (pd.DataFrame): DataFrame holding the taxi data.
        master (pd.DataFrame): DataFrame holding the master data.
        id_column (str): The id column of the master DataFrame.
        value_column (str): Name of the column in master_df containing the values.

    Returns
    -------
    DataFrame: The updated master data, if new values are in the taxi data, they will be loaded to it.
    """
    max_id = master[id_column].max()

    new_values_list = list(set(taxi_trips[value_column].values) - set(master[value_column].values))
    new_values_df = pd.DataFrame({
        id_column: range(max_id + 1, max_id + len(new_values_list) + 1),
        value_column: new_values_list
    })

    updated_master = pd.concat([master, new_values_df], 
                               ignore_index = True)
    
    return updated_master
    
################################################################################

def transform_weather_data(weather_data: json) -> pd.DataFrame:
    """
    Make transformations on the daily weather api response.

    Parameters
    ----------
        weather_data (json): The daily weather data from the Open Meteo API

    Returns
    -------
        DataFrame: A DataFrame representation of the data.
    """
    weather_data_filtered = {
        "datetime": weather_data["hourly"]["time"],
        "temperature": weather_data["hourly"]["temperature_2m"],
        "wind_speed": weather_data["hourly"]["wind_speed_10m"],
        "rain": weather_data["hourly"]["rain"],
        "precipitation": weather_data["hourly"]["precipitation"]
    }

    weather_df = pd.DataFrame(weather_data_filtered)
    weather_df["datetime"] = pd.to_datetime(weather_df["datetime"])

    return weather_df
    
################################################################################

def read_csv_from_s3(bucket: str, 
                     path: str, 
                     filename: str) -> pd.DataFrame:
    """
    Downloads a csv file from an S3 bucket.

    Parameters
    ----------
        bucket (str): The bucket where the files are.
        path (str): The folders to the file.
        filename (str): Name of the file.

    Returns
    -------
        DataFrame: A DataFrame of the downloaded file.
    """
    s3 = boto3.client("s3")
    full_path = f"{path}{filename}"
    obj = s3.get_object(Bucket = bucket, 
                        Key = full_path)
    output_df = pd.read_csv(obj["Body"])

    return output_df
    
################################################################################

def read_json_from_s3(bucket: str, 
                      path: str, 
                      filename: str) -> dict:
    """
    This function is an extra adjustment to replace the following lines of code with
    a singular function call. It's able to fetch and load a json file from an S3 bucket.

    This rows have been replaced by the read_json_from_s3:

        response = s3.get_object(Bucket=bucket, 
                                 Key=taxi_trip_key)
        content = response["Body"]
        taxi_trips_data_json = json.loads(content.read())
        
        
        response = s3.get_object(Bucket = bucket, 
                                         Key = weather_key)
        content = response["Body"]
        weather_data_json = json.loads(content.read())

    Args:
        bucket (str): name of the S3 bucket.
        path (str): The path to the file in the bucket.
        filename (str): The name of the file.

    Returns:
        dict: The downloaded and loaded JSON data.
    """


    s3 = boto3.client("s3")

    full_path = f"{path}{filename}"
    obj = s3.get_object(Bucket = bucket, 
                        Key = full_path)
    
    content = obj["Body"].read()
    data = json.loads(content)

    return data
    
################################################################################

def upload_dataframe_to_s3(dataframe: pd.DataFrame, 
                           bucket: str, 
                           path: str):
    """
    Uploads a dataframe to the specified S3 path.

    Parameters
    ----------
        dataframe (pd.DataFrame):The dataframe to be uploaded.
        bucket (str): Name of the S3 bucket where we want to store the files.
        path (str)): Path within the bucket to upload the files.

    Returns
    -------
        This  function won't return anything.
    """
    s3 = boto3.client("s3")
    buffer = StringIO()

    dataframe.to_csv(buffer, index = False)
    df_content = buffer.getvalue()
    s3.put_object(Bucket = bucket, 
                  Key = path, 
                  Body = df_content)
                  
################################################################################

def upload_master_data_to_s3(bucket: str, 
                             path: str, 
                             file_type: str, 
                             dataframe: pd.DataFrame):
    """
    Uploads master data (payment_type or company) to S3. Copies the previous version and creates the new one.

    Parameters
    ----------
        bucket (str): Name of the S3 bucket where we want to store the files.
        path (str): Path within the bucket to upload the files.
        file_type (str): Either "company" or "payment_type".
        dataframe (pd.DataFrame): The dataframe to be uploaded.

    Returns
    -------
        This function won't return anything.
    """
    s3 = boto3.client("s3")

    master_file_path = f"{path}{file_type}_master.csv"
    previous_master_file_path = f"{path}previous_version/{file_type}_master_previous_version.csv"

    s3.copy_object(Bucket = bucket, 
                   CopySource = {"Bucket": bucket, "Key": master_file_path}, 
                   Key = previous_master_file_path)
    
    upload_dataframe_to_s3(dataframe = dataframe, 
                           bucket = bucket, 
                           path = master_file_path)
                           
################################################################################

def upload_and_move_file_on_s3(dataframe: pd.DataFrame, 
                               datetime_col: str, 
                               bucket: str, 
                               file_type: str, 
                               filename: str, 
                               source_path: str, 
                               target_path_raw: str, 
                               target_path_transformed: str):
    """
    Uploads a DataFrame to S3 and then moves a file from the base folder to another.

    Parameters
    ----------
        dataframe (pd.DataFrame):The DataFrame to be uploaded.
        datetime_col (str): Datetime column name, which we derive the date for the filename.
        bucket (str): Name of the S3 bucket.
        file_type (str): "weather" or "taxi".
        source_path (str): Source path within the bucket.
        target_path_transformed (str): Target path within the bucket where the transformed data would go.
        target_path_raw (str): Target path within the bucket where the raw data would go.
        filename (str): Name of the file to be uploaded or moved.

    Returns
    -------
        This function won't return anything.
    """
    s3 = boto3.client("s3")

    formatted_date = dataframe[datetime_col].iloc[0].strftime("%Y-%m-%d")

    new_path_with_filename = f"{target_path_transformed}{file_type}_{formatted_date}.csv"

    upload_dataframe_to_s3(dataframe = dataframe, 
                           bucket = bucket, 
                           path = new_path_with_filename)
    
    try:
        s3.head_object(Bucket = bucket, Key = source_path)
        s3.copy_object(Bucket = bucket, 
                       CopySource = {"Bucket": bucket, "Key": source_path}, 
                       Key = f"{target_path_raw}{filename}")
        
        s3.delete_object(Bucket = bucket, Key = source_path)
    except s3.exceptions.NoSuchKey:
        print(f"Source path {source_path} does not exist.")
        raise
    
################################################################################

################################  MAIN PART  ###################################

################################################################################

def lambda_handler(event, context):
    """
    Main function for AWS Lambda to handle taxi and weather data processing and uploading to S3.

    Parameters
    ----------
        event (dict): Event data passed by AWS Lambda.
        context (object): Runtime information provided by AWS Lambda.

    Returns
    -------
        None
    """
    s3 = boto3.client("s3")
    bucket = "egle.final-project-bucket"

    raw_taxi_trips_folder = "raw_data/to_processed/taxi_data/"
    raw_weather_folder = "raw_data/to_processed/weather_data/"
    target_taxi_trips_folder = "raw_data/processed/taxi_data/"
    target_weather_folder = "raw_data/processed/weather_data/"
    transformed_taxi_trips_folder = "transformed_data/taxi_trips/"
    transformed_weather_folder = "transformed_data/weather/"
    payment_type_master_folder = "transformed_data/payment_type/"
    company_master_folder = "transformed_data/company/"
    payment_type_master_file_name = "payment_type_master.csv"
    company_master_file_name = "company_master.csv"


    payment_type_master = read_csv_from_s3(bucket = bucket, 
                                           path = payment_type_master_folder, 
                                           filename = payment_type_master_file_name)
    
    company_master = read_csv_from_s3(bucket = bucket, 
                                      path = company_master_folder, 
                                      filename = company_master_file_name)

    for file in s3.list_objects(Bucket = bucket, Prefix = raw_taxi_trips_folder)["Contents"]:
        taxi_trip_key = file["Key"]
        if taxi_trip_key.split("/")[-1].strip() != "":
            if taxi_trip_key.split(".")[-1] == "json":
                filename = taxi_trip_key.split("/")[-1]
                taxi_trips_data_json = read_json_from_s3(bucket, 
                                                         raw_taxi_trips_folder, 
                                                         filename)
                taxi_trips_data_raw = pd.DataFrame(taxi_trips_data_json)
                taxi_trips_transformed = taxi_trips_transformations(taxi_trips_data_raw)

                company_master_updated = update_master(taxi_trips_transformed, 
                                                       company_master, 
                                                       "company_id", 
                                                       "company")
                
                payment_type_master_updated = update_master(taxi_trips_transformed, 
                                                            payment_type_master, 
                                                            "payment_type_id", 
                                                            "payment_type")
                
                taxi_trips = update_taxi_trips_with_master_data(taxi_trips_transformed, 
                                                                payment_type_master_updated, 
                                                                company_master_updated)
                
                upload_and_move_file_on_s3(
                    dataframe = taxi_trips,
                    datetime_col = "datetime_for_weather",
                    bucket = bucket,
                    file_type = "taxi",
                    filename = filename,
                    source_path = taxi_trip_key,
                    target_path_raw = target_taxi_trips_folder,
                    target_path_transformed = transformed_taxi_trips_folder
                )
                print("taxi_trips is uploaded and moved.")

                upload_master_data_to_s3(bucket = bucket, 
                                         path = payment_type_master_folder, 
                                         file_type = "payment_type", 
                                         dataframe = payment_type_master_updated)
                
                upload_master_data_to_s3(bucket = bucket, 
                                         path = company_master_folder, 
                                         file_type = "company", 
                                         dataframe = company_master_updated)
                
                print("company_master has been updated.")

    for file in s3.list_objects(Bucket = bucket, Prefix = raw_weather_folder)["Contents"]:
        weather_key = file["Key"]
        if weather_key.split("/")[-1].strip() != "":
            if weather_key.split(".")[-1] == "json":
                filename = weather_key.split("/")[-1]
                weather_data_json = read_json_from_s3(bucket, 
                                                      raw_weather_folder, 
                                                      filename)
                weather_data = transform_weather_data(weather_data_json)
                
                upload_and_move_file_on_s3(
                    dataframe = weather_data,
                    datetime_col = "datetime",
                    bucket = bucket,
                    file_type = "weather",
                    filename = filename,
                    source_path = weather_key,
                    target_path_raw = target_weather_folder,
                    target_path_transformed = transformed_weather_folder
                )
                print("weather is uploaded and moved.")