In [None]:
import json
import boto3
import pandas as pd
from io import StringIO
import os

def transform_weather_data(weather_data) -> pd.DataFrame:
    """
    Transforms raw weather data into a pandas DataFrame.

    Parameters
    ----------
    weather_data : dict
        A dictionary containing weather information with a structure that includes 
        'hourly' data for 'time', 'temperature_2m', 'wind_speed_10m', 'rain', and 'precipitation'.

    Returns
    -------
    pd.DataFrame
        A DataFrame with the following columns:
        - 'datetime': Converted to pandas datetime format
        - 'temperature': Temperature at 2 meters
        - 'wind_speed': Wind speed at 10 meters
        - 'rain': Rain amount
        - 'precipitation': Precipitation amount
    """
    weather_data_filtered = {
        'datetime': weather_data['hourly']["time"],
        'tempereature': weather_data['hourly']['temperature_2m'],
        'wind_speed': weather_data['hourly']['wind_speed_10m'],
        'rain': weather_data['hourly']['rain'],
        'precipitation': weather_data['hourly']['precipitation']
    }

    weather_df = pd.DataFrame(weather_data_filtered)
    weather_df['datetime'] = pd.to_datetime(weather_df['datetime'])

  
    return weather_df

def taxi_trips_transformation(taxi_trips: pd.DataFrame) -> pd.DataFrame:
    ''' 
    Transforms the taxi trips DataFrame

    Parameters:
        taxi_trips (pd.DataFrame): A DataFrame containing taxi trip data.

    Returns:
        pd.DataFrame: The transformed DataFrame with the specified columns removed, renamed, 
                      and a new 'datetime_for_weather' column added.
    '''
    if not isinstance(taxi_trips, pd.DataFrame):
        raise ('taxi_trips is not a valid pandas DataFrame')

    taxi_trips.drop(['pickup_census_tract', 'dropoff_census_tract', 'pickup_centroid_location', 'dropoff_centroid_location'], axis=1, inplace=True)
    
    taxi_trips.dropna(inplace=True)
    
    taxi_trips.rename(columns={'pickup_community_area': 'pickup_community_area_id', 'dropoff_community_area': 'dropoff_community_area_id'}, inplace=True)
    
    taxi_trips['datetime_for_weather'] = pd.to_datetime(taxi_trips['trip_start_timestamp']).dt.floor('h')
    
    return taxi_trips

def update_master(taxi_trips: pd.DataFrame, master: pd.DataFrame, id_column: str, value_column: str ) -> pd.DataFrame:
    """Extend the master DataFrame with new items if there are any.

    Parameters
    ----------
    taxi_trips : pd.DataFrame
        DataFrame holding the daily taxi trips.
    master: pd.DataFrame
        DataFrame holding master data.
    id_column: srt
        Id of the...
    value_column: str
        Value of the master DataFrame.

    Returns
    -------
    pd.DataFrame
        The updated payment_type_master data, if new payment types are in the taxi data, they will be loaded to it.
    """

    max_id = master[id_column].max()

    new_value_list = [value for value in taxi_trips[value_column].values if value not in master[value_column].values]
    new_value_df = pd.DataFrame({
        id_column: range(max_id + 1, max_id + len(new_value_list) + 1),
        value_column: new_value_list
    })
    
    updated_master = pd.concat([master, new_value_df], ignore_index=True)

    return updated_master

def read_csv_from_s3(bucket: str, path: str, file_name: str) -> pd.DataFrame:
    """Reads a CSV file from an S3 bucket and returns it as a DataFrame.

    Parameters
    ----------
    bucket : str
        The name of the S3 bucket.
    key : str
        The key (path) of the CSV file in the S3 bucket.
    file_name: str
        The name of the file.

    Returns
    -------
    pd.DataFrame
        The DataFrame containing the data from the CSV file.
    """
    s3 = boto3.client('s3')


    full_path = f'{path}{file_name}'

    object = s3.get_object(Bucket=bucket, Key=full_path)
    object = object['Body'].read().decode('utf-8')
    output_df = pd.read_csv(StringIO(object))
    
    return output_df

def read_json_from_s3(bucket: str, file_key: str) -> pd.DataFrame:
    """
    Reads a JSON file from an S3 bucket and returns its contents as a pandas DataFrame.

    Parameters
        bucket (str): Name of the S3 bucket.
        
        file_key (str): Key (path) to the JSON file in the bucket.

    Returns:
        pd.DataFrame: DataFrame containing the JSON data.
    """
    
    s3 = boto3.client('s3')

    response = s3.get_object(Bucket = bucket, Key = file_key)
    content = response['Body']
    weather_data = json.loads(content.read())
    
    df_weather_data = pd.DataFrame(weather_data)

    return df_weather_data

def update_taxi_trips_with_master_data(taxi_trips: pd.DataFrame, payment_type_master: pd.DataFrame, company_master: pd.DataFrame) -> pd.DataFrame:
    """
    Update the taxi trips DataFrame with information from master data tables.

    Parameters
    ----------
    taxi_trips : pd.DataFrame
        A DataFrame containing taxi trip records, including 'payment_type' and 'company' columns.
    payment_type_master : pd.DataFrame
        Payment type master table.
    company_master : pd.DataFrame
        Company master table.

    Returns
    -------
    pd.DataFrame
        A DataFrame with the original trip data enriched with information from 
        the payment type and company master data.

    """
    taxi_trips_id = taxi_trips.merge(payment_type_master, on= 'payment_type')
    taxi_trips_id = taxi_trips_id.merge(company_master, on= 'company')
    taxi_trips_id.drop(["payment_type", "company"], axis=1, inplace=True)
    return taxi_trips_id

def upload_dataframe_to_s3(bucket: str, dataframe: pd.DataFrame, path: str) -> None:
    """
    Uploads a dataframe to S3 as a CSV file.

    Parameters
    ----------
    bucket : str
        Name of the S3 bucket where we want to store the files.

    dataframe : pd.DataFrame
        The dataframe to be uploaded.

    path : str
        Path within the bucket to upload the files.

    Returns
    -------
    None
    """
    s3 = boto3.client('s3')

    buffer = StringIO()
    dataframe.to_csv(buffer, index=False)
    df_content = buffer.getvalue()
    s3.put_object(Bucket=bucket, Key=path, Body=df_content)

def upload_master_data_to_s3(bucket: str, path: str, file_type: str, dataframe: pd.DataFrame) -> None:
    """
    Uploads master data (payment_type or company) to S3. Copy and uploads the new version.

    Parameters
    ----------
    bucket : str
        The name of the S3 bucket where the file will be stored.

    path : str
        The path within the S3 bucket where the file will be uploaded.

    file_type : str
        The type of master data to upload.

    dataframe : pd.DataFrame
        The DataFrame containing the data to be uploaded.

    Returns
    -------
    None
    """
    s3 = boto3.client('s3')
    
    # Copy the master files
    master_file_path = f"{path}{file_type}_master.csv"
    previous_master_file_path = f"transformed_data/master_table_previous_version/{file_type}_master_previous_version.csv"
    s3.copy_object(
        Bucket=bucket,
        CopySource={"Bucket": bucket, "Key": master_file_path},
        Key=previous_master_file_path
    )
    # Create the new master file
    upload_dataframe_to_s3(bucket = bucket, dataframe=dataframe, path=master_file_path)

def uplod_and_move_file_on_s3(
        dataframe: pd.DataFrame, 
        datetime_col: str, 
        bucket: str, 
        file_type: str, 
        filename: str,
        source_path: str,
        target_path_raw: str,
        target_path_transformed: str
    ):
    """
    Uploads a DataFrame to S3, and moves the original file from the raw path to the transformed path.

    Parameters
    ----------
    dataframe : pd.DataFrame
        The DataFrame to upload to S3.

    datetime_col : str
        Name of the column in the DataFrame containing the datetime values.

    bucket : str
        Name of the S3 bucket.

    file_type : str
        A string to identify the file type.

    filename : str
        The original filename in the raw S3 path.

    source_path : str
        The S3 path (prefix) where the original file is currently stored.

    target_path_raw : str
        The S3 path where the original file will be moved from.

    target_path_transformed : str
        The S3 path where the transformed (new) file will be uploaded.

    Returns
    -------
    None
    """

    s3 = boto3.client("s3")
    
    formatted_date = dataframe[datetime_col].iloc[0].strftime("%Y-%m-%d")
    new_path_with_filename = f"{target_path_transformed}{file_type}_{formatted_date}.csv"
    
    upload_dataframe_to_s3(bucket=bucket, dataframe=dataframe, path=new_path_with_filename)
    
    s3.copy_object(
        Bucket=bucket,
        CopySource={"Bucket": bucket, "Key":f"{source_path}{filename}"},
        Key=f"{target_path_raw}{filename}"
    )
    
    s3.delete_object(Bucket=bucket, Key=f"{source_path}{filename}")




# Main function

def lambda_handler(event, context):

    s3 = boto3.client('s3')

    bucket = 'de-chicago-taxi'
    raw_weather_folder = 'raw_data/to_processed/weather_data/'
    raw_taxi_folder = 'raw_data/to_processed/taxi_data/'
    target_taxi_folder = 'raw_data/processed/taxi_data/'
    target_weather_folder = 'raw_data/processed/weather_data/'
    transformed_taxi_folder = 'transformed_data/taxi_trips/'
    transformed_weather_folder = 'transformed_data/weather/'
    file_key = 'raw_data/to_processed/test_data/weather_raw_2025-02-03.json'
    
    path_payment_type_master = 'transformed_data/payment_type/'
    path_company_master = 'transformed_data/company/'
    payment_type_master_file_name = 'payment_type_master.csv'
    company_master_file_name = 'company_master.csv'
    
    # Master data loading
    company_master = read_csv_from_s3(bucket, path_company_master, company_master_file_name)
    payment_type_master = read_csv_from_s3(bucket, path_payment_type_master, payment_type_master_file_name )
    
    # Read_json_from_s3 function
    json_weather = read_json_from_s3(bucket= bucket, file_key=file_key)
    print(json_weather)

    # Taxi data transformation and loading
    for file in s3.list_objects(Bucket=bucket, Prefix=raw_taxi_folder)['Contents']:
        taxi_key = file['Key']
        
        if taxi_key.split('/')[-1].strip() != '':
            if taxi_key.endswith('.json'):

                file_name = taxi_key.split('/')[-1]

            
            
                response = s3.get_object(Bucket=bucket, Key=taxi_key)
                content = response['Body']
                taxi_data_json = json.loads(content.read())
                print(taxi_key)
                taxi_data_raw_df = pd.DataFrame(taxi_data_json)
                taxi_trips_transformed = taxi_trips_transformation(taxi_data_raw_df)

            
                # Update master data
                company_master_updated = update_master(taxi_trips_transformed, company_master, 'company_id', 'company')
                payment_type_master_updated = update_master(taxi_trips_transformed, payment_type_master, 'payment_type_id', 'payment_type')
                
                # Update taxi trips with master data
                taxi_trips = update_taxi_trips_with_master_data(taxi_trips= taxi_trips_transformed, payment_type_master= payment_type_master_updated, company_master=company_master_updated)
            
                uplod_and_move_file_on_s3(
                    dataframe= taxi_trips, 
                    datetime_col= 'datetime_for_weather', 
                    bucket= bucket, 
                    file_type= 'taxi', 
                    filename= file_name,
                    source_path= raw_taxi_folder ,
                    target_path_raw= target_taxi_folder,
                    target_path_transformed= transformed_taxi_folder
                )

                print('Taxi trips transformed and moved')
                # Upload master data to S3 
                upload_master_data_to_s3(bucket=bucket, path= path_company_master, file_type= 'company', dataframe=company_master_updated)
            
                upload_master_data_to_s3(bucket= bucket, path= path_payment_type_master, file_type= 'payment_type', dataframe= payment_type_master_updated)
            

    # Weather data transformation and loading
    for file in s3.list_objects(Bucket=bucket, Prefix=raw_weather_folder)['Contents']:
        weather_key = file['Key']

        if weather_key.split('/')[-1].strip() != '':
            if weather_key.split('.')[1] == 'json':
                
                file_name = weather_key.split('/')[-1]
                
                response = s3.get_object(Bucket=bucket, Key=weather_key)
                content = response['Body']
                weather_data_json = json.loads(content.read())

                weather_data_df = transform_weather_data(weather_data_json)

                
                uplod_and_move_file_on_s3(
                    dataframe= weather_data_df, 
                    datetime_col= 'datetime', 
                    bucket= bucket, 
                    file_type= 'weather', 
                    filename= file_name,
                    source_path= raw_weather_folder ,
                    target_path_raw= target_weather_folder,
                    target_path_transformed= transformed_weather_folder
                    )

                print('weather data transformed and moved')
        
