In [1]:
from __future__ import annotations
import random
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import torch
import torch.nn as nn
from torch.distributions.normal import Normal
import pickle
import time
import gymnasium as gym
from segway_sim.envs import SegwayEnv

  from .autonotebook import tqdm as notebook_tqdm


In [35]:
class PID:
    def __init__(self, kp=1, ki=0, kd=0, setpoint=0):
        self.kp = kp
        self.ki = ki
        self.kd = kd
        self.setpoint = setpoint
        self.integral = 0
        self.previous_error = 0
    def sample_action(self, obs: np.ndarray) -> float:
        error = self.setpoint - obs[0]
        self.integral += error
        derivative = error - self.previous_error

        action = self.kp * error + self.ki * self.integral + self.kd * derivative

        self.previous_error = error
        action = max(min(action, 0.7), -0.7)
        return [action]

    def update(self):
        return
    def save(self, nn_file_path: str):
        return

env = SegwayEnv(max_ep_len = 1000)
wrapped_env = gym.wrappers.RecordEpisodeStatistics(env, 50)  # Records episode-reward
# wrapped_env = gym.wrappers.RecordVideo(wrapped_env, 'videos', episode_trigger = lambda x: x % 1000 == 0)

obs_space_dims = env.observation_space.shape[0]
action_space_dims = env.action_space.shape[0]
print('ob space:', obs_space_dims)
print('ac space:', action_space_dims)

seed = int(time.time())
torch.manual_seed(seed)
random.seed(seed)
np.random.seed(seed)

agent = PID(15, 0, 0)
reward_over_episodes = []

for episode in range(1):
    obs, info = wrapped_env.reset(seed=seed)
    done = False
    while not done:
        action = agent.sample_action(obs)
        obs, reward, terminated, truncated, info = wrapped_env.step(action)
        done = terminated or truncated

    reward_over_episodes.append(wrapped_env.return_queue[-1])
    agent.update()

    if episode % 1000 == 0:
        avg_reward = int(np.mean(wrapped_env.return_queue))
        print("Episode:", episode, "Average Reward:", avg_reward)

ob space: 6
ac space: 1
Episode: 0 Average Reward: 275


In [36]:
from gymnasium.wrappers import RecordVideo
# env = gym.make("InvertedPendulum-v4", render_mode = 'rgb_array')
env = SegwayEnv(max_ep_len = 1000, render_mode="rgb_array")
video_env = RecordVideo(env, video_folder="videos", name_prefix="SegwayPID")
observation, info = video_env.reset()
print("observation", observation)
print(video_env.action_space)
for i in range(1000):
    action = agent.sample_action(observation)
    # action = env.action_space.sample()
    observation, reward, terminated, truncated, info = video_env.step(action)
    if terminated or truncated:
        print("terminated at", i)
        break
        observation, info = video_env.reset()
video_env.close()

  logger.warn(


observation [ 1.31828900e-01 -2.11840573e+00  2.39865870e+01 -1.42868453e-03
 -6.34104121e-03  9.02506296e-03]
Box(-1.0, 1.0, (1,), float32)
Moviepy - Building video /Users/robertli/github/tinySegway/videos/SegwayPID-episode-0.mp4.
Moviepy - Writing video /Users/robertli/github/tinySegway/videos/SegwayPID-episode-0.mp4



                                                               

Moviepy - Done !
Moviepy - video ready /Users/robertli/github/tinySegway/videos/SegwayPID-episode-0.mp4
terminated at 448
