In [None]:
import json
import boto3
import pandas as pd
from io import StringIO


def taxi_trips_transformations(taxi_trips: pd.DataFrame) -> pd.DataFrame:
    """ Perform transformation with the taxi data

    Parameters:
    -----------
        taxi_trips : pd.DataFrame
            The dataframe holding the daly taxi trips

    Returns:
    --------
        pd.dataframe
        The cleaned transformed Dataframe holding the taxi trips.
    """

    if not isinstance(taxi_trips, pd.DataFrame):
        raise TypeError("taxi_trips is not a valid pandas datarfame.")

    taxi_trips.drop(["pickup_census_tract", "dropoff_census_tract"], axis=1, inplace=True)
    taxi_trips.drop(["pickup_centroid_location", "dropoff_centroid_location"], axis=1, inplace=True)

    #taxi_trips.info()

    taxi_trips.rename(columns={"pickup_community_area": "pickup_community_area_id",
                            "dropoff_community_area": "dropoff_community_area_id"}, inplace=True)

    taxi_trips["datetime_for_weather"] = pd.to_datetime(taxi_trips["trip_start_timestamp"]).dt.floor("h")

    return taxi_trips
    

def update_taxi_trips_with_master_data(taxi_trips: pd.DataFrame, payment_type_master: pd.DataFrame, company_master: pd.DataFrame) -> pd.DataFrame:
    """Update taxi_trips with the most recenty company_master and payment_type master codes

    Parameters
    ----------
    taxi_trips : pd.DataFrame
        The dataframe with the daly taxi trips.
    payment_type_master : pd.DataFrame
        The payment type master table.
    company_master : pd.DataFrame
        The company master table.

    Returns
    -------
    pd.DataFrame
        Taxi trips data, with only payment_type_id, company_id.
    """
    taxi_trips_id = taxi_trips.merge(payment_type_master, on="payment_type")
    taxi_trips_id = taxi_trips_id.merge(company_master, on="company")
    taxi_trips_id.drop(["payment_type", "company"], axis=1, inplace=True)

    return taxi_trips_id
    

def update_master(taxi_trips: pd.DataFrame, master: pd.DataFrame, id_column: str, value_column: str) -> pd.DataFrame:
    """Extend the master dataframe with new values.

    Parameters:
    -----------
    taxi_trips: pd.DataFrame
        Dataframe holding the dayly taxi trips
    master: pd.DataFrame
        Dataframe holding the master data
    id_column: str
        The Id column of the master dataframe.
    value_column: str
        Name of the column in master_df containing the values.

    Returns:
    --------
    pd.DataFrame
        The updated master data, if new values are in the taxi data.
    
    """
    max_id = master[id_column].max()
    
    new_values_list = [value for value in taxi_trips[value_column].values if value not in master[value_column].values]
    new_values_df = pd.DataFrame({
        id_column: range(max_id + 1, max_id + len(new_values_list) + 1),
        value_column: new_values_list
    })

    updated_master = pd.concat([master, new_values_df], ignore_index=True)

    return updated_master
    

def transform_weather_data(weather_data) -> pd.DataFrame:
    """Make transformations on the daily weather api response.

    Parameters
    ----------
    weather_data : JSON
        The daily weather data from the Open Meteo API.

    Returns
    -------
    pd.DataFrame
        A dataframe presentation of the data
    """
    weather_data_filtered = {
        "datetime": weather_data["hourly"]["time"],
        "temperature": weather_data["hourly"]["temperature_2m"],
        "wind_speed": weather_data["hourly"]["wind_speed_10m"],
        "rain": weather_data["hourly"]["rain"],
        "precipitation": weather_data["hourly"]["precipitation"]
    }

    weather_df = pd.DataFrame(weather_data_filtered)
    weather_df["datetime"] = pd.to_datetime(weather_df["datetime"])
    # weather_df.head()

    return weather_df
    
    
def read_csv_from_s3(bucket: str, path: str, filename: str) -> pd.DataFrame:
    """Downloads a csv file from an S3 bucket
    
    Parameters:
    -----------
    bucket : str
        The bucket where the files at.
    path : str
        The folders to the files.
    filename : str
        Name of the file.
        
    Returns:
    --------
    pd.DataFrame
        A Dataframe of the downloaded file.
    """
    
    s3 = boto3.client("s3")

    full_path = f"{path}{filename}"
    
    object = s3.get_object(Bucket=bucket, Key=full_path)
    object = object["Body"].read().decode("utf-8")
    output_df = pd.read_csv(StringIO(object))

    return output_df
    

def upload_dataframe_to_s3(dataframe: pd.DataFrame, bucket: str, path: str):
    """
    Uploads a dataframe to the specified S3 path.
    
    Parameters:
    -----------
    dataframe : pd.DataFrame
        The datafrane to be upkoaded.
        
    bucket : str
        Name of the s3 bucket.
        
    path : str
         Paht within the bucket upload the files

    Returns:
    --------
    None
    
    """
    
    s3 = boto3.client("s3")
    buffer = StringIO()
    dataframe.to_csv(buffer, index=False)
    df_content = buffer.getvalue()
    s3.put_object(Bucket=bucket, Key=path, Body=df_content)
    

def upload_master_data_to_s3(bucket: str, path: str, file_type: str, dataframe: pd.DataFrame):
    """
    Uploads master data to S3, copies the previous version.
    
    Parameters:
    -----------
    
    bucket : str
        Name of the s3 bucket
    path : str
        Paht within the bucket upload the files
    file_type : str
        Either "payment_type" or "company".
    dataframe : pd.DataFrame
        The datafrane to be upkoaded.
    
    Returns:
    --------
    None
    
    """
    
    s3 = boto3.client("s3")
    
    master_file_path = f"{path}{file_type}_master.csv"
    previous_master_file_path = f"transformed_data/master_table_previous_version/{file_type}_master_previous_version.csv"
    
    s3.copy_object(
        Bucket=bucket,
        CopySource={"Bucket": bucket, "Key":master_file_path},
        Key=previous_master_file_path
        )
        
    upload_dataframe_to_s3(bucket=bucket, dataframe=dataframe, path=master_file_path)    
    

def upload_and_move_file_on_s3(
        dataframe: str, 
        datetime_col: str, 
        bucket: str, 
        file_type: str, 
        filename: str,
        source_path: str,
        target_path_raw: str,
        target_path_transformed: str
    ):

    """ Uploads a dataframe to s3 and then moves a file from the base folder.
    
    Parameters:
    ----------
    bucket : str
        Name of the bucket
    file_type : str
        weather os trips
    source_path : str
        Source path the bucket
    target_path_transformed : str
        
    
    Returns:
    --------
        None
    """
    
    s3 = boto3.client("s3")

    formatted_date = dataframe[datetime_col].iloc[0].strftime("%Y-%m-%d")
    new_pass_with_filename = f"{target_path_transformed}{file_type}_{formatted_date}.csv"

    upload_dataframe_to_s3(bucket=bucket, dataframe=dataframe, path=new_pass_with_filename)    
    
    s3.copy_object(
        Bucket=bucket,
        CopySource={"Bucket": bucket, "Key":f"{source_path}{filename}"},
        Key=f"{target_path_raw}{filename}"
        )
        
    s3.delete_object(Bucket=bucket, Key=f"{source_path}{filename}")


def lambda_handler(event, context):
    s3 = boto3.client("s3")
    bucket = "cubix-chicago-taxi-goga"
    
    raw_weather_folder = "raw_data/to_processed/weather_data/"
    raw_taxi_trips_folder = "raw_data/to_processed/taxi_data/"    
    
    target_taxi_trips_folder = "raw_data/processed/taxi_data/"
    target_weather_folder = "raw_data/processed/weather_data/"
    
    transformed_taxi_trips_folder = "transformed_data/taxi_trips/"
    transformed_weather_folder = "transformed_data/weather/"
    
    payment_type_master_folder = "transformed_data/payment_type/"
    company_type_master_folder = "transformed_data/company/"
    
    payment_type_master_filename = "payment_type_master.csv"
    company_master_filename = "company_master.csv"
    
    payment_type_master = read_csv_from_s3(bucket=bucket, path=payment_type_master_folder, filename=payment_type_master_filename)    
    company_master = read_csv_from_s3(bucket=bucket, path=company_type_master_folder, filename=company_master_filename)    
    

    # Taxi data transformation and loading
    for file in  s3.list_objects(Bucket=bucket, Prefix=raw_taxi_trips_folder)['Contents']:
        taxi_trip_key = file["Key"]

        if taxi_trip_key.split("/")[-1].strip() != "":
            if taxi_trip_key.split(".")[1] == "json":
                
                filename = taxi_trip_key.split("/")[-1]

                response = s3.get_object(Bucket=bucket, Key=taxi_trip_key)
                content = response["Body"]
                taxi_trip_data_json = json.loads(content.read())
                
                taxi_trip_data_raw = pd.DataFrame(taxi_trip_data_json)
                taxi_trips_transformed = taxi_trips_transformations(taxi_trip_data_raw)

                company_master_updated = update_master(taxi_trips_transformed, company_master, "company_id", "company")                
                payment_type_master_updated = update_master(taxi_trips_transformed, payment_type_master, "payment_type_id", "payment_type")                
                
                taxi_trips = update_taxi_trips_with_master_data(taxi_trips_transformed, payment_type_master_updated, company_master_updated)
                
                upload_and_move_file_on_s3(
                    dataframe=taxi_trips, 
                    datetime_col="datetime_for_weather", 
                    bucket=bucket, 
                    file_type="taxi", 
                    filename=filename,
                    source_path=raw_taxi_trips_folder,
                    target_path_raw=target_taxi_trips_folder,
                    target_path_transformed=transformed_taxi_trips_folder
                )
                
                print("Taxi trips is uploaded ans moved.")

    
                upload_master_data_to_s3(bucket=bucket, path=payment_type_master_folder, file_type="payment_type", dataframe=payment_type_master_updated)        
                print("Payment_type_master has been updated.")
                upload_master_data_to_s3(bucket=bucket, path=company_type_master_folder, file_type="company", dataframe=company_master_updated)        
                print("Company_master has been updated.")



    # Weahter data transformation and loading
    for file in  s3.list_objects(Bucket=bucket, Prefix=raw_weather_folder)['Contents']:
        weather_key = file["Key"]
        
        if weather_key.split("/")[-1].strip() != "":
            if weather_key.split(".")[1] == "json":
                
                filename = weather_key.split("/")[-1]

                response = s3.get_object(Bucket=bucket, Key=weather_key)
                content = response["Body"]
                weather_data_json = json.loads(content.read())
                
                weather_data = transform_weather_data(weather_data_json)
                
                upload_and_move_file_on_s3(
                    dataframe=weather_data, 
                    datetime_col="datetime", 
                    bucket=bucket, 
                    file_type="weather", 
                    filename=filename,
                    source_path=raw_weather_folder,
                    target_path_raw=target_weather_folder,
                    target_path_transformed=transformed_weather_folder
                )
                
                print("Weather is uploaded ans moved.")
                
