In [29]:
import gym
from gym import spaces
import pandas as pd
from datetime import datetime, timedelta
import random
import numpy as np

gym.logger.set_level(40)

class ChargingEnv():
    """
    ### 动作空间

    动作是形状为`(1,)`的`ndarray`, 表示该小时内汽车充放电的功率, 正数代表充电, 复数代表放电.

    | 序号 | 动作     | 最小值 | 最大值 |
    |-----|----------|--------|--------|
    | 0   | power    | -20.0   | 20.0    |


    ### 观察空间

    观察是形状为`(2,)`的`ndarray`, 表示汽车的当前电量和当前电价。

    | 序号 | 观察                | 最小值 | 最大值 |
    |-----|--------------------|--------|--------|
    | 0   | SOC                | 0      | 77     |
    | 1   | e_price            | 30     | 120    |

    ### 奖励

    奖励函数定义为：

    *r = -(power * e_price)*

    ### 起始状态

    起始状态是SOC为77(即满电状态), 电价取决于开始时的电价数据。
    """
    def __init__(self, penalty_factor=0.1):
        #环境参数
        # 生成上班时间，范围在七点到九点
        self.start_time = self.generate_random_time(7, 9)
        # 生成下班时间，范围在四点到六点
        self.end_time = self.generate_random_time(16, 18)
        #计算出实际情况下的开始充电和停止充电时间(整点)
        self.real_start_time, self.real_end_time = self.calculate_real_time(self.start_time, self.end_time)
        self.battery_capacity = 77
        self.SOC = 77
        self.soc_min = 0.0
        self.soc_max = 77.0
        self.e_price_min = 0.0
        self.e_price_max = 200.0
        self.power_max = 20.0
        self.current_step = 0
        self.df_prices = pd.read_csv("GR-data-11-20.csv", header=None, names=["DateTime", "ElectricityPrice"])
        self.penalty_factor = penalty_factor

        print("length of prices", len(self.df_prices))

        # 观察空间和动作空间的定义
        # 定义观察空间
        self.observation_space = spaces.Box(low=np.array([self.soc_min, self.e_price_min]),
                                            high=np.array([self.soc_max, self.e_price_max]),
                                            dtype=np.float32)       

        # 定义动作空间
        self.action_space = spaces.Box(low=np.array([-self.power_max]),
                                       high=np.array([self.power_max]),
                                       dtype=np.float32)
        
    def generate_random_time(self, start_hour, end_hour):
        hour = random.randint(start_hour, end_hour)
        minute = random.randint(0, 59)
        second = random.randint(0, 59)
        return datetime.now().replace(hour=hour, minute=minute, second=second)
    
    def calculate_real_time(self, start_time, end_time):
        real_start_time = (start_time + timedelta(hours = 1))
        real_start_time = real_start_time.replace(minute=0, second=0)
        real_end_time = (end_time - timedelta(hours = 0))
        real_end_time = real_end_time.replace(minute=0, second=0)
        return real_start_time, real_end_time
        
    def read_e_price(self,index):  
        # Ensure the index is within the range of the dataframe
        if 0 <= index < len(self.df_prices):
            one_price = self.df_prices["ElectricityPrice"].iloc[index] / 1000
            return one_price
        else:
            # Handle the case where the index is out of range
            print("Index out of range.")
            return None
        
    def step(self, power):
        """
        在环境中执行一步动作，并返回新的观察、奖励等信息。

        参数：
        - `power`：该小时内汽车充电的功率。

        返回：
        - `observation`：新的观察。
        - `reward`：当前步的奖励。
        - `done`：标志是否完成（截断剧集）。
        - `info`：其他信息（空字典）。
        """
        SOC, e_price = self.state  # th := theta

        # 对功率进行裁剪，确保在合理范围内
        power = np.clip(power, -self.power_max, self.power_max)[0]
        
        # 计算新的SOC
        newSOC = SOC + power
        newSOC = np.clip(newSOC, self.soc_min, self.soc_max)
        self.SOC = newSOC

        # 计算成本，根据功率和电价
        costs = (newSOC - SOC) * e_price
        penalty1 = self.penalty_factor * min(0, power) * e_price
        penalty2 = self.penalty_factor * max(0, power) * e_price
        costs += penalty1
        costs -= penalty2

        if (self.current_step + 1) % 24 == self.real_start_time.hour or (self.current_step + 1) % 24 == self.real_end_time.hour:
            if (self.SOC < 10):
                costs += 500
                print("SOC is less than 10%: ", self.SOC)
            
        #取出新的电价
        newe_price = self.read_e_price(self.current_step+1)
        newe_price = np.clip(newe_price, self.e_price_min, self.e_price_max)
        self.current_step += 1

        self.state = np.array([newSOC, newe_price])

        # 返回新的观察、奖励、是否完成、其他信息
        return self._get_obs(), -costs, False, False, {}
    
    def reset(self):
        """
        重置环境到初始状态。

        返回：
        - `observation`：初始观察。
        - `info`：空字典。
        """
        # 恢复起始状态
        # self.current_step = 0
        self.state = np.array([77, self.read_e_price(self.current_step)])
        
        # 返回初始观察和空字典
        return self._get_obs(), {}
    
    def _get_obs(self):
        """
        返回当前观察。

        返回：
        - `observation`：当前观察。
        """
        SOC, e_price = self.state
        return np.array([SOC,e_price], dtype=np.float32)

In [30]:
# Agent

import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device type: ", device)

# Hyperparameters
LR_ACTOR = 1e-4
LR_CRITIC = 1e-3
GAMMA = 0.99
MEMORY_SIZE = 100000
BATCH_SIZE = 64
TAU = 5e-3

class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=64):
        super(Actor, self).__init__()
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, action_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.tanh(self.fc3(x)) * 20
        return x
    
class Critic(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=64):
        super(Critic, self).__init__()
        self.fc1 = nn.Linear(state_dim + action_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, 1)

    def forward(self, x, a):
        x = torch.cat([x, a], 1)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)
    
class ReplayMemory:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def add_memo(self, state, action, reward, next_state, done):
        state = np.expand_dims(state, 0)
        next_state = np.expand_dims(next_state, 0)
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        state, action, reward, next_state, done = zip(*random.sample(self.buffer, batch_size))
        return np.concatenate(state), action, reward, np.concatenate(next_state), done
    
    def __len__(self):
        return len(self.buffer)

class DDPGAgent:
    def __init__(self, state_dim, action_dim):
        self.actor = Actor(state_dim, action_dim).to(device)
        self.actor_target = Actor(state_dim, action_dim).to(device)
        self.actor_target.load_state_dict(self.actor.state_dict())
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=LR_ACTOR)

        self.critic = Critic(state_dim, action_dim).to(device)
        self.critic_target = Critic(state_dim, action_dim).to(device)
        self.critic_target.load_state_dict(self.critic.state_dict())
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=LR_CRITIC)

        self.replay_buffer = ReplayMemory(MEMORY_SIZE)

    def get_action(self, state):
        state = torch.FloatTensor(state).unsqueeze(0).to(device)
        action = self.actor(state)
        return action.detach().cpu().numpy()[0]
    
    def update(self):
        if len(self.replay_buffer) < BATCH_SIZE:
            return
        
        states, actions, rewards, next_states, dones = self.replay_buffer.sample(BATCH_SIZE)
        states = torch.FloatTensor(states).to(device)
        actions = torch.FloatTensor(actions).to(device)
        rewards = torch.FloatTensor(rewards).to(device)
        next_states = torch.FloatTensor(next_states).to(device)
        dones = torch.FloatTensor(dones).to(device)

        # Update critic
        next_actions = self.actor_target(next_states)
        target_Q = self.critic_target(next_states,
                                      next_actions.detach())  # .detach() means the gradient won't be backpropagated to the actor
        target_Q = rewards + (GAMMA * target_Q * (1 - dones))
        current_Q = self.critic(states, actions)
        critic_loss = nn.MSELoss()(current_Q, target_Q.detach())  # nn.MSELoss() means Mean Squared Error
        self.critic_optimizer.zero_grad()  # .zero_grad() clears old gradients from the last step
        critic_loss.backward()  # .backward() computes the derivative of the loss
        self.critic_optimizer.step()  # .step() is to update the parameters

        # Update actor 
        actor_loss = -self.critic(states, self.actor(states)).mean()  # .mean() is to calculate the mean of the tensor
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        # Update target networks of critic and actor
        for target_param, param in zip(self.critic_target.parameters(), self.critic.parameters()):
            target_param.data.copy_(TAU * param.data + (1 - TAU) * target_param.data)
            
        for target_param, param in zip(self.actor_target.parameters(), self.actor.parameters()):
            target_param.data.copy_(TAU * param.data + (1 - TAU) * target_param.data)

Device type:  cpu


In [31]:
from geopy.distance import geodesic
import random
import pandas as pd

def generate_parking_locations(center_lat, center_lon, city_radius, downtown_radius, suburb_radius, num_parking_lots, electricity_prices_file):
    parking_lots = []

    # Calculate the number of parking lots in each zone
    # Assume that the number of parking lots in each area is roughly equal
    num_downtown_parking_lots = round(num_parking_lots / 3)
    num_suburb_parking_lots = round(num_parking_lots / 3)
    num_city_center_parking_lots = num_parking_lots - num_downtown_parking_lots - num_suburb_parking_lots

    # Define the probability of business area, office district, Residential district
    downtown_district_probs = {"Business district": 0.4, "Office district": 0.3, "Residential district": 0.3}
    city_district_probs = {"Business district": 0.3, "Office district": 0.4, "Residential district": 0.3}
    suburb_district_probs = {"Business district": 0.2, "Office district": 0.2, "Residential district": 0.6}

    # Read the price of electricity for the day
    df_prices = pd.read_csv(electricity_prices_file, header=None, names=["DateTime", "ElectricityPrice"])
    one_day_prices_list = df_prices["ElectricityPrice"][:24].tolist()

    for _ in range(num_city_center_parking_lots):
        location_type = "city_center"
        # Generates latitude and longitude within the city center
        angle = random.uniform(0, 2 * 3.14159)
        distance = random.uniform(0, downtown_radius)
        destination = geodesic(kilometers=distance).destination((center_lat, center_lon), angle)
        
        # Randomly select the attributes of business district, office district and Residential district
        district_type = random.choices(list(downtown_district_probs.keys()), weights=list(downtown_district_probs.values()))[0]
        
        new_one_day_prices_list = []
        # Set parking fees and the number of charging points available
        if district_type == "Business district":
            parking_fee = round(random.uniform(1.1, 1.6) * random.uniform(7, 15), 2)
            available_charging_stations = random.randint(0, int(0.2 * 0.9 * 10))
            for price in one_day_prices_list:
                new_price = round(price * random.uniform(1.1, 1.2) * random.uniform(1.2, 1.5), 2)
                new_one_day_prices_list.append(new_price)
        elif district_type == "Office district":
            parking_fee = round(random.uniform(1.1, 1.6) * random.uniform(7, 15), 2)
            available_charging_stations = random.randint(0, int(0.2 * 0.1 * 10))
            for price in one_day_prices_list:
                new_price = round(price * random.uniform(1.1, 1.2) * random.uniform(1.2, 1.5), 2)
                new_one_day_prices_list.append(new_price)
        elif district_type == "Residential district":
            parking_fee = round(random.uniform(7, 15), 2)
            available_charging_stations = random.randint(0, int(0.2 * 0.8 * 10))
            for price in one_day_prices_list:
                new_price = round(price * random.uniform(1.1, 1.2) , 2)
                new_one_day_prices_list.append(new_price)

        parking_lots.append((destination.latitude, destination.longitude, distance, location_type, district_type, parking_fee, available_charging_stations, new_one_day_prices_list))

    for _ in range(num_downtown_parking_lots):
        location_type = "downtown"
        # Generate latitude and longitude within the city limits
        angle = random.uniform(0, 2 * 3.14159)
        distance = random.uniform(downtown_radius, suburb_radius)
        destination = geodesic(kilometers=distance).destination((center_lat, center_lon), angle)
        
        # Randomly select the attributes of business district, office district and Residential district
        district_type = random.choices(list(city_district_probs.keys()), weights=list(city_district_probs.values()))[0]
        
        new_one_day_prices_list = []
        # Set parking fees and the number of charging points available
        if district_type == "Business district":
            parking_fee = round(random.uniform(1.1, 1.6) * random.uniform(4, 8), 2)
            available_charging_stations = random.randint(0, int(0.5 * 0.9 * 10))
            for price in one_day_prices_list:
                new_price = round(price * random.uniform(1.2, 1.5), 2)
                new_one_day_prices_list.append(new_price)
        elif district_type == "Office district":
            parking_fee = round(random.uniform(1.1, 1.6) * random.uniform(4, 8), 2)
            available_charging_stations = random.randint(0, int(0.5 * 0.1 * 10))
            for price in one_day_prices_list:
                new_price = round(price * random.uniform(1.2, 1.5), 2)
                new_one_day_prices_list.append(new_price)
        elif district_type == "Residential district":
            parking_fee = round(random.uniform(4, 8), 2)
            available_charging_stations = random.randint(0, int(0.5 * 0.8 * 10))
            for price in one_day_prices_list:
                new_price = round(price, 2)
                new_one_day_prices_list.append(new_price)

        parking_lots.append((destination.latitude, destination.longitude, distance, location_type, district_type, parking_fee, available_charging_stations, new_one_day_prices_list))

    for _ in range(num_suburb_parking_lots):
        location_type = "suburb"
        # Generate latitude and longitude within the suburbs
        angle = random.uniform(0, 2 * 3.14159)
        distance = random.uniform(suburb_radius, city_radius)
        destination = geodesic(kilometers=distance).destination((center_lat, center_lon), angle)
        
        # Randomly select the attributes of business district, office district and Residential district
        district_type = random.choices(list(suburb_district_probs.keys()), weights=list(suburb_district_probs.values()))[0]
        
        new_one_day_prices_list = []
        # Set parking fees and the number of charging points available
        if district_type == "Business district":
            parking_fee = round(random.uniform(1.1, 1.6) * random.uniform(2, 5), 2)
            available_charging_stations = random.randint(0, int(0.8 * 0.9 * 10))
            for price in one_day_prices_list:
                new_price = round(price * random.uniform(0.8, 0.9) * random.uniform(1.2, 1.5), 2)
                new_one_day_prices_list.append(new_price)
            
        elif district_type == "Office district":
            parking_fee = round(random.uniform(1.1, 1.6) * random.uniform(2, 5), 2)
            available_charging_stations = random.randint(0, int(0.8 * 0.1 * 10))
            for price in one_day_prices_list:
                new_price = round(price * random.uniform(0.8, 0.9) * random.uniform(1.2, 1.5), 2)
                new_one_day_prices_list.append(new_price)
        elif district_type == "Residential district":
            parking_fee = round(random.uniform(2, 5), 2)
            available_charging_stations = random.randint(0, int(0.8 * 0.8 * 10))
            for price in one_day_prices_list:
                new_price = round(price * random.uniform(0.8, 0.9), 2)
                new_one_day_prices_list.append(new_price)

        parking_lots.append((destination.latitude, destination.longitude, distance, location_type, district_type, parking_fee, available_charging_stations, new_one_day_prices_list))

    return parking_lots

# Set the longitude, latitude and radius of the city center
# Suppose that the coordinates of the city center point are expressed in latitude and longitude as (30,120).
city_center_lat = 30
city_center_lon = 120
city_radius = 10  # 10 km radius of the city
downtown_radius = 2  # Downtown radius
suburb_radius = 6  # Suburban radius

# Generates longitude, latitude, properties, and parking fees for ten parking lots
num_parking_lots = 10
electricity_prices_file = "GR-data-11-20.csv"
parking_locations = generate_parking_locations(city_center_lat, city_center_lon, city_radius, downtown_radius, suburb_radius, num_parking_lots, electricity_prices_file)

# Print result
for i, location in enumerate(parking_locations):
    print(f"停车场 {i + 1}: 经度 - {location[1]}, 纬度 - {location[0]}, 距离城市中心的距离 - {location[2]} 公里, 距离属性 - {location[3]}, 区域属性 - {location[4]}, 停车费 - {location[5]} 元/小时, 可用充电桩 - {location[6]} 个, 一天电价 - {location[7]}")


停车场 1: 经度 - 120.00036932942429, 纬度 - 30.014196651364355, 距离城市中心的距离 - 1.57413852541602 公里, 距离属性 - city_center, 区域属性 - Business district, 停车费 - 10.88 元/小时, 可用充电桩 - 1 个, 一天电价 - [86.93, 93.7, 82.69, 91.64, 88.99, 91.77, 80.15, 84.45, 99.72, 58.04, 57.25, 55.81, 53.55, 37.83, 64.18, 79.37, 101.31, 135.63, 86.51, 96.1, 88.38, 103.87, 82.95, 60.31]
停车场 2: 经度 - 120.00043226638924, 纬度 - 30.010351663157298, 距离城市中心的距离 - 1.1482656826875315 公里, 距离属性 - city_center, 区域属性 - Business district, 停车费 - 13.91 元/小时, 可用充电桩 - 1 个, 一天电价 - [83.27, 85.81, 83.27, 86.29, 100.47, 80.95, 74.15, 89.83, 92.74, 56.69, 63.58, 56.55, 65.54, 44.34, 62.79, 87.54, 89.82, 156.1, 103.19, 103.8, 95.38, 101.48, 73.06, 65.88]
停车场 3: 经度 - 120.00034249242887, 纬度 - 30.004852170103035, 距离城市中心的距离 - 0.5388892302524622 公里, 距离属性 - city_center, 区域属性 - Residential district, 停车费 - 14.74 元/小时, 可用充电桩 - 1 个, 一天电价 - [66.42, 68.49, 68.42, 68.57, 65.18, 66.97, 56.07, 62.86, 67.27, 45.86, 46.63, 43.34, 43.05, 31.43, 46.03, 64.4, 69.11, 109.12, 67

In [32]:
import random

class Parking_lot:
    # This class is used to represent a parking lot
    """
        :param location: (latitude, longitude)
            description: The location of the parking lot, represented by coordinates composed of latitude and longitude
        :param hourly_parking_rate: float
            description: The hourly parking fee charged by the parking lot
        :param available_charging_stations: int
            description: The parking lot has available charging stations
        :param one_day_prices_list: list[float]
            description: The price of electricity in the parking lot during the day
        :param parking_type: string
            description: The type of parking lot, e.g. office area, shopping area, residential area
        :param location_type: string
            description: The type of parking lot location, e.g. city centre, urban, suburban, etc
        :param id: string
            description: The id of the parking lot
    """
    def __init__(self, location, hourly_parking_rate, available_charging_stations, one_day_prices_list, parking_type, location_type, id):
        # Define parking types and location types
        self.location_type_probs = ["city_centre", "urban", "suburban"]
        self.parking_type_probs = ["shopping_area", "office_area", "residential_area"]

        self.latitude, self.longitude = location
        self.hourly_parking_rate = hourly_parking_rate
        self.available_charging_stations = available_charging_stations
        self.one_day_prices_list = one_day_prices_list
        self.parking_type = parking_type
        self.location_type = location_type
        self.id = id

    # Add a method to get the coordinates here so that you can get them directly
    def get_location(self):
        return self.latitude, self.longitude
    
    # Update available_charging_stations when the vehicle enters the parking lot
    def vehicle_enter(self):
        self.available_charging_stations -= 1

    # Update available_charging_stations when the vehicle exits the parking lot
    def vehicle_exit(self):
        self.available_charging_stations += 1
    

In [33]:
from geopy.distance import geodesic

class Vehicle:
    # This class is used to represent vehicles
    """
        :param home_location: (latitude, longitude)
            description: The location of the owner's home, represented by coordinates composed of latitude and longitude
        :param company_location: (latitude, longitude)
            description: The location of the owner's company, represented by coordinates composed of latitude and longitude
        :param battery_capacity: int
            description: Maximum capacity of vehicle battery
        :param start_time: datetime
            description: The time the owner began using the vehicle
        :param end_time: datetime
            description: The time when the vehicle owner finishes using the vehicle
        :param parking_lot: object
            description: The parking lot where the vehicle is parked
        :param speed: int
            description: The speed of vehicle
        :param soc: float
            description: The current battery level of the vehicle
        :param EC: float
            description: Tram energy consumption (per kilometer)
    """
    def __init__(self, home_location, company_location, battery_capacity, start_time, end_time, parking_lot=None, speed = 40, soc = 77):
        self.home_latitude, self.home_longitude = home_location
        self.company_latitude, self.company_longitude = company_location
        self.battery_capacity = battery_capacity
        self.start_time = start_time
        self.end_time = end_time
        self.speed = speed
        self.soc = soc
        self.EC = 0.145

    # Add a method to get the coordinates here so that you can get them directly
    def get_home_location(self):
        return self.home_latitude, self.home_longitude

    def get_company_location(self):
        return self.company_latitude, self.company_longitude
    
    # Calculate the distance from the vehicle owner's company to all parking lots
    def calculate_distance_to_parking_lot(self, parking_lots):
        distances = [geodesic(self.get_company_location(), parking_lot.get_location()).kilometers
                     for parking_lot in parking_lots]
        return distances
    
    # Calculate the time it takes the vehicle to get from the starting point to the destination (assuming constant speed)
    def calculate_driving_time(self, start_point, destination, alpha):
        if alpha:
            self.soc -= geodesic(start_point, destination).kilometers * self.EC
        return geodesic(start_point, destination).kilometers / self.speed
    
    # Calculate the distance between the starting point and the destination
    def calculate_driving_distance(self, start_point, destination):
        return geodesic(start_point, destination).kilometers
    
    # Calculate the driving time from the vehicle owner's company to all parking lots
    def calculate_driving_time_to_parking_lot(self, parking_lots):
        driving_times = [self.calculate_driving_time(self.get_company_location(), parking_lot.get_location(), False)
                         for parking_lot in parking_lots]
        return driving_times
    
    # Call this method when the vehicle decide to goto a parking lot
    def enter_parking_lot(self, parking_lot):
        self.parking_lot = parking_lot
        self.parking_lot.vehicle_enter()
        print(f"The available charging stations are {self.parking_lot.available_charging_stations}.")

    # Call this method when the vehicle decide to leave a parking lot
    def exit_parking_lot(self):
        self.parking_lot.vehicle_exit()
        print(f"Vehicle exited the parking lot named {self.parking_lot.id}.")
        print(f"The available charging stations are {self.parking_lot.available_charging_stations}.")
    
    def __str__(self):
        return f'Vehicle: {self.home_latitude}, {self.home_longitude}, {self.company_latitude}, {self.company_longitude}, {self.battery_capacity}, {self.start_time}, {self.end_time}'
    

In [34]:
import os
import math

# 初始化充放电决策的神经网络
def initialize_network():
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print("Device type: ", device)

    # Initialize env
    env = ChargingEnv()
    STATE_DIM = env.observation_space.shape[0]
    ACTION_DIM = env.action_space.shape[0]

    # Load para
    current_path = os.getcwd()
    model = current_path + '/models/'
    actor_path = model + "best_actor_model.pth"
    # actor_path = model + "ddpg_actor.pth"

    class Actor(nn.Module):
        def __init__(self, state_dim, action_dim, hidden_dim=64):
            super(Actor, self).__init__()
            self.fc1 = nn.Linear(state_dim, hidden_dim)
            self.fc2 = nn.Linear(hidden_dim, hidden_dim)
            self.fc3 = nn.Linear(hidden_dim, action_dim)

        def forward(self, x):
            x = torch.relu(self.fc1(x))
            x = torch.relu(self.fc2(x))
            x = torch.tanh(self.fc3(x)) * 20
            return x
        
    actor = Actor(STATE_DIM, ACTION_DIM).to(device)
    actor.load_state_dict(torch.load(actor_path))
    
    return env, actor

In [35]:
def select_parking_lot(test_vehicle_1, parking_lots, current_time, env, actor, vehicle_location, end_time):
    # 选择停车场
    min_cost_parking_lot = None
    min_cost = float('inf')  # 初始化成本为无穷大
    min_temp = float('inf')

    NUM_EPISODE = 10
    NUM_STEP = 24

    for parking_lot in parking_lots:
        temp_time = current_time
        env.reset()
        cost = 0
        # 距离 * 功耗 * 电价 + 停车费
        cost += (test_vehicle_1.calculate_driving_distance(vehicle_location, parking_lot.get_location()) * test_vehicle_1.EC * parking_lot.one_day_prices_list[current_time.hour] / 1000 
        + parking_lot.hourly_parking_rate * math.ceil((end_time - current_time).total_seconds() /3600) / 7) # 此处除以7是汇率问题，计算停车费的单位是人民币，将其换算为美元
        
        temp = cost

        if temp < min_temp:
            min_temp = temp
            
        print(f"Parking Lot {parking_lot.id}: CostWithoutReward: {cost}")


        time_used = test_vehicle_1.calculate_driving_time(vehicle_location, parking_lot.get_location(),False)
        temp_time += timedelta(hours=time_used)
        env.SOC = test_vehicle_1.soc - test_vehicle_1.EC * test_vehicle_1.calculate_driving_distance(vehicle_location, parking_lot.get_location())
        NUM_STEP = test_vehicle_1.end_time.hour - temp_time.hour

        # List to store episode rewards
        episode_rewards = []

        for episode_i in range(NUM_EPISODE):
            state, others = env.reset()
            episode_reward = 0

            for tep_i in range(NUM_STEP):
                action = actor(torch.FloatTensor(state).unsqueeze(0).to(device)).detach().cpu().numpy()[0]
                # print("Power", action)
                next_state, reward, done, truncation, info = env.step(action)
                state = next_state
                episode_reward += reward

            episode_rewards.append(episode_reward)

        # Calculate average reward
        average_reward = np.mean(episode_rewards)

        print(f"Parking Lot {parking_lot.id}: Expected_reward: {average_reward}")

        # 成本减去在停车场充放电的期望收益
        
        cost -= average_reward
        print(f"Parking Lot {parking_lot.id}: CostWithReward: {cost}")

        if cost < min_cost:
            min_cost = cost
            min_cost_parking_lot = parking_lot

    print(f"The parking lot with the minimum cost is {min_cost_parking_lot.id} with a cost of {min_cost}")
    return min_temp, min_cost, min_cost_parking_lot

In [36]:
# 模拟汽车从一个地方行驶到另一个地方
def drive(current_time, test_vehicle_1, vehicle_location, destination_location, one_day_prices_list):
    current_time = current_time + timedelta(hours=test_vehicle_1.calculate_driving_time(vehicle_location, destination_location, True))
    cost = test_vehicle_1.calculate_driving_distance(vehicle_location, destination_location) * test_vehicle_1.EC * one_day_prices_list[current_time.hour] / 1000
    print("Current Time:", current_time)
    print("SOC", test_vehicle_1.soc)
    print("Cost", cost)
    return current_time, cost

In [37]:
import math
import time
from datetime import datetime, timedelta

# 模拟汽车的充放电
def charge_policy(test_vehicle_1, env, actor, start_time, end_time):
    NUM_STEP = math.floor((end_time - start_time).total_seconds() / 3600)
    state, others = env.reset()
    episode_reward = 0
    env.SOC = test_vehicle_1.soc
    for tep_i in range(NUM_STEP):
        action = actor(torch.FloatTensor(state).unsqueeze(0).to(device)).detach().cpu().numpy()[0]
        next_state, reward, done, truncation, info = env.step(action)
        state = next_state
        episode_reward += reward / 2
    test_vehicle_1.soc = env.SOC
    print("After charging and discharging in parking lot")
    print("Real_Reward", episode_reward)
    print("SOC", test_vehicle_1.soc)
    return episode_reward

In [38]:
# 正常的行程，即用户没有特殊出行计划
def normal_itinerary(test_vehicle_1, parking_lots, one_day_prices_list):
    print("Without user event")

    # 初始时间
    current_time = test_vehicle_1.start_time
    print("Current Time:", current_time)
    
    # 初始化总成本为0
    total_cost = 0

    # 从车主家到公司所需的时间
    current_time, cost = drive(current_time, test_vehicle_1, test_vehicle_1.get_home_location(), test_vehicle_1.get_company_location(), one_day_prices_list)
    total_cost += cost
    print("Total cost", total_cost)

    # Initialize env
    env, actor = initialize_network()

    # 选择停车场
    min_temp, min_cost, min_cost_parking_lot = select_parking_lot(test_vehicle_1, parking_lots, current_time, env, actor, test_vehicle_1.get_company_location(), test_vehicle_1.end_time)

    # 前往选择的停车场
    current_time, cost = drive(current_time, test_vehicle_1, test_vehicle_1.get_company_location(), min_cost_parking_lot.get_location(), one_day_prices_list)
    total_cost += min_temp
    print("Total cost", total_cost)

    # 进行充放电
    reward = charge_policy(test_vehicle_1, env, actor, current_time, test_vehicle_1.end_time)
    total_cost -= reward
    current_time = test_vehicle_1.end_time
    print("Total cost", total_cost)

    # 从停车场前往公司
    current_time, cost = drive(current_time, test_vehicle_1, min_cost_parking_lot.get_location(), test_vehicle_1.get_company_location(), one_day_prices_list)
    total_cost += cost
    print("Total cost", total_cost)

    # 从公司回家
    current_time, cost = drive(current_time, test_vehicle_1, test_vehicle_1.get_company_location(), test_vehicle_1.get_home_location(), one_day_prices_list)
    total_cost += cost
    print("Total cost", total_cost)

    # 在家充电直到第二天上班
    next_day_start_time = test_vehicle_1.start_time + timedelta(days=1)
    reward = charge_policy(test_vehicle_1, env, actor, current_time, next_day_start_time)
    total_cost -= reward

    print("Total cost", total_cost)
    

In [39]:
def calculate_cost(test_vehicle_1, parking_lots, current_time, vehicle_location, return_time):
    # Initialize env
    env, actor = initialize_network()

    # 计算前往最近停车场的成本
    min_temp, min_cost, min_cost_parking_lot = select_parking_lot(test_vehicle_1, parking_lots, current_time, env, actor, vehicle_location, return_time)

    # 计算留在原地等待的成本
    wait_cost = 5 * math.ceil((return_time - current_time).total_seconds() /3600) / 7 # 此处除以7是汇率问题，停车费的单位是人民币

    print("前往停车场成本", min_cost)
    print("停在原地成本", wait_cost)

    return min_temp, min_cost, min_cost_parking_lot, wait_cost

In [40]:
import math

# 自定行程，即用户有额外的出行需求
def special_itinerary(test_vehicle_1, parking_lots, departure_time, departure_pickup, departure_destination, return_time, return_pickup, return_destination, one_day_prices_list):
    print("With user event")

    # 将字符串格式的出发和返回时间转化为datetime格式
    departure_time = "2023-01-01 " + departure_time
    return_time = "2023-01-01 " + return_time
    time_format = "%Y-%m-%d %H:%M"

    departure_time = datetime.strptime(departure_time, time_format)
    return_time = datetime.strptime(return_time, time_format)
    # print(departure_time)
    # print(return_time)

    # 将字符串格式的出发上车点，出发时目的地， 返回上车点，返回时目的地转化为tuple
    string_list = departure_pickup.split(', ')
    departure_pickup = tuple(map(float, string_list))
    string_list = departure_destination.split(', ')
    departure_destination = tuple(map(float, string_list))
    string_list = return_pickup.split(', ')
    return_pickup = tuple(map(float, string_list))
    string_list = return_destination.split(', ')
    return_destination = tuple(map(float, string_list))

    # print(departure_pickup)
    # print(departure_destination)
    # print(return_pickup)
    # print(return_destination)

    # 初始时间
    current_time = test_vehicle_1.start_time
    print("Current Time:", current_time)
    
    # 初始化总成本为0
    total_cost = 0

    # 比较用户需求的出发时间从而判断用户是否先去公司
    if departure_time > current_time:
        # 从车主家到公司
        current_time, cost = drive(current_time, test_vehicle_1, test_vehicle_1.get_home_location(), test_vehicle_1.get_company_location(), one_day_prices_list)
        total_cost += cost

        # 比较前往停车场与等在原地的成本
        min_temp, min_cost, min_cost_parking_lot, wait_cost = calculate_cost(test_vehicle_1, parking_lots, current_time, test_vehicle_1.get_company_location(), departure_time)

        
        if min_cost < wait_cost:
            print("前往停车场成本更低, 为:", min_cost)
            # 前往最近停车场
            current_time, cost = drive(current_time, test_vehicle_1, test_vehicle_1.get_company_location(), min_cost_parking_lot.get_location(), one_day_prices_list)
            total_cost += cost

            # Initialize env
            env, actor = initialize_network()

            # 进行充放电
            reward = charge_policy(test_vehicle_1, env, actor, current_time, departure_time)
            total_cost -= reward

            current_time = departure_time
            
        else:
            print("停在原地成本更低, 为:", wait_cost)
            #停在原地等待
            current_time = departure_time
            total_cost += wait_cost

        # 前往用户出发上车点
        current_time, cost = drive(current_time, test_vehicle_1, min_cost_parking_lot.get_location(), departure_pickup, one_day_prices_list)
        total_cost += cost

        # print("Total cost", total_cost)
    else:
        # 从用户家到用户指定上车点
        other_time, cost = drive(current_time, test_vehicle_1, test_vehicle_1.get_home_location(), departure_pickup, one_day_prices_list)
        total_cost += cost

        # 初始化现在时间为用户需求的出发时间
        current_time = departure_time

    # 从用户指定上车点到用户指定目的地
    current_time, cost = drive(current_time, test_vehicle_1, departure_pickup, departure_destination, one_day_prices_list)
    total_cost += cost

    min_temp, min_cost, min_cost_parking_lot, wait_cost = calculate_cost(test_vehicle_1, parking_lots, current_time, departure_destination, return_time)

    if min_cost < wait_cost:
        print("前往停车场成本更低, 为:", min_cost)
        # 前往最近停车场
        current_time, cost = drive(current_time, test_vehicle_1, departure_destination, min_cost_parking_lot.get_location(), one_day_prices_list)
        total_cost += min_temp

        # Initialize env
        env, actor = initialize_network()

        # 进行充放电
        reward = charge_policy(test_vehicle_1, env, actor, current_time, return_time)
        total_cost -= reward

        # 从停车场前往返回时用户出发点
        current_time, cost = drive(current_time, test_vehicle_1, min_cost_parking_lot.get_location(), return_pickup, one_day_prices_list)
        current_time = return_time

    else:
        print("停在原地成本更低, 为:", wait_cost)
        # 停在路边等待
        total_cost += wait_cost

        # 从出发时用户目的地前往返回时用户出发点
        current_time, cost = drive(current_time, test_vehicle_1, departure_destination, return_pickup, one_day_prices_list)
        total_cost += cost
        current_time = return_time

    # 从返回时用户上车点前往返回时用户目的地
    current_time, cost = drive(current_time, test_vehicle_1, return_pickup, return_destination, one_day_prices_list)

    # 判断用户返程目的地的位置
    if return_destination == test_vehicle_1.get_home_location():
        # 用户回家，结束一天行程，开始在家充电
        # 在家充电直到第二天上班
        next_day_start_time = test_vehicle_1.start_time + timedelta(days=1)
        reward = charge_policy(test_vehicle_1, env, actor, current_time, next_day_start_time)
        total_cost -= reward

    elif return_destination == test_vehicle_1.get_company_location():
        # 用户回公司，到下班时间再回家
        min_temp, min_cost, min_cost_parking_lot, wait_cost = calculate_cost(test_vehicle_1, parking_lots, current_time, return_destination, test_vehicle_1.end_time)
        if min_cost < wait_cost:
            print("前往停车场成本更低, 为:", min_cost)
            # 前往最近停车场
            current_time, cost = drive(current_time, test_vehicle_1, test_vehicle_1.get_company_location(), min_cost_parking_lot.get_location(), one_day_prices_list)
            total_cost += min_temp

            # Initialize env
            env, actor = initialize_network()

            # 进行充放电
            reward = charge_policy(test_vehicle_1, env, actor, current_time, test_vehicle_1.end_time)
            total_cost -= reward

            # 从停车场返回公司
            current_time, cost = drive(current_time, test_vehicle_1, test_vehicle_1.get_company_location(), min_cost_parking_lot.get_location(), one_day_prices_list)
            total_cost += cost

        else:
            print("停在原地成本更低, 为:", wait_cost)
            # 停在原地等待车主
            current_time = test_vehicle_1.end_time
            total_cost += wait_cost

        # 从公司返回家
            current_time, cost = drive(current_time, test_vehicle_1, test_vehicle_1.get_company_location(), test_vehicle_1.get_home_location(), one_day_prices_list)

            # 在家充电直到第二天上班
            next_day_start_time = test_vehicle_1.start_time + timedelta(days=1)
            reward = charge_policy(test_vehicle_1, env, actor, current_time, next_day_start_time)
            total_cost -= reward
    else:
        # 待定
        print("待定")

    print("Total cost", total_cost)
        

    

In [41]:
import time
import ipywidgets as widgets
from IPython.display import display, clear_output, Markdown

# 在外部声明变量
departure_time = ""
departure_pickup = ""
departure_destination = ""
return_time = ""
return_pickup = ""
return_destination = ""

def user_random_event(test_vehicle_1, parking_lots, one_day_prices_list):
    # 定义一个函数，用于处理第一个下拉菜单的选择
    def handle_first_dropdown(change):
        if change.new == 'Yes':
            display_second_textboxes()
        else:
            clear_output()  # 在选择"No"后清除输出区域
            normal_itinerary(test_vehicle_1, parking_lots, one_day_prices_list)

    # 定义一个函数，用于显示六个文本输入框
    def display_second_textboxes():
        clear_output()
        # 定义一个函数，用于处理提交按钮的点击事件
        def handle_submit_button(button):
            # 声明全局变量
            global departure_time, departure_pickup, departure_destination, return_time, return_pickup, return_destination
            # 收集所有的错误信息
            errors = []
            # 获取文本输入框的值
            departure_time = departure_time_textbox.value
            departure_pickup = departure_pickup_textbox.value
            departure_destination = departure_destination_textbox.value
            return_time = return_time_textbox.value
            return_pickup = return_pickup_textbox.value
            return_destination = return_destination_textbox.value
            
            # 检查输入的值是否符合要求
            # if not is_valid_time(departure_time):
            #     errors.append("出发时间格式不正确，请输入类似于 14:00 的时间")
            # if not is_valid_coordinate(departure_pickup):
            #     errors.append("出发时上车点格式不正确，请输入类似于 120,30 的坐标")
            # if not is_valid_coordinate(departure_destination):
            #     errors.append("出发时目的地格式不正确，请输入类似于 120,30 的坐标")
            # if not is_valid_time(return_time):
            #     errors.append("返回时间格式不正确，请输入类似于 14:00 的时间")
            # if not is_valid_coordinate(return_pickup):
            #     errors.append("返回时上车点格式不正确，请输入类似于 120,30 的坐标")
            # if not is_valid_coordinate(return_destination):
            #     errors.append("返回时目的地格式不正确，请输入类似于 120,30 的坐标")
            
            # 如果有错误，则输出所有错误信息
            if errors:
                for error in errors:
                    print(error)
            else:
                # 清除输出
                clear_output()
                # 打印输入的值
                # print("出发时间：", departure_time)
                # print("出发时上车点：", departure_pickup)
                # print("出发时目的地：", departure_destination)
                # print("返回时间：", return_time)
                # print("返回时上车点：", return_pickup)
                # print("返回时目的地：", return_destination)
                special_itinerary(test_vehicle_1, parking_lots, departure_time, departure_pickup, departure_destination, return_time, return_pickup, return_destination, one_day_prices_list)

        # 创建六个文本输入框
        departure_time_textbox = widgets.Text(value='')
        departure_pickup_textbox = widgets.Text(value='')
        departure_destination_textbox = widgets.Text(value='')
        return_time_textbox = widgets.Text(value='')
        return_pickup_textbox = widgets.Text(value='')
        return_destination_textbox = widgets.Text(value='')

        # 创建提交按钮
        submit_button = widgets.Button(description='提交')

        # 将文本输入框和提交按钮显示在Jupyter Notebook中
        display(Markdown("### Please enter your travel details:"))
        display(Markdown("### 出发时间："))
        display(departure_time_textbox)
        display(Markdown("### 出发时上车点："))
        display(departure_pickup_textbox)
        display(Markdown("### 出发时目的地："))
        display(departure_destination_textbox)
        display(Markdown("### 返回时间："))
        display(return_time_textbox)
        display(Markdown("### 返回时上车点："))
        display(return_pickup_textbox)
        display(Markdown("### 返回时目的地："))
        display(return_destination_textbox)
        display(submit_button)

        # 将提交按钮的点击事件与处理函数关联起来
        submit_button.on_click(handle_submit_button)

    # 创建第一个下拉菜单，提供两个选项
    options = ['Yes', 'No']
    first_dropdown = widgets.Dropdown(options=options, value=None)

    # 将第一个下拉菜单显示在Jupyter Notebook中
    display(Markdown("### Are there any additional travel plans for today:"))
    display(first_dropdown)

    # 将第一个下拉菜单的选择与处理函数关联起来
    first_dropdown.observe(handle_first_dropdown, names='value')

# 检查输入的时间格式是否合法的函数
def is_valid_time(time_str):
    try:
        time.strptime(time_str, '%H:%M')
        return True
    except ValueError:
        return False

# 检查输入的坐标格式是否合法的函数
def is_valid_coordinate(coord_str):
    try:
        coordinates = coord_str.split(',')
        if len(coordinates) != 2:
            return False
        longitude = float(coordinates[0])
        latitude = float(coordinates[1])
        if longitude < -180 or longitude > 180 or latitude < -90 or latitude > 90:
            return False
        return True
    except ValueError:
        return False


In [42]:
from datetime import datetime, timedelta
import random
import os
import math

test_vehicle_1 = Vehicle((30.984089, 119.319236), (30.084089, 120.119236), 5000, datetime(2023, 1, 1, random.randint(7, 9)), datetime(2023, 1, 1, random.randint(16, 18)))

print("test_vehicle_1 start_time:", test_vehicle_1.start_time.strftime("%H:%M"))
print("test_vehicle_1 end_time:", test_vehicle_1.end_time.strftime("%H:%M"))

parking_lots = []

# 根据data.py中产生的停车场数据生成10个停车场
for i, location in enumerate(parking_locations):
    parking_lots.append(Parking_lot((location[0], location[1]), location[5], location[6], location[7], location[4], location[3], "Parking_lot " + str(i+1  )))
    
# 停车场信息
# for parking_lot in parking_lots:
#     location = parking_lot.get_location()
#     latitude, longitude = location
#     hourly_parking_rate = parking_lot.hourly_parking_rate
#     available_charging_stations = parking_lot.available_charging_stations
#     one_day_prices_list = parking_lot.one_day_prices_list
#     parking_type = parking_lot.parking_type
#     location_type = parking_lot.location_type
#     parking_lot_id = parking_lot.id

#     print(f"Parking Lot ID: {parking_lot_id}")
#     print(f"Location: Latitude - {latitude}, Longitude - {longitude}")
#     print(f"Hourly Parking Rate: {hourly_parking_rate}元/小时")
#     print(f"Available Charging Stations: {available_charging_stations}")
#     print(f"One Day Electricity Prices: {one_day_prices_list}")
#     print(f"Parking Type: {parking_type}")
#     print(f"Location Type: {location_type}")
#     print()

# TODO
one_day_prices_list = [59.52, 59.01, 59.01, 59.01, 59.01, 59.01, 47.59, 55.97, 59.01, 41.01, 39.01, 39.01, 39.01, 27.01, 41.01, 54.26, 59.01, 92.02, 59.52, 65.46, 59.52, 59.01, 52.46, 41.01]


# 询问车主是否有额外行程
user_random_event(test_vehicle_1, parking_lots, one_day_prices_list)


### Please enter your travel details:

### 出发时间：

Text(value='')

### 出发时上车点：

Text(value='')

### 出发时目的地：

Text(value='')

### 返回时间：

Text(value='')

### 返回时上车点：

Text(value='')

### 返回时目的地：

Text(value='')

Button(description='提交', style=ButtonStyle())