In [1]:
import requests
import matplotlib.pyplot as plt
import numpy as np
from OSMPythonTools.overpass import overpassQueryBuilder, Overpass
from pathlib import Path
import json
from scipy.interpolate import splprep, splev
from geopy import distance
import beamngpy
import uuid

# CONSTANTS

In [2]:
SIM_ROAD_MINIMAL_HEIGHT = 0
DECAL_ROAD_WIDTH = 8
MESH_ROAD_WIDTH = DECAL_ROAD_WIDTH * 3
MESH_ROAD_DEPTH = 5
RISK_VALUE = 0.7
MAX_SPEED = 13.411194443293 # 30 miles/hour

# level_path = Path(r'C:\Users\tupol\Documents\BeamNG.tech\levels\smallgrid')

BEAMNG_HOME_PATH = Path('C:\\Users\\tupol\\Documents\\Dissertation\\BeamNG.tech.v0.21.3.0')
BEAMNG_USER_PATH = Path('C:\\Users\\tupol\\Documents\\Dissertation\\beamng_user\\0.21')
LEVEL_NAME = "smallgrid" 
PARENT_DIRECTORY = "Roads"

SHEFFIELD_BBOX = [53.356987, -1.510101, 53.402656, -1.433196]
KETY_BBOX = [49.839258, 19.153293, 49.920978, 19.305325]
KRAKOW_BBOX = [49.973493, 19.807804, 50.123627, 20.097225]


### CHOOSE STREET

In [3]:
# street_name = "Abney Street"
# bbox = SHEFFIELD_BBOX
street_name = "Czaniecka"
bbox = KETY_BBOX

# GEO DATA

In [4]:
def enrich_with_elevation(points, dataset="eudem25m"):
    
    points_str = [f"{lat},{lon}" for lon, lat in points]
    locations = "|".join(points_str)
    response = requests.get(f"https://api.opentopodata.org/v1/{dataset}?locations={locations}")
    elevations = [p['elevation'] for p in response.json()['results']]

    lons = [x for (x,_) in points]
    lats = [y for (_,y) in points]

    ret = zip(lons, lats, elevations)
    return list(ret)
    
def get_street_points(street_name,bbox):
    overpass = Overpass()
    query = overpassQueryBuilder(bbox=bbox,
    elementType='way',selector=f'"name"="{street_name}"')

    road_points = []
    for e in overpass.query(query).elements():
        for node in e.nodes():
            road_points.append((node.lon(), node.lat()))

    

    road_points = enrich_with_elevation(road_points)
    return road_points


In [5]:
geo_points = get_street_points(street_name, bbox)
print(geo_points)

[(19.257576, 49.8622271, 319.13616943359375), (19.2576442, 49.8623352, 318.88177490234375), (19.2577738, 49.8626507, 317.9229736328125), (19.2578349, 49.8629234, 316.8909912109375), (19.2578477, 49.8630905, 316.21990966796875), (19.2578269, 49.863242, 315.61163330078125), (19.2576378, 49.8638651, 313.501220703125), (19.2576076, 49.8640435, 313.29254150390625), (19.2576014, 49.8646039, 314.5698547363281), (19.2576184, 49.8649705, 314.90338134765625), (19.2575971, 49.865054, 314.7142639160156), (19.2575328, 49.8651341, 314.4497375488281), (19.2574267, 49.8651919, 314.0150451660156), (19.2571815, 49.8652445, 313.13604736328125), (19.2567054, 49.8653025, 311.5503234863281), (19.2564297, 49.8653302, 310.6714782714844), (19.2562537, 49.8653713, 310.0884704589844), (19.2561541, 49.8654394, 309.7017822265625), (19.2560951, 49.8655154, 309.447509765625), (19.2560676, 49.8656287, 309.22271728515625), (19.256082, 49.8663528, 308.3063049316406), (19.2561104, 49.8665442, 308.07684326171875), (19.25

# CONVERTING TO BEAMNG COORDINATE SYSTEM

In [6]:
def middle(points):
    return (max(points) + min(points)) / 2


def _geo_to_simulation_points(geo_points):


    # SIM_ROAD_MINIMAL_HEIGHT = 1

    lons = [x for x,_,_ in geo_points] #geo x's
    lats = [y for _,y,_ in geo_points] #geo y's
    els = [z for _,_,z in geo_points] 

    geo_center = np.array((middle(lons), middle(lats)))
    z_offset = min(els)
    sim_points = []

    for x, y, z in geo_points:
        geo_point = np.array([x,y])

        relative_to_geo_center = geo_point-geo_center
        dist = distance.distance(geo_center, geo_point).m
        theta = distance.atan2(relative_to_geo_center[1], relative_to_geo_center[0])

        sim_x = dist * np.cos(theta)
        sim_y = dist * np.sin(theta)
        
        sim_z = z-z_offset+SIM_ROAD_MINIMAL_HEIGHT

        sim_points.append((sim_x, sim_y, sim_z))

    return sim_points

In [7]:
def _get_simulation_size(points):
    points = np.array(points)
    xs = points[:, 0]
    ys = points[:, 1]
    x_range = np.max(xs)-np.max(xs)
    y_range = np.max(ys)-np.max(ys)
    sim_size = max(x_range, y_range)

    return sim_size

In [8]:
sim_points = _geo_to_simulation_points(geo_points)
sim_size = _get_simulation_size(sim_points)

In [9]:
def _recenter_points(points):
    points = np.array(points)
    points[:, 0:2] += sim_size/2 # x and y values
    return points


In [10]:

sim_points = _recenter_points(sim_points)

# INTERPOLATE POINTS

In [11]:
def interpolate(points):
    
    SPLINE_DEGREE = 2
    INTERPOLATED_POINTS_FOR_EACH_POINT = 10
    
    pos_tck, _ = splprep(points.T, s=1, k=SPLINE_DEGREE)

    N_POINTS = points.shape[0] * INTERPOLATED_POINTS_FOR_EACH_POINT
    unew = np.linspace(0, 1, N_POINTS)

    interpolated = splev(unew, pos_tck) #retured as list of ND arrays
    return np.array(interpolated).T



In [12]:
sim_points = interpolate(sim_points)
sim_points.shape

(360, 3)

# WRITE TO SCENARIO TEMPLATE

### READ ROAD DATA FROM SCENARIO

In [13]:
road_file_path = BEAMNG_USER_PATH / 'levels' / LEVEL_NAME / 'main' / 'MissionGroup' / 'Roads' / 'items.level.json'
print(road_file_path)
with open(road_file_path, "r") as f:
    decal_road_data = json.loads(f.readline())
    mesh_road_data = json.loads(f.readline())


assert decal_road_data['class'] == 'DecalRoad', "Invalid decal road data"
assert mesh_road_data['class'] == 'MeshRoad', "Invalid mesh road data"

C:\Users\tupol\Documents\Dissertation\beamng_user\0.21\levels\smallgrid\main\MissionGroup\Roads\items.level.json


### PREPARE NEW DECAL ROAD NODES

In [14]:
n_points = sim_points.shape[0]

suffix = np.array([DECAL_ROAD_WIDTH])

#append road width
suffix_matrix = np.tile(suffix, (n_points, 1))
decal_new_nodes = np.hstack((sim_points, suffix_matrix)).tolist()

# decal_new_nodes

In [15]:
decal_road_data['nodes'] = decal_new_nodes

### PREPARE NEW MESH ROD NODES

In [16]:
suffix = np.array([MESH_ROAD_WIDTH, MESH_ROAD_DEPTH, 0, 0, 1])

n_points = sim_points.shape[0]

#repeat suffix data for each node
suffix_matrix = np.tile(suffix, (n_points, 1))

#append every node with suffix data
mesh_new_nodes = np.hstack((sim_points, suffix_matrix)).tolist()

In [17]:
mesh_road_data['nodes'] = mesh_new_nodes

### SET WAYPOINT

In [18]:
goal_waypoint_data = {
    'name': 'GoalWaypoint',
    'class': 'BeamNGWaypoint',
    'persistentId': str(uuid.uuid4()),
    '__parent': 'Roads',
    'position': list(sim_points[-1]),
    'scale': [DECAL_ROAD_WIDTH]*3, #x y z size
}


### WRITE NEW ROAD NODES TO FILE

In [19]:
def write_json(json_data, file_handler):
    json.dump(json_data, file_handler) #dump json in one line
    file_handler.write("\n") #go to new line

In [20]:
with open(road_file_path, 'w') as f:
    write_json(decal_road_data, f)
    write_json(mesh_road_data, f)
    write_json(goal_waypoint_data, f)
    

# TEST ON BEAMNG

### BRING UP BEAMNG

In [28]:
from beamngpy import BeamNGpy, Scenario, Vehicle

# Instantiate BeamNGpy instance running the simulator from the given path,
# communicating over localhost:64256
bng = BeamNGpy('localhost', 64256, home=BEAMNG_HOME_PATH, user=BEAMNG_USER_PATH)
# Launch BeamNG.tech
bng.open(launch=True)

<beamngpy.beamng.BeamNGpy at 0x19bdaf00888>

### CREATE SCENATIO

In [22]:
def calc_point_edges(p1, p2):
    
    height = p1[2]
    origin = np.array(p1[0:2])
    next = np.array(p2[0:2])

    direction_v = np.subtract(next, origin)

    # calculate the vector which length is half the road width
    v = (direction_v / np.linalg.norm(direction_v)) * DECAL_ROAD_WIDTH / 2
    # add normal vectors
    left_point = origin + np.array([-v[1], v[0]])
    right_point = origin + np.array([v[1], -v[0]])

    #add origin height
    left_point = np.append(left_point, height)
    right_point = np.append(right_point, height)

    return left_point, right_point

In [23]:
def vehicle_start_pose(meters_from_road_start=2.5):

    p1 = sim_points[0]
    p2 = sim_points[1]

    _, p1r = calc_point_edges(p1, p2)
    p1r = p1r[0:2]

    
    direction = np.subtract(p2[0:2], p1[0:2])
    v = (direction / np.linalg.norm(direction)) * meters_from_road_start
    middle_of_lane = np.add(p1[0:2], p1r[0:2]) / 2 #making car spawn in the middle of right lane
    deg = np.degrees(np.arctan2([-v[0]], [-v[1]]))
    rot = (0, 0, deg[0])
    pos = tuple(middle_of_lane + v) + (p1[2],)

    return pos, rot


In [29]:
beamngpy.logging.basicConfig(filename="beamng.log")
vehicle = Vehicle('car', model='etk800', licence='OSM Testing', color='Blue')
# Create a scenario in smallgrid called 'osm_testing'
scenario = Scenario('smallgrid', 'osm_testing')

# Add it to our scenario at this position and rotation


pos, rot = vehicle_start_pose()

scenario.add_vehicle(vehicle, pos=pos, rot=rot)
# Place files defining our scenario for the simulator to read
scenario.make(bng)

bng.load_scenario(scenario)
try:
    scenario.start()
except Exception:
    pass

In [25]:
vehicle.ai_set_aggression(RISK_VALUE)
vehicle.ai_set_speed(1000, mode='limit')
vehicle.ai_drive_in_lane(True)
vehicle.ai_set_waypoint(goal_waypoint_data["name"])

In [30]:
bng.close()