In [None]:
import json
from io import StringIO
import boto3
import pandas as pd

def update_master(taxi_trips: pd.DataFrame, master: pd.DataFrame, id_column: str, value_column: str) -> pd.DataFrame:
    """
    Extend master DataFrame with new values.

    Parameters
    ----------
    taxi_trips : pd.DataFrame
        A DataFrame containing taxi trip records, with a column "company" indicating the company 
        associated with each trip.

    master : pd.DataFrame
        A DataFrame containing the master data.

    id_column: str
        The id column of the master data.

    value_column: str
        The value column of the master data.

    Returns
    -------
    pd.DataFrame
        A DataFrame containing the updated company master list, including any new companies found 
        in the `taxi_trips` DataFrame that were not already present in `company_master`.
    """
    max_id = master[id_column].max()
    #new_values_list = [values for values in taxi_trips[value_column].values if values not in master[value_colomn].values]
    new_values_list = list(set(taxi_trips[value_column].values)-set(master[value_column].values))
    new_values_df = pd.DataFrame({
        id_column: range(max_id +1, max_id + len(new_values_list)+1),
        value_column: new_values_list
    })
    updated_master = pd.concat([master, new_values_df], ignore_index=True)
    return updated_master

def update_taxi_trips_with_master_data(taxi_trips: pd.DataFrame, payment_type_master: pd.DataFrame, company_master:pd.DataFrame) -> pd.DataFrame:
    """
    Update the taxi trips data by merging it with payment type and company master data.

    This function enriches the `taxi_trips` DataFrame by merging it with the `payment_type_master` 
    and `company_master` DataFrames. It adds relevant information from these master data tables 
    based on matching `payment_type` and `company` columns. After the merges, it drops the original 
    `payment_type` and `company` columns from the `taxi_trips` DataFrame.

    Parameters
    ----------
    taxi_trips : pd.DataFrame
        A DataFrame containing the taxi trip records, with at least two columns:
        - "payment_type": The payment method used for each trip.
        - "company": The company associated with each trip.

    payment_type_master : pd.DataFrame
        A DataFrame containing the payment type details. It must have at least the following column:
        - "payment_type": The payment method identifier.
    
    company_master : pd.DataFrame
        A DataFrame containing company details. It must have at least the following column:
        - "company": The company identifier or name.

    Returns
    -------
    pd.DataFrame
        A DataFrame containing the original taxi trip records enriched with additional columns from
        the `payment_type_master` and `company_master`. The original `payment_type` and `company` columns
        are dropped.
    """
    taxi_trips_id=taxi_trips.merge(payment_type_master, on="payment_type")
    taxi_trips_id=taxi_trips_id.merge(company_master, on="company")
    taxi_trips_id.drop(["payment_type", "company"], axis=1, inplace=True)
    return taxi_trips_id

def taxi_trips_transformations(taxi_trips: pd.DataFrame) -> pd.DataFrame:
    """
    Perform transformations with the taxi data.

    Parameters
    taxi_trips: pd.DataFrame
        The DataFrame holding the daily taxi trips.

    Returns
    pd.DataFrame
        The clean transformed DataFrame holding the daily taxi trips.
    """

    if not isinstance(taxi_trips, pd.DataFrame):
        raise TypeError("'taxi_trips' is not a valid pandas DataFrame.")

    taxi_trips.drop(["pickup_census_tract", "dropoff_census_tract","pickup_centroid_location", "dropoff_centroid_location"], axis=1, inplace=True)
    taxi_trips.dropna(inplace=True)
    taxi_trips.rename(columns={"pickup_community_area" : "pickup_community_area_id", "dropoff_community_area" : "dropoff_community_area_id"}, inplace=True)
    taxi_trips["datetime_for_weather"]= pd.to_datetime(taxi_trips["trip_start_timestamp"]).dt.floor("h")
    return taxi_trips

def transform_weather_data(weather_data: json) -> pd.DataFrame:    
    """r
    Transform raw weather data from a JSON format into a structured DataFrame.

    This function extracts relevant weather data from the given JSON object and transforms it into
    a pandas DataFrame. It filters out the necessary fields such as "datetime", "temperature", "wind_speed",
    "rain", and "precipitation" from the hourly weather data, then converts the "datetime" field to a pandas 
    datetime format for easier handling.

    Parameters
    ----------
    weather_data : json
        A JSON object containing weather data. The expected structure is a dictionary with the following
        keys and nested fields:
        - "hourly": A dictionary containing the hourly weather data.
            - "time": A list of datetime values for each hour.
            - "temperature_2m": A list of temperature values at 2 meters above ground.
            - "windspeed_10m": A list of wind speed values at 10 meters above ground.
            - "rain": A list of rainfall amounts for each hour.
            - "precipitation": A list of total precipitation amounts for each hour.

    Returns
    -------
    pd.DataFrame
        A DataFrame with the following columns:
        - "datetime": The timestamp for each hour.
        - "temperature": Temperature values at 2 meters above ground.
        - "wind_speed": Wind speed values at 10 meters above ground.
        - "rain": Rainfall amounts for each hour.
        - "precipitation": Total precipitation amounts for each hour.
    """
    weather_data_filtered = {
        "datetime" : weather_data["hourly"]["time"],
        "temperature" : weather_data["hourly"]["temperature_2m"],
        "wind_speed" : weather_data["hourly"]["windspeed_10m"],
        "rain" : weather_data["hourly"]["rain"],
        "precipitation" : weather_data["hourly"]["precipitation"],

    }
    weather_df = pd.DataFrame(weather_data_filtered)
    weather_df["datetime"]= pd.to_datetime(weather_df.datetime)
    return weather_df

def read_csv_from_s3(bucket: str, path: str, filename: str) -> pd.DataFrame:
    """
    Reads a CSV file from an S3 bucket and loads it into a Pandas DataFrame.

    Args:
        bucket (str): The name of the S3 bucket.
        key (str): The S3 key (folder path) where the file is located.
        filename (str): The name of the CSV file to be read.

    Returns:
        pd.DataFrame: A Pandas DataFrame containing the data from the CSV file.

    Note:
        This function assumes that the CSV file is UTF-8 encoded.
    """  
    s3 = boto3.client('s3')
    full_path = f"{path}{filename}"
    object = s3.get_object(Bucket=bucket, Key=full_path)
    object = object['Body'].read().decode('utf-8')
    output_df = pd.read_csv(StringIO(object))
    return output_df

def read_json_from_s3(bucket: str, key: str) -> dict:
    """
    Reads a JSON file from an S3 bucket and loads it into a Python dictionary.

    Args:
        bucket (str): The name of the S3 bucket where the JSON file is stored.
        key (str): The S3 key (path) to the JSON file within the bucket.

    Returns:
        dict: The parsed JSON content from the file as a Python dictionary.
    """
    s3 = boto3.client('s3')
    response = s3.get_object(Bucket=bucket, Key=key)
    content = response['Body']
    json_data = json.loads(content.read())
    return json_data


def upload_dataframe_to_s3(dataframe: pd.DataFrame, bucket: str, path: str):
    """
    Uploads a Pandas DataFrame to an S3 bucket as a CSV file.

    Args:
        dataframe (pd.DataFrame): The DataFrame to upload.
        bucket (str): The name of the S3 bucket where the file will be stored.
        path (str): The S3 object key (file path) where the file will be saved.

    Returns:
        None: This function does not return anything. It uploads the DataFrame to S3.

    Example:
        upload_dataframe_to_s3(df, 'my-bucket', 'path/to/myfile.csv')

    Notes:
        Ensure that AWS credentials are configured and the necessary permissions are granted to access the S3 bucket.
    """
    s3 = boto3.client('s3')
    buffer = StringIO()
    dataframe.to_csv(buffer, index=False)
    df_content = buffer.getvalue()
    s3.put_object(Bucket = bucket, Key = path, Body=df_content)


def upload_master_data_to_s3(bucket: str, path: str, file_type: str, dataframe: pd.DataFrame):
    """
    Uploads the given DataFrame as a CSV file to an S3 bucket, and creates a backup of the previous version.

    This function first copies the existing master file in S3 to create a backup with a 'previous_version' suffix.
    Then, it uploads the new DataFrame to S3, overwriting the master file.

    Args:
        bucket (str): The name of the S3 bucket where the file will be uploaded.
        path (str): The S3 folder path where the master file is located.
        file_type (str): The type of file, used to construct the filename for the master CSV file.
        DataFrame (pd.DataFrame): The DataFrame to be uploaded as a CSV to the S3 bucket.

    Returns:
        None
    """
    s3 = boto3.client('s3')
    
    master_file_path = f"{path}{file_type}_master.csv"
    previous_master_file_path = "/transformed_data/master_table_previous_version/{file_type}_master_previous_version.csv"

    s3.copy_object(Bucket=bucket, CopySource={'Bucket': bucket, 'Key': master_file_path}, Key=previous_master_file_path)
    upload_dataframe_to_s3(dataframe, bucket, master_file_path)

def upload_and_move_file_on_s3(dataframe: pd.DataFrame, datetime_col: str, bucket: str,file_type: str, filename: str, source_path: str, target_path_raw: str, target_path_transformed: str):
    """
    Uploads a Pandas DataFrame to an S3 bucket, moves the file to a raw location, 
    and deletes the original file from the source path.

    The function performs the following steps:
    1. Formats the date from the specified `datetime_col` of the DataFrame.
    2. Uploads the DataFrame to an S3 path based on the transformed date.
    3. Copies the uploaded file to a "raw" location within the same S3 bucket.
    4. Deletes the original file from the source path in the S3 bucket.

    Args:
        dataframe (pd.DataFrame): The DataFrame to be uploaded.
        datetime_col (str): The column name in the DataFrame containing the datetime values used for formatting the file name.
        bucket (str): The name of the S3 bucket where the file will be uploaded.
        file_type (str): A string used in the file name to denote the file type (e.g., "raw", "transformed").
        filename (str): The name of the file to be copied and deleted within S3.
        source_path (str): The S3 path where the original file is located before it is copied and deleted.
        target_path_raw (str): The S3 path where the file will be copied to as a "raw" file.
        target_path_transformed (str): The S3 path where the file will be uploaded as a transformed file.

    Returns:
        None: This function does not return any value. It performs the file upload, copy, and deletion on S3.

    Example:
        upload_and_move_file_on_s3(df, 'date_column', 'my-bucket', 'data', 'data.csv', 'source/path/', 'raw/path/', 'transformed/path/')

    Notes:
        Ensure that AWS credentials are configured and that the necessary permissions are granted to access and modify the S3 bucket.
    """
    s3 = boto3.client("s3")

    formatted_date = dataframe[datetime_col].iloc[0].strftime("%Y-%m-%d")
    new_path_with_filename = f"{target_path_transformed}{file_type}_{formatted_date}.csv"

    upload_dataframe_to_s3(dataframe, bucket, new_path_with_filename)
    s3.copy_object(
        Bucket=bucket, 
        CopySource={'Bucket': bucket, 'Key': f"{source_path}{filename}"}, 
        Key=f"{target_path_raw}{filename}")
    
    s3.delete_object(Bucket=bucket, Key=f"{source_path}{filename}")


def lambda_handler(event, context):
    s3 = boto3.client('s3')
    bucket = 'cubix-chicago-taxi-sp'
    raw_weather_folder = 'raw_data/to_processed/weather_data/'
    raw_taxi_trips_folder = 'raw_data/to_processed/taxi_data/'

    target_taxi_trips_folder = 'raw_data/processed/taxi_data/'
    target_weather_folder = 'raw_data/processed/weather_data/'

    transformed_taxi_trips_folder = 'transformed_data/taxi_data/'
    transformed_weather_folder = 'transformed_data/weather/'

    payment_type_master_folder = 'transformed_data/payment_type/'
    company_master_folder = 'transformed_data/company/'

    payment_type_master_file_name = 'payment_type_master.csv'
    company_master_file_name = 'company_master.csv'

    payment_type_master = read_csv_from_s3(bucket, payment_type_master_folder, payment_type_master_file_name)
    company_master = read_csv_from_s3(bucket, company_master_folder, company_master_file_name)

# Taxi data transformation and loading   
    for file in s3.list_objects(Bucket=bucket, Prefix = raw_taxi_trips_folder)['Contents']:
        taxi_trips_key = file['Key']

        if taxi_trips_key.split('/')[-1].strip() != '':
            if taxi_trips_key.split('.')[1] == 'json':
                filename = taxi_trips_key.split('/')[-1]

                taxi_trips_data_json = read_json_from_s3(bucket=bucket, key=taxi_trips_key)

                taxi_trips_data_raw = pd.DataFrame(taxi_trips_data_json)
                taxi_trips_transformed = taxi_trips_transformations(taxi_trips_data_raw)

                company_master_updated = update_master(taxi_trips_transformed, company_master, "company_id", "company")
                payment_type_master_updated = update_master(taxi_trips_transformed, payment_type_master, "payment_type_id", "payment_type")

                taxi_trips = update_taxi_trips_with_master_data(taxi_trips_transformed, payment_type_master_updated, company_master_updated)

                upload_and_move_file_on_s3(dataframe= taxi_trips, datetime_col = "datetime_for_weather", bucket= bucket, file_type= "taxi", filename = filename, source_path = raw_taxi_trips_folder, target_path_raw = target_taxi_trips_folder, target_path_transformed = transformed_taxi_trips_folder)


                upload_master_data_to_s3(bucket, company_master_folder, "company", company_master_updated)
                upload_master_data_to_s3(bucket, payment_type_master_folder, "payment_type", payment_type_master_updated)
                print(f"company_master_updated: {company_master_updated}")
                print(f"payment_type_master_updated: {payment_type_master_updated}")

# Weather data transformation and loading
    for file in s3.list_objects(Bucket=bucket, Prefix = raw_weather_folder)['Contents']:
        weather_key = file['Key']

        if weather_key.split('/')[-1].strip() != '':
            if weather_key.split('.')[1] == 'json':

                filename = weather_key.split('/')[-1]

                weather_data_json = read_json_from_s3(bucket=bucket, key=weather_key)

                weather_data = transform_weather_data(weather_data_json)

                upload_and_move_file_on_s3(dataframe= weather_data, datetime_col= "datetime", bucket= bucket, file_type= "weather", filename = filename, source_path = raw_weather_folder, target_path_raw = target_weather_folder, target_path_transformed= transformed_weather_folder)
