In [None]:
# Import relevant libraries

import math
import csv
import requests
from enum import Enum
import folium
import xml.etree.ElementTree as ET
import utm

# **Part 1**: get data and filter

In [None]:

# Initialise variables

m = folium.Map(location=[51.380001, -2.360000])

class p(Enum):
    count_point_id = 0
    direction_of_travel = 1
    year = 2
    count_date = 3
    hour = 4
    region_id = 5
    region_name = 6
    local_authority_id = 7
    local_authority_name = 8
    road_name = 9
    road_type = 10
    start_junction_road_name = 11
    end_junction_road_name = 12
    easting = 13
    northing = 14
    latitude = 15
    longitude = 16
    link_length_km = 17
    link_length_miles = 18
    pedal_cycles = 19
    two_wheeled_motor_vehicles = 20
    cars_and_taxis = 21
    buses_and_coaches = 22
    lgvs = 23
    hgvs_2_rigid_axle = 24
    hgvs_3_rigid_axle = 25
    hgvs_4_or_more_rigid_axle = 26
    hgvs_3_or_4_articulated_axle = 27
    hgvs_5_articulated_axle = 28
    hgvs_6_articulated_axle = 29
    all_hgvs = 30
    all_motor_vehicles = 31

# town_name = input("Enter town name : ");

In [None]:
# Get count point data for relevant county

x = requests.get('https://storage.googleapis.com/dft-statistics/road-traffic/downloads/rawcount/local_authority_id/dft_rawcount_local_authority_id_115.csv');
raw_counts = x.text.split('\n');
raw_counts = list(csv.reader(raw_counts));
list.pop(raw_counts);
raw_counts.pop(0);

In [None]:
# Reduce each count for a point at a certain time of day into one average value

# first only include points from 2018
raw_counts = [point for point in raw_counts if point[p.year.value]=='2018'];

# now reduce all values to single averages for each time of day
point_counts = []
points = []

def find_point(point_id):
    
    for i, point_count in enumerate(point_counts):
        if (point_count['point']['id'] == point_id):
            return i;
    
    return -1;


def add_count(averaged_counts, count):

    for averaged_count in averaged_counts:
        if (averaged_count['hour'] == int(count[p.hour.value])):
            # add count to average
            averaged_count['value_count'] += 1;
            averaged_count['value_total'] += int(count[p.cars_and_taxis.value]);
            return;
    
    averaged_counts.append({
        'hour': int(count[p.hour.value]), 
        'value_count': 1, 
        'value_total': int(count[p.cars_and_taxis.value])
    });

    return averaged_counts;


# reduce raw counts to average count at each count point
for count in raw_counts:

    point_id = count[p.count_point_id.value];
    point_index = find_point(point_id);
    
    # if point_index is -1, then it hasn't been encountered yet so add new entry
    if (point_index == -1):
        points.append({
            'id': point_id, 
            'road_name': count[p.road_name.value],
            'latitude': float(count[p.latitude.value]),
            'longitude': float(count[p.longitude.value]),
            'utm': utm.from_latlon(float(count[p.latitude.value]), float(count[p.longitude.value]))
        });
        point_counts.append({
            'point': points[-1],
            'counts': add_count([], count)
        });
    else:
        for point_count in point_counts:
            if (point_count['point']['id'] == point_id):
                add_count(point_count['counts'], count);

# **Part 2**: associate count points with lanes

In [None]:
# Extract all edges and their UTM position

tree = ET.parse('bath.net.xml');
root = tree.getroot();

# get origin UTM position
temp1 = root[0].attrib['projParameter'].split(' ');
utm_zone = int(temp1[1].split('=')[1]);

origin = root[0].attrib['netOffset'].split(',');
origin = [-1*float(coord) for coord in origin];

lanes = [];

def normalise_shape(shape_str):
     shape = [list(map(float, point.split(","))) for point in shape_str.split(" ")];
     for point in shape:
          point[0] += origin[0];
          point[1] += origin[1];

     return shape;


for element in root:
     if (element.tag == 'edge'):
          for child in element:
               if (child.tag == 'lane'):
                    lanes.append({
                         'id': child.attrib['id'],
                         'speed': float(child.attrib['speed']),
                         'shape': normalise_shape(child.attrib['shape'])
                    });

In [None]:
# For each count point, work out the closest lane from the utm position

def min_dist_to_lane(lane, point):

    min_dist = -1;

    for i in range(1, len(lane['shape'])):
        x = point['utm'][0];
        y = point['utm'][1];
        x1 = lane['shape'][i-1][0];
        y1 = lane['shape'][i-1][1];
        x2 = lane['shape'][i][0];
        y2 = lane['shape'][i][1];

        A = x - x1;
        B = y - y1;
        C = x2 - x1;
        D = y2 - y1;

        dot = A * C + B * D;
        len_sq = C * C + D * D;
        param = -1;
        if (len_sq != 0):
            param = dot / len_sq;

        xx = 0.0;
        yy = 0.0;

        if (param < 0):
            xx = x1;
            yy = y1;
        elif (param > 1):
            xx = x2;
            yy = y2;
        else:
            xx = x1 + param * C;
            yy = y1 + param * D;

        dx = x - xx;
        dy = y - yy;
        dist = math.sqrt(dx * dx + dy * dy);

        if (min_dist == -1 or dist<min_dist):
            min_dist = dist;
    
    return min_dist;

# find closest lane for each count_point
for point in points:
    closest_lane = (-1, None);
    
    for lane in lanes:
        dist = min_dist_to_lane(lane, point);
        if (closest_lane[0] == -1 or dist<closest_lane[0]):
            closest_lane = (dist, lane);
    
    point['lane'] = closest_lane;

    lane_start_utm = closest_lane[1]['shape'][0];
    lane_start = utm.to_latlon(lane_start_utm[0], lane_start_utm[1], utm_zone, northern=True);

# filter out count points which are more that 10 metres away from the closest lane
points = [point for point in points if point['lane'][0]<10];

# place markers for each counting point on map
for point in points:
    # print("Point " + point['id'] + " is matched with lane " + point['lane'][1]['id'])
    folium.Marker([float(point['latitude']), float(point['longitude'])], popup=point['id'], icon=folium.Icon(color="red")).add_to(m)

m


# **Part 3**: Generate SUMO files

In [None]:
# Generate detector XML file

root = ET.Element("detectors")

for point in points:
    ET.SubElement(root, "detectorDefinition", id=str(point['id']), lane=str(point['lane'][1]['id']), pos='0');

tree = ET.ElementTree(root)
tree.write("detectors.xml")

In [None]:
# Generate detector counts csv file

f = open('detector_counts.csv', 'w')
writer = csv.writer(f)

writer.writerow(['Detector', 'Time', 'qPKW', 'qLKW', 'vPKW', 'vLKW']);
for point_count in point_counts:
    for count in point_count['counts']:
        writer.writerow([
            point_count['point']['id'],
            str(count['hour']*60),
            str(int(count['value_total']/count['value_count'])),
            str(int(point_count['point']['lane'][1]['speed']*3.6)),
            '',
            ''
        ]);

# dfrouter -n bath.net.xml -d detectors.xml -f detector_counts.csv -o test.rou.xml --vtype t --routes-for-all t --guess-empty-flows t