-
Notifications
You must be signed in to change notification settings - Fork 236
/
simulation.py
226 lines (175 loc) · 6.75 KB
/
simulation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import time, datetime
import bluesky as bs
from bluesky.tools import datalog, areafilter, plugin
from bluesky.tools.misc import txt2tim,tim2txt
from bluesky import stack
from bluesky.traffic.metric import Metric
onedayinsec = 24*3600 # [s] time of one day in seconds for clock time
class Simulation:
"""
Simulation class definition : simulation control (time, mode etc.) class
Methods:
Simulation() : constructor
update() : update sim variables (like time)
op() : go from IC/HOLD to OPERATE mode
pause() : go to HOLD mode (pause)
stop() : quit function
Created by : Jacco M. Hoekstra (TU Delft)
"""
# simulation modes
init, hold, op, end = list(range(4))
def __init__(self, detached):
# simmode
self.mode = self.init
self.simt = 0.0 # Runtime
self.tprev = 0.0
self.syst0 = 0.0
self.simdt = 0.0
self.syst = 0.0 # system time
# Simulated UTC clock time
self.utc = datetime.datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
# Directories
self.datadir = "./data/"
self.dts = []
# Fixed dt mode for fast forward
self.ffmode = False # Default FF off
self.fixdt = 0.1 # Default time step
self.ffstop = -1. # Indefinitely
# Simulation objects
print("Setting up Traffic simulation")
self.metric = Metric()
def update(self):
self.syst = time.clock()
if self.mode == Simulation.init:
self.operate()
# Closk for run(=op) mode
if self.mode == Simulation.op:
# Not fast forward: variable dt
if not self.ffmode:
self.tprev = self.simt
self.simt = self.syst - self.syst0
self.simdt = self.simt - self.tprev
# Protect against incidental dt's larger than 1 second,
# due to window moving, switch to/from full screen, etc.
if self.simdt > 1.0:
extra = self.simdt-1.0
self.simdt = 1.0
self.simt = self.simt - extra
self.syst0 = self.syst-self.simt
# Fast forward: fixed dt until ffstop time, goto pause
else:
self.simdt = self.fixdt
self.simt = self.simt+self.fixdt
self.syst0 = self.syst - self.simt
if self.ffstop > 0. and self.simt >= self.ffstop:
self.ffmode = False
self.mode = self.hold
# Update UTC time
self.utc += datetime.timedelta(seconds=self.simdt)
# Datalog pre-update (communicate current sim time to loggers)
datalog.preupdate(self.simt)
# Plugins pre-update
plugin.preupdate(self.simt)
# For measuring game loop frequency
self.dts.append(self.simdt)
if len(self.dts) > 20:
del self.dts[0]
stack.checkscen()
# Always process stack
stack.process()
if self.mode == Simulation.op:
bs.traf.update(self.simt, self.simdt)
# Update metrics
self.metric.update()
# Update plugins
plugin.update(self.simt)
# Update loggers
datalog.postupdate()
# HOLD/Pause mode
else:
self.syst0 = self.syst-self.simt
self.simdt = 0.0
return
def scenarioInit(self, name):
self.reset()
return
def benchmark(self, fname='IC', tend=60.0):
return False, "Benchmark command not available in Pygame version."
def batch(self, filename):
return False, "Batch comand not available in Pygame version," + \
"use Qt-version for batch simulations"
def addnodes(self, count):
return
def pause(self): # Hold mode
self.mode = self.hold
self.syst0 = self.syst-self.simt
self.simdt = 0.0
return
def stop(self): # Quit mode
self.mode = self.end
datalog.reset()
bs.stack.saveclose() # Save close configuration
# datalog.save()
return
def operate(self): # Back to op-mode: run after HOLD/PAUSE
self.mode = self.op
self.syst = time.clock()
self.syst0 = self.syst-self.simt
self.tprev = self.simt-0.001 # allow 1 msec step rto avoid div by zero
return
def setdt(self, dt):
self.fixdt = abs(dt)
def setDtMultiplier(self, mult=None):
return False, "Dt multiplier not available in Pygame version."
def setFixdt(self, flag=None, nsec=None):
if flag is not None:
if flag:
self.fastforward(nsec)
else:
self.ffmode = False
def fastforward(self, nsec=None):
self.ffmode = True
if nsec is not None:
self.ffstop = self.simt + nsec
else:
self.ff_end = -1.0
def reset(self):
self.simt = 0.0
self.mode = self.init
plugin.reset()
bs.navdb.reset()
bs.traf.reset()
datalog.reset()
areafilter.reset()
self.utc = datetime.datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
def setutc(self, *args):
""" Set simulated clock time offset"""
if not args:
pass # avoid error message, just give time
elif len(args) == 1:
if args[0].upper() == "RUN":
self.utc = datetime.datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
elif args[0].upper() == "REAL":
self.utc = datetime.datetime.today().replace(microsecond=0)
elif args[0].upper() == "UTC":
self.utc = datetime.datetime.utcnow().replace(microsecond=0)
else:
try:
self.utc = datetime.datetime.strptime(args[0], '%H:%M:%S.%f')
except ValueError:
return False, "Input time invalid"
elif len(args) == 3:
day, month, year = args
try:
self.utc = datetime.datetime(year, month, day)
except ValueError:
return False, "Input date invalid."
elif len(args) == 4:
day, month, year, timestring = args
try:
self.utc = datetime.datetime.strptime(f'{year},{month},{day},{timestring}', '%Y,%m,%d,%H:%M:%S.%f')
except ValueError:
return False, "Input date invalid."
else:
return False, "Syntax error"
return True, "Simulation UTC " + str(self.utc)