In [21]:
from copy import deepcopy
import itertools
import json
import math
import os

from dataclasses import dataclass
import defusedxml.ElementTree
import flatbuffers
import numpy as np
from pathlib import Path
import plotly.express as px
from pyproj import Proj
import scipy.stats
import toml

from ricetimer.proto import Map, MapHeader

In [22]:
spec_path = Path(r'../data/map/thunderhill.toml')
spec = toml.load(spec_path)
print(spec['header'])
print(spec['osm'])

{'name': 'Thunderhill', 'type': 'Circuit', 'origin': {'lat': 39.5384764, 'lon': -122.3311785}, 'timezone': 'America/Los_Angeles'}
{'file': 'thunderhill.osm'}


In [23]:
xml_path = (spec_path.parent / spec['osm']['file']).resolve()
print(xml_path)
xml = defusedxml.ElementTree.parse(xml_path)

E:\proj\rice-timer\data\map\thunderhill.osm


In [24]:
origin_lat = spec['header']['origin']['lat']
origin_lon = spec['header']['origin']['lon']
print(origin_lat, origin_lon)

39.5384764 -122.3311785


In [25]:
projection = Proj(proj='tmerc', lat_0=origin_lat, lon_0=origin_lon, k_0=1.0, ellps="WGS84")

In [26]:
nodes = []
node_i_to_id = []
node_id_to_i = dict()
for node_i, node in enumerate(xml.iterfind("./node")):
    id = node.get('id')
    lat = node.get('lat')
    lon = node.get('lon')
    x, y = projection(lon, lat)
    node_id_to_i[int(id)] = node_i
    node_i_to_id.append(id)
    nodes.append((x, y))
node_i_to_id = np.array(node_i_to_id, dtype=np.int64)
nodes = np.array(nodes)

In [27]:
MAX_CHECKPOINT_POSITION_ERROR_M = 10
checkpoints = spec['checkpoints']
num_checkpoints = len(checkpoints)
checkpoint_i_to_node_i = [None] * num_checkpoints
node_id_to_checkpoint_i = dict()
for checkpoint_i, checkpoint in enumerate(checkpoints):
    position = np.array(projection(
        checkpoint['position']['lon'],
        checkpoint['position']['lat']))
    dist = np.linalg.norm(nodes - np.array(position), axis=1)
    node_i = np.argmin(dist)
    if dist[node_i] <= MAX_CHECKPOINT_POSITION_ERROR_M:
        checkpoint_i_to_node_i[checkpoint_i] = node_i
        node_id_to_checkpoint_i[node_i_to_id[node_i]] = checkpoint_i
    else:
        raise LookupError(f"checkpoint #{checkpoint_i} not found in map")

In [28]:
@dataclass
class Track:
    ref_line: np.ndarray
    from_checkpoint_index: int
    to_checkpoint_index: int

In [29]:
tracks = []
checkpoint_dirs = [[] for _ in range(num_checkpoints)]
checkpoint_mean_dirs = [None] * num_checkpoints
for way in xml.iterfind("./way"):
    nodes_id = [int(node_ref.get('ref')) for node_ref in way.iterfind("./nd")]
    nodes_i = np.array([node_id_to_i[node_id] for node_id in nodes_id])
    checkpoints_on_way = [
        (i, node_id_to_checkpoint_i[node_id])
        for i, node_id in enumerate(nodes_id)
        if node_id in node_id_to_checkpoint_i]
    if not checkpoints_on_way:
        continue
    
    for seq, checkpoint_i in checkpoints_on_way:
        checkpoint = checkpoints[checkpoint_i]
        if 'heading_deg' not in checkpoint and checkpoint['type'] != 'Junction':
            try:
                v = nodes[nodes_i[seq]] - nodes[nodes_i[seq - 1]]
                angle = np.arctan2(v[1], v[0])
                checkpoint_dirs[checkpoint_i].append(angle)
            except IndexError:
                pass
            try:
                v = nodes[nodes_i[seq + 1]] - nodes[nodes_i[seq]]
                angle = np.arctan2(v[1], v[0])
                checkpoint_dirs[checkpoint_i].append(angle)
            except IndexError:
                pass

    for (i1, j1), (i2, j2) in zip(checkpoints_on_way[:-1], checkpoints_on_way[1:]):
        tracks.append(Track(
            ref_line=nodes[nodes_i[i1:i2+1]],
            from_checkpoint_index=j1,
            to_checkpoint_index=j2,
        ))
    if nodes_id[0] == nodes_id[-1]:
        i1, j1 = checkpoints_on_way[-1]
        i2, j2 = checkpoints_on_way[0]
        tracks.append(Track(
            ref_line=np.vstack([
                nodes[nodes_i[i1:]],
                nodes[nodes_i[:i2+1]],
            ]),
            from_checkpoint_index=j1,
            to_checkpoint_index=j2,
        ))

for checkpoint_i, dirs in enumerate(checkpoint_dirs):
    if dirs:
        mean = scipy.stats.circmean(dirs, low=-math.pi, high=math.pi)
        checkpoint_mean_dirs[checkpoint_i] = math.degrees(mean)

In [30]:
tracks.sort(key=lambda track: (track.from_checkpoint_index, track.to_checkpoint_index))

In [31]:
len(tracks)

15

In [32]:
for track in tracks:
    print(track.from_checkpoint_index, track.to_checkpoint_index)
    fig = px.line(x=track.ref_line[:, 0], y=track.ref_line[:, 1])
    fig.update_yaxes(scaleanchor='x', scaleratio=1)
    fig.show()

0 1


1 2


2 3


2 4


3 5


4 5


5 6


6 7


7 8


8 9


9 10


10 11


11 12


12 13


13 0


In [33]:
## THIS IS SO RETARDED

# builder = flatbuffers.Builder(2048)
# name = builder.CreateString(spec['header']['name'])

# MapHeader.Start(builder)
# MapHeader.AddName(builder, )
# header = MapHeader.End(builder)
# Map.Start(builder)
# Map.AddHeader(builder, )

In [34]:
output = {
    'header': deepcopy(spec['header']),
    'checkpoints': deepcopy(checkpoints),
    'tracks': [],
}

In [35]:
for checkpoint_i, out_checkpoint in enumerate(output['checkpoints']):
    position = nodes[checkpoint_i_to_node_i[checkpoint_i]]
    output['checkpoints'][checkpoint_i]
    output['checkpoints'][checkpoint_i]['position'] = {'x': position[0], 'y': position[1]}
    del output['checkpoints'][checkpoint_i]['description']
    if checkpoint_mean_dirs[checkpoint_i]:
        output['checkpoints'][checkpoint_i]['heading_deg'] = checkpoint_mean_dirs[checkpoint_i]

In [37]:
for track in tracks:
    ref_line = track.ref_line
    out_ref_line = []
    for i in range(ref_line.shape[0]):
        out_ref_line.append({'x': ref_line[i, 0], 'y': ref_line[i, 1]})
    output['tracks'].append({
        'ref_line': out_ref_line,
        'from_checkpoint_index': track.from_checkpoint_index,
        'to_checkpoint_index': track.to_checkpoint_index,
    })

In [38]:
with open(spec_path.with_suffix('.json'), 'w', encoding='utf8') as f:
    json.dump(output, f, indent=2)