-
Notifications
You must be signed in to change notification settings - Fork 0
/
simulator.py
113 lines (92 loc) · 3.43 KB
/
simulator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# -*- coding: utf-8 -*-
import subprocess, sys, os, time, constants, optparse, config
import traci
import networkx as nx
import networkutility as netutil
from sumolib import checkBinary
from sumolib import net
class Simulator:
def __init__(self):
self.conf = config.Config()
self.conf.readConfig(constants.CONFIG_FILE)
self._options = self.__getOptions__()
self.sumo_net = net.readNet(self.conf.network_file)
self.original_network = nx.DiGraph()
# check output directory
if os.path.isdir(self.conf.output_dir) == False:
print("there is not output directory...")
os.mkdir(self.conf.output_dir)
print("create output directory.")
if self.conf.real_net == True:
netutil.readRealNetwork(self.sumo_net, self.original_network)
else:
netutil.readNetwork(self.sumo_net, self.original_network)
def __getOptions__(self):
self.optParser = optparse.OptionParser()
self.optParser.add_option(
"--gui",
action = "store_true",
default = False,
help = "run sumo with gui")
self.optParser.add_option(
"--port",
type = "int",
default = self.conf.port,
help = "run sumo with port")
options, args = self.optParser.parse_args()
return options
def run(self, offset):
self.__inner_run__(self.conf.output_dir + "/" + offset + ".xml")
self.__sumoProcess.wait()
def __inner_run__(self, output_file):
if self._options.gui:
sumoBinary = checkBinary('sumo-gui')
self.__sumoProcess = subprocess.Popen(
[sumoBinary,"-W",
"-n", self.conf.network_file,
"-r", self.conf.route_file,
"--tripinfo-output", output_file,
"--remote-port", str(self.conf.port),
"--gui-settings-file", self.conf.gui_setting_file,
"--step-length", "1",
"-v", "true",
"--time-to-teleport", "-1"],
stdout = sys.stdout, stderr=sys.stderr)
time.sleep(20)
else:
sumoBinary = checkBinary('sumo')
self.__sumoProcess = subprocess.Popen(
[sumoBinary, "-W",
"-n", self.conf.network_file,
"-r", self.conf.route_file,
"--tripinfo-output", output_file,
"--remote-port", str(self.conf.port),
"--step-length", "1",
"-v", "true",
"--time-to-teleport", "-1"],
stdout = sys.stdout, stderr=sys.stderr)
time.sleep(20)
traci.init(self.conf.port)
self.initIteration()
while True:
traci.simulationStep()
if traci.simulation.getMinExpectedNumber() <= 0:
break
self.stepProcess()
if traci.simulation.getCurrentTime() % self.conf.short_term_sec == 0:
self.can_change_lane_list = []
self.want_chage_vehicle_list = []
self.endIteration()
traci.close()
if self._options.gui:
os.system('pkill sumo-gui')
sys.stdout.flush()
def initIteration(self):
pass
def stepProcess(self):
pass
def endIteration(self):
pass
if __name__ == "__main__":
sim = Simulator()
sim.run("Simulation")