<a href="https://colab.research.google.com/github/Perfect-Cube/Volkswagon-imobilothon-4.0/blob/main/Fuzzy_vs_PPO.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
pip install scikit-fuzzy


Collecting scikit-fuzzy
  Downloading scikit_fuzzy-0.5.0-py2.py3-none-any.whl.metadata (2.6 kB)
Downloading scikit_fuzzy-0.5.0-py2.py3-none-any.whl (920 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/920.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m153.6/920.8 kB[0m [31m4.8 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m911.4/920.8 kB[0m [31m14.2 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m920.8/920.8 kB[0m [31m10.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: scikit-fuzzy
Successfully installed scikit-fuzzy-0.5.0


In [2]:
import numpy as np
import skfuzzy as fuzz
from skfuzzy import control as ctrl

# Define fuzzy variables for temperature, charge, and cooling power
temperature = ctrl.Antecedent(np.arange(0, 101, 1), 'temperature')
charge = ctrl.Antecedent(np.arange(0, 101, 1), 'charge')
cooling_power = ctrl.Consequent(np.arange(0, 101, 1), 'cooling_power')

# Define fuzzy sets and membership functions
temperature['low'] = fuzz.trimf(temperature.universe, [0, 0, 40])
temperature['medium'] = fuzz.trimf(temperature.universe, [30, 50, 70])
temperature['high'] = fuzz.trimf(temperature.universe, [60, 100, 100])

charge['low'] = fuzz.trimf(charge.universe, [0, 0, 50])
charge['medium'] = fuzz.trimf(charge.universe, [30, 50, 80])
charge['high'] = fuzz.trimf(charge.universe, [70, 100, 100])

cooling_power['low'] = fuzz.trimf(cooling_power.universe, [0, 0, 50])
cooling_power['medium'] = fuzz.trimf(cooling_power.universe, [25, 50, 75])
cooling_power['high'] = fuzz.trimf(cooling_power.universe, [50, 100, 100])

# Define fuzzy rules
rule1 = ctrl.Rule(temperature['high'] & charge['high'], cooling_power['high'])
rule2 = ctrl.Rule(temperature['medium'] & charge['high'], cooling_power['medium'])
rule3 = ctrl.Rule(temperature['low'] | charge['low'], cooling_power['low'])

# Control system
cooling_ctrl = ctrl.ControlSystem([rule1, rule2, rule3])
cooling_sim = ctrl.ControlSystemSimulation(cooling_ctrl)

# Test the fuzzy controller
temp_input = 60  # Example temperature
charge_input = 80  # Example charge

cooling_sim.input['temperature'] = temp_input
cooling_sim.input['charge'] = charge_input

# Compute output
cooling_sim.compute()
print(f"Temperature: {temp_input}, Charge: {charge_input}, Cooling Power: {cooling_sim.output['cooling_power']}")


Temperature: 60, Charge: 80, Cooling Power: 49.99999999999997


In [3]:
pip install stable-baselines3 gym


Collecting stable-baselines3
  Downloading stable_baselines3-2.3.2-py3-none-any.whl.metadata (5.1 kB)
Collecting gymnasium<0.30,>=0.28.1 (from stable-baselines3)
  Downloading gymnasium-0.29.1-py3-none-any.whl.metadata (10 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium<0.30,>=0.28.1->stable-baselines3)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading stable_baselines3-2.3.2-py3-none-any.whl (182 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m182.3/182.3 kB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading gymnasium-0.29.1-py3-none-any.whl (953 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m953.9/953.9 kB[0m [31m24.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium, stable-baselines3
Successfully installed farama-notifications-0.0.4 gymnasium-0.29.1 stable-baselines3-2.3

In [6]:
import gym
from stable_baselines3 import PPO

# Define a custom environment (simple example for demonstration purposes)
class BatteryEnv(gym.Env):
    def __init__(self):
        super(BatteryEnv, self).__init__()
        # Define action and observation space
        # Action: Cooling power (continuous between 0 and 1)
        self.action_space = gym.spaces.Box(low=0, high=1, shape=(1,), dtype=float)
        # Observation: Battery temperature and state of charge
        self.observation_space = gym.spaces.Box(low=0, high=100, shape=(2,), dtype=float)
        self.state = [50, 80]  # Example: [temperature, charge]

    def reset(self):
        self.state = [50, 80]  # Reset to initial conditions
        return self.state

    def step(self, action):
        temp, charge = self.state
        cooling_power = action[0]  # Action is cooling power (0 to 1)

        # Update battery temperature and charge (simple dynamics for demonstration)
        temp -= cooling_power * 5  # Cooling reduces temperature
        charge -= 0.1 * (1 - cooling_power)  # Charge decreases with less cooling

        # Update state
        self.state = [temp, charge]

        # Calculate reward (e.g., keeping temp between 20 and 60 and maintaining charge)
        reward = -abs(temp - 40) - abs(charge - 80)

        # Done if battery reaches critical levels
        done = temp < 20 or temp > 80 or charge < 10

        return self.state, reward, done, {}

env = BatteryEnv()

# Initialize PPO model
model = PPO("MlpPolicy", env, verbose=1)
# Train the model
model.learn(total_timesteps=10000)

# Test the model
obs = env.reset()
for _ in range(100):
    action, _states = model.predict(obs, deterministic=True)
    obs, reward, done, info = env.step(action)
    print(f"Obs: {obs}, Reward: {reward}, Done: {done}")
    if done:
        break


Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.




---------------------------------
| rollout/           |          |
|    ep_len_mean     | 19.8     |
|    ep_rew_mean     | -193     |
| time/              |          |
|    fps             | 1292     |
|    iterations      | 1        |
|    time_elapsed    | 1        |
|    total_timesteps | 2048     |
---------------------------------
------------------------------------------
| rollout/                |              |
|    ep_len_mean          | 18.4         |
|    ep_rew_mean          | -175         |
| time/                   |              |
|    fps                  | 818          |
|    iterations           | 2            |
|    time_elapsed         | 5            |
|    total_timesteps      | 4096         |
| train/                  |              |
|    approx_kl            | 0.0080325045 |
|    clip_fraction        | 0.109        |
|    clip_range           | 0.2          |
|    entropy_loss         | -1.41        |
|    explained_variance   | -0.00199     |
|    learning_r

In [5]:
pip install shimmy

  and should_run_async(code)


Collecting shimmy
  Downloading Shimmy-2.0.0-py3-none-any.whl.metadata (3.5 kB)
Collecting gymnasium>=1.0.0a1 (from shimmy)
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Downloading Shimmy-2.0.0-py3-none-any.whl (30 kB)
Downloading gymnasium-1.0.0-py3-none-any.whl (958 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m958.1/958.1 kB[0m [31m16.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: gymnasium, shimmy
  Attempting uninstall: gymnasium
    Found existing installation: gymnasium 0.29.1
    Uninstalling gymnasium-0.29.1:
      Successfully uninstalled gymnasium-0.29.1
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
stable-baselines3 2.3.2 requires gymnasium<0.30,>=0.28.1, but you have gymnasium 1.0.0 which is incompatible.[0m[31m
[0mSuccessfully installed gymnasium-1.0.0 shimmy-2.0.0


In [7]:
import numpy as np
import gym
from stable_baselines3 import PPO
import skfuzzy as fuzz
from skfuzzy import control as ctrl

# 1. Define a Battery Simulation Environment (common for PPO and Fuzzy Logic)
class BatteryEnv(gym.Env):
    def __init__(self):
        super(BatteryEnv, self).__init__()
        self.action_space = gym.spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)
        self.observation_space = gym.spaces.Box(low=0, high=100, shape=(2,), dtype=np.float32)
        self.state = [50, 80]  # Initialize with [temperature, charge]

    def reset(self):
        self.state = [50, 80]
        return np.array(self.state, dtype=np.float32)

    def step(self, action):
        temp, charge = self.state
        cooling_power = action[0]
        temp -= cooling_power * 5  # Cooling reduces temperature
        charge -= 0.1 * (1 - cooling_power)
        self.state = [temp, charge]
        reward = -abs(temp - 40) - abs(charge - 80)
        done = temp < 20 or temp > 80 or charge < 10
        return np.array(self.state, dtype=np.float32), reward, done, {}

# Initialize the environment
env = BatteryEnv()

# 2. Train PPO on the Environment
print("Training PPO...")
model = PPO("MlpPolicy", env, verbose=1)
model.learn(total_timesteps=5000)

# 3. Define the Fuzzy Logic System
temperature = ctrl.Antecedent(np.arange(0, 101, 1), 'temperature')
charge = ctrl.Antecedent(np.arange(0, 101, 1), 'charge')
cooling_power = ctrl.Consequent(np.arange(0, 101, 1), 'cooling_power')

temperature['low'] = fuzz.trimf(temperature.universe, [0, 0, 40])
temperature['medium'] = fuzz.trimf(temperature.universe, [30, 50, 70])
temperature['high'] = fuzz.trimf(temperature.universe, [60, 100, 100])
charge['low'] = fuzz.trimf(charge.universe, [0, 0, 50])
charge['medium'] = fuzz.trimf(charge.universe, [30, 50, 80])
charge['high'] = fuzz.trimf(charge.universe, [70, 100, 100])
cooling_power['low'] = fuzz.trimf(cooling_power.universe, [0, 0, 50])
cooling_power['medium'] = fuzz.trimf(cooling_power.universe, [25, 50, 75])
cooling_power['high'] = fuzz.trimf(cooling_power.universe, [50, 100, 100])

rule1 = ctrl.Rule(temperature['high'] & charge['high'], cooling_power['high'])
rule2 = ctrl.Rule(temperature['medium'] & charge['high'], cooling_power['medium'])
rule3 = ctrl.Rule(temperature['low'] | charge['low'], cooling_power['low'])

cooling_ctrl = ctrl.ControlSystem([rule1, rule2, rule3])
cooling_sim = ctrl.ControlSystemSimulation(cooling_ctrl)

# 4. Test and Compare PPO and Fuzzy Logic
def test_policy(policy, env, policy_type="PPO"):
    obs = env.reset()
    total_reward = 0
    steps = 0
    while True:
        if policy_type == "PPO":
            action, _ = policy.predict(obs, deterministic=True)
        elif policy_type == "Fuzzy":
            temp, charge = obs
            cooling_sim.input['temperature'] = temp
            cooling_sim.input['charge'] = charge
            cooling_sim.compute()
            action = [cooling_sim.output['cooling_power'] / 100.0]  # Normalize to 0-1
        obs, reward, done, _ = env.step(action)
        total_reward += reward
        steps += 1
        if done or steps > 100:  # Limit to 100 steps to avoid infinite loops
            break
    return total_reward

# Run tests
ppo_reward = test_policy(model, env, policy_type="PPO")
fuzzy_reward = test_policy(None, env, policy_type="Fuzzy")

print("\nResults:")
print(f"PPO Total Reward: {ppo_reward}")
print(f"Fuzzy Logic Total Reward: {fuzzy_reward}")

# Comparison based on average reward and overall performance
if ppo_reward > fuzzy_reward:
    print("PPO outperformed Fuzzy Logic for this battery optimization task.")
else:
    print("Fuzzy Logic outperformed PPO for this battery optimization task.")


Training PPO...
Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.




---------------------------------
| rollout/           |          |
|    ep_len_mean     | 20.7     |
|    ep_rew_mean     | -206     |
| time/              |          |
|    fps             | 1134     |
|    iterations      | 1        |
|    time_elapsed    | 1        |
|    total_timesteps | 2048     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 17.9        |
|    ep_rew_mean          | -169        |
| time/                   |             |
|    fps                  | 843         |
|    iterations           | 2           |
|    time_elapsed         | 4           |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.005569823 |
|    clip_fraction        | 0.084       |
|    clip_range           | 0.2         |
|    entropy_loss         | -1.41       |
|    explained_variance   | 0.00396     |
|    learning_rate        | 0.

In [11]:
import numpy as np
import gym
from stable_baselines3 import PPO
import skfuzzy as fuzz
from skfuzzy import control as ctrl

# 1. Define Battery Consumption Environment
class BatteryEnv(gym.Env):
    def __init__(self):
        super(BatteryEnv, self).__init__()
        # Action: Power consumption level, continuous between 0 (low) and 1 (high)
        self.action_space = gym.spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)
        # Observation: Battery level, speed, and incline angle
        self.observation_space = gym.spaces.Box(low=np.array([0, 0, -10]), high=np.array([100, 120, 10]), dtype=np.float32)
        self.state = [100, 50, 0]  # Initial state: [battery level, speed, incline angle]

    def reset(self):
        self.state = [100, 50, 0]  # Reset to full battery, moderate speed, flat road
        return np.array(self.state, dtype=np.float32)

    def step(self, action):
        battery, speed, incline = self.state
        power_consumption = action[0]

        # Battery drain: Increased by speed and incline
        battery -= power_consumption * (1 + speed / 100 + incline / 10)
        battery = max(battery, 0)  # Ensure battery does not go below 0

        # Simulate changes in speed and incline
        speed = np.clip(speed + np.random.uniform(-5, 5), 0, 120)
        incline = np.clip(incline + np.random.uniform(-2, 2), -10, 10)

        self.state = [battery, speed, incline]
        reward = -power_consumption * (1 + speed / 100 + incline / 10)  # Penalize high consumption
        done = battery <= 0
        return np.array(self.state, dtype=np.float32), reward, done, {}

# Initialize the environment
env = BatteryEnv()

# 2. Train PPO on the Environment
print("Training PPO...")
model = PPO("MlpPolicy", env, verbose=1)
model.learn(total_timesteps=5000)

# 3. Define the Fuzzy Logic System
battery_level = ctrl.Antecedent(np.arange(0, 101, 1), 'battery_level')
speed = ctrl.Antecedent(np.arange(0, 121, 1), 'speed')
incline = ctrl.Antecedent(np.arange(-10, 11, 1), 'incline')
power = ctrl.Consequent(np.arange(0, 101, 1), 'power')

# Define fuzzy sets
battery_level['low'] = fuzz.trimf(battery_level.universe, [0, 0, 40])
battery_level['medium'] = fuzz.trimf(battery_level.universe, [30, 50, 70])
battery_level['high'] = fuzz.trimf(battery_level.universe, [60, 100, 100])

speed['slow'] = fuzz.trimf(speed.universe, [0, 0, 60])
speed['medium'] = fuzz.trimf(speed.universe, [40, 60, 80])
speed['fast'] = fuzz.trimf(speed.universe, [70, 120, 120])

incline['downhill'] = fuzz.trimf(incline.universe, [-10, -10, 0])
incline['flat'] = fuzz.trimf(incline.universe, [-2, 0, 2])
incline['uphill'] = fuzz.trimf(incline.universe, [0, 10, 10])

power['low'] = fuzz.trimf(power.universe, [0, 0, 50])
power['medium'] = fuzz.trimf(power.universe, [25, 50, 75])
power['high'] = fuzz.trimf(power.universe, [50, 100, 100])

# Define rules
rule1 = ctrl.Rule(battery_level['high'] & incline['downhill'], power['low'])
rule2 = ctrl.Rule(battery_level['medium'] & incline['flat'], power['medium'])
rule3 = ctrl.Rule(battery_level['low'] & incline['uphill'], power['high'])
rule4 = ctrl.Rule(speed['fast'] & incline['uphill'], power['high'])
rule5 = ctrl.Rule(speed['slow'] & incline['downhill'], power['low'])

# Create the fuzzy control system
power_ctrl = ctrl.ControlSystem([rule1, rule2, rule3, rule4, rule5])
power_sim = ctrl.ControlSystemSimulation(power_ctrl)

# 4. Test and Compare PPO and Fuzzy Logic
def test_policy(policy, env, policy_type="PPO"):
    obs = env.reset()
    total_reward = 0
    steps = 0
    while True:
        if policy_type == "PPO":
            action, _ = policy.predict(obs, deterministic=True)
        elif policy_type == "Fuzzy":
            battery, speed, incline = obs
            power_sim.input['battery_level'] = battery
            power_sim.input['speed'] = speed
            power_sim.input['incline'] = incline
            power_sim.compute()  # Compute the fuzzy logic output
            action = [power_sim.output['power'] / 100.0]  # Normalize to 0-1 range

        obs, reward, done, _ = env.step(action)
        total_reward += reward
        steps += 1
        if done or steps > 100:  # Limit to 100 steps
            break
    return total_reward

# Run tests
ppo_reward = test_policy(model, env, policy_type="PPO")
fuzzy_reward = test_policy(None, env, policy_type="Fuzzy")

print("\nResults:")
print(f"PPO Total Reward: {ppo_reward}")
print(f"Fuzzy Logic Total Reward: {fuzzy_reward}")

# Display which approach performed better
if ppo_reward > fuzzy_reward:
    print("PPO outperformed Fuzzy Logic for battery consumption optimization.")
else:
    print("Fuzzy Logic outperformed PPO for battery consumption optimization.")


  and should_run_async(code)
  logger.warn(f"Box bound precision lowered by casting to {self.dtype}")


Training PPO...
Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 208      |
|    ep_rew_mean     | -100     |
| time/              |          |
|    fps             | 1237     |
|    iterations      | 1        |
|    time_elapsed    | 1        |
|    total_timesteps | 2048     |
---------------------------------
----------------------------------------
| rollout/                |            |
|    ep_len_mean          | 254        |
|    ep_rew_mean          | -100       |
| time/                   |            |
|    fps                  | 829        |
|    iterations           | 2          |
|    time_elapsed         | 4          |
|    total_timesteps      | 4096       |
| train/                  |            |
|    approx_kl            | 0.01195502 |
|    clip_fraction        | 0.129      |
|    clip_range           | 0.2        |
|    entropy_loss

UnboundLocalError: local variable 'action' referenced before assignment

In [12]:
import numpy as np
import gym
from stable_baselines3 import PPO
import random
import math

# 1. Define Battery Consumption Environment
class BatteryEnv(gym.Env):
    def __init__(self):
        super(BatteryEnv, self).__init__()
        self.action_space = gym.spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)
        self.observation_space = gym.spaces.Box(low=np.array([0, 0, -10]), high=np.array([100, 120, 10]), dtype=np.float32)
        self.state = [100, 50, 0]  # Initial state: [battery level, speed, incline]

    def reset(self):
        self.state = [100, 50, 0]  # Reset to full battery, moderate speed, flat road
        return np.array(self.state, dtype=np.float32)

    def step(self, action):
        battery, speed, incline = self.state
        power_consumption = action[0]

        battery -= power_consumption * (1 + speed / 100 + incline / 10)
        battery = max(battery, 0)  # Ensure battery does not go below 0

        speed = np.clip(speed + np.random.uniform(-5, 5), 0, 120)
        incline = np.clip(incline + np.random.uniform(-2, 2), -10, 10)

        self.state = [battery, speed, incline]
        reward = -power_consumption * (1 + speed / 100 + incline / 10)
        done = battery <= 0
        return np.array(self.state, dtype=np.float32), reward, done, {}

# Initialize the environment
env = BatteryEnv()

# 2. Train PPO on the Environment
print("Training PPO...")
model = PPO("MlpPolicy", env, verbose=1)
model.learn(total_timesteps=5000)

# 3. Implement Sidewinder Snake Algorithm
class SidewinderSnake:
    def __init__(self, env):
        self.env = env

    def choose_action(self, state):
        # A simple heuristic: decide the action based on battery level and incline
        battery, speed, incline = state

        if battery > 80:
            action = random.uniform(0.1, 0.3)  # Low consumption
        elif battery > 40:
            action = random.uniform(0.3, 0.7)  # Medium consumption
        else:
            action = random.uniform(0.7, 1.0)  # High consumption

        # "Snake" behavior: introduce a random component to simulate exploration
        action += random.uniform(-0.05, 0.05)
        action = np.clip(action, 0, 1)

        return [action]

    def test_policy(self, env):
        state = env.reset()
        total_reward = 0
        done = False
        while not done:
            action = self.choose_action(state)
            state, reward, done, _ = env.step(action)
            total_reward += reward
        return total_reward

# 4. Test function for both PPO and Sidewinder Snake Algorithm
def test_policy(model, env, policy_type="PPO"):
    if policy_type == "PPO":
        state = env.reset()
        total_reward = 0
        done = False
        while not done:
            action, _states = model.predict(state, deterministic=True)
            state, reward, done, _ = env.step(action)
            total_reward += reward
        return total_reward
    elif policy_type == "Fuzzy":
        snake_algorithm = SidewinderSnake(env)
        return snake_algorithm.test_policy(env)

# 5. Compare PPO and Sidewinder Snake Algorithm Performance
def compare_algorithms(env):
    # Test PPO
    ppo_reward = test_policy(model, env, policy_type="PPO")

    # Test Sidewinder Snake Algorithm
    snake_algorithm = SidewinderSnake(env)
    sidewinder_reward = snake_algorithm.test_policy(env)

    # Display Results
    print("\nResults:")
    print(f"PPO Total Reward: {ppo_reward}")
    print(f"Sidewinder Snake Algorithm Total Reward: {sidewinder_reward}")

    if ppo_reward > sidewinder_reward:
        print("PPO outperformed Sidewinder Snake Algorithm.")
    else:
        print("Sidewinder Snake Algorithm outperformed PPO.")

# Run comparison
compare_algorithms(env)


Training PPO...
Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 208      |
|    ep_rew_mean     | -101     |
| time/              |          |
|    fps             | 888      |
|    iterations      | 1        |
|    time_elapsed    | 2        |
|    total_timesteps | 2048     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 225         |
|    ep_rew_mean          | -101        |
| time/                   |             |
|    fps                  | 760         |
|    iterations           | 2           |
|    time_elapsed         | 5           |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.014373256 |
|    clip_fraction        | 0.182       |
|    clip_range           | 0.2         |
|   

KeyboardInterrupt: 

In [15]:
import numpy as np
import gym
from stable_baselines3 import PPO
import random

# 1. Define Battery Consumption Environment
class BatteryEnv(gym.Env):
    def __init__(self):
        super(BatteryEnv, self).__init__()
        self.action_space = gym.spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)  # Power consumption action space
        self.observation_space = gym.spaces.Box(low=np.array([0, 0, -10]), high=np.array([100, 120, 10]), dtype=np.float32)  # Battery, speed, incline
        self.state = [100, 50, 0]  # Initial state: [battery level, speed, incline]

    def reset(self):
        self.state = [100, 50, 0]  # Reset to full battery, moderate speed, flat road
        return np.array(self.state, dtype=np.float32)

    def step(self, action):
        battery, speed, incline = self.state
        power_consumption = action[0] * 100  # Scale power consumption to a reasonable range (e.g., 0-100)

        # Update battery based on power consumption and the effect of speed and incline
        battery -= power_consumption * (1 + speed / 100 + incline / 10)
        battery = max(battery, 0)  # Ensure battery does not go below 0

        # Random changes to speed and incline
        speed = np.clip(speed + np.random.uniform(-5, 5), 0, 120)
        incline = np.clip(incline + np.random.uniform(-2, 2), -10, 10)

        self.state = [battery, speed, incline]

        # Reward is negative because we want to minimize battery usage
        reward = -power_consumption * (1 + speed / 100 + incline / 10)

        # Episode ends when battery is drained
        done = battery <= 0
        return np.array(self.state, dtype=np.float32), reward, done, {}

# Initialize the environment
env = BatteryEnv()

# 2. Train PPO on the Environment
print("Training PPO...")
model = PPO("MlpPolicy", env, verbose=1)
model.learn(total_timesteps=5000)

# 3. Test function for both PPO and Normal Agent
def test_policy(model, env, policy_type="PPO"):
    state = env.reset()
    total_reward = 0
    done = False
    c=0
    while not done:
        if policy_type == "PPO":
            # PPO uses its learned policy to choose the action
            action, _states = model.predict(state, deterministic=True)
        elif policy_type == "Normal":
            # Normal agent takes random actions
            action = np.random.uniform(0, 1, size=(1,))
        c+=1
        state, reward, done, _ = env.step(action)
        total_reward += reward
        if done or c>100:
          break
    return total_reward

# 4. Compare PPO and Normal Agent Performance
def compare_algorithms(env):
    # Test PPO
    ppo_reward = test_policy(model, env, policy_type="PPO")

    # Test Normal Agent
    normal_reward = test_policy(None, env, policy_type="Normal")

    # Display Results
    print("\nResults:")
    print(f"PPO Total Reward: {ppo_reward}")
    print(f"Normal Agent Total Reward: {normal_reward}")

    if ppo_reward > normal_reward:
        print("PPO outperformed Normal Agent.")
    else:
        print("Normal Agent outperformed PPO.")

# Run comparison
compare_algorithms(env)


  and should_run_async(code)
  logger.warn(f"Box bound precision lowered by casting to {self.dtype}")


Training PPO...
Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 2.87     |
|    ep_rew_mean     | -157     |
| time/              |          |
|    fps             | 1224     |
|    iterations      | 1        |
|    time_elapsed    | 1        |
|    total_timesteps | 2048     |
---------------------------------
----------------------------------------
| rollout/                |            |
|    ep_len_mean          | 4.53       |
|    ep_rew_mean          | -158       |
| time/                   |            |
|    fps                  | 862        |
|    iterations           | 2          |
|    time_elapsed         | 4          |
|    total_timesteps      | 4096       |
| train/                  |            |
|    approx_kl            | 0.01892561 |
|    clip_fraction        | 0.214      |
|    clip_range           | 0.2        |
|    entropy_loss