In [7]:
import pandas as pd
from datetime import datetime, timedelta as delta
import random
import numpy as np

import calendar
import math
import os

# Read in probability distributions #

Some of the distributions for the script should be generated from survey data specific to the location the trips are being generated for. 
 - Leave home time (weekend / weekday) based on custom probability distribution
 - parameters for gamma distribution describing distance between home and work

Other distributions can use basic assumptions like time spent at work or traffic disruption


In [8]:
path = path=r"S:\E3 Projects\CEC EPRI V2G\E3 Modeling"
leave_home_probs = pd.read_csv(path + r'\NHTS_NE_leave_home_probabilities.csv', header=0, index_col=0)

Function requires a list of tuples (time of departure, probability) so will generate this from the data

In [9]:
pdf_wkda = [(index, val) for index, val in leave_home_probs['weekday'].iteritems() if index > 4 and index < 11.5]
pdf_wked = [(index, val) for index, val in leave_home_probs['weekend'].iteritems() if index > 4.5 and index < 21]

# Random Driving Pattern Function #

random_distr used to select a value based on a probability of that value occuring

rand_gen is the main function used to generate driving profiles 

In [10]:
def random_distr(pdf):
    
    """ Function to use a customised cumulative distribution function with a randomised threshold 
    
    pdf = tuple of sequential values or items and their associated probabilities of occuring, 
    create a cumulative distribution function from this pdf"""
    
    #ransomised threshold
    rand_threshold = random.uniform(0, 1)
    cumulative_prob = 0
    
    for value, prob in pdf:
        cumulative_prob += prob
        if cumulative_prob >= rand_threshold:
            return value
        elif value == pdf[-1][0]:
            value = 9999
            
    return value

In [11]:
def distance_time(leave_home):
    """Used to increase journey times that start earlier in the day
    REVISIT THIS - REVERSE CAUSAL ISSUE: should be make leave home earlier if journey is longer....
    Also need to find out where I got  these numbers from!"""
    
    dist_factor = -0.07813*leave_home + 1.890625
    return dist_factor

In [12]:
def check_distance(ev_type, dp, rtrn_jrny):
    """Checks the drive period does not exceed battery limit
    Assumes global parameters: power, hr_per_period, period_per_hr, battery_limit are available
    args:
    ev_type: 1 = BEV, else PHEV
    dp = float representing number of periods driving 
    rtrn_jrny = if battery is needed to get to destination and back without charging, 2, if charging is present at destination,1 
    returms:
    electric drive period: number of intervals driving in electric mode
    drive_period: number of intervals for journey in total"""
    
    if ev_type == 1:
        edp = dp
        if ((dp*hr_per_period)*power) > (0.8 * (battery_limit / rtrn_jrny)):
            dp = (((0.8 * (battery_limit / rtrn_jrny)) / power) * period_per_hr)
            edp = (((0.8 * (battery_limit / rtrn_jrny)) / power) * period_per_hr)
        drive_period = int(math.ceil(dp))
        electric_drive_period = drive_period
    else:
        drive_period = int(math.ceil(dp))
        if ((dp*hr_per_period)*power) > (0.95*(battery_limit/rtrn_jrny)):
            edp = (((0.95*(battery_limit / rtrn_jrny)) / power) * period_per_hr)
            electric_drive_period = int(math.ceil(edp))
        else:
            electric_drive_period = drive_period
            edp = dp
    
    return electric_drive_period, edp, drive_period, dp

In [13]:
def rand_gen(num_evs, pdf_we, pdf_wd, BEV_PHEV, gam_a=2, gam_b=2.5, gam_c=10, miles_per_period=8.75, power_consumption=12, 
             battery_size=60, freq=15, year=2017):
    
    """ Random EV profile generator
    
    num_evs = int of total number of ev profiles you want to make
    pdf_w_e = probability distribution for time leaving home on a weekend, list of tuples (time leaving, probability)
    pdf_w_d = as above but for week day 
    BEV_PHEV = list of vehicle types, 1 = BEV, 2 = PHEV, list len must match num_evs
    gam_a - c = parameters for gamma distribution describing the distance between a drivers home and work location 
    energy_consumption = average power consumption (kw) during an hour of continuous city driving (~30 mph), 
                        float or list (size must be same as num_evs)
    battery_size = size of EV battery in kWh, default is 75 (if list is passed, size must be same as num_evs)
    freq = timestep for time series, 60 = 60 mins per time step, 15 = 15 min per timestep (4 x more timesteps than freq = 60)
    year = int of year, to recognise if leap or not
    
    Output is 3 different dataframes, so need 3 variables to assign to outputs when using this function """ 
    
    days = 366 if calendar.isleap(year) else 365
    home_avail = pd.DataFrame()
    work_avail = pd.DataFrame()
    driv_disch = pd.DataFrame()
    driv_miles = pd.DataFrame()
    
    global period_per_hr
    period_per_hr = (60.0/freq)
    global hr_per_period
    hr_per_period = (1/period_per_hr)
    
    print('Creating profiles for ' + str(num_evs) + ' EVs in the year ' + str(year) + ' at ' + str(freq) + ' min timesteps')
    
    for ev in range(num_evs):
               
        #create dataframes to fill 
        home_ev_ts = pd.DataFrame()
        work_ev_ts = pd.DataFrame()
        drive_ev_ts = pd.DataFrame()
        miles_ev_ts = pd.DataFrame()
        
        #establish EV technical parameters
        global power
        if type(power_consumption) is not list:
            power = power_consumption                
        elif num_evs != len(power_consumption):
            raise Exception('The power_consumption argument was input as a list so its length must equal the number of EVs!')
        else:
            power = float(power_consumption[ev])
        
        if num_evs != len(BEV_PHEV):
            raise Exception('The BEV_PHEV argument list must have a length equal to the number of EVs!')
        global battery_limit
        if type(battery_size) is not list: 
            battery_limit = battery_size
        elif num_evs != len(battery_size):
            raise Exception('The battery_size argument was input as a list so its length must equal the number of EVs!')
        else:
            battery_limit = float(battery_size[ev])
        
        for day in range(days):

            #dataframe of minutes
            nminutes = 1440
            start = datetime(year, 1,1,0) + delta(days=day)
            dates = [start + delta(minutes =x) for x in range(0, nminutes)]
            values = 1
            tsa = pd.Series(values, index=dates)
            ts = tsa.resample(str(freq) +'Min').mean()
            ts_h = ts
            ts_w = ts-1
            ts_dr = ts-1
            ts_mdr = ts-1

            if start.weekday() > 4:

                # WEEKEND - WAHOOO
                # Start of day - use custom pdf
                pdf_w_e = pdf_we
                leave_home = int(random_distr(pdf_w_e)*(period_per_hr))
                
                # Account for trips futher from home leaving earlier in the day
                leave_home_factor = distance_time(leave_home*hr_per_period)

                # Drive Period (both ways) resampled, very broad distribution
                odp = (np.random.gamma(2,5)/10)*(period_per_hr)* 1 #leave_home_factor
               
                # Check the energy consumed during trip does not exceed battery limit, only applicable for BEVs
                electric_drive_period, edp, drive_period, dp = check_distance(BEV_PHEV[ev], odp, rtrn_jrny=2)
            
                # out of house period
                ohp = np.random.normal(3*period_per_hr, 3*period_per_hr)
                if ohp < (1*period_per_hr):
                    ohp = (1*period_per_hr)
                play_period = int(round(ohp))

                #Sum total time out of the house for availability at home
                total_period = drive_period + play_period + drive_period
                
                #time away from home
                ts_h[leave_home:(leave_home + total_period)] = 0
                #time driving to destination
                ts_mdr[leave_home:(leave_home + drive_period)] = ((miles_per_period*dp)/drive_period)
                #time driving to destination under electric power
                ts_dr[leave_home:(leave_home + electric_drive_period)] = ((power*edp)/electric_drive_period)
                #time driving home
                ts_mdr[(leave_home + drive_period + play_period):(leave_home + total_period)] = (
                    (miles_per_period*dp)/drive_period)
                #time driving home under electric power
                ts_dr[(leave_home + drive_period + play_period):(leave_home + total_period - drive_period + 
                                                                 electric_drive_period)] = ((power*edp)/electric_drive_period)

            else:

                # WORKDAY
                # Start of day - use custom pdf
                pdf_w_d = pdf_wd
                leave_home = int(random_distr(pdf_w_d)*period_per_hr)

                # Work Commute Period (wcp) generated based on gamma distribution (every day could be completely different) 
                wcp = (np.random.gamma(gam_a,gam_b)/gam_c)*period_per_hr # units are freq increments 2, 2.5 is best
                if wcp > (2*period_per_hr): # check commute time not crazy long (2 hrs max)
                    wcp = (2*period_per_hr)
                if wcp < (0.25*period_per_hr): #commute must be at least 15 mins driving
                    wcp = (0.25*period_per_hr)
                
                # WCP adjusted for traffic conditions
                odwp = np.random.normal(wcp, wcp/5) #frequency already corrected
                if odwp <= (0.08*period_per_hr): #time cannot be less than 5 mins
                    odwp = (0.08*period_per_hr)
                
                electric_drive_work, edwp, drive_work, dwp = check_distance(BEV_PHEV[ev], odwp, rtrn_jrny=2)
                
                # At work
                wp = np.random.normal(9*period_per_hr, 1*period_per_hr)
                if wp < (3*period_per_hr): # must be at work for at least 3 hrs
                    wp = (3*period_per_hr)
                work_period = int(round(wp))

                # drive home, vary wcp in same was as for dwp to calculate drive home period
                odhp = np.random.normal(wcp, wcp/5) #frequency already correct
                if odhp <= (0.08*period_per_hr): #time cannot be less than 5 mins
                    odhp = (0.08*period_per_hr)
                
                electric_drive_home, edhp, drive_home, dhp = check_distance(BEV_PHEV[ev], odhp, rtrn_jrny=2)

                #Sum total time out of the house for availability at home
                total_period = drive_work + work_period + drive_home
                
                #time away from home
                ts_h[leave_home:(leave_home + total_period)] = 0
                #time at work
                ts_w[(leave_home + drive_work):(leave_home + drive_work + work_period)] = 1
                #time driving to work
                ts_mdr[leave_home:(leave_home + drive_work)] = ((miles_per_period*dwp)/drive_work)
                #time driving to work under electric power
                ts_dr[leave_home:(leave_home + electric_drive_work)] = ((power*edwp)/electric_drive_work)
                #time driving home 
                ts_mdr[(leave_home + drive_work + work_period):(leave_home + total_period)] = (
                    (miles_per_period*dhp)/drive_home)
                #time driving home under electric power
                ts_dr[(leave_home + drive_work + work_period):(leave_home + total_period - drive_home + 
                                                               electric_drive_home)] = ((power*edhp)/electric_drive_home)

            home_ev_ts = pd.concat((home_ev_ts, ts_h), axis=0, ignore_index=False)
            work_ev_ts = pd.concat((work_ev_ts, ts_w), axis=0, ignore_index=False)
            drive_ev_ts = pd.concat((drive_ev_ts, ts_dr), axis=0, ignore_index=False)
            miles_ev_ts = pd.concat((miles_ev_ts, ts_mdr), axis=0, ignore_index=False)
        
        ev_string = '{}EV_{}'.format(('B' if BEV_PHEV[ev] == 1 else 'PH'), ev+1)
        home_ev_ts.columns = [ev_string]
        home_avail = pd.concat([home_avail, home_ev_ts], axis=1)
        
        work_ev_ts.columns = [ev_string]
        work_avail = pd.concat([work_avail, work_ev_ts], axis=1)
        
        drive_ev_ts.columns = [ev_string]
        driv_disch = pd.concat([driv_disch, drive_ev_ts], axis=1)
        
        miles_ev_ts.columns = [ev_string]
        driv_miles = pd.concat([driv_miles, miles_ev_ts], axis=1)
        
        print('{}EV_{} complete'.format(('B' if BEV_PHEV[ev] == 1 else 'PH'), ev+1))
        
    return work_avail, home_avail, driv_disch, driv_miles

In [14]:
work_avail, home_avail, driv_disch, driv_miles = rand_gen(num_evs=12, pdf_we=pdf_wked, pdf_wd=pdf_wkda, gam_b=2, 
                                                          miles_per_period=6.95,
                                                          BEV_PHEV=[1,1,1,1,1,1,2,2,2,2,2,2],
                                                          power_consumption=9.235880399,
                                                          battery_size=[65,65,65,65,65,65,17,17,17,17,17,17], 
                                                          freq=15, year=2017)

Creating profiles for 12 EVs in the year 2017 at 15 min timesteps
BEV_1 complete
BEV_2 complete
BEV_3 complete
BEV_4 complete
BEV_5 complete
BEV_6 complete
PHEV_7 complete
PHEV_8 complete
PHEV_9 complete
PHEV_10 complete
PHEV_11 complete
PHEV_12 complete


In [15]:
driv_miles.sum() #/ (driv_disch.sum() / 4)

BEV_1      10985.949720
BEV_2      10295.157784
BEV_3       9885.148862
BEV_4      10239.089155
BEV_5      10515.430910
BEV_6      10587.272208
PHEV_7     10611.908569
PHEV_8     11117.255639
PHEV_9     10521.336746
PHEV_10    10346.540538
PHEV_11    11093.609173
PHEV_12    10238.775686
dtype: float64

In [16]:
driv_disch.sum() / 4

BEV_1      3649.817183
BEV_2      3420.318201
BEV_3      3284.102612
BEV_4      3401.690749
BEV_5      3493.498641
BEV_6      3517.366182
PHEV_7     2918.104727
PHEV_8     2990.503255
PHEV_9     2946.516021
PHEV_10    2804.316299
PHEV_11    2990.942863
PHEV_12    2812.923048
dtype: float64

In [17]:
path = r'S:\E3 Projects\National Grid EV Regulatory Support\Research\Random Driving Pattern Outputs'

home_avail.to_csv(os.path.join(path,r'charging_availability.csv'))
work_avail.to_csv(os.path.join(path,r'charging_availability_work.csv'))
driv_disch.to_csv(os.path.join(path,r'driving_kw.csv'))
driv_miles.to_csv(os.path.join(path,r'driving_miles.csv'))

# Testing #

The following code is all for testing the outputs of the function to see how random and realistic the outputs are. Note that its a lot better to just use one EV for testing purposes!!

First test the speed of the function

In [None]:
pdf_w_e = [(5, 0.0074), (6, 0.0035), (7, 0.01023), (8, 0.02758), (9, 0.05276), (10, 0.07554), 
            (11,0.0773),(12,0.07098),(13,0.07754)]
for i in range(100):
    print(int(random_distr(pdf_w_e)))

In [None]:
%timeit rand_gen(num_evs=4, power_consumption=1.197625, freq=15, year=2016)

Run the function (preferably use high number of timesteps

# Combine Work and Home #

Combine all the function outputs together for easier comparison.

In [None]:
outputs = pd.concat([work_avail, home_avail, driv_disch], axis=1)

In [None]:
outputs.head(20)

## General Tests ##

Create day of the week column

In [None]:
outputs['day_of_week'] = outputs.index.dayofweek

In [None]:
outputs.describe()

At 12pm every day, where is the EV through the year?

In [None]:
outputs[outputs.index.hour == 12].sum()

### Weekend Analysis###
Where do people tend to be on weekends?

In [None]:
outputs[outputs.index.dayofweek > 4].describe()

(1 - sum of mean for home and drive should be the average time spent out on weekends) 

In [None]:
1 - outputs[outputs.index.dayofweek > 4]['EV_h_1'].mean() - outputs[outputs.index.dayofweek > 4]['EV_dr_1'].mean()

### Week day analysis ###

In [None]:
outputs[outputs.index.dayofweek < 4].describe()

Check that on the weekdays, the location is either work, home, or drive, since they should only be one of those places in the current function. Filter rows so that the sum of all three columns is less than one since during the week there should always be one non-zero column.

In [None]:
outputs.loc[(outputs.index.dayofweek < 5) & (outputs['EV_w_1'] + outputs['EV_h_1'] + outputs['EV_dr_1'] < 1)]

## Longest Streaks ##

Each set of consecutive values (e.g. streak of 1s or streak of 0s) is a block.

group by blocks and then sum to get total hours in each place or total energy spent driving in one session

In [None]:
outputs['block_w'] = (outputs['EV_w_1'] != outputs['EV_w_1'].shift(1)).astype(int).cumsum() # blocks of consecutive work values
x = outputs.groupby('block_w').sum() #.transform(lambda x: range(1, len(x) + 1))

In [None]:
x.head()

In [None]:
x.loc[x['EV_w_1'] > 0]['EV_w_1'].describe()

Count = numbeer of work sessions (i.e. work days per year)

In [None]:
import matplotlib.pyplot as plt

plot = x.loc[x['EV_w_1']>0]['EV_w_1']
count, bins, ignored = plt.hist(plot, 50, normed=True)
plt.show()

When was the longest day spent at work? Output will be the block number the streak is located in, will then use the block number to find the date 

In [None]:
x.loc[x['EV_w_1'] == x['EV_w_1'].max()].index[0]

In [None]:
outputs.loc[outputs['block_w'] == x.loc[x['EV_w_1'] == x['EV_w_1'].max()].index[0]]

How many work sessions over 8hrs? Remeber to account for frequenecy (so 8 * 60/freq if checking for hrs)

In [None]:
x[x['EV_w_1'] > 8*4].describe()

Check which block max streak located in

### Home Streak ###
Do same thing for home

In [None]:
outputs['block_h'] = (outputs['EV_h_1'] != outputs['EV_h_1'].shift(1)).astype(int).cumsum()
x = outputs.groupby('block_h').sum()

In [None]:
x.head()

In [None]:
x.loc[x['EV_h_1'] > 0]['EV_h_1'].describe()

Count will likely be 1+total number of days in year beacause year starts with EV at home and ends with EV at home

In [None]:
import matplotlib.pyplot as plt

plot = x.loc[x['EV_h_1']>0]['EV_h_1']
count, bins, ignored = plt.hist(plot, 50, normed=True)
plt.show()

When did the longest time spent at home occur?

In [None]:
x.loc[x['EV_h_1'] == x['EV_h_1'].max()].index[0]

In [None]:
outputs.loc[outputs['block_h'] == x.loc[x['EV_h_1'] == x['EV_h_1'].max()].index[0]].head(50)

How many days over 24 hrs at home???

In [None]:
x[x['EV_h_1'] > 20*4].describe()

In [None]:
x[x['EV_h_1'] > 20].median()

### Driving Energy consumed per drive ###

In [None]:
outputs['block_dr'] = (outputs['EV_dr_1'] != outputs['EV_dr_1'].shift(1)).astype(int).cumsum()
x = outputs.groupby('block_dr').sum()

In [None]:
x.head()

In [None]:
x.loc[x['EV_dr_1'] > 0]['EV_dr_1'].describe()

In [None]:
import matplotlib.pyplot as plt

plot = x.loc[x['EV_dr_1']>0]['EV_dr_1'].values
count, bins, ignored = plt.hist(plot, 50, normed=True)
plt.show()

values will be discrete and represent 1hr, 2hrs, 3hrs etc. of driving

# Random testing #

In [None]:
pdf = [(5, 0.1), (6, 0.2), (7, 0.2), (8, 0.3), (9, 0.15), (10, 0.05)]

test = {}

for i in range(1,1000):
    out = random_distr(pdf)
    test[i] = out

fin = pd.DataFrame(test.items())

In [None]:
fin.describe()

In [None]:
outputs['test'] = outputs['EV_dr_1'].apply(lambda x: x > 1)

In [None]:
outputs['day_of_week'] = outputs.index.dayofweek