forked from jiawlu/MapMatching
-
Notifications
You must be signed in to change notification settings - Fork 2
/
get_gps_trace.py
144 lines (113 loc) · 4.7 KB
/
get_gps_trace.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# @author Jiawei Lu (jiaweil9@asu.edu)
# @time 2021/4/29 12:11
# @desc [script description]
"""
Download and format trace data directly from OpenStreetMap**
python get_gps_trace.py*
Note: If you set up a network that is wider, the more trace information you get, and the longer you download and format it.
"""
from urllib.request import urlopen
import multiprocessing as mp
import xml.etree.cElementTree as ET
import re
import numpy as np
import pandas as pd
# Boundary Box of ASU
min_lon = -111.947725
min_lat = 33.413345
max_lon = -111.915025
max_lat = 33.432364
processors = 8
output_xml = False
url_template = 'https://api.openstreetmap.org/api/0.6/trackpoints?bbox={},{},{},{}&page={}'
ns_pattern = re.compile(r' xmlns="(.*)"')
def getPageData(bbox, page):
min_lon_, min_lat_, max_lon_, max_lat_ = bbox
vehicle_trace_list_page = []
request_url = url_template.format(min_lon_, min_lat_, max_lon_, max_lat_, page)
req = urlopen(request_url)
res = req.read().decode()
res_nons = re.sub(ns_pattern, '', res, count=1)
tree = ET.ElementTree(ET.fromstring(res_nons))
root = tree.getroot()
for trk in root:
trkseg = trk.find('trkseg')
if trkseg is None: continue
gps_points_list_veh = []
for trkpt in trkseg:
pttime = trkpt.find('time')
if pttime is None:
# untrackable
break
time_stamp = pttime.text
lat, lon = trkpt.attrib['lat'], trkpt.attrib['lon']
gps_points_list_veh.append((time_stamp, lon, lat))
if gps_points_list_veh:
vehicle_trace_list_page.append(gps_points_list_veh)
file_name = f'download_{min_lon_}-{min_lat_}-{max_lon_}-{max_lat_}_{page}.xml'
if output_xml:
with open(file_name, 'w',encoding='utf-8') as fin:
fin.write(res_nons)
return vehicle_trace_list_page
def getRegionData(bbox):
print(f'getting gps data on subregion {bbox}')
vehicle_trace_list_region = []
page = 0
while True:
vehicle_trace_list_page = getPageData(bbox,page)
if vehicle_trace_list_page:
vehicle_trace_list_region += vehicle_trace_list_page
else:
break
page += 1
return vehicle_trace_list_region
def downloadGPSData():
num_of_lon_inters = int(np.ceil((max_lon - min_lon) / 0.49))
num_of_lat_inters = int(np.ceil((max_lat - min_lat) / 0.49))
lons = np.linspace(min_lon, max_lon, num_of_lon_inters+1, endpoint=True).round(7)
lats = np.linspace(min_lat, max_lat, num_of_lat_inters+1, endpoint=True).round(7)
sub_bbox_list = [(lons[m],lats[n],lons[m+1],lats[n+1]) for m in range(num_of_lon_inters) for n in range(num_of_lat_inters)]
print(f'number of subregions: {len(sub_bbox_list)}')
p = mp.Pool(processes=processors)
vehicle_trace_list_regions = p.map(getRegionData, sub_bbox_list)
vehicle_trace_list_ = []
vehicle_no = 0
for vehicle_trace_list_region in vehicle_trace_list_regions:
for vehicle_trace in vehicle_trace_list_region:
for trace_point in vehicle_trace:
vehicle_trace_list_.append((vehicle_no, *trace_point))
vehicle_no += 1
# vehicle_trace_list = []
# for sub_bbox in sub_bbox_list:
# vehicle_trace_list_region = getRegionData(sub_bbox)
# vehicle_trace_list += vehicle_trace_list_region
#
# vehicle_trace_list_ = []
# for vehicle_no, vehicle_trace in enumerate(vehicle_trace_list):
# for trace_point in vehicle_trace:
# vehicle_trace_list_.append((vehicle_no, *trace_point))
vehicle_trace_df = pd.DataFrame(vehicle_trace_list_, columns=['agent_id','time','x_coord','y_coord'])
p = re.compile('T(.*)Z')
additional_info_list = []
pre_agent_id = -1
point_no = 0
for i in range(len(vehicle_trace_df)):
agent_id = vehicle_trace_df.loc[i,'agent_id']
if agent_id != pre_agent_id:
point_no = 0
else:
point_no += 1
time_str_list = re.findall(p,vehicle_trace_df.loc[i,'time'])
try:
time_str = time_str_list[0]
hh,mm,ss = time_str.split(':')
except:
hh,mm,ss = '', '', ''
additional_info_list.append((point_no, hh, mm, ss))
pre_agent_id = agent_id
additional_info_df = pd.DataFrame(additional_info_list, columns=['trace_point_no','hh','mm','ss'])
vehicle_trace_complete = pd.concat([vehicle_trace_df, additional_info_df], axis=1)
vehicle_trace_complete = vehicle_trace_complete[['trace_point_no','agent_id','x_coord','y_coord','time','hh','mm','ss']]
vehicle_trace_complete.to_csv('osm_gps.csv', index=False)
if __name__ == '__main__':
downloadGPSData()