In [99]:
from datetime import datetime
from dateutil.relativedelta import relativedelta
import json
import os

import pandas as pd
pd.set_option("display.max_columns", 30)            # mutassa az összes értéket ne rakjon ... okat
import requests

In [69]:
""" 
1. get the data from s3 - DONE
2. weather data transformations - DONE
3. taxi trips transformations - DONE
4. update payment_type_master - DONE
5. update company_master - DONE
6. update taxi_trips with company and payment_type ids
(replace the string values with ids from the latest master tables) - DONE
7. upload weather data to s3 - DONE
8. upload taxi data to s3 - DONE
9. upload the newest payment_type_master and company_master - DONE
"""


' \n1. get the data from s3\n2. weather data transformations\n3. taxi trips transformations - DONE\n4. update payment_type_master - DONE\n5. update company_master - DONE\n6. update taxi_trips with company and payment_type ids\n(replace the string values with ids from the latest master tables) - DONE\n7. upload weather data to s3\n8. upload taxi data to s3\n9. upload the newest payment_type_master and company_master\n'

### taxi_trips transformation codes

In [70]:
current_datetime = datetime.now() - relativedelta(months = 2)     # 2 honappal ezelőtti adaktokat kérdezek le
formatted_datetime = current_datetime.strftime("%Y-%m-%d")

url = (
    f"https://data.cityofchicago.org/resource/ajtu-isnz.json?"
    f"$where=trip_start_timestamp >= '{formatted_datetime}T00:00:00' "
    f"AND trip_start_timestamp <= '{formatted_datetime}T23:59:59'&$limit=30000"
)
response = requests.get(url)

data = response.json()

In [71]:
taxi_trips = pd.DataFrame(data)

In [72]:
# kitöröljük a droppal a szükségetelen oszlopokat, axis 1 oszlop menten ! 
taxi_trips.drop(["pickup_census_tract", "dropoff_census_tract"], axis = 1, inplace = True)
taxi_trips.drop(["pickup_centroid_location", "dropoff_centroid_location"], axis = 1, inplace = True)

taxi_trips.dropna(inplace = True)

taxi_trips.rename(columns = {"pickup_community_area": "pickup_community_area_id",
                            "dropoff_community_area": "dropoff_community_area_id"}, inplace = True)

taxi_trips["datetime_for_weather"] = pd.to_datetime(taxi_trips["trip_start_timestamp"]).dt.floor("H")


  taxi_trips["datetime_for_weather"] = pd.to_datetime(taxi_trips["trip_start_timestamp"]).dt.floor("H")


In [73]:
taxi_trips.head()

Unnamed: 0,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_community_area_id,dropoff_community_area_id,fare,tips,tolls,extras,trip_total,payment_type,company,pickup_centroid_latitude,pickup_centroid_longitude,dropoff_centroid_latitude,dropoff_centroid_longitude,datetime_for_weather
0,53e7ac3cefa012a32335bf6551271746fa0bd986,f962126062d38e2ef0fa177812eba73ccd30513c1c6cfa...,2024-06-15T23:45:00.000,2024-06-15T23:45:00.000,656,1.42,6,6,8.0,2.0,0,2,12.5,Credit Card,City Service,41.944226601,-87.655998182,41.944226601,-87.655998182,2024-06-15 23:00:00
1,55bb9b3e555901affa0ba1c8cff3c47815e55de8,39c5b1afbd9856f33218efea0a669e7c7f8c186197b83e...,2024-06-15T23:45:00.000,2024-06-15T23:45:00.000,523,1.56,28,32,7.75,0.82,0,0,9.07,Mobile,Flash Cab,41.88528132,-87.6572332,41.884987192,-87.620992913,2024-06-15 23:00:00
2,56889b170bfba3ecdb75d54e013d9d741d3f159a,9946e42a2123fb60e2fa9c39ce15b11a5cfd39cd899460...,2024-06-15T23:45:00.000,2024-06-15T23:45:00.000,846,4.37,8,6,15.52,0.0,0,0,15.52,Mobile,City Service,41.899602111,-87.633308037,41.944226601,-87.655998182,2024-06-15 23:00:00
5,59eef492824b7fe529db0fbd8e1dbe3f93f479eb,8f4cde3f6d55c38289233d1679f415c644227354939888...,2024-06-15T23:45:00.000,2024-06-16T00:00:00.000,212,0.65,6,6,10.0,2.87,0,0,12.87,Mobile,Chicago Independents,41.944226601,-87.655998182,41.944226601,-87.655998182,2024-06-15 23:00:00
6,5b646c51a9d4cba367ca3b41fcba23522b945c33,3f2a24e813695ce63232d6cccfc688ad3875df09b609fa...,2024-06-15T23:45:00.000,2024-06-15T23:45:00.000,360,1.0,8,8,6.0,2.0,0,0,8.0,Credit Card,Chicago City Taxi Association,41.899602111,-87.633308037,41.899602111,-87.633308037,2024-06-15 23:00:00


#### taxi_trips transformation function

In [74]:
def taxi_trips_transformation(taxi_trips: pd.DataFrame) -> pd.DataFrame:
    """ Perform transformations with the taxi data.

    Parameters
    ----------
    taxi_trips : pd.DataFrame
        The DataFrame holding the daily taxi trips

    Returns
    -------
    pd.DataFrame
        The cleaned, transformed DataFrame holding the daily taxi trips.
    """

    if not isinstance(taxi_trips, pd.DataFrame):
        raise TypeError("taxi_trips is not a valid pandas DataFrame.")

    taxi_trips.drop(["pickup_census_tract", "dropoff_census_tract", 
                     "pickup_centroid_location", "dropoff_centroid_location"], axis = 1, inplace = True)

    taxi_trips.dropna(inplace = True)

    taxi_trips.rename(columns = {"pickup_community_area": "pickup_community_area_id",
                                "dropoff_community_area": "dropoff_community_area_id"}, inplace = True)

    taxi_trips["datetime_for_weather"] = pd.to_datetime(taxi_trips["trip_start_timestamp"]).dt.floor("H")
    return taxi_trips

In [75]:
#taxi_trips_transformation("string")

### company update codes

In [76]:
company_master = taxi_trips["company"].drop_duplicates().reset_index(drop = True)

company_master = pd.DataFrame(
    {
        "company_id": range(1, len(company_master) +1),
        "company": company_master
    }

)
company_master.tail()  

Unnamed: 0,company_id,company
27,28,6574 - Babylon Express Inc.
28,29,Patriot Taxi Dba Peace Taxi Associat
29,30,Petani Cab Corp
30,31,3556 - 36214 RC Andrews Cab
31,32,5167 - 71969 5167 Taxi Inc


In [77]:
new_company_data = [
        {"company": "Koam Taxi Association"},
        {"company": "X"},
        {"company": "Y"}
]

new_company_mapping = pd.DataFrame(new_company_data)

new_company_mapping

Unnamed: 0,company
0,Koam Taxi Association
1,X
2,Y


In [78]:
company_max_id = company_master["company_id"].max()


company_max_id

np.int64(32)

In [79]:
new_companies_list = []

# values kell hogy az értékeken menjen végig 
for company in new_company_mapping["company"].values:
    if company not in company_master["company"].values:
        new_companies_list.append(company)

# one line, egy sorban tehatá v ez vagy felső !
new_companies_list = [company for company in new_company_mapping["company"].values if company not in company_master["company"].values]

new_companies_list

['X', 'Y']

In [80]:
new_companies_df = pd.DataFrame({
    "company_id": range(company_max_id +1, company_max_id + len(new_companies_list) +1),
    "company": new_companies_list
})

new_companies_df

Unnamed: 0,company_id,company
0,33,X
1,34,Y


In [81]:
updated_company_master = pd.concat([company_master, new_companies_df], ignore_index = True)

updated_company_master.tail()

Unnamed: 0,company_id,company
29,30,Petani Cab Corp
30,31,3556 - 36214 RC Andrews Cab
31,32,5167 - 71969 5167 Taxi Inc
32,33,X
33,34,Y


In [82]:
def update_company_master(taxi_trips: pd.DataFrame, company_master: pd.DataFrame) -> pd.DataFrame:
    """Extend the new company master with new companies if there are new companies.

    Parameters
    ----------
    taxi_trips : pd.DataFrame
        DataFrame holding the daily taxi trips.
    company_master : pd.DataFrame
        DataFrame holding the company_master data.

    Returns
    -------
    pd.DataFrame
        The updated company_master data, if new companies are in the taxi data, they will be loaded to it.
    """

    company_max_id = company_master["company_id"].max()

    new_companies_list = [company for company in taxi_trips["company"].values if company not in company_master["company"].values]

    new_companies_df = pd.DataFrame({
    "company_id": range(company_max_id +1, company_max_id + len(new_companies_list) +1),
    "company": new_companies_list
    })

    new_companies_df

    updated_company_master = pd.concat([company_master, new_companies_df], ignore_index = True)

    return updated_company_master


In [83]:
taxi_trips_company_only = pd.DataFrame({
    "company_id" : [1, 2, 3],
    "company" : ["Koam Taxi Association", "X", "Y"]
})

taxi_trips_company_only

Unnamed: 0,company_id,company
0,1,Koam Taxi Association
1,2,X
2,3,Y


In [84]:
updated_company_master = update_company_master(taxi_trips = taxi_trips_company_only, 
                                               company_master = company_master )

In [85]:
updated_company_master.tail()

Unnamed: 0,company_id,company
29,30,Petani Cab Corp
30,31,3556 - 36214 RC Andrews Cab
31,32,5167 - 71969 5167 Taxi Inc
32,33,X
33,34,Y


### payment_type_master codes

In [86]:
payment_type_master = taxi_trips["payment_type"].drop_duplicates().reset_index(drop = True)

payment_type_master = pd.DataFrame(
    {
        "payment_type_id": range(1, len(payment_type_master) +1),
        "payment_type": payment_type_master
    }
)

taxi_trips_payment_type_only = pd.DataFrame({
    "payment_type_id" : [1, 2, 3],
    "payment_type" : ["Credit Card", "X", "Y"]
})

taxi_trips_payment_type_only

Unnamed: 0,payment_type_id,payment_type
0,1,Credit Card
1,2,X
2,3,Y


In [87]:
def update_payment_type_master(taxi_trips: pd.DataFrame, payment_type_master: pd.DataFrame) -> pd.DataFrame:
    """Extend the new payment type master with new payment types if there are new payment types.

    Parameters
    ----------
    taxi_trips : pd.DataFrame
        DataFrame holding the daily taxi trips.
    payment_type_master : pd.DataFrame
        DataFrame holding the payment_type_master data.

    Returns
    -------
    pd.DataFrame
        The updated payment_type_master data, if new payment types are in the taxi data, they will be 
        loaded to it.
    """

    payment_type_max_id = payment_type_master["payment_type_id"].max()

    new_payment_types_list = [payment_type for payment_type in taxi_trips["payment_type"].values 
                              if payment_type not in payment_type_master["payment_type"].values]

    new_payment_type_df = pd.DataFrame({
    "payment_type_id": range(payment_type_max_id +1, payment_type_max_id + len(new_payment_types_list) +1),
    "payment_type": new_payment_types_list
    })


    updated_payment_type_master = pd.concat([payment_type_master, new_payment_type_df], ignore_index = True)

    return updated_payment_type_master

In [88]:
updated_payment_type_master = update_payment_type_master(taxi_trips = taxi_trips_payment_type_only, 
                                               payment_type_master = payment_type_master)

In [89]:
updated_payment_type_master

Unnamed: 0,payment_type_id,payment_type
0,1,Credit Card
1,2,Mobile
2,3,Unknown
3,4,Cash
4,5,Prcard
5,6,Dispute
6,7,No Charge
7,8,X
8,9,Y


### Creating a generic update master table function

In [90]:
def update_master(taxi_trips: pd.DataFrame, master: pd.DataFrame, id_column: str, 
                  value_column: str ) -> pd.DataFrame:
    """Extend the master DataFrame with new values if there are any.

    Parameters
    ----------
    taxi_trips : pd.DataFrame
        DataFrame holding the daily taxi trips.
    master : pd.DataFrame
        DataFrame holding the master data.
    id_column : str
        The id column of the master DataFrame.
    value_column : str
        Name of the column in master_df containing the values.

    Returns
    -------
    pd.DataFrame
        The updated master data, if newvalues are in the taxi data, they will be 
        loaded to it.
    """

    max_id = master[id_column].max()

    new_values_list = [value for value in taxi_trips[value_column].values 
                              if value not in master[value_column].values]

    new_values_df = pd.DataFrame({
        id_column: range(max_id +1, max_id + len(new_values_list) +1),
        value_column: new_values_list
    })


    updated_master = pd.concat([master, new_values_df], ignore_index = True)

    return updated_master

In [91]:
test_payment_type_master = update_master(taxi_trips = taxi_trips_payment_type_only, master = payment_type_master, 
                                         id_column = "payment_type_id", value_column ="payment_type")

In [92]:
test_payment_type_master

Unnamed: 0,payment_type_id,payment_type
0,1,Credit Card
1,2,Mobile
2,3,Unknown
3,4,Cash
4,5,Prcard
5,6,Dispute
6,7,No Charge
7,8,X
8,9,Y


In [93]:
test_company_master = update_master(taxi_trips = taxi_trips_company_only, master = company_master, 
                                         id_column = "company_id", value_column ="company")

In [94]:
test_company_master.tail()

Unnamed: 0,company_id,company
29,30,Petani Cab Corp
30,31,3556 - 36214 RC Andrews Cab
31,32,5167 - 71969 5167 Taxi Inc
32,33,X
33,34,Y


### update taxi_trips with the most recent company_master and payment_type master function

In [95]:
def update_taxi_trips_with_master_data(taxi_trips: pd.DataFrame, 
                                       payment_type_master: pd.DataFrame, 
                                       company_master: pd.DataFrame) -> pd.DataFrame:
    
    """Update the taxi_trips DataFrame with the company_master 
        and payment_type_master ids, and delete the string columns.

    Parameters
    ----------
    taxi trips : pd.DataFrame
       The DataFrame with the daily taxi trips.
    payment_type_master : pd.DataFrame
        The payment type master table.
    company_master: pd.DataFrame
        The company master table.
    Returns
    -------
    pd.DataFrame
        The taxi trips data, with only payment_type_id and company_id, without company or payment_type values.
    """
    taxi_trips_id = taxi_trips.merge(payment_type_master, on = "payment_type")

    taxi_trips_id = taxi_trips_id.merge(company_master, on = "company")

    # duplikaciok törlése , 
    taxi_trips_id.drop(["payment_type", "company"], axis = 1, inplace = True)

    return taxi_trips_id

In [96]:
taxi_trips_id = update_taxi_trips_with_master_data(taxi_trips = taxi_trips, 
                                                   payment_type_master = payment_type_master, 
                                                   company_master = company_master)

taxi_trips_id.sample(4)

Unnamed: 0,trip_id,taxi_id,trip_start_timestamp,trip_end_timestamp,trip_seconds,trip_miles,pickup_community_area_id,dropoff_community_area_id,fare,tips,tolls,extras,trip_total,pickup_centroid_latitude,pickup_centroid_longitude,dropoff_centroid_latitude,dropoff_centroid_longitude,datetime_for_weather,payment_type_id,company_id
433,1aa803a8c2a48b0b4114dd6b42b78856536c2ccf,be435bc4d6744155b3272e9edded016bd4afb347774701...,2024-06-15T23:15:00.000,2024-06-15T23:45:00.000,1049,7.19,76,17,20.5,3.75,0,4,28.75,41.980264315,-87.913624596,41.94651142,-87.806020002,2024-06-15 23:00:00,1,7
6061,547486ac72bdca2b5039beb0c08b736496cfc573,b04098ea3832d5a9ccc78d361c80f01229133e99aa3a8d...,2024-06-15T16:15:00.000,2024-06-15T16:30:00.000,978,2.69,8,24,11.09,2.91,0,0,14.0,41.899602111,-87.633308037,41.901206994,-87.676355989,2024-06-15 16:00:00,2,3
6730,bc5f57e6437b577f8f42cde9c8deee9e82a5a54e,eaf9ff311ef5bc5e9f673e209710280ab0cc565517ee62...,2024-06-15T15:30:00.000,2024-06-15T15:45:00.000,528,1.06,32,32,9.0,2.3,0,0,11.3,41.884987192,-87.620992913,41.870607372,-87.622172937,2024-06-15 15:00:00,2,14
2006,1fc3f99b0884311bffa680355ca658f2acdf7b3a,c0591d33660deb744cef729db6457d5a0924498c9ed3bb...,2024-06-15T20:45:00.000,2024-06-15T22:15:00.000,4626,13.91,76,76,35.25,0.0,0,0,35.25,41.980264315,-87.913624596,41.980264315,-87.913624596,2024-06-15 20:00:00,4,13


### weather transformations function

In [101]:
def transform_weather_data(weather_data: json) -> pd.DataFrame:
    """Make transformations on the daily weather api response.

    Parameters
    ----------
    weather_data : json
        The daily weather data from the Open Meteo API.

    Returns
    -------
    pd.DataFrame
        A DataFrame representation of the data.
    """
    weather_data_filtered = {
        "datetime": weather_data["hourly"]["time"],
        "temperature": weather_data["hourly"]["temperature_2m"],
        "wind_speed": weather_data["hourly"]["wind_speed_10m"],
        "rain": weather_data["hourly"]["rain"],
        "precipitation": weather_data["hourly"]["precipitation"],
        }

    weather_df = pd.DataFrame(weather_data_filtered)

    weather_df["datetime"] = pd.to_datetime(weather_df["datetime"])

    return weather_df


In [102]:
# Test

current_datetime = datetime.now() - relativedelta(months=2)     # 2 honappal ezelőtti adaktokat kérdezek le
formatted_datetime = current_datetime.strftime("%Y-%m-%d")

url = "https://archive-api.open-meteo.com/v1/era5"  #? utanit töröltük mert azok paraméterek, igy meg base

params = {
    "latitude": 41.85,
    "longitude": - 87.65,
    "start_date": formatted_datetime,
    "end_date": formatted_datetime,
    "hourly": "temperature_2m,wind_speed_10m,rain,precipitation"
}
response = requests.get(url, params = params)

weather_data = response.json()

weather_data_df = transform_weather_data(weather_data)

In [103]:
weather_data_df.head()

Unnamed: 0,datetime,temperature,wind_speed,rain,precipitation
0,2024-06-15 00:00:00,19.0,24.4,0.0,0.0
1,2024-06-15 01:00:00,17.6,18.7,0.0,0.0
2,2024-06-15 02:00:00,16.7,17.4,0.0,0.0
3,2024-06-15 03:00:00,16.5,15.7,0.0,0.0
4,2024-06-15 04:00:00,16.2,13.7,0.0,0.0
