# AXA coding challenge
Data:
1. Citibike: https://s3.amazonaws.com/tripdata/index.html
2. NYPD:  https://data.cityofnewyork.us/Public-Safety/Motor-Vehicle-Collisions-Crashes/h9gi-nx95/about_data

In [1]:
# Install packages (only once)
#!pip install selenium webdriver-manager

# Import modules
import os # basic
import zipfile
import datetime

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import sklearn as sk
import dask.dataframe as dd

from selenium import webdriver # for downloading files automatically
from selenium.webdriver.chrome.service import Service

current_dir = os.getcwd() # current dir
print('Current directory: ' + current_dir)
extract_dir = current_dir + '/data/bike-tripdata'  # directory where extracted files from 1. will be saved
cleaned_dir = extract_dir + '_cleaned' # directory where cleaned and concatenated df will be saved

pd.options.display.float_format = '{:.4f}'.format # set pd output to 2 decimals

Current directory: C:\Users\Hanna\sciebo\AXA_coding-challenge


In [2]:
# Functions

# to download files from an url
def download_files(url, save_path):
    response = requests.get(url, stream=True)
    with open(save_path, 'wb') as file:
        for chunk in response.iter_content(chunk_size=1024):
            if chunk:
                file.write(chunk)
    print(f"Downloaded {save_path}")
    
# to clean column names
def clean_column_names(df, column_mapping=None):
    # strip whitespace, convert to lowercase, and replace spaces with underscores
    df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_').str.replace('-', '_')
    
    # apply manual column mapping if specified
    if column_mapping:
        df.rename(columns=column_mapping, inplace=True)
    
    return df

# check unique column names across all .csv files in list_files 
def list_unique_col_names(list_files):
    unique_column_names = []
    for csv_file in list_files:
        file_path = os.path.join(extract_dir, csv_file)
        df = pd.read_csv(file_path, nrows=1)
        #print(df.columns) # visual check
        [unique_column_names.append(col) for col in df.columns if col not in unique_column_names]
    unique_column_names.sort()
    
    return unique_column_names

## Download Citibike data automatically from url

In [7]:
url = "https://s3.amazonaws.com/tripdata/index.html" # url to data files
driver_path = 'C:/Drivers/chromedriver-win64_128/chromedriver.exe' # Chrome driver for web interaction, needed by selenium - must match Chrome version

# Download files
service = Service(driver_path) # initialize the Chrome driver
driver = webdriver.Chrome(service=service)
driver.get(url) # navigate to website
time.sleep(5)  # give the page time to load the dynamic content
html = driver.page_source # get the page source after JavaScript has executed
soup = BeautifulSoup(html, 'html.parser') # parse the HTML

# find all .zip links
file_links = []
for link in soup.find_all('a', href=True):
    if link['href'].endswith('.zip'): # on this website, files are .zip format
        file_links.append(link['href'])
print(file_links[:2]) # check if the file paths are retrieved correctly by printing a few

driver.quit() # close the browser

if not os.path.exists(current_dir+'/downloads'): # directory to save the downloaded files
    os.makedirs(current_dir+'/downloads')

for file_link in file_links: # loop through all the zip links and download them
    filename = os.path.join(current_dir+'/downloads', os.path.basename(file_link))
    
    if not file_link.startswith('http'): # if the link is relative, make it an absolute URL by appending the base URL
        file_link = url + file_link

    download_files(file_link, filename) # download the file

<Response [200]>
[]


## Unzip & reorganize files

In [None]:
# - alternatively (instead of next cell), unzip first and then reorganize files

# Unzip files  
# zip_dir = current_dir+'/downloads' # directory containing the zip files
# extract_dir = current_dir+'/data' # directory where extracted files will be saved

# for filename in os.listdir(zip_dir): # loop through all files in the directory
#     if filename.endswith('.zip') :
#         zip_file_path = os.path.join(zip_dir, filename)
#         new_file_path = extract_dir + '/' + filename[:-4] + '.csv' # remove '.zip' and subfolders from the target path name
#         os.makedirs(new_file_path, exist_ok=True)  # create the directory if it doesn't exist

#         with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: # extract the zip file
#             for member in zip_ref.namelist():
#                 if '_MACOSX' not in member: # skip any file or folder inside "_MACOSX" (for MAC computers, not needed)
#                     zip_ref.extract(member, new_file_path) # extract to the specified directory

#             print(f'Extracted: {member} to {new_file_path}')


# # Move  files from subfolders in subfolders to 1 folder

# import shutil

# source_dir = current_dir + '/data'
# destination_dir = current_dir + '/data_test'
# os.makedirs(destination_dir, exist_ok=True)

# for root, dirs, files in os.walk(source_dir):
#     for file in files:
#         if file.endswith('.csv') and not file.startswith('.'): # select .csv files, skip files starting with '.' 
#             if '_MACOSX' in root:
#                 continue  # skip this directory and its contents, for MAC

#             source_file = os.path.join(root, file)
#             destination_file = os.path.join(destination_dir, file)
            
#             shutil.move(source_file, destination_file) # or shutil.copy
#             print(f"Moved: {source_file} -> {destination_file}")


In [95]:
# Unzip files & reorganize simultaneously
zip_dir = current_dir + '/downloads'  # directory containing the zip files
extract_dir = current_dir + '/data/bike-tripdata'  # directory where extracted files will be saved

os.makedirs(extract_dir, exist_ok=True)  # create the directory if it doesn't exist

for filename in os.listdir(zip_dir):  # loop through all files in the directory
    if filename.endswith('.zip'):
        zip_file_path = os.path.join(zip_dir, filename)

        with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:  # extract the zip file
            for member in zip_ref.namelist():
                # skip any file or folder inside "_MACOSX" (for MAC computers, not needed), and files that do not end with .csv
                if '_MACOSX' not in member and member.endswith('.csv'):  
                    # get only the base name of the file (ignore folder structure in zip)
                    base_member = os.path.basename(member)
                    target_path = os.path.join(extract_dir, base_member)
                    
                    with zip_ref.open(member) as source, open(target_path, "wb") as target:
                        target.write(source.read())  # write the extracted content to the single folder

                    print(f'Extracted {base_member}')
    print(f'... from {filename} to {extract_dir}')

Extracted 201309-citibike-tripdata.csv
Extracted 201311-citibike-tripdata.csv
Extracted 201307-citibike-tripdata.csv
Extracted 201308-citibike-tripdata.csv
Extracted 201306-citibike-tripdata.csv
Extracted 201310-citibike-tripdata.csv
Extracted 201312-citibike-tripdata.csv
Extracted 201312-citibike-tripdata_1.csv
Extracted 201311-citibike-tripdata_1.csv
Extracted 201307-citibike-tripdata_1.csv
Extracted 201310-citibike-tripdata_2.csv
Extracted 201310-citibike-tripdata_1.csv
Extracted 201309-citibike-tripdata_2.csv
Extracted 201309-citibike-tripdata_1.csv
Extracted 201308-citibike-tripdata_1.csv
Extracted 201308-citibike-tripdata_2.csv
Extracted 201306-citibike-tripdata_1.csv
from 2013-citibike-tripdata.zip to C:\Users\Hanna\sciebo\AXA_coding-challenge/data/bike-tripdata
Extracted 201404-citibike-tripdata_1.csv
Extracted 201412-citibike-tripdata_1.csv
Extracted 201411-citibike-tripdata_1.csv
Extracted 201407-citibike-tripdata_1.csv
Extracted 201410-citibike-tripdata_1.csv
Extracted 20140

Extracted 202407-citibike-tripdata_1.csv
Extracted 202407-citibike-tripdata_2.csv
Extracted 202407-citibike-tripdata_3.csv
Extracted 202407-citibike-tripdata_4.csv
Extracted 202407-citibike-tripdata_5.csv
from 202407-citibike-tripdata.zip to C:\Users\Hanna\sciebo\AXA_coding-challenge/data/bike-tripdata
Extracted 202408-citibike-tripdata_3.csv
Extracted 202408-citibike-tripdata_2.csv
Extracted 202408-citibike-tripdata_1.csv
Extracted 202408-citibike-tripdata_5.csv
Extracted 202408-citibike-tripdata_4.csv
from 202408-citibike-tripdata.zip to C:\Users\Hanna\sciebo\AXA_coding-challenge/data/bike-tripdata
Extracted JC-201509-citibike-tripdata.csv
from JC-201509-citibike-tripdata.csv.zip to C:\Users\Hanna\sciebo\AXA_coding-challenge/data/bike-tripdata
Extracted JC-201510-citibike-tripdata.csv
from JC-201510-citibike-tripdata.csv.zip to C:\Users\Hanna\sciebo\AXA_coding-challenge/data/bike-tripdata
Extracted JC-201511-citibike-tripdata.csv
from JC-201511-citibike-tripdata.csv.zip to C:\Users\H

Extracted JC-202007-citibike-tripdata.csv
from JC-202007-citibike-tripdata.csv.zip to C:\Users\Hanna\sciebo\AXA_coding-challenge/data/bike-tripdata
Extracted JC-202008-citibike-tripdata.csv
from JC-202008-citibike-tripdata.csv.zip to C:\Users\Hanna\sciebo\AXA_coding-challenge/data/bike-tripdata
Extracted JC-202009-citibike-tripdata.csv
from JC-202009-citibike-tripdata.csv.zip to C:\Users\Hanna\sciebo\AXA_coding-challenge/data/bike-tripdata
Extracted JC-202010-citibike-tripdata.csv
from JC-202010-citibike-tripdata.csv.zip to C:\Users\Hanna\sciebo\AXA_coding-challenge/data/bike-tripdata
Extracted JC-202011-citibike-tripdata.csv
from JC-202011-citibike-tripdata.csv.zip to C:\Users\Hanna\sciebo\AXA_coding-challenge/data/bike-tripdata
Extracted JC-202012-citibike-tripdata.csv
from JC-202012-citibike-tripdata.csv.zip to C:\Users\Hanna\sciebo\AXA_coding-challenge/data/bike-tripdata
Extracted JC-202101-citibike-tripdata.csv
from JC-202101-citibike-tripdata.csv.zip to C:\Users\Hanna\sciebo\AXA_

## Visualize dataset for cleaning
### 1. Check which unique column names exist across all files
### 2. Correct column names (strip uppercase and convert space to underscore)
### 3. Map names to manual

In [27]:
# Since CSV files do not contain the same column headers, check which ones exist in the dataset?
list_files = [f for f in os.listdir(extract_dir) if f.endswith('.csv')]
unique_column_names = []
for csv_file in list_files:
    file_path = os.path.join(extract_dir, csv_file)
    df = pd.read_csv(file_path, nrows=3)
    [unique_column_names.append(col) for col in df.columns if col not in unique_column_names]
print(unique_column_names)

['tripduration', 'starttime', 'stoptime', 'start station id', 'start station name', 'start station latitude', 'start station longitude', 'end station id', 'end station name', 'end station latitude', 'end station longitude', 'bikeid', 'usertype', 'birth year', 'gender', 'Trip Duration', 'Start Time', 'Stop Time', 'Start Station ID', 'Start Station Name', 'Start Station Latitude', 'Start Station Longitude', 'End Station ID', 'End Station Name', 'End Station Latitude', 'End Station Longitude', 'Bike ID', 'User Type', 'Birth Year', 'Gender', 'ride_id', 'rideable_type', 'started_at', 'ended_at', 'start_station_name', 'start_station_id', 'end_station_name', 'end_station_id', 'start_lat', 'start_lng', 'end_lat', 'end_lng', 'member_casual', 'Unnamed: 0', 'rideable_type_duplicate_column_name_1']


It turns out that the column names are not consistent, e.g. some files contain the column "starttime" while others contain the column "Start Time". This should be corrected. Additionally, column names should not contain spaces ("start station latitude" vs "start_lat"). Last, there are 2 strange column names which need to be checked: "Unnamed: 0" and "rideable_type_duplicate_column_name_1".

### Correct column names

In [28]:
unique_column_names = list_unique_col_names(list_files)
print('Unique column names: \n  ' + str(unique_column_names))
column_mapping = {
    'bikeid': 'bike_id',
    'end_lat': 'end_station_latitude',
    'end_lng': 'end_station_longitude',
    'ended_at': 'end_datetime',
    'member_casual': 'user_type',
    'rideable_type_duplicate_column_name_1': 'duplicate_col',
    'start_lat': 'start_station_latitude',
    'start_lng': 'start_station_longitude',
    'starttime': 'start_datetime',
    'start_time': 'start_datetime',
    'started_at': 'start_datetime',
    'stoptime': 'end_datetime',
    'stop_time': 'end_datetime',
    'tripduration': 'trip_duration',
    'unnamed:_0': 'unnamed', # this is just an index column without name, present in some files -> can be discarded later
    'usertype': 'user_type'
}

# Exploratory correction, see if it solves the inconsistencies
unique_column_names=[]
for csv_file in list_files:
    file_path = os.path.join(extract_dir, csv_file)
    df = pd.read_csv(file_path, nrows=1)
#     if 'rideable_type_duplicate_column_name_1' in df.columns: # check what column this is -> just a duplicate -> can be discarded
#         print(df.head(2))
    df = clean_column_names(df, column_mapping)
    ##print(df.columns)
    
    [unique_column_names.append(col) for col in df.columns if col not in unique_column_names] # save new col names for checking
    unique_column_names.sort()

remove_names = ['unnamed','duplicate_col'] # column names to remove
final_column_names = [name for name in unique_column_names if name not in remove_names] # list with final universal column names
print(' ')
print('Unique column names after cleaning: \n ' + str(final_column_names)) # -> satisfied!
final_column_names.extend(['year','month']) # add the columns year and month, as I will add them from start_datetime

Unique column names: 
  ['Bike ID', 'Birth Year', 'End Station ID', 'End Station Latitude', 'End Station Longitude', 'End Station Name', 'Gender', 'Start Station ID', 'Start Station Latitude', 'Start Station Longitude', 'Start Station Name', 'Start Time', 'Stop Time', 'Trip Duration', 'Unnamed: 0', 'User Type', 'bikeid', 'birth year', 'end station id', 'end station latitude', 'end station longitude', 'end station name', 'end_lat', 'end_lng', 'end_station_id', 'end_station_name', 'ended_at', 'gender', 'member_casual', 'ride_id', 'rideable_type', 'rideable_type_duplicate_column_name_1', 'start station id', 'start station latitude', 'start station longitude', 'start station name', 'start_lat', 'start_lng', 'start_station_id', 'start_station_name', 'started_at', 'starttime', 'stoptime', 'tripduration', 'usertype']
 
Unique column names after cleaning: 
 ['bike_id', 'birth_year', 'end_datetime', 'end_station_id', 'end_station_latitude', 'end_station_longitude', 'end_station_name', 'gender',

In [29]:
# check if data type of files are the same
check_dtype = {col: [] for col in final_column_names} # create empty dict to store dtypes

list_files = [f for f in os.listdir(extract_dir) if f.endswith('.csv')]
for csv_file in list_files:
#df.memory_usage(deep=True).sum()
    file_path = os.path.join(extract_dir, csv_file)
    df = pd.read_csv(file_path, nrows=3)
    df = clean_column_names(df, column_mapping) # clean column names
    to_remove = ['duplicate_col','unnamed'] 
    for col in to_remove:
        if col in df.columns:
            df.drop(col, axis=1, inplace=True) # drop these columns
    for col in df.columns:
        #print(f'{col}: {df[col].dtype}')
        check_dtype[col].append(df[col].dtype)

for col in check_dtype:
    print(f"Column: {col}")
    print(f"Unique dtypes: {set(check_dtype[col])}\n")

Column: bike_id
Unique dtypes: {dtype('int64')}

Column: birth_year
Unique dtypes: {dtype('O'), dtype('int64'), dtype('float64')}

Column: end_datetime
Unique dtypes: {dtype('O')}

Column: end_station_id
Unique dtypes: {dtype('O'), dtype('int64'), dtype('float64')}

Column: end_station_latitude
Unique dtypes: {dtype('float64')}

Column: end_station_longitude
Unique dtypes: {dtype('float64')}

Column: end_station_name
Unique dtypes: {dtype('O')}

Column: gender
Unique dtypes: {dtype('int64')}

Column: ride_id
Unique dtypes: {dtype('O')}

Column: rideable_type
Unique dtypes: {dtype('O')}

Column: start_datetime
Unique dtypes: {dtype('O')}

Column: start_station_id
Unique dtypes: {dtype('O'), dtype('int64'), dtype('float64')}

Column: start_station_latitude
Unique dtypes: {dtype('float64')}

Column: start_station_longitude
Unique dtypes: {dtype('float64')}

Column: start_station_name
Unique dtypes: {dtype('O')}

Column: trip_duration
Unique dtypes: {dtype('int64')}

Column: user_type
Uniq

In [6]:
df.head()

Unnamed: 0,ride_id,rideable_type,start_datetime,end_datetime,start_station_name,start_station_id,end_station_name,end_station_id,start_station_latitude,start_station_longitude,end_station_latitude,end_station_longitude,user_type
0,17AE31FCAE74D287,electric_bike,2024-08-07 13:22:55.656,2024-08-07 13:25:09.654,7 St & Monroe St,HB304,4 St & Grand St,HB301,40.7464,-74.038,40.7423,-74.0351,member
1,FD9859BDBE0CDF70,electric_bike,2024-08-13 13:15:08.627,2024-08-13 13:17:44.971,7 St & Monroe St,HB304,4 St & Grand St,HB301,40.7464,-74.038,40.7423,-74.0351,member
2,AAC5ECD095AE5572,electric_bike,2024-08-12 20:07:26.975,2024-08-12 20:09:38.180,7 St & Monroe St,HB304,4 St & Grand St,HB301,40.7464,-74.038,40.7423,-74.0351,member


In [30]:
# Get universal entries for columns I am converting to str and then to category (global categories across csv files)
dtype_cat =['gender', 'user_type', 'rideable_type']

# get global categories (differing per file)
categories_dict = {col: set() for col in dtype_cat }

# Collect all unique categories across the DataFrames
for csv_file in list_files:  
    file_path = os.path.join(extract_dir, csv_file) # load individual file
    
    # Process the file in chunks to save memory
    df = pd.read_csv(file_path)
    df = clean_column_names(df, column_mapping) # clean column names
    to_remove = ['duplicate_col','unnamed'] 
    for col in to_remove:
        if col in df.columns:
            df.drop(col, axis=1, inplace=True) # drop these columns
        
    for col in df.columns:
        if col in categories_dict:
            df[col].fillna('unknown').dropna()   
            categories_dict[col].update(df[col].unique())  # update unique categories of df
            #categories_dict[col].add('unknown') # add category 'unknown' for missing data
categories_dict['user_type'].discard(np.nan) # for some reason still nan as category here
categories_dict

  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)
  df = pd.read_csv(file_path)


{'gender': {0, 1, 2},
 'user_type': {'Customer', 'Subscriber', 'casual', 'member'},
 'rideable_type': {'classic_bike', 'docked_bike', 'electric_bike'}}

## Load csv files, clean column names, change dtypes, concatenate into 1 dask df and save as dask parquet file

In [104]:
### This cell executed in the Anaconda powershell (clean_concat.py), since it´s faster/requires less memory on my 16GB mem laptop ###

if not os.path.exists(cleaned_dir):  # directory to save the cleaned df
    os.makedirs(cleaned_dir)

dtype_dict = { # I also convert latitude and longitude to float32 here since it greatly improves efficiency- float32 can hold only 7 decimals, but this should be enough (accurate to ~10m)
    'int32': ['birth_year', 'trip_duration'],
    'float32': ['end_station_latitude', 'end_station_longitude', 'start_station_latitude', 'start_station_longitude'],
    'str': ['start_station_id', 'start_station_name', 'end_station_id', 'end_station_name', 'ride_id'], 
    'category': ['gender', 'user_type', 'rideable_type'],
    'datetime64': ['start_datetime', 'end_datetime']
}

dtype_mapping = {} # dictionary with col: dtype, for changing data types per column
for dtype, columns in dtype_dict.items():
    for col in columns:
        dtype_mapping[col] = dtype

list_files = [f for f in os.listdir(extract_dir) if f.endswith('.csv')] # list of csv files in dir to loop over
chunksize = 1_000_000  # load in chunks to save memory, in case csv file is huge

ddf_list = []
for csv_file in list_files[0:2]: 
    file_path = os.path.join(extract_dir, csv_file) # load individual file
    print('loading ' +  csv_file)
    
    # Process the file in chunks to save memory
    chunk_iter = pd.read_csv(file_path, chunksize=chunksize, low_memory=True, parse_dates=True)

    for n, chunk in enumerate(chunk_iter):
        ddf = clean_column_names(chunk, column_mapping)  # Clean column names
 
        # Convert to Dask DataFrame for larger datasets
        #ddf = dd.from_pandas(chunk, npartitions=1) #
        #print('to dask converted')
        # Drop unwanted columns
        to_remove = ['duplicate_col', 'unnamed']
        ddf = ddf.drop(columns=[col for col in to_remove if col in ddf.columns])
        
        missing_cols = set(final_column_names) - set(ddf.columns) # add missing (universal) columns from final_column_names and fill with nans
        for col in missing_cols:
            ddf[col] = np.nan
                    
        # Convert column dtypes
        for col in ddf.columns:
            if col in dtype_dict['int32']:
                ddf[col] = ddf[col].replace('\\N', np.nan)  # handle missing values
                ddf[col] = ddf[col].astype('float32')
                ddf[col] = ddf[col].fillna(0).round(0).astype('int32') # replace nan with the place filler 0, round and convert to int
            elif col in dtype_dict['category']: # for categorical data, replace nans with 'unknown' cat
                ddf[col] = ddf[col].fillna('unknown')  # Replace NaNs with 'unknown'
                ddf[col] = ddf[col].astype('str') # convert to str first
                ddf[col] = ddf[col].astype('category') # string to category, as it needs less memory
                ddf[col] = ddf[col].cat.set_categories(new_categories=list(categories_dict[col])) # set global categories
            elif col in dtype_mapping.keys():
                ddf[col] = ddf[col].astype(dtype_mapping[col])
                
        ddf = ddf.drop_duplicates().sort_values(by='start_datetime')# drop duplicates and sort to start rental time/date
        ddf = ddf.reset_index(drop=True)
        #ddf = ddf.set_index('start_datetime') # set start_datetime as index

        ddf['year'] = ddf['start_datetime'].dt.year.astype('int32') # add year column for partitioning
        ddf['month'] = ddf['start_datetime'].dt.month.astype('int8') # add month column for partitioning
                                            
        ddf = ddf[final_column_names] # ensure consistent column order   

        ddf = dd.from_pandas(ddf, npartitions=5)
        ddf_list.append(ddf)   
    print('.... cleaned, converted to dask df and appended to ddf_list')     

ddf_comb = dd.concat(ddf_list, ignore_index=True) # concatenate all dask dfs
#del ddf_list
ddf_comb = ddf_comb.drop_duplicates() # remove duplicates

# check if all dtypes are consistent across ddfs
check_dtype = {col: [] for col in final_column_names}
for ddf in ddf_list:
    for col in ddf.columns:
        check_dtype[col].append(ddf[col].dtype)

for col in check_dtype.keys():
    print(col)
    print(pd.Series(check_dtype[col]).unique())
    if len(pd.Series(check_dtype[col]).unique()) > 1:
        print(col + ' has inconsistent dtypes')
    
ddf_comb.to_parquet(cleaned_dir + '/combined_dask_df.parquet', engine='pyarrow', partition_on=['year', 'month'], write_index=False) # write, partitioned on year and month
print('All files processed and saved to Parquet')

loading 201306-citibike-tripdata.csv
.... cleaned, converted to dask df and appended to ddf_list
loading 201306-citibike-tripdata_1.csv
.... cleaned, converted to dask df and appended to ddf_list
bike_id
[dtype('int64')]
birth_year
[dtype('int32')]
end_datetime
[dtype('<M8[ns]')]
end_station_id
[dtype('O')]
end_station_latitude
[dtype('float32')]
end_station_longitude
[dtype('float32')]
end_station_name
[dtype('O')]
gender
[CategoricalDtype(categories=[0, 1, 2], ordered=False)]
ride_id
[dtype('float64')]
rideable_type
[dtype('float64')]
start_datetime
[dtype('<M8[ns]')]
start_station_id
[dtype('O')]
start_station_latitude
[dtype('float32')]
start_station_longitude
[dtype('float32')]
start_station_name
[dtype('O')]
trip_duration
[dtype('int32')]
user_type
[CategoricalDtype(categories=['Subscriber', 'member', 'casual', 'Customer'], ordered=False)]
year
[dtype('int32')]
month
[dtype('int8')]
All files processed and saved to Parquet


## Load cleaned ddf (data bike rides)

In [29]:
# Load cleaned data from saved file
ddf = dd.read_parquet(cleaned_dir + '/combined_dask_df.parquet')
#ddf = dd.read_parquet('path_to_parquet_file', columns=['category_column', 'numeric_column']) # read only specific columns
#ddf_2020 = dd.read_parquet('path_to_parquet_file/year=2020') # read only specific partition

ddf.head()

Unnamed: 0,bike_id,birth_year,end_datetime,end_station_id,end_station_latitude,end_station_longitude,end_station_name,gender,ride_id,rideable_type,start_datetime,start_station_id,start_station_latitude,start_station_longitude,start_station_name,trip_duration,user_type,year,month
0,15839.0,1971,2019-01-01 00:07:07.581,3283.0,40.7882,-73.9704,W 89 St & Columbus Ave,,,,2019-01-01 00:01:47.401,3160.0,40.779,-73.9737,Central Park West & W 76 St,320,Subscriber,2019,1
1,32723.0,1964,2019-01-01 00:10:00.608,518.0,40.7478,-73.9734,E 39 St & 2 Ave,,,,2019-01-01 00:04:43.736,519.0,40.7519,-73.9777,Pershing Square North,316,Subscriber,2019,1
2,27451.0,1987,2019-01-01 00:15:55.438,3154.0,40.7731,-73.9586,E 77 St & 3 Ave,,,,2019-01-01 00:06:03.997,3171.0,40.7852,-73.9767,Amsterdam Ave & W 82 St,591,Subscriber,2019,1
3,21579.0,1990,2019-01-01 00:52:22.650,3709.0,40.738,-73.9964,W 15 St & 6 Ave,,,,2019-01-01 00:07:03.545,504.0,40.7322,-73.9817,1 Ave & E 16 St,2719,Subscriber,2019,1
4,35379.0,1979,2019-01-01 00:12:39.502,503.0,40.7383,-73.9875,E 20 St & Park Ave,,,,2019-01-01 00:07:35.945,229.0,40.7274,-73.9938,Great Jones St,303,Subscriber,2019,1


Now that all column names are consistent and all date is concatenated, check if categories in some columns are consistent

In [16]:
print('Unique categories:')
print(f'user_type - {ddf["user_type"].cat.as_known().cat.categories}')
print(f'rideable_type - {ddf["rideable_type"].cat.as_known().cat.categories}')

print('Unique string:')
print('start_station id, unique: '+str(ddf.start_station_id.nunique().compute()))
print('start_station name, unique: '+str(ddf.start_station_name.nunique().compute()))
print('end_station_id, unique: '+str(ddf.end_station_id.nunique().compute()))
print('end_station name, unique: '+str(ddf.end_station_name.nunique().compute()))

Unique categories:
user_type - Index(['Subscriber', 'Customer', 'member', 'casual'], dtype='object')
rideable_type - Index(['electric_bike', 'classic_bike', 'docked_bike'], dtype='object')
Unique string:
start_station id, unique: 4199
start_station name, unique: 2424
end_station_id, unique: 4252
end_station name, unique: 2445


In [17]:
print(ddf.dtypes)

bike_id                           float64
birth_year                          int32
end_datetime               datetime64[ns]
end_station_id                     object
end_station_latitude              float32
end_station_longitude             float32
end_station_name                   object
gender                           category
ride_id                            object
rideable_type                    category
start_datetime             datetime64[ns]
start_station_id                   object
start_station_latitude            float32
start_station_longitude           float32
start_station_name                 object
trip_duration                       int32
user_type                        category
year                             category
month                            category
dtype: object


In [30]:
# Unify string in rideable_type and user_type columns

#ddf['rideable_type'] = ddf['rideable_type'].astype('str').str.strip().str.lower().astype('category')# if needed

def replace_user_type(df):
    df = df.copy()  # make a copy to avoid SettingWithCopyWarning
    df['user_type'] = df['user_type'].astype(str).str.strip().str.lower() # temporarily convert to string (object)
    
    # Replace 'subscriber' with 'member' and 'customer' with 'casual'
    df.loc[df['user_type'] == 'subscriber', 'user_type'] = 'member'
    df.loc[df['user_type'] == 'customer', 'user_type'] = 'casual'

    df['user_type'] = pd.Categorical(df['user_type'], categories=['member', 'casual']) # convert back to category

    return df

# Use map_partitions to apply this function to the Dask DataFrame
ddf = ddf.map_partitions(replace_user_type)

print('Unique categories, after changing "subscriber" to "member" and "customer" to "casual", for consistency:')
print(f'user_type - {ddf["user_type"].cat.as_known().cat.categories}')
ddf.head(2) # check if it worked

Unique categories, after changing "subscriber" to "member" and "customer" to "casual", for consistency:
user_type - Index(['member', 'casual'], dtype='object')


Unnamed: 0,bike_id,birth_year,end_datetime,end_station_id,end_station_latitude,end_station_longitude,end_station_name,gender,ride_id,rideable_type,start_datetime,start_station_id,start_station_latitude,start_station_longitude,start_station_name,trip_duration,user_type,year,month
0,15839.0,1971,2019-01-01 00:07:07.581,3283.0,40.7882,-73.9704,W 89 St & Columbus Ave,,,,2019-01-01 00:01:47.401,3160.0,40.779,-73.9737,Central Park West & W 76 St,320,member,2019,1
1,32723.0,1964,2019-01-01 00:10:00.608,518.0,40.7478,-73.9734,E 39 St & 2 Ave,,,,2019-01-01 00:04:43.736,519.0,40.7519,-73.9777,Pershing Square North,316,member,2019,1


In [31]:
ddf.isna().sum().compute() # check how many nans in which columns

bike_id                    31328486
birth_year                        0
end_datetime                      0
end_station_id                    0
end_station_latitude          12755
end_station_longitude         12755
end_station_name                  0
gender                     52620105
ride_id                           0
rideable_type              21291619
start_datetime                    0
start_station_id                  0
start_station_latitude            0
start_station_longitude           0
start_station_name                0
trip_duration                     0
user_type                         0
year                              0
month                             0
dtype: int64

In [39]:
ddf[ddf['end_station_latitude'].isna()].compute().head(5)

Unnamed: 0,bike_id,birth_year,end_datetime,end_station_id,end_station_latitude,end_station_longitude,end_station_name,gender,ride_id,rideable_type,start_datetime,start_station_id,start_station_latitude,start_station_longitude,start_station_name,trip_duration,user_type,year,month
313,,0,2021-02-07 08:38:56,,,,,,9A0FF842D38924E8,docked_bike,2021-02-06 09:44:17,JC082,40.7216,-74.0429,Manila & 1st,0,casual,2021,2
526,,0,2021-02-08 16:44:41,,,,,,DF695B318E00BB00,docked_bike,2021-02-07 15:44:49,JC084,40.7144,-74.0666,Communipaw & Berry Lane,0,member,2021,2
546,,0,2021-02-08 13:03:06,,,,,,E7D301B8767D1015,docked_bike,2021-02-08 07:51:36,JC011,40.7165,-74.0496,JC Medical Center,0,member,2021,2
548,,0,2021-02-08 08:12:54,,,,,,2920267F4E3DDFFC,docked_bike,2021-02-08 07:58:42,JC011,40.7165,-74.0496,JC Medical Center,0,member,2021,2
552,,0,2021-02-08 13:03:06,,,,,,AD2EA0ED7DA9B9DB,docked_bike,2021-02-08 08:53:53,JC027,40.7253,-74.0456,Jersey & 6th St,0,member,2021,2


In [None]:
# Get some idea of the df content

print('Mean trip duration: ')
print(ddf.groupby('user_type').agg({'trip_duration': 'mean'}).compute())
ddf.groupby('gender').agg({'trip_duration': 'mean'}).compute()

Mean trip duration: 
           trip_duration
user_type               
member          335.7891
casual          674.6633


In [None]:
# save further cleaned ddf again, overwriting
ddf_comb.to_parquet(cleaned_dir + '/combined_dask_df.parquet', engine='pyarrow', partition_on=['year', 'month'], write_index=False, overwrite=True) # write, partitioned on year and month

## Load collision data

In [63]:
file_path = current_dir + '/data/Motor_Vehicle_Collisions_-_Crashes_20240922.csv'
df = pd.read_csv(file_path)

print(df.shape)
df.head(5)

  df = pd.read_csv(file_path)


(2120518, 29)


Unnamed: 0,CRASH DATE,CRASH TIME,BOROUGH,ZIP CODE,LATITUDE,LONGITUDE,LOCATION,ON STREET NAME,CROSS STREET NAME,OFF STREET NAME,...,CONTRIBUTING FACTOR VEHICLE 2,CONTRIBUTING FACTOR VEHICLE 3,CONTRIBUTING FACTOR VEHICLE 4,CONTRIBUTING FACTOR VEHICLE 5,COLLISION_ID,VEHICLE TYPE CODE 1,VEHICLE TYPE CODE 2,VEHICLE TYPE CODE 3,VEHICLE TYPE CODE 4,VEHICLE TYPE CODE 5
0,09/11/2021,2:39,,,,,,WHITESTONE EXPRESSWAY,20 AVENUE,,...,Unspecified,,,,4455765,Sedan,Sedan,,,
1,03/26/2022,11:45,,,,,,QUEENSBORO BRIDGE UPPER,,,...,,,,,4513547,Sedan,,,,
2,06/29/2022,6:55,,,,,,THROGS NECK BRIDGE,,,...,Unspecified,,,,4541903,Sedan,Pick-up Truck,,,
3,09/11/2021,9:35,BROOKLYN,11208.0,40.67,-73.87,"(40.667202, -73.8665)",,,1211 LORING AVENUE,...,,,,,4456314,Sedan,,,,
4,12/14/2021,8:13,BROOKLYN,11233.0,40.68,-73.92,"(40.683304, -73.917274)",SARATOGA AVENUE,DECATUR STREET,,...,,,,,4486609,,,,,


In [71]:
df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_') # clean column names
df = df.str.strip().str.lower() # clean string entries
print(df.dtypes)

crash_date                        object
crash_time                        object
borough                           object
zip_code                          object
latitude                         float64
longitude                        float64
location                          object
on_street_name                    object
cross_street_name                 object
off_street_name                   object
number_of_persons_injured        float64
number_of_persons_killed         float64
number_of_pedestrians_injured      int64
number_of_pedestrians_killed       int64
number_of_cyclist_injured          int64
number_of_cyclist_killed           int64
number_of_motorist_injured         int64
number_of_motorist_killed          int64
contributing_factor_vehicle_1     object
contributing_factor_vehicle_2     object
contributing_factor_vehicle_3     object
contributing_factor_vehicle_4     object
contributing_factor_vehicle_5     object
collision_id                       int64
vehicle_type_cod

In [None]:
dtype_dict = { # I also convert latitude and longitude to float32 here since it greatly improves efficiency- float32 can hold only 7 decimals, but this should be enough (accurate to ~10m)
    'float32': ['borough','zip_code','number_of_persons_injured', 'number_of_persons_killed', 'latitude', 'longitude'],
    'str': ['on_street_name','cross_street_name','off_street_name','contributing_factor_vehicle_1', 'contributing_factor_vehicle_2',
           'contributing_factor_vehicle_3','contributing_factor_vehicle_4','contributing_factor_vehicle_5','vehicle_type_code_1',
           'vehicle_type_code_2','vehicle_type_code_3','vehicle_type_code_4','vehicle_type_code_5'], 
    'int8': ['number_of_pedestrians_injured', 'number_of_pedestrians_killed', 'number_of_cyclist_injured', 'number_of_cyclist_killed', 
             'number_of_motorist_injured', 'number_of_motorist_killed','collision_id'],
    'datetime64': ['start_datetime', 'end_datetime']
}

dtype_mapping = {}
for dtype, columns in dtype_dict.items():
    for col in columns:
        dtype_mapping[col] = dtype

for col in df.columns:
    df[col] = df[col].astype(dtype_mapping[col])
    if col in dtype_dict['str']: # for categorical data, replace nans with 'unknown' cat
        df[col] = df[col].astype('category') # string to category, as it needs less memory
        df[col] = df[col].cat.add_categories('unknown').fillna('unknown') # add unknown category for nans

df['crash_datetime'] = pd.to_datetime(df['crash_date'] + ' ' + df['crash_time'])
df.drop(['crash_date','crash_time'], inplace=True)
df['year'] = df['crash_datetime'].dt.year.astype('int8') # add year column 
df['month'] = df['crash_datetime'].dt.month.astype('int8') # add month column 
        
df = df.drop_duplicates().sort_values(by='start_datetime')# drop duplicates and sort to start rental time/date
df = df.reset_index(drop=True)

df.to_csv(cleaned_dir + '/crashed_cleaned.csv')

## Data cleaning
### Which columns contain nans?
### Change data format

In [73]:
# Check which column has missing values (nans) and how many
summary_table = pd.DataFrame({
    'Nan_count': df.isna().sum(),
    'Total': df.shape[0]
})

print(summary_table)

                               Nan_count    Total
crash_date                             0  2120518
crash_time                             0  2120518
borough                           659498  2120518
zip_code                          659758  2120518
latitude                          247820  2120518
longitude                         247820  2120518
location                          247820  2120518
on_street_name                    453598  2120518
cross_street_name                 807416  2120518
off_street_name                  1759293  2120518
number_of_persons_injured             18  2120518
number_of_persons_killed              31  2120518
number_of_pedestrians_injured          0  2120518
number_of_pedestrians_killed           0  2120518
number_of_cyclist_injured              0  2120518
number_of_cyclist_killed               0  2120518
number_of_motorist_injured             0  2120518
number_of_motorist_killed              0  2120518
contributing_factor_vehicle_1       7107  2120518


In [96]:
# Check types of data in columns
#df['zip_code'].unique()
#df['number_of_persons_injured'].unique()
df['number_of_persons_killed'].unique()

array([ 0.,  1.,  2.,  3.,  4., nan,  8.,  5.])

In [None]:
convert_dict = {
    'borough': str,
}
df = df.astype(convert_dict)
    
df['datetime'] = pd.to_datetime(df['date'] + ' ' + df['time']) # create datetime column
df['date'] = pd.to_datetime(df['date'])
df['year'] = df['datetime'].dt.year # create year column for easy data selection

#df[col] = pd.to_numeric(df[col], errors='coerce')  # errors='coerce' will convert invalid parsing to NaN

In [None]:
df.describe()