# Kinetic Model for "A bumpy ride to electrification: High fidelity energy consumption estimates for paratransit vehicles in South Africa"

Welcome! This Jupyter Markdown file walks through the kinetic model used to estimate vehicle energy consumption from 1Hz GPS tracking data for  "A bumpy ride to electrification: High fidelity energy consumption estimates for paratransit vehicles in South Africa" - Hull, et al (2022) </br>
• paper doi: </br>
• data-in-brief doi: </br>
• link to data repository: </br>

Email addresses for questions, comments, concerns, suggestions, connections, etc: </br> mjbooysen@sun.ac.za </br> katherine.collett@eng.ox.ac.uk </br> christopher.hull@eng.ox.ac.uk


Author: Christopher Hull </br>
Date created: 10 June, 2022 </br>
Date last updated: 10 June, 2022

## Download the Data
To begin, download the data files from the 'Raw Data' folder at the Mendeley Data repository link above. 

## Import Packages
Run the following cell to import all of the packages necessary for data processing

In [111]:
import pandas as pd
import os
import numpy as np
from numpy import inf
import datetime
from datetime import datetime, timedelta
import descartes
import geopandas as gpd
import geopy.distance
from pathlib import Path

## Functions to Load Data
The "get_raw_data" function assumes the same file hierarchy as the data is stored in the Mendeley Data repository. Once you have downloaded the data, all you must do to load the data is provide a path from your current working directory to the 'path' string variable, and run the get_raw_data function.

Read file function

In [101]:
''' 
This function takes the filename of a GPS trip stored in a .csv file, and path to the file.

It reads the file, cleans it, adds columns that are necessary for the kinetic model,
and returns the file in a pandas dataframe
'''

def read_file(filename, path):

    journey = pd.read_csv(path + "/" + filename)

    ############################ Data cleaning #####################################

    if 'Aatitude' in journey.columns:
        journey['Altitude'] = journey['Aatitude']
        journey.drop(columns = ['Aatitude'])

    if 'GPS Speed' in journey.columns:
        journey['Speed'] = journey['GPS Speed']
        journey.drop(columns = ['GPS Speed'])

    # Join date and time columns into one column
    journey['DateTime'] = pd.to_datetime(journey['Date'] + journey['Time'], format = '%m/%d/%Y%H:%M:%S')

    # Check if the time logger got stuck for a second, and correct if so
    for i in range(len(journey) - 1):                                   
         if journey['DateTime'][i] == journey['DateTime'][i + 1]:
            journey['DateTime'][i + 1] += timedelta(seconds = 1)  
            

            
    #######################################################################
  
    ############ Set up dataframe for energy consumption estimations ############

    
    # Convert speed in km/h to m/s 
    journey['Velocity'] = np.where(journey['Speed'] >= 0.5, journey['Speed']/3.6, 0) # 
    
    
   # for i in range(len(journey)): ### Observations less than 0.5 km/h are set to 0, due to GPS noise 
   #     if journey['Speed'][i] < 0.5:
      #      journey['Velocity'][i] = 0

    #Calculate elevation change
    journey['ElevChange'] = np.where(abs(journey['Altitude'].shift(-1) - journey['Altitude']) >= 0.2, journey['Altitude'].shift(-1) - journey['Altitude'], 0)
   # for i in range(len(journey)): # If measured elevation change < 0.2, then set to 0 due to GPS noise. 
    #    if abs(journey['ElevChange'][i]) < 0.2:
     #       journey['ElevChange'][i] = 0 

    # Calculate time between samples. Useful for kinetic model. 
    # This is needed because some samples are 1/2Hz. 
    journey['DeltaT'] = journey['DateTime'].shift(-1) - journey['DateTime']
    journey.DeltaT = journey['DeltaT']/ np.timedelta64(1, 's') # Convert from timedelta to float
    
    # Calculate change in velocity between each timestep 
    journey['DeltaV'] = journey['Velocity'].shift(-1) - journey['Velocity']

    # Calculate acceleration
    journey['Acceleration'] = np.where(journey['DeltaT'] > 0, journey['DeltaV']/journey['DeltaT'], 0)

    # Joins lat/lon Coords into one column, useful for getDist function
    journey['Coordinates'] = list(zip(journey.Latitude, journey.Longitude)) 

    return journey

Get raw data function

In [44]:
''' 
This function takes in a folder directory where the raw GPS data files are stored,
reads the data files using the read_file function, 
and returns a nested dictionary containing all trip files, 
in the same hierarchy as the folders in which they are stored
'''

def get_raw_data(path):
    raw_trips = {} # Dictionary to store all trips in as dataframes
    
    for root, dirs, files in os.walk(path, topdown = True): # Walk through directory
        for file in files:    
            if file.lower().endswith('.csv'): #make sure to only reading csv files and not auxiliary files in the directory
                

                this_trip = read_file(file,root)

                route = root.split(os.sep)[1] # Get current route name
                time_of_day = root.split(os.sep)[2] # Get current time of day
                unit = os.path.splitext(file)[0] # Get unit
                print("Reading:", route,"/",time_of_day,"/",unit)

                if route not in raw_trips: 
                    raw_trips[route] = {} # Create  dictionary for current route

                if time_of_day not in raw_trips[route]:
                    raw_trips[route][time_of_day] = {} # Create dictionary for current time of day within route

                raw_trips[route][time_of_day][unit] = {} #Create dictionary entry to enter trip in 
                raw_trips[route][time_of_day][unit] = this_trip
            
    return raw_trips

• Each route and time of day combo is given its own nested dictionary to store data files in. </br>
Thus, the dictionary structure in which the data is stored reflects the folder hierarchy. </br>

• For example, there is a trip file UnitMain9.csv that was recorded in the morning on the route from Pniel - STB. Therefore it is stored in 'Pniel - STB'/'Morning'/'UnitMain_9.csv'. </br>
Therefore, to access it in the raw_trips: raw_trips['Pniel - STB']['Morning']['UnitMain_9.csv'] 

• Now that we have the data loaded in, let's build the model...  

## Functions to Process Data

Get slope angle and geodesic distance

In [86]:
''' 
Inputs: dataframe with vehicle journey data

Calculates distance between each successive pair of lat/lon coordinates
(accounts for elevation difference)

Calculates slope angle faced by vehicle at each timestamp

'''
def getDistSlope(journey):
    Distance = np.zeros(len(journey))
    Slope = np.zeros(len(journey))
    l_route = journey.shape[0]

    for i in range(0,l_route-1): 
      #  elev_change = journey['ElevChange'][i]
        elev_change = journey['Altitude'][i + 1] - journey['Altitude'][i]
        if abs(elev_change) < 0.2:
            elev_change = 0
        
        dist_lateral = geopy.distance.geodesic(journey.Coordinates.iloc[i],  # (m) Lateral distance between two lat/lon coord pairs
                                       journey.Coordinates.iloc[i+1]).m
        dist_3d = np.sqrt(dist_lateral**2 + elev_change**2)  # (m) 3D Distance using pythogoras to account for elevation change
        if i == 0:
            Distance[i] = 0
        else:
            Distance[i] = dist_3d
        if Distance[i] != 0 and elev_change != 0:
            Slope[i] = np.arcsin(elev_change/dist_3d)
    journey['Displacement_m'] = list(Distance) # Used for computing kWh/km 
    journey['slope_rad'] = list(Slope) 

Functions for retrieving environmental forces

In [6]:
'''
Functions that calculate three environmental forces acting on the vehicle in (N)

'''

# Rolling Resistance (road friction) (N)
def getRollingResistance(mass, c_rr, slope, vel, grav = 9.81): #c_rr is coeff of rolling resistance
    rr = 0
    if vel > 0.3:
        rr = -mass * grav * c_rr * np.cos(slope)
    return rr
 
# Aerodynamic Drag Force (N)
def getAerodynamicDrag(c_d, A, vel, rho = 1.184): # rho is air density 20C, c_d is air drag coeff
    return -0.50 * rho * c_d * A * vel**2

# Road Slope Force (N)
def getRoadSlopeDrag(mass, slope, grav = 9.81):
    return -mass * grav * np.sin(slope)

Drivecycle class

In [20]:
class Drivecycle: 
    """
    Inputs: dataframe with journey info
    """
    def __init__(self, journey): 
        self.displacement = journey.Displacement_m # m
        self.velocity = journey.Velocity # m/s
        self.acceleration = journey.Acceleration #m/s^2
        self.slope = journey.slope_rad # rad
        self.dt = journey.DeltaT # Time elapsed between each timestamp
        self.dv = journey.DeltaV
        
        

Vehicle class
- Get energy expenditure function

In [42]:
class Vehicle:
    """
    Inputs: Physical parameters of vehicle for modeling energy consumption
    Returns an array of vehicle energy consumption at each timestamp
    
    Default assumptions from "A bumpy ride to electrification":
    mass = 3900 kg
    cd = 0.36
    crr = 0.02
    A = 4 m^2
    eff = 0.90
    rgbeff = 0.65
    p0 = 100 W
    """
    
    def __init__(self, mass = 3900, cd = 0.36, crr = 0.02, A = 4,
                 eff = 0.9, rgbeff = 0.65, p0 = 100):

        # Vehicle physical parameters
        self.mass = mass # (kg) Total veh weight including payload
        self.crr = crr # Rolling resistance coeff
        self.cd = cd  # Air drag coeff
        self.A = A # (m^2) Approximation of vehicle frontal area 
        self.eff = eff # Powertrain efficiency
        self.rgbeff = rgbeff # Recuperative braking efficiency
        self.p0 = p0 # (W) Constant power loss (to power auxiliary vehicle systems)

        
    # Computes energy expenditure from a Drivecycle object
    def getEnergyExpenditure(self,cycle):
        
        v = cycle.velocity # (m/s) 
        s = cycle.slope # (rad)
        a = cycle.acceleration # (m/s^2)
        dt = cycle.dt # (s)
        d = cycle.displacement # (m)
        dv = cycle.dv # (m/s)
            
            
        Er = [] # Total energy consumption (J), from battery if positive, to battery if negative (RGbrake)

        
                
        for slope,vel,acc,delta_t, delta_v in zip(s,v,a,dt,dv):
            
            force, frr, fad, fsd = 0,0,0,0
            
            if vel != 0: #No drag if velocity = 0
                frr = getRollingResistance(self.mass,self.crr, slope, vel) # (N)
                fad = getAerodynamicDrag(self.cd, self.A, vel) # (N)
                fsd = getRoadSlopeDrag(self.mass, slope) # (N)
                
            force = frr + fad + fsd # (N)
            
            exp_speed_delta = force * delta_t / self.mass # (N-s/kg)
            
            unexp_speed_delta = delta_v - exp_speed_delta # (m/s)
            
            try:
                prop_brake_force = self.mass * unexp_speed_delta / delta_t # (kg * m/s)/(s) = (N)
            
                kinetic_power = prop_brake_force * vel # (N)*(m/s) = (W)
                
                propultion_work = kinetic_power * delta_t  # (W)*(s) = (J) 
                
            except ZeroDivisionError:
                prop_brake_force = 0
                kinetic_power = 0
                propultion_work = 0

            Er.append(propultion_work) # (J)

        ErP = [0.0]*len(Er)
        ErB = [0.0]*len(Er)

        for i in range(len(Er)):            
            if Er[i] > 0: # Propulsion Energy 
                ErP[i] = Er[i]/self.eff # (J)
                
            elif Er[i] < 0: # Recuperated Energy (from regen braking system)
                ErB[i]= Er[i]*self.rgbeff # (J)
    
    
    
        return ErP,ErB 

Function to loop through raw trips and process them

In [114]:
''' 
Intakes dictionary of raw trip data
Outputs dictionary of processed trip data, with energy expenditure computations
'''
def process_data(trip_dict, SUMO = False, mass = 3900, cd = 0.36, crr = 0.02, A = 4.0, eff = 0.90, rgbeff = 0.65, p0=100):
    processed_trips = trip_dict #initialize dictionary of same structure
    for route, v in trip_dict.items():
        for time_of_day, v in trip_dict[route].items():
            for unit,journey in trip_dict[route][time_of_day].items():
                
                print("Processing journey:", route, time_of_day, unit) 
                minibus = Vehicle(mass = mass, cd = cd, crr = crr, A = A, eff = eff, rgbeff = rgbeff, p0 = p0)
                
                if SUMO:
                    drvcycle = SUMO_Drivecycle(journey) # Create drivecycle object with trip data
                else:
                    getDistSlope(journey) # Augment journey dataframe with distance and slope columns
                    drvcycle = Drivecycle(journey) # Create drivecycle object with trip data
                
                # Compute energy expenditure (with and without regenerative braking)
                journey['Propultion Work (J)'],journey['Braking Work (J)'] = minibus.getEnergyExpenditure(drvcycle)
                journey['Offload Work (J)'] = journey['DeltaT'] * p0 # (J)

                journey['Energy Consumption (kWh)'] = (journey['Propultion Work (J)'] + journey['Braking Work (J)'] + journey['Offload Work (J)'])/3.6e6 # (kWh)                
                
                #Rename columns for clarity
                journey['Displacement (m)'] = journey['Displacement_m']
                journey['Slope Angle (rad)'] = journey['slope_rad']
                
            
                #Drop duplicate and intermediate calculation columns to reduce clutter
                journey = journey.drop(['Displacement_m', 'DeltaT', 'slope_rad', 'Velocity', 'DeltaV',
                                       'Acceleration', 'Coordinates', 'DateTime', 'ElevChange'], axis = 1)
                
                #Insert processed journey into dictionary
                processed_trips[route][time_of_day][unit] = journey
                
    return processed_trips

## Call read and process data functions

In [115]:
path = "DataCollection/"
raw_trips = get_raw_data(path)
processed_trips = process_data(raw_trips)

Reading: Pniel - STB / Morning / UnitMain_9
Reading: Pniel - STB / Morning / Unit1_2
Reading: Pniel - STB / Morning / UnitMain_10
Reading: Pniel - STB / Midday / Unit3_5
Reading: Pniel - STB / Midday / Unit1_6
Reading: Pniel - STB / Midday / Unit4_4
Reading: Pniel - STB / Midday / UnitMain_3
Reading: Pniel - STB / Afternoon / UnitMain_11
Reading: Pniel - STB / Afternoon / UnitMain_7
Reading: Pniel - STB / Afternoon / Unit3_8
Reading: KMNDI - STB / Morning / Unit3_3
Reading: KMNDI - STB / Morning / Unit4_2
Reading: KMNDI - STB / Morning / UnitOG_5
Reading: KMNDI - STB / Morning / UnitMain_1
Reading: KMNDI - STB / Midday / Unit1_10
Reading: KMNDI - STB / Midday / Unit4_7
Reading: KMNDI - STB / Midday / Unit2_9
Reading: KMNDI - STB / Midday / UnitMain_6
Reading: KMNDI - STB / Midday / Unit3_8
Reading: KMNDI - STB / Afternoon / Unit3_13
Reading: KMNDI - STB / Afternoon / Unit1_15
Reading: KMNDI - STB / Afternoon / UnitMain_11


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  journey['DateTime'][i + 1] += timedelta(seconds = 1)


Reading: KMNDI - STB / Afternoon / Unit4_12
Reading: STB - KMNDI / Morning / Unit3_2
Reading: STB - KMNDI / Morning / Unit1_3
Reading: STB - KMNDI / Morning / UnitOG_4
Reading: STB - KMNDI / Morning / UnitMain_1
Reading: STB - KMNDI / Midday / Unit3_6
Reading: STB - KMNDI / Midday / UnitMain_5
Reading: STB - KMNDI / Midday / Unit1_9
Reading: STB - KMNDI / Afternoon / Unit3_12
Reading: STB - KMNDI / Afternoon / Unit1_14
Reading: STB - KMNDI / Afternoon / UnitMain_10
Reading: SW - STB / Morning / Unit4_3
Reading: SW - STB / Morning / UnitOG_1
Reading: SW - STB / Morning / UnitMain_2
Reading: SW - STB / Midday / Unit1_7
Reading: SW - STB / Midday / Unit3_6
Reading: SW - STB / Midday / Unit4_5
Reading: SW - STB / Midday / UnitMain_4
Reading: SW - STB / Afternoon / Unit1_10
Reading: SW - STB / Afternoon / Unit4_8
Reading: SW - STB / Afternoon / Unit2_9
Reading: STB - SW / Morning / Unit4_2
Reading: STB - SW / Morning / UnitOG_3
Reading: STB - SW / Morning / UnitMain_1
Reading: STB - SW / Mi

## How to analyze`

Now you have all of the trips stored in the 'processed_trips' dictionary 
To loop through this dictionary and perform analysis on the trips you can use the for loops in the following block of code:

In [73]:

for route, route_subdict in processed_trips.items():
    for time_of_day, route_tod_subdict in processed_trips[route].items():
        for unit, journey_df in processed_trips[route][time_of_day].items():
            print("Here lies the dataframe for a trip taken on/during/recorded with: ", route, "/", time_of_day, "/", unit, "\n")

            

Here lies the dataframe for a trip taken between:  Pniel - STB 
 During the:  Morning Recorded on:  UnitMain_9
Here lies the dataframe for a trip taken between:  Pniel - STB 
 During the:  Morning Recorded on:  Unit1_2
Here lies the dataframe for a trip taken between:  Pniel - STB 
 During the:  Morning Recorded on:  UnitMain_10
Here lies the dataframe for a trip taken between:  Pniel - STB 
 During the:  Afternoon Recorded on:  Unit3_5
Here lies the dataframe for a trip taken between:  Pniel - STB 
 During the:  Afternoon Recorded on:  Unit1_6
Here lies the dataframe for a trip taken between:  Pniel - STB 
 During the:  Afternoon Recorded on:  Unit4_4
Here lies the dataframe for a trip taken between:  Pniel - STB 
 During the:  Afternoon Recorded on:  UnitMain_3
Here lies the dataframe for a trip taken between:  Pniel - STB 
 During the:  Evening Recorded on:  UnitMain_11
Here lies the dataframe for a trip taken between:  Pniel - STB 
 During the:  Evening Recorded on:  UnitMain_7
Her

Example usage of this loop to perform summary statistics on energy consumptions for all trips

In [120]:
energy_consumption = []
routes = []
tod = []

for route, route_subdict in processed_trips.items():
    for time_of_day, route_tod_subdict in processed_trips[route].items():
        for unit, journey_df in processed_trips[route][time_of_day].items():
            energy_consumption.append(journey_df['Energy Consumption (kWh)'].sum()/(journey_df['Displacement (m)'].sum()/1e3))
            tod.append(time_of_day)
            routes.append(route)
            if journey_df.Speed[0] > 50:
                print(route, time_of_day, unit)
            
            
summ_df = pd.DataFrame({'Energy Consumption (kWh)': pd.Series(energy_consumption), 
              'Time of Day': pd.Series(tod), 
              'Route': pd.Series(route)})

print(round(summ_df.describe(),2))

Pniel - STB Afternoon Unit3_8
KMNDI - STB Afternoon Unit1_15
       Energy Consumption (kWh)
count                     64.00
mean                       0.39
std                        0.05
min                        0.31
25%                        0.35
50%                        0.38
75%                        0.42
max                        0.51


Write Files to "Processed Data" Folder (must create a Processed Data folder or equivalent to store the data in)

In [119]:
def write_processed_files(folder, all_journeys):
    for route, x in all_journeys.items():
        for tod, m in all_journeys[route].items():
            for unit, journey in all_journeys[route][tod].items():
                filename = folder + "/" + route + "/" + tod + "/" + unit + ".csv"
                output_file = Path(filename)
                output_file.parent.mkdir(exist_ok=True, parents=True)
                journey.to_csv(filename)


output_folder = "Processed Data"
write_processed_files(output_folder, processed_trips)