This repository has been archived by the owner on Feb 6, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
inverted_pendulum.py
119 lines (94 loc) · 4.09 KB
/
inverted_pendulum.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""
A Gym environment for frequency band assignments to a sensor and a controller in
the wireless networked control of an inverted pendulum
"""
from math import degrees
from typing import Dict, List
import gym
import numpy as np
from gym import error, spaces, utils
from gym.utils import seeding
from simpy.rt import RealtimeEnvironment
from gymwipe.control.inverted_pendulum import InvertedPendulumPidController
from gymwipe.envs.core import BaseEnv, Interpreter
from gymwipe.networking.attenuation_models import FsplAttenuation
from gymwipe.networking.devices import SimpleRrmDevice
from gymwipe.networking.messages import (FakeTransmittable, Packet,
Transmittable)
from gymwipe.networking.physical import FrequencyBand
from gymwipe.plants.sliding_pendulum import (AngleSensor, SlidingPendulum,
WagonActuator)
from gymwipe.simtools import SimMan
class InvertedPendulumInterpreter(Interpreter):
def __init__(self, env: "InvertedPendulumEnv"):
self._env = env
self.reset()
def onPacketReceived(self, senderIndex: int, receiverIndex: int, payload: Transmittable):
"""
No actions for received packets, as we read sensor angles directly
from the plant object.
"""
pass
def onFrequencyBandAssignment(self, deviceIndex: int, duration: int):
pass
def getReward(self):
"""
Reward is :math:`\\lvert 180 - \\alpha \\rvert` with :math:`\\alpha`
being the pendulum angle.
"""
return float( abs(180 - degrees(self._env.plant.getAngle())) )
def getObservation(self):
return int(degrees(self._env.plant.getAngle()))
def getDone(self):
return False
def getInfo(self):
return {"Sensor angle": degrees(self._env.plant.getAngle())}
class InvertedPendulumEnv(BaseEnv):
"""
An environment that allows an agent to assign a frequency band to a sliding
pendulum's :class:`~gymwipe.plants.sliding_pendulum.AngleSensor` and an
:class:`~gymwipe.control.inverted_pendulum.InvertedPendulumPidController`
Note:
This environment is yet untested!
"""
def __init__(self):
frequencyBand = FrequencyBand([FsplAttenuation])
super(InvertedPendulumEnv, self).__init__(frequencyBand, deviceCount=2)
# Observation depends on plant angle
self.observation_space = spaces.Discrete(180)
# Realtime environment for visualization
SimMan.env = RealtimeEnvironment()
# Setup plant and devices
plant = SlidingPendulum(visualized=True)
controller = InvertedPendulumPidController("Controller", 0, -1, frequencyBand)
sensor = AngleSensor("Sensor", frequencyBand, plant, controller.macAddr, 0.001) # 1 ms sample interval
controller.sensorAddr = sensor.macAddr
actuator = WagonActuator("Actuator", frequencyBand, plant)
controller.actuatorAddr = actuator.macAddr
self.plant = plant
self.sensor = sensor
self.actuator = actuator
self.controller = controller
self.deviceIndexToMacDict = {
0: sensor.macAddr,
1: controller.macAddr
}
interpreter = InvertedPendulumInterpreter(self)
self.rrm = SimpleRrmDevice("RRM", 0, 1, self.frequencyBand, self.deviceIndexToMacDict, interpreter)
def reset(self):
"""
Resets the state of the environment and returns an initial observation.
"""
return self.rrm.interpreter.getObservation()
def step(self, action):
assert self.action_space.contains(action)
deviceIndex = action["device"]
duration = action["duration"] * self.ASSIGNMENT_DURATION_FACTOR
# Assign the frequency band
assignSignal = self.rrm.assignFrequencyBand(deviceIndex, duration)
# Run the simulation until the assignment ends
SimMan.runSimulation(assignSignal.eProcessed)
# Return (observation, reward, done, info)
return self.rrm.interpreter.getFeedback()
def render(self, mode='human', close=False):
pass