-
Notifications
You must be signed in to change notification settings - Fork 8
/
extract_pos.py
149 lines (119 loc) · 3.71 KB
/
extract_pos.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import logging
import mylogger
import subprocess
import time
import csv
class PlayerConfig:
def __init__(self):
self.position_token = (3, "position2d")
self.type_token = (5, 1)
self.subtype_token = (6, 1)
class Position:
def __init__(self, line):
tokens = line.split(" ")
# Time
self.time = float(tokens[0])
# Positions
self.x_pos = float(tokens[7])
self.y_pos = float(tokens[8])
self.yaw_pos = float(tokens[9])
# Velocity
self.x_vel = float(tokens[10])
self.y_vel = float(tokens[11])
self.yaw_vel = float(tokens[12])
self.signal = int(get_wifi())
# Motor stall
# self.motor = tokens[13]
def __repr__(self):
representation = "Time: %f \n" \
"\tX Pos: %f \n" \
"\tY Pos: %f \n" \
"\tYaw Pos: %f \n" \
"\tX vel: %f \n" \
"\tY vel: %f \n" \
"\tYaw vel: %f\n"\
"\tSignal: %f\n"\
% (self.time, self.x_pos, self.y_pos, self.yaw_pos, self.x_vel, self.y_vel, self.yaw_vel, self.signal)
return representation
def get_wifi():
a = open("/proc/net/wireless","r")
a.readline()
a.readline()
wlan0 = a.readline()
return int(wlan0[15:17])
def tail_file(filename):
"""
Proc every line in a file
"""
# Holds all of the responses
responses = []
mapfile = 'maps.csv'
f = subprocess.Popen(['tail', '-F', filename],\
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
ts = time.time()
while True:
line = f.stdout.readline().decode()
try:
response = process_line(line)
if response is not None:
if (ts < time.time()):
responses.append(response)
write_csv(response, mapfile)
print(response)
ts = time.time() + 0.1
#f.stdout.flush()
#time.sleep(1)
else:
pass
except Exception as e:
logging.error("Error..." + str(e))
pass
return responses
def write_csv(data, filename):
with open(filename, "a+") as csvfile:
writer = csv.writer(csvfile)
writer.writerow([data.x_pos, data.y_pos, data.signal])
def read_file(filename):
"""
Proc every line in a file
"""
# Holds all of the responses
responses = []
with open(filename, "r") as f:
for line in f.readlines():
try:
response = process_line(line)
if response is not None:
responses.append(response)
else:
pass
except Exception as e:
logging.error("Error..." + str(e))
pass
return responses
def process_line(line) -> Position or None:
"""
Given a line, return either a Position object or None
"""
logging.debug(line)
if is_position2d(line):
return Position(line)
else:
return None
def is_position2d(line) -> bool:
"""
Returns a boolean if the line represents a 2d position line
"""
tokens = line.split(" ")
logging.debug(tokens)
config = PlayerConfig()
return \
tokens[config.position_token[0]] == config.position_token[1] and \
int(tokens[config.type_token[0]]) == config.type_token[1] and \
int(tokens[config.subtype_token[0]]) == config.subtype_token[1]
if __name__ == "__main__":
data_file = "/run/shm/PLAYER_fprintf.log"
mylogger.config_logs()
#responses = read_file(data_file)
responses = tail_file(data_file)
print(responses)