In [None]:
#Data extract
import requests
from dateutil.relativedelta import relativedelta
from datetime import datetime as dt
import boto3 #s3 funkciói ebben vannak
import json
from typing import Dict


#1. egy T-2months taxi adat
#2. egy T-2months időjárás adat
#3. feltöltés az s3-ra (raw-data/to-processed/weather-data and raw-data/to-processed/taxi-data)
#4. code refactor
#5. create trigger

def get_taxi_data(start_date: str) -> Dict:
    """
    Retrieves taxi data for a specified start date from the City of Chicago data portal.

    Args:
        start_date (str): The start date in the format 'YYYY-MM-DD'. The function retrieves
            taxi data for the specified date.

    Returns:
        Dict: A dictionary containing the retrieved taxi data. The data is retrieved from
            the City of Chicago data portal based on the provided start date. Each entry
            in the dictionary represents a taxi trip and includes various details such as
            trip start timestamp, trip end timestamp, trip duration, pickup and dropoff
            locations, fare details, etc.
    """
    
    url_new = f"https://data.cityofchicago.org/resource/ajtu-isnz.json?$where=trip_start_timestamp >= '{start_date}T00:00:00' AND trip_start_timestamp <= '{start_date}T23:59:59'&$limit=213000000"
    response_taxi = requests.get(url_new)
    taxi_data = response_taxi.json()
    return taxi_data
    
def get_weather_data(weather_start_date: str) -> Dict:
    """
    Retrieves weather data for a specified start date from the Open-Meteo archive API.

    Args:
        weather_start_date (str): The start date in the format 'YYYY-MM-DD'. The function retrieves
            weather data for the specified date.

    Returns:
        Dict: A dictionary containing the retrieved weather data. The data is retrieved from
            the Open-Meteo archive API based on the provided start date. The dictionary includes
            various weather parameters such as temperature at 2 meters above ground level,
            wind speed at 10 meters above ground level, precipitation, and rain.
    """
    weather_url = f"https://archive-api.open-meteo.com/v1/era5"
    params = {
        "latitude" : 41.85,
        "longitude" : -87.65,
        "start_date" : weather_start_date,
        "end_date" : weather_start_date,
        "hourly" : "temperature_2m,wind_speed_10m,precipitation,rain"
    }
    response = requests.get(weather_url, params=params)
    weather_data = response.json()
    return weather_data

def upload_data(bucket_name:str, bucket_key: str, bucket_data:Dict) -> None:
    client = boto3.client('s3')
    client.put_object(
        Bucket = bucket_name,
        Key = bucket_key,
        Body = json.dumps(bucket_data)
        )

def lambda_handler(event, context):
    
    startDate = (dt.now() - relativedelta(months=2)).strftime("%Y-%m-%d")
    
    weather_bucket_key = f'raw_data/to_processed/weather_data/weather_raw{startDate}.json'
    taxi_bucket_key = f"raw_data/to_processed/taxi_data/taxi_raw{startDate}.json"
    bucket_name = 'chicago-taxi'
    
    taxi_data_api_call = get_taxi_data(startDate)
    
    weather_data_api_call = get_weather_data(startDate)
    
    upload_data(bucket_name, taxi_bucket_key, taxi_data_api_call)
    upload_data(bucket_name, weather_bucket_key, weather_data_api_call)
    


In [None]:
#Transformation
import json
from io import StringIO
import boto3
import pandas as pd #A layersben hozzá kell adni!!

def transform_weather_data(weather_data: json) -> pd.DataFrame:
    """Transform weather data from JSON format to a pandas DataFrame.
    
    Parameters:
    weather_data (json): JSON data containing weather information.
    
    Returns:
    pd.DataFrame: Transformed weather data in a DataFrame format.
    """
    
    weather_filtered = {
        'datetime': weather_data['hourly']['time'],
        'temperature' : weather_data['hourly']['temperature_2m'],
        'wind' : weather_data['hourly']['wind_speed_10m'],
        'precipitation' : weather_data['hourly']['precipitation'],
        'rain' : weather_data['hourly']['rain']
    }
    weather_df = pd.DataFrame(weather_filtered)
    weather_df['datetime'] = pd.to_datetime(weather_df['datetime'])
    return weather_df

def taxi_trips_transformations(taxi_trips: pd.DataFrame) -> pd.DataFrame:
    """
    Transformations for taxi trips DataFrame.

    Parameters:
    - taxi_trips (pd.DataFrame): DataFrame containing taxi trips data.

    Returns:
    - pd.DataFrame: Transformed DataFrame with specified columns dropped, NaN values removed,
      columns renamed, and datetime rounded to the nearest hour for weather matching.
    """
    if not isinstance(taxi_trips,pd.DataFrame):
      raise TypeError('taxi_trips is not a valid data frame')
    
    taxi_trips.drop(['pickup_census_tract', 'dropoff_census_tract','pickup_centroid_location', 'dropoff_centroid_location'], axis=1, inplace=True)
    taxi_trips.dropna(inplace=True)
    taxi_trips.rename(columns={'pickup_community_area' : 'pickup_community_area_id',
                            'dropoff_community_area' : 'dropoff_community_area_id'
                            },inplace=True)
    taxi_trips['datetime_for_weather'] = pd.to_datetime(taxi_trips['trip_start_timestamp']).dt.floor('h')
    return taxi_trips
    
def update_master(taxi_trips : pd.DataFrame, master: pd.DataFrame, id_col_name:str, value_col_name:str) -> pd.DataFrame:
    """Update the master dataframe with new values from the taxi trips dataframe.
    
    Args:
        taxi_trips (pd.DataFrame): The dataframe containing new values.
        master (pd.DataFrame): The master dataframe to be updated.
        id_col_name (str): The name of the column in the master dataframe that contains IDs.
        value_col_name (str): The name of the column in the master dataframe that contains values.
    
    Returns:
        pd.DataFrame: The updated master dataframe with new values added.
    """
    
    max_id = master[id_col_name].max()

    new_values = [value for value in taxi_trips[value_col_name].values if value not in master[value_col_name].values]
    
    new_data_df = pd.DataFrame({
        id_col_name : range(max_id + 1, max_id + len(new_values) + 1),
        value_col_name : new_values
    })

    return pd.concat([master, new_data_df], ignore_index=True)
    
    

def read_csv_from_s3(bucket: str, path: str, filename: str) -> pd.DataFrame:
    """
    Reads a CSV file from an Amazon S3 bucket into a Pandas DataFrame.

    Parameters:
    - bucket (str): The name of the Amazon S3 bucket.
    - path (str): The path within the bucket where the CSV file is located.
    - filename (str): The name of the CSV file to be read.

    Returns:
    - pd.DataFrame: A Pandas DataFrame containing the data read from the CSV file.

    Note:
    - Requires the boto3 library to be installed.
    - Assumes the CSV file is UTF-8 encoded.
    """
    
    s3 = boto3.client('s3')
    full_path = f'{path}{filename}'
    
    object = s3.get_object(Bucket=bucket, Key=full_path)
    object = object['Body'].read().decode('utf-8')
    
    return pd.read_csv(StringIO(object))

def update_taxi_trips_with_master_data(taxi_trips: pd.DataFrame, payment_type_master: pd.DataFrame, company_master: pd.DataFrame) -> pd.DataFrame:
    """Update taxi trips with master data.
    
    Parameters:
    taxi_trips (pd.DataFrame): DataFrame containing taxi trips data.
    payment_type_master (pd.DataFrame): DataFrame containing master data for payment types.
    company_master (pd.DataFrame): DataFrame containing master data for companies.
    
    Returns:
    pd.DataFrame: Updated DataFrame with merged master data.
    """
        
    taxi_trips_id = taxi_trips.merge(payment_type_master, on='payment_type')
    taxi_trips_id = taxi_trips_id.merge(company_master, on='company')
    taxi_trips_id.drop(['payment_type', 'company'], axis=1, inplace=True)
    return taxi_trips_id
    
def upload_master_data_to_s3(bucket: str, path: str, file_type: str, dataframe: pd.DataFrame):
    """
    Uploads a DataFrame to an S3 bucket as a master data file, while preserving the previous version of the master file.

    This function performs the following operations:
    1. Copies the existing master file to a backup location with a specified naming convention.
    2. Converts the provided DataFrame to CSV format.
    3. Uploads the new master file to the S3 bucket.

    Parameters:
    bucket (str): The name of the S3 bucket.
    path (str): The path in the S3 bucket where the master file will be stored.
    file_type (str): A string indicating the type of file, which is used to construct the master file's name.
    dataframe (pd.DataFrame): The pandas DataFrame to be uploaded as the master file.

    Returns:
    None

    Raises:
    botocore.exceptions.BotoCoreError: If an error is returned by the S3 service.
    botocore.exceptions.ClientError: If a client error is returned by the S3 service.
    
    Example:
    >>> import pandas as pd
    >>> data = {'col1': [1, 2], 'col2': [3, 4]}
    >>> df = pd.DataFrame(data)
    >>> upload_master_data_to_s3('my-bucket', 'data/', 'example', df)
    """
    
    s3 = boto3.client("s3")
    
    master_file_path = f'{path}{file_type}_master.csv'
    previous_master_file_path = f'transformed_data/master_table_previous_version/{file_type}_master_previous_version.csv'
    
    s3.copy_object(
            Bucket = bucket,
            CopySource = {"Bucket": bucket, "Key": master_file_path},
            Key = previous_master_file_path
        )

    buffer = StringIO()
    dataframe.to_csv(buffer, index=False)
    df_content = buffer.getvalue()
    s3.put_object(
        Bucket=bucket,
        Key=master_file_path,
        Body=df_content
        )


def upload_and_move_file_on_s3(dataframe: pd.DataFrame, datetime_col: str, bucket: str, target_path_transformed: str, file_type: str, source_path:str, target_path_raw: str, filename: str):
    """
    Uploads a pandas DataFrame to a specified S3 bucket location, then moves the source file to a different location within the same bucket.

    This function performs the following operations:
    1. Formats the `datetime_col` of the DataFrame to create a timestamped filename.
    2. Converts the DataFrame to CSV format and uploads it to the specified target path.
    3. Copies the original file from the source path to the target raw path.
    4. Deletes the original file from the source path after copying.

    Parameters:
    dataframe (pd.DataFrame): The pandas DataFrame to be uploaded.
    datetime_col (str): The column name in the DataFrame containing datetime values used for timestamping the filename.
    bucket (str): The name of the S3 bucket.
    target_path_transformed (str): The S3 path where the transformed CSV file will be uploaded.
    file_type (str): The type of file, used to construct the filename.
    source_path (str): The S3 path of the original file.
    target_path_raw (str): The S3 path where the original file will be moved.
    filename (str): The name of the original file.

    Returns:
    None

    Raises:
    botocore.exceptions.BotoCoreError: If an error is returned by the S3 service.
    botocore.exceptions.ClientError: If a client error is returned by the S3 service.

    Example:
    >>> import pandas as pd
    >>> from datetime import datetime
    >>> data = {'datetime_col': [datetime(2023, 9, 6)], 'col1': [1], 'col2': [2]}
    >>> df = pd.DataFrame(data)
    >>> upload_and_move_file_on_s3(df, 'datetime_col', 'my-bucket', 'transformed_data/taxi_trips/', 'taxi_trips', 'source_path/', 'target_path_raw/', 'source_file.csv')
    """
    
    s3 = boto3.client('s3')
    formatted_date = dataframe[datetime_col].iloc[0].strftime('%Y-%m-%d')
    new_path_with_filename = f'{target_path_transformed}{file_type}_{formatted_date}.csv'
    #A filenév a köv. lesz:
    # target_path_transformed =  transformed_data/taxi_trips/
    # file_type = taxi_trips
    # transformed_data/taxi_trips/taxi_trips_2023-09-06.csv
    
    buffer = StringIO()
    dataframe.to_csv(buffer, index=False)
    df_content = buffer.getvalue()
    s3.put_object(
        Bucket=bucket,
        Key=new_path_with_filename,
        Body=df_content
        )
    
    s3.copy_object(
        Bucket = bucket,
        CopySource = {"Bucket": bucket, "Key": f'{source_path}{filename}'},
        Key = f'{target_path_raw}{filename}'
    )
    
    s3.delete_object(Bucket=bucket, Key = f'{source_path}{filename}')

def lambda_handler(event, context):
    s3 = boto3.client("s3")
    bucket = 'chicago-taxi'
    
    raw_weather_folder = 'raw_data/to_processed/weather_data/'
    raw_taxi_trips_folder = 'raw_data/to_processed/taxi_data/'
    
    target_taxi_trips_folder ='raw_data/processed/taxi_data/'
    target_weather_folder = 'raw_data/processed/weather_data/'
    
    payment_type_master_folder = 'transformed_data/payment_type/'
    payment_type_master_file = 'payment_type_master.csv'
    
    company_master_folder = 'transformed_data/company/'
    company_master_file = 'company_master.csv'

    transformed_taxi_trips_folder = 'transformed_data/taxi_trips/'
    transformed_weather_folder = 'transformed_data/weather/'
    

    payment_type_master = read_csv_from_s3(bucket=bucket, path=payment_type_master_folder, filename=payment_type_master_file)
    company_master = read_csv_from_s3(bucket=bucket, path=company_master_folder, filename=company_master_file)
    
    file_name_for_testing_taxi_trips = 'taxi_raw2024-03-15.json'
    file_name_for_testing_weather = 'weather_raw2024-03-15.json'
    
    #Taxi data transformation and loading
    for file in s3.list_objects(Bucket = bucket, Prefix = raw_taxi_trips_folder)['Contents']:
        taxi_trip_key = file['Key']
        if taxi_trip_key.split('/')[-1].strip() != '':
            if taxi_trip_key.split('.')[1] == 'json':
                filename = taxi_trip_key.split('/')[-1].strip()
                
                response = s3.get_object(Bucket = bucket, Key = taxi_trip_key)
                content = response['Body']
                taxi_trips_data_json = json.loads(content.read())
                
                taxi_trips_data_raw = pd.DataFrame(taxi_trips_data_json)
                taxi_trips_transformed = taxi_trips_transformations(taxi_trips_data_raw)
                
                #company_master_updated = update_master(taxi_trips_transformed, company_master, 'company_id', 'company')
                payment_type_master_updated = update_master(taxi_trips_transformed, payment_type_master,'payment_type_id', 'payment_type')
                
                taxi_trips = update_taxi_trips_with_master_data(taxi_trips_transformed, payment_type_master_updated, company_master_updated)
                
                upload_and_move_file_on_s3(
                    dataframe=taxi_trips, 
                    datetime_col='datetime_for_weather', 
                    bucket=bucket, 
                    target_path_transformed=transformed_taxi_trips_folder, 
                    file_type='taxi', 
                    source_path=raw_taxi_trips_folder, 
                    target_path_raw=target_taxi_trips_folder, 
                    filename=filename)
                print('Taxi trips is uploaded.')
                
                upload_master_data_to_s3(bucket=bucket, path=payment_type_master_folder, file_type='payment_type', dataframe=payment_type_master_updated)
                print('Payment has been updated')
                upload_master_data_to_s3(bucket=bucket, path=company_master_folder, file_type='company', dataframe=company_master_updated)
                print('company updated')
    
    

    #Weather data trandform and loading
    for file in s3.list_objects(Bucket = bucket, Prefix = raw_weather_folder)['Contents']:
        weather_key = file['Key']
        
        if weather_key.split('/')[-1].strip() != '':
            if weather_key.split('.')[1] == 'json':
                filename = weather_key.split('/')[-1].strip()
                
                response = s3.get_object(Bucket = bucket, Key = weather_key)
                content = response['Body']
                weather_data_json = json.loads(content.read())
                
                weather_data = transform_weather_data(weather_data_json)
                
                upload_and_move_file_on_s3(
                    dataframe=weather_data, 
                    datetime_col='datetime', 
                    bucket=bucket, 
                    target_path_transformed=transformed_weather_folder, 
                    file_type='weather', 
                    source_path=raw_weather_folder, 
                    target_path_raw=target_weather_folder, 
                    filename=filename)
                print('Taxi trips is uploaded.')
                
                