In [2]:
import os
import pandas as pd
import pandasql as ps
from my_s3_func import *
from func.df_fix_str_cols_to_dtime_and_conv import *

In [3]:
# my s3 global variables
s_mybucket = "jozsi-chicago-taxi-bb"
pathdir_proc_taxi = 'raw_data/to_process/taxi_data/'
pathdir_processed_taxi = 'raw_data/processed/taxi_data/'
pathdir_transf_taxi = 'transfomed_data/taxi_data/'
pathdir_proc_wheater = 'raw_data/to_process/weather_data/'
pathdir_processed_wheater = 'raw_data/processed/weather_data/'
pathdir_transf_wheater = 'transfomed_data/weather_data/'
pathdir_pay_type_master = 'transfomed_data/payment_type/'
pathdir_company_master = 'transfomed_data/company/'
pathdir_prev_masters = 'transfomed_data/master_table_previous_version/'
pathdir_areas =  'transfomed_data/community_areas/'
pathdir_date = 'transfomed_data/date/'

s_pay_types = 'payment_types.csv'
s_companies = 'companies.csv'
s_taxi_data = 'taxi_data.csv'
s_wheater_data = 'wheater_data.csv'
s_areas = 'community_areas.csv'
s_date = 'date.csv'

s_aws_id = os.getenv('AWS_ACCESS_ID')
s_aws_key = os.getenv('AWS_SEC_KEY')


### get data from s3

In [4]:
# single files
# def object list
ls_path_file = [
    [pathdir_areas,s_areas],
    [pathdir_company_master, s_companies],
    [pathdir_date, s_date],
    [pathdir_pay_type_master, s_pay_types],
    ]

# get files from s3, a load to dynamic generate global dataframe variables
for s_path, s_file in ls_path_file:
    globals()['df_'+s_file.replace('.csv', '')] = load_s3_csv_to_df(s_bucket=s_mybucket, path_file=s_path+s_file, s_access_id=s_aws_id, s_sec_key=s_aws_key )
    print(f'{s_file} has been loaded to df_{s_file.replace('.csv', '')}')


community_areas.csv has been loaded to df_community_areas
companies.csv has been loaded to df_companies
date.csv has been loaded to df_date
payment_types.csv has been loaded to df_payment_types


In [5]:
# load & concat all taxi_data
df_taxi_data_concat = load_s3_dir_csv_files(
    s_bucket=s_mybucket, 
    path_work=pathdir_transf_taxi,  
    s_access_id=s_aws_id, 
    s_sec_key=s_aws_key
    )


taxi_data_2024-10-18.csv has loaded and added
taxi_data_2024-10-19.csv has loaded and added
taxi_data_2024-10-20.csv has loaded and added
taxi_data_2024-10-21.csv has loaded and added
taxi_data_2024-10-22.csv has loaded and added
taxi_data_2024-10-23.csv has loaded and added
taxi_data_2024-10-24.csv has loaded and added
taxi_data_2024-10-25.csv has loaded and added
taxi_data_2024-10-26.csv has loaded and added
taxi_data_2024-10-27.csv has loaded and added
taxi_data_2024-10-28.csv has loaded and added


In [6]:
# load & concat all wheater_data
df_wheather_data_concat = load_s3_dir_csv_files(
    s_bucket=s_mybucket, 
    path_work=pathdir_transf_wheater,  
    s_access_id=s_aws_id, 
    s_sec_key=s_aws_key
    )

wheater_data_2024-10-18.csv has loaded and added
wheater_data_2024-10-19.csv has loaded and added
wheater_data_2024-10-20.csv has loaded and added
wheater_data_2024-10-21.csv has loaded and added
wheater_data_2024-10-22.csv has loaded and added
wheater_data_2024-10-23.csv has loaded and added
wheater_data_2024-10-24.csv has loaded and added
wheater_data_2024-10-25.csv has loaded and added
wheater_data_2024-10-26.csv has loaded and added
wheater_data_2024-10-27.csv has loaded and added
wheater_data_2024-10-28.csv has loaded and added


### enrichments

In [22]:
# I always use a left join for this step to avoid losing data from the main table due to a join error
df_taxi_data_full = pd.merge(df_taxi_data_concat, df_community_areas, left_on='pickup_community_area_id', right_on='area code', how='left')
df_taxi_data_full.rename(columns={'community name': 'pickup_area_name'}, inplace=True)
df_taxi_data_full = pd.merge(df_taxi_data_full, df_community_areas, left_on='dropoff_community_area_id', right_on='area code', how='left')
df_taxi_data_full.rename(columns={'community name': 'dropof_area_name'}, inplace=True)
df_taxi_data_full = pd.merge(df_taxi_data_full, df_payment_types, on='payment_type_id', how='left')
df_taxi_data_full = pd.merge(df_taxi_data_full, df_companies, on='company_id', how='left')
df_taxi_data_full = pd.merge(df_taxi_data_full, df_wheather_data_concat, left_on='datetime_for_weather', right_on='datetime', how='left')


In [23]:
# preparation and join df_date
# fix values (if possible) and convert cols type to datatime
s_columns = ['trip_start_timestamp', 'trip_end_timestamp']
df_fix_str_cols_to_dtime_and_conv(df_taxi_data_full, s_columns)
# make a new "date" col for join
df_taxi_data_full['trip_start_date'] = df_taxi_data_full['trip_start_timestamp'].dt.date.astype(str)
df_taxi_data_full = pd.merge(df_taxi_data_full, df_date, left_on='trip_start_date', right_on='Date', how='left')


In [24]:
# dropping unnecessary columns
df_taxi_data_full = df_taxi_data_full.drop(columns=['area code_x','area code_y', 'datetime', 'Date'], errors='ignore')


In [10]:
# ''' 
# 1. Which 10 companies make the most money?
# Use two columns: Company name and the sum of the fare. Order by descending order.
# Make sure that the sum of the fares (total_fare) is not in scientific notation.
# '''

# pd.options.display.float_format = lambda x: '{:,.0f}'.format(x).replace(',', ' ')

# query = '''
# SELECT 
# 	company, 
#     ROUND(SUM(trip_total),0) AS trips_total_usd
# FROM df_taxi_data_full
# GROUP BY company_id
# ORDER BY trips_total_usd DESC
# LIMIT 10
# '''
# df_res = ps.sqldf(query, locals())
# df_res


In [11]:
# df_taxi_data_full.describe(include='all')
df_taxi_data_full.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 189782 entries, 0 to 189781
Data columns (total 34 columns):
 #   Column                      Non-Null Count   Dtype         
---  ------                      --------------   -----         
 0   trip_id                     189782 non-null  object        
 1   taxi_id                     189782 non-null  object        
 2   trip_start_timestamp        189782 non-null  datetime64[ns]
 3   trip_end_timestamp          189782 non-null  datetime64[ns]
 4   trip_seconds                189782 non-null  int64         
 5   trip_miles                  189782 non-null  float64       
 6   pickup_community_area_id    189782 non-null  int64         
 7   fare                        189782 non-null  float64       
 8   tips                        189782 non-null  float64       
 9   tolls                       189782 non-null  float64       
 10  extras                      189782 non-null  float64       
 11  trip_total                  189782 non-

In [12]:
# '''
# 2. Show the 10 pickup community areas with the most rides.
# Use two columns: community area name and count of rides per area, in descending order for
# the rides.
# '''

# query = '''
# SELECT 
#     pickup_area_name,
#     COUNT(*) AS rides_count
# FROM df_taxi_data_full
# GROUP BY pickup_community_area_id
# ORDER BY rides_count DESC
# LIMIT 10
# '''
# df_res = ps.sqldf(query, locals())
# df_res

In [None]:
# '''
# 3. Get the count of taxi rides per day of week. In other words, we'd like to see which day has
# the most rides.
# Use the names of the days instead of numbers (Monday = 1, Tuesday = 2, etc.).
# HINT: Check CASE WHEN statements for replacing names of days. For ordering the results by
# the day you can use MIN(day_of_the_week).
# '''

# query = '''
# SELECT 
#     CASE 
#         WHEN day_of_week = '1' THEN 'Monday'
#         WHEN day_of_week = '2' THEN 'Tuesday'
#         WHEN day_of_week = '3' THEN 'Wednesday'
#         WHEN day_of_week = '4' THEN 'Thursday'
#         WHEN day_of_week = '5' THEN 'Friday'
#         WHEN day_of_week = '6' THEN 'Saturday'
#         WHEN day_of_week = '7' THEN 'Sunday'
#         ELSE 'wrong data'
#     END AS day_of_week,
#     COUNT(*) AS rides_count
# FROM df_taxi_data_full
# GROUP BY day_of_week
# ORDER BY rides_count DESC
# '''
# df_res = ps.sqldf(query, locals())
# df_res

In [25]:
# df_taxi_data_full.describe(include='all')
df_taxi_data_full.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 189782 entries, 0 to 189781
Data columns (total 34 columns):
 #   Column                      Non-Null Count   Dtype         
---  ------                      --------------   -----         
 0   trip_id                     189782 non-null  object        
 1   taxi_id                     189782 non-null  object        
 2   trip_start_timestamp        189782 non-null  datetime64[ns]
 3   trip_end_timestamp          189782 non-null  datetime64[ns]
 4   trip_seconds                189782 non-null  int64         
 5   trip_miles                  189782 non-null  float64       
 6   pickup_community_area_id    189782 non-null  int64         
 7   fare                        189782 non-null  float64       
 8   tips                        189782 non-null  float64       
 9   tolls                       189782 non-null  float64       
 10  extras                      189782 non-null  float64       
 11  trip_total                  189782 non-

In [28]:


df_data = df_taxi_data_full

# error detection function
def detect_errors(row):
    errors = []
    
    # missing data
    if row.isnull().any():
        errors.append('missing_data')
    
    # trip distance = 0, but the pickup and dropoff coordinates are diferent
    if (row['trip_miles'] == 0.0
        and (
            row['pickup_centroid_latitude'] != row['dropoff_centroid_latitude']
            or row['pickup_centroid_longitude'] != row['dropoff_centroid_longitude']
            )
        ):
        errors.append('distance')
    
    # trip time the calculated trip time are differ significantly
    timestamp_diff = abs(row['trip_end_timestamp'] - row['trip_start_timestamp']).seconds
    if ( timestamp_diff - row['trip_seconds'] >= 900 ): # max 15 minutes granulation difference
        errors.append('trip_time')
    
    return errors

# new column for the error types
df_data['anomaly_types'] = df_data.apply(detect_errors, axis=1)

# convert the list of error types to a string
df_data['anomaly_types'] = df_data['anomaly_types'].apply(lambda x: x if x else None)

