# ELECTRIC VEHICLE CHARGE SCHEDULING 
Objective: Minimize the cost incurred by the charging station.

In [14]:
# Import libraries
import numpy as np
import copy
import random
import fileinput

In [15]:
# genrand = True: Generate a random float value between provided constraints.
# genrand = False: Simply sends the already generated value in rand_vals array.
def fRand (fMin,fMax, h, i, gen_rand, rand_vals):
  if (gen_rand):
    return random.uniform(fMin, fMax)
  else:
    return rand_vals[i][h]

In [16]:
# Stores and finds the delta E values of each of the EVs in a given time slot and check whether the rand_vals array is valid.
def cal_deltaE (h, EV, deltaT, n, gen_rand, rand_vals): 
  valid = True
  for i in range(1, n+1, 1):
    if (h*deltaT > EV[i]['T_fin'] or h*deltaT <= EV[i]['T_init']): 
      EV[i]['deltaE_const'] = EV[i]['deltaE'] = 0
    
    elif (EV[i]['type'] == False):
      left = max(EV[i]['MinPwLmt']*deltaT, EV[i]['E_fin'] - EV[i]['E_curr'])
      right = min(0.0, EV[i]['E_fin'] - EV[i]['E_curr'] - EV[i]['MinPwLmt']*(EV[i]['T_fin'] - h*deltaT))
      if (not gen_rand):
        val = rand_vals[i][h]
        if (val < left or val > right):
          valid = False
      EV[i]['deltaE_const'] = EV[i]['deltaE'] = fRand(left, right, h, i, gen_rand, rand_vals)
    else:
      left = max(max(EV[i]['MaxCap']-EV[i]['E_curr'], EV[i]['MinPwLmt']*deltaT), EV[i]['E_fin'] - EV[i]['E_curr'] - EV[i]['MaxPwLmt']*(EV[i]['T_fin'] - h*deltaT))
      right = min(min(-1*EV[i]['E_curr'], EV[i]['MaxPwLmt']*deltaT), EV[i]['E_fin'] - EV[i]['E_curr'] - EV[i]['MinPwLmt']*(EV[i]['T_fin'] - h*deltaT))
      if (not gen_rand):
        if (rand_vals[i][h] < left or rand_vals[i][h] > right):
          valid = False
      EV[i]['deltaE_const'] = EV[i]['deltaE'] = fRand(left, right, h, i, gen_rand, rand_vals)

  return EV, valid

In [17]:
# Find energy drawn from grid in one time slot.
# Performs energy exchange between the EVs using prioritization algorithm
# Updates Current energy and EPT values of the EVs accordingly.
def cal_E_g(h, EV, deltaT, n, P_PV, C_g, len): 
  dC_EVs = np.empty((0, 2), dtype=[('energy_cost', np.float64), ('index', np.int64)])
  dC_EVs = np.append(dC_EVs, np.array([tuple([0,0])], dtype=dC_EVs.dtype))
  dC_EVs = np.append(dC_EVs, np.array([tuple([1000000000000,n+1])], dtype=dC_EVs.dtype))

  C_EVs = np.empty((0, 2), dtype=[('dep_time', np.float64), ('index', np.int64)])

  for i in range(1, n+1, 1):
    if (h*deltaT <= EV[i]['T_fin'] and h*deltaT > EV[i]['T_init']):
      if (EV[i]['type'] == True and EV[i]['deltaE'] > 0):
        dC_EVs = np.append(dC_EVs, np.array([tuple([EV[i]['EPT'][h-1], i])], dtype=dC_EVs.dtype))
        EV[i]['EPT'][h] = EV[i]['EPT'][h-1]
      else:
        if (EV[i]['type'] == False):
          C_EVs = np.append(C_EVs, np.array([tuple([len+1, i])], dtype=C_EVs.dtype))
        else:
          C_EVs = np.append(C_EVs, np.array([tuple([EV[i]['T_fin'], i])], dtype=C_EVs.dtype))

  dC_EVs = np.sort(dC_EVs, order=['energy_cost'])
  C_EVs = np.sort(C_EVs, order=['dep_time'])[::-1]

  j = 0
  for i in range(1, n+1, 1):
    if (EV[i]['type'] == True):
      EV[i]['C_curr'] = -1*EV[i]['EPT'][h-1]*EV[i]['E_curr']

  E_PV_curr = P_PV[h]*deltaT
  E_g_curr = 0;

  for i in range(0, C_EVs.size, 1):
    C_EV = C_EVs[i]['index']

    while ((dC_EVs[j]['index'] == 0 and -EV[C_EV]['deltaE'] > E_PV_curr) or (dC_EVs[j]['index'] != 0 and dC_EVs[j]['index'] != (n+1) and -EV[C_EV]['deltaE'] > EV[dC_EVs[j]['index']]['deltaE'])):
      if (dC_EVs[j]['index'] == 0):
        EV[C_EV]['E_curr'] += -E_PV_curr
        EV[C_EV]['deltaE'] += E_PV_curr
        E_PV_curr = 0
      else:
        EV[C_EV]['E_curr'] += -EV[dC_EVs[j]['index']]['deltaE']
        if (EV[C_EV]['type'] == True):
          EV[C_EV]['C_curr'] += EV[dC_EVs[j]['index']]['EPT'][h-1]*EV[dC_EVs[j]['index']]['deltaE']
        EV[C_EV]['deltaE'] += EV[dC_EVs[j]['index']]['deltaE']
        EV[dC_EVs[j]['index']]['E_curr'] += EV[dC_EVs[j]['index']]['deltaE']
        EV[dC_EVs[j]['index']]['deltaE'] = 0
      j += 1

    if (dC_EVs[j]['index'] == 0):
      EV[C_EV]['E_curr'] += EV[C_EV]['deltaE']
      E_PV_curr += EV[C_EV]['deltaE']
      EV[C_EV]['deltaE'] = 0

    elif (dC_EVs[j]['index'] == n+1):
      EV[C_EV]['E_curr'] += EV[C_EV]['deltaE']
      if (EV[C_EV]['type'] == True):
        EV[C_EV]['C_curr'] += C_g[h]*(-1*EV[C_EV]['deltaE'])
      E_g_curr += -1*EV[C_EV]['deltaE']
      EV[C_EV]['deltaE'] = 0  
    else:
      EV[C_EV]['E_curr'] += EV[C_EV]['deltaE']
      if (EV[C_EV]['type'] == True):
        EV[C_EV]['C_curr'] += EV[dC_EVs[j]['index']]['EPT'][h-1]*(-1*EV[C_EV]['deltaE'])
    
      EV[dC_EVs[j]['index']]['E_curr'] += -1*EV[C_EV]['deltaE']
      EV[dC_EVs[j]['index']]['deltaE'] += EV[C_EV]['deltaE']    
      EV[C_EV]['deltaE'] = 0

  for i in range(1, n+1, 1):
    if (h*deltaT <= EV[i]['T_fin'] and h*deltaT > EV[i]['T_init']):
      if (EV[i]['type'] == True and EV[i]['deltaE'] > 0):
        EV[i]['E_curr'] += EV[i]['deltaE']
        EV[i]['deltaE'] = 0
  
  for i in range(1, n+1, 1):
    if (h*deltaT <= EV[i]['T_fin'] and h*deltaT > EV[i]['T_init']):
      if (EV[i]['type'] == True and EV[i]['deltaE_const'] < 0):
        EV[i]['EPT'][h] = EV[i]['C_curr']/((-1)*EV[i]['E_curr'])

  return E_g_curr, EV 

In [18]:
def found(ch, curr_pop, x_len, y_len):
  for chr in curr_pop:
    cnt = 0
    for i in range(x_len):
      for j in range(y_len):
        if (ch[i][j] == chr[0][i][j]):
          cnt += 1
    if (cnt == x_len*y_len):
      return True
  return False

In [19]:
# Adds curr_pop with P number of more chromosomes.
def generate_pop(P, curr_pop, EV, deltaT, len, n, P_PV, C_g, f):
  # Stores randomly generated delta E values of EVs at all the time slots (EV -> row, time slots -> column)
  #rand_vals = np.zeros(shape=(n+1,len+1), dtype = np.float64)
  while (P):     
    rand_vals = np.zeros(shape=(n+1,len+1), dtype = np.float64) 
    OF = 0          
    tot_cost = 0
    for i in range(1, n+1, 1):
      EV[i]['E_curr'] = EV[i]['E_init']
      EV[i]['deltaE'] = EV[i]['deltaE_const'] = 0

    for h in range(1, len + 1, 1):
      gen_rand = True
      EV, valid = cal_deltaE(h, EV, deltaT, n, gen_rand, rand_vals)
      E_g_curr, EV = cal_E_g(h, EV, deltaT, n, P_PV, C_g, len)
      curr_g_price = C_g[h]*E_g_curr
      OF += curr_g_price
      tot_cost += curr_g_price

      for i in range(1, n+1, 1):
        if (h*deltaT <= EV[i]['T_fin'] and h*deltaT > EV[i]['T_init']):
          if (EV[i]['type'] == True):
            tot_cost += max((0, (EV[i]['EPT'][h-1]*EV[i]['deltaE_const']*f)))
            OF += (EV[i]['EPT'][h-1] - C_g[h])*EV[i]['deltaE_const'] + max(0, (EV[i]['EPT'][h-1]*EV[i]['deltaE_const']*f))

      for i in range(1, n+1, 1):
        rand_vals[i][h] = EV[i]['deltaE_const']

    if (not found(rand_vals, curr_pop, n+1, len+1)):
      curr_pop.append((rand_vals, (tot_cost, OF)))
      P -= 1
  return curr_pop

In [20]:
def selection(curr_pop, sel_cnt):
  sel_ch = [] 
  ind = [i for i in range(len(curr_pop))]
  w = []
  #curr_pop_list = list(curr_pop)
  #for key, val in curr_pop:
  #  w.append(val[0])
  for ch in curr_pop:
    w.append(ch[1][0])
  ind_chosen = set()
  while(1):
    if (len(ind_chosen) == sel_cnt):
      break
    ind_chosen.add(random.choices(ind, w)[0])
    
  for ind in ind_chosen:
    temp_chr = copy.deepcopy(curr_pop[ind][0])
    sel_ch.append(temp_chr)
  return sel_ch

In [21]:
def mutation(chr, EV):
  # print("hello")
  leng = (len(chr)-1)*(len(chr[0])-1)
  # print(leng)
  rand_ind = random.randint(0, leng-1)
  time_slot = rand_ind%(len(chr[0])-1) + 1
  EV_no = rand_ind//(len(chr[0])-1) + 1
  # print(chr[0])
  # print(rand_ind)
  # print(time_slot)
  # print(EV_no)
  chr[EV_no][time_slot] = random.uniform(EV[EV_no]['MinPwLmt'], EV[EV_no]['MaxPwLmt'])
  return chr

In [22]:
def cross_over(p1, p2):
  tot_slots = len(p1[0])
  rand_slot = random.randint(2, tot_slots)
  for i in range(rand_slot, tot_slots):
    for j in range(1, len(p1)):
      temp = p1[j][i]
      p1[j][i] = p2[j][i]
      p2[j][i] = temp
  return p1, p2

In [23]:
def get_invalid_chromosome_cnt(curr_pop, ch_list, EV, deltaT, len, n, P_PV, C_g, f):
  invalid_cnt = 0
  for ch in ch_list:
    OF = 0          
    tot_cost = 0
    for i in range(1, n+1, 1):
      EV[i]['E_curr'] = EV[i]['E_init']
      EV[i]['deltaE'] = EV[i]['deltaE_const'] = 0

    for h in range(1, len + 1, 1):
      gen_rand = False
      EV, valid = cal_deltaE(h, EV, deltaT, n, gen_rand, ch)
      if (not valid):
        invalid_cnt += 1
        break
      E_g_curr, EV = cal_E_g(h, EV, deltaT, n, P_PV, C_g, len)
      curr_g_price = C_g[h]*E_g_curr
      OF += curr_g_price
      tot_cost += curr_g_price

      for i in range(1, n+1, 1):
        if (h*deltaT <= EV[i]['T_fin'] and h*deltaT > EV[i]['T_init']):
          if (EV[i]['type'] == True):
            tot_cost += max((0, (EV[i]['EPT'][h-1]*EV[i]['deltaE_const']*f)))
            OF += (EV[i]['EPT'][h-1] - C_g[h])*EV[i]['deltaE_const'] + max(0, (EV[i]['EPT'][h-1]*EV[i]['deltaE_const']*f))

    if (valid):
      #if (ch not in curr_pop):
      if (not found(ch, curr_pop, n+1, len+1)):
        curr_pop.append((ch, (tot_cost, OF)))
      else:
        invalid_cnt += 1
  return curr_pop, invalid_cnt

In [24]:
def survivor_selection(remove_cnt, curr_pop):
  cost_list = []
  i = 0
  for ch in curr_pop:
    cost_list.append((ch[1][0], i))
    i += 1
  cost_list = sorted(cost_list, reverse =  True)
  rem_ind = []
  for ele in cost_list:
    if (not remove_cnt):
      break
    rem_ind.append(ele[1])
    remove_cnt -= 1
  for index in sorted(rem_ind, reverse=True):
    del curr_pop[index]
  return curr_pop

In [25]:
# Taking input from file and storing data in specific variables.
data = []
with fileinput.input(files = ('sample_data/n_16.txt')) as f:
    for line in f:
        for word in line.split():        
            data.append(word)

word = 0
EV = np.zeros(100, 
       dtype=[('type', np.bool_), ('SOC_init', np.float64), ('SOC_fin', np.float64), ('MaxCap', np.float64), ('MaxPwLmt', np.float64), ('MinPwLmt', np.float64), ('E_init', np.float64), ('E_fin', np.float64), ('E_curr', np.float64), ('TI', np.float64), ('T_init', np.float64), ('T_fin', np.float64), ('deltaE', np.float64), ('deltaE_const', np.float64), ('C_curr', np.float64), ('EPT', np.float64, 1000)])

new_arr = np.zeros(1000, dtype = np.float64)
C_g = copy.deepcopy(new_arr)
P_PV = copy.deepcopy(new_arr)
E_g = copy.deepcopy(new_arr)

H = float(data[word])
word += 1
print('Enter the length of the day: ' + str(H))

deltaT = float(data[word])
word += 1
print('Enter the length of one time unit: ' + str(deltaT))
l = (int)(H//deltaT)  
H = l*deltaT;
i = 0

print('Enter the grid prices over the whole day (' + str(l) + ' space separated no.s): ')
for i in range(1,l+1,1):
  print(data[word] + " ", end = '')
  C_g[i] = float(data[word])
  word += 1

print('\nEnter the photo voltaic power generated over the whole day (' + str(l) + ' space separated no.s): ')
for i in range(1,l+1,1):
  print(data[word] + " ", end = '')
  P_PV[i] = float(data[word])
  word += 1

n = int(data[word])
word += 1
print("\nEnter number of EVs: " + str(n))
for i in range(1, n+1, 1):
  print("\nEnter details of EV" + str(i) + " :- \n")
  ev_type = data[word]
  word += 1
  print("Enter the type of EV (VV/GV): " + ev_type)
  if (ev_type == "GV"):
    EV[i]['type'] = False
  else:
    EV[i]['type'] = True
  
  EV[i]['SOC_init'] = float(data[word])
  word += 1
  print("Enter initial SOC of the EV: " + str(EV[i]['SOC_init']))
  EV[i]['SOC_fin'] = float(data[word])
  word += 1
  print("Enter final SOC of the EV: " + str(EV[i]['SOC_fin']))
  EV[i]['MaxCap'] = float(data[word])
  word += 1
  print("Enter the max capacity of the EV: " + str(EV[i]['MaxCap']))

  EV[i]['MaxCap'] = -1*EV[i]['MaxCap']
  EV[i]['MinPwLmt'] = float(data[word])
  word += 1
  print("Enter the min power limit of the EV [-ve]: " + str(EV[i]['MinPwLmt']))
  EV[i]['MaxPwLmt'] = float(data[word])
  word += 1
  print("Enter the max power limit of the EV (0 for GV)[+ve]: " + str(EV[i]['MaxPwLmt']))
  EV[i]['TI'] = float(data[word])
  word += 1
  print("Enter the time interval of stay of EV: " + str(EV[i]['TI']))
  EV[i]['E_fin'] = (EV[i]['SOC_fin']*EV[i]['MaxCap'])/100
  EV[i]['E_curr'] = EV[i]['E_init'] = (EV[i]['SOC_init']*EV[i]['MaxCap'])/100
  while (EV[i]['TI']*EV[i]['MinPwLmt'] > EV[i]['E_fin'] - EV[i]['E_init']):
    EV[i]['TI'] = float(input("Invalid time interval of stay of EV.\nEnter a larger time interval: "))   

  EV[i]['T_init'] = float(data[word])
  word += 1
  print("Enter arrival time of the EV: " + str(EV[i]['T_init']))
  EV[i]['T_fin'] = EV[i]['T_init'] + EV[i]['TI']

f = float(data[word])
word += 1
print("Enter the fraction of charging price to give to the EV owners: " + str(f))

I = int(data[word])
word += 1
print("\n\nEnter the number of generations for the Genetic Algorithm (GA): " + str(I))

P = 20 #int(data[word])
word += 1
print("\n\nEnter the initial population for GA: " + str(P))


Enter the length of the day: 5.0
Enter the length of one time unit: 1.0
Enter the grid prices over the whole day (5 space separated no.s): 
1 4 4 3 2 
Enter the photo voltaic power generated over the whole day (5 space separated no.s): 
1 2 3 4 5 
Enter number of EVs: 16

Enter details of EV1 :- 

Enter the type of EV (VV/GV): GV
Enter initial SOC of the EV: 20.0
Enter final SOC of the EV: 70.0
Enter the max capacity of the EV: 100.0
Enter the min power limit of the EV [-ve]: -10.0
Enter the max power limit of the EV (0 for GV)[+ve]: 0.0
Enter the time interval of stay of EV: 5.0
Enter arrival time of the EV: 0.0

Enter details of EV2 :- 

Enter the type of EV (VV/GV): GV
Enter initial SOC of the EV: 30.0
Enter final SOC of the EV: 60.0
Enter the max capacity of the EV: 100.0
Enter the min power limit of the EV [-ve]: -15.0
Enter the max power limit of the EV (0 for GV)[+ve]: 0.0
Enter the time interval of stay of EV: 3.0
Enter arrival time of the EV: 2.0

Enter details of EV3 :- 

Ent

In [26]:
# Genetic Algorithm
# - Create initial population of P chromosomes
# In each generation:
# - Using roulette wheel, find 3 chromosomes for mutation and cross over.
# - Do crossover with ch1 and ch2 and return new_ch1, new_ch2.
# - Do mutation on ch3 and return new_ch3.
# - Validate chromosomes new_ch1, new_ch2 and new_ch3, put valid chromosomes in population and return count of invalid chromosomes.  
# - Generate cnt_invalid number of new chromsomes.
# - Remove the least three chromosomes from the population.

# list of tuples: (chromosome-numpy.ndarray, tuple(min_cost, obj))
curr_pop = []
curr_pop = generate_pop(P, curr_pop, EV, deltaT, l, n, P_PV, C_g, f)
while (I):
  new_ch = []
  sel_cnt = 3
  ch = selection(curr_pop, sel_cnt)
  #print(ch)
  new_ch.append(mutation(copy.deepcopy(ch[2]), EV))
  c1, c2 = cross_over(ch[0], ch[1])
  new_ch.append(c1)
  new_ch.append(c2)
  curr_pop, invalid_cnt = get_invalid_chromosome_cnt(curr_pop, new_ch, EV, deltaT, l, n, P_PV, C_g, f)
  curr_pop = generate_pop(invalid_cnt, curr_pop, EV, deltaT, l, n, P_PV, C_g, f)
  remove_cnt = 3
  curr_pop = survivor_selection(remove_cnt, curr_pop)
  I -= 1

min_tot_cost, min_OF = curr_pop[0][1]
for ch in curr_pop:
  if (min_tot_cost > ch[1][0]):
    min_tot_cost = ch[1][0]
    min_OF = ch[1][1]

print("\nMin cost to the Charging Station using GA: " + str(min_tot_cost))


Min cost to the Charging Station using GA: 1690.9627637400545
