In [834]:
import pandas as pd
from loguru import logger

In [835]:
logger.remove()
_=logger.add('transform.log',
            format="{time:YYYY-MM-DD at HH:mm:ss} {level} --- {message}",
            retention='1 minute', 
            level='DEBUG'
)


In [836]:
crash_file_csv='/Users/joe/Desktop/ETL Pipelines/data/transformation_data/traffic_crashes.csv'
vehicle_crash_csv='/Users/joe/Desktop/ETL Pipelines/data/transformation_data/traffic_crash_vehicle.csv'


In [837]:
df_crashes=pd.read_csv(crash_file_csv)
df_vehicles=pd.read_csv(vehicle_crash_csv)


In [838]:
#Grouped simple pipeline
def transformed_data(crash_file,vehicle_crash_file):
    try:
        #load data
        df_crashes=pd.read_csv(crash_file)
        df_vehicles=pd.read_csv(vehicle_crash_file)

        #cleaning data
        threshold_removed=df_crashes.dropna(axis='index',thresh=2,inplace=False)
        #removed row
        threshold_row=df_crashes[~df_crashes.index.isin(threshold_removed.index)]
        df_crashes.fillna(value={'report_type': 'ON SCENE'}, inplace=True)
        # merge crashes and vehicles
        df = df_crashes.merge(df_vehicles, how='left', on='crash_record_id', suffixes=('_left','_right')) 
        df_agg = df.groupby('vehicle_type').agg({'crash_record_id': 'count'}).reset_index() 
    
        # transform column names for output data
        vehicle_mapping = {'vehicle_type':'vehicletypes'}  
        df_agg = df_agg.rename(columns=vehicle_mapping)


    except Exception as e:
        print(f"An error occurred: {e}")
        df_agg=pd.DataFrame()
        
    return df_agg         
           

In [839]:
df_agg_result=transformed_data(crash_file_csv,vehicle_crash_csv)
df_agg_result

Unnamed: 0,vehicletypes,crash_record_id
0,BUS OVER 15 PASS.,5
1,MOPED OR MOTORIZED BICYCLE,1
2,OTHER,20
3,OTHER VEHICLE WITH TRAILER,1
4,PASSENGER,633
5,PICKUP,33
6,SINGLE UNIT TRUCK WITH TRAILER,2
7,SPORT UTILITY VEHICLE (SUV),138
8,TRACTOR W/ SEMI-TRAILER,5
9,TRACTOR W/O SEMI-TRAILER,2


Splitting the function to individual easier to manipulate functions

In [840]:

#loading data
def loading_data(crash_data):
     df=pd.read_csv(crash_data)
     return df

In [841]:
#removing rows with null values
def cleaned_data(df):
    df=df.dropna(axis='index',thresh=2,inplace=False)
    return df


In [842]:
#fill missing values
def filling_values(df):
    df=df.fillna(value={'report_type': 'ON SCENE'})
    return df

In [843]:
#merging crashes and vehicle_crashes
def merging_tables(df_crashes,df_vehicles):
    df = df_crashes.merge(df_vehicles, how='left', on='crash_record_id', suffixes=('_left','_right')) 
    df = df.groupby('vehicle_type').agg({'crash_record_id': 'count'}).reset_index() 
    return df

In [844]:
#mapping columns
def mapped_column(df):
    vehicle_mapping = {'vehicle_type':'vehicletypes'}  
    df= df.rename(columns=vehicle_mapping)
    return df

Pipeline Start

In [845]:
#load data
def data_pipeline(crash_file,vehicle_crash_file):
    df_crash=pd.DataFrame()
    df_vehicle=pd.DataFrame()
    try:
        df_crash=loading_data(crash_file)
        df_vehicle=loading_data(vehicle_crash_file)
    except Exception as e:
        logger.info(f'Exception in reading data file:{e}')
    finally:
        return df_crash,df_vehicle   

In [846]:
#pipeline removing null values
def nonnull_data_pipeline(df_crash,df_vehicle):
    try:
        df_crash=cleaned_data(df_crash)
        df_vehicle=cleaned_data(df_vehicle)

    except Exception as e:
        logger.info(f'Exception in nonnull cleaning{e}')    
    finally:
        return df_crash,df_vehicle

In [847]:
#pipeline to fill data
def fill_data_pipeline(df_crash,df_vehicle):
    
    try:
        df_crash=filling_values(df_crash)
        df_vehicle=filling_values(df_vehicle)
    except Exception as e:
        logger.info(f'Exception in loading data{e}')
    finally:
        return df_crash,df_vehicle  
      

In [848]:
#merging pipeline
def merge_pipeline(df_crash,df_vehicle):
    try:
        df_merged=merging_tables(df_crash,df_vehicle)
    except Exception as e:
        logger.info('Exception in merging tables{e}') 
    finally:
        return df_merged       

In [849]:
#mapping pipeline
def mapped_pipeline(df_merged):
    try:
        df_results=mapped_column(df_merged)
    except Exception as e:
        logger.info(f'Exception in mapping data columns{e}') 
    finally:
        return df_results   

THE DATA 

In [850]:
#input data
crash_file_csv='/Users/joe/Desktop/ETL Pipelines/data/transformation_data/traffic_crashes.csv'
vehicle_crash_csv='/Users/joe/Desktop/ETL Pipelines/data/transformation_data/traffic_crash_vehicle.csv'

#load data
df_crash,df_vehicle=data_pipeline(crash_file_csv,vehicle_crash_csv)


#nonnull cleaning
df_crash,df_vehicle=nonnull_data_pipeline(df_crash,df_vehicle)


#fillin 
df_crash=fill_data_pipeline(df_crash)



#merge data
df_merged=merge_pipeline(df_crash,df_vehicle)

#mapping columns
df_result=mapped_pipeline(df_merged)

df_result




Unnamed: 0,vehicletypes,crash_record_id
0,BUS OVER 15 PASS.,5
1,MOPED OR MOTORIZED BICYCLE,1
2,OTHER,20
3,OTHER VEHICLE WITH TRAILER,1
4,PASSENGER,633
5,PICKUP,33
6,SINGLE UNIT TRUCK WITH TRAILER,2
7,SPORT UTILITY VEHICLE (SUV),138
8,TRACTOR W/ SEMI-TRAILER,5
9,TRACTOR W/O SEMI-TRAILER,2
