In [113]:
import csv
import glob
import json
import locale
import os
import re
import tempfile
import xml.etree.ElementTree as ET
import zipfile
from calendar import monthrange
from datetime import datetime, timedelta
from io import TextIOWrapper
from math import floor
from pathlib import Path

import boto3
import geojson
import geopandas
import numpy as np
import pandas as pd
import pyproj
import requests
from shapely.geometry import Point
from shapely.ops import transform

# DATA_PATH expects a working structure with mets10 data generated and city subfolders
# ├── loop_counters
# │   ├── berlin
# │   │   ├── downloads
# │   │   └── speed
# │   ├── london
# │   │   ├── downloads
# │   │   └── speed
# │   └── madrid
# │       ├── all
# │       └── downloads
# └── release20221026_residential_unclassified
#     ├── 2021
#     │   ├── road_graph
#     │   │   └── berlin
#     │   └── speed_classes
#     │       └── berlin
#     └── 2022
#         ├── road_graph
#         │   ├── london
#         │   └── madrid
#         └── speed_classes
#             ├── london
#             └── madrid
DATA_PATH = Path('/private/data/mets10') 
BERLIN_PATH = DATA_PATH / 'loop_counters' / 'berlin'
LONDON_PATH = DATA_PATH / 'loop_counters' / 'london'
MADRID_PATH = DATA_PATH / 'loop_counters' / 'madrid'
MELBOURNE_PATH = DATA_PATH / 'loop_counters' / 'melbourne'  # not used for MeTS-10 validations (no time overlap)

In [2]:
def get_gdf(df, id_field='id', bbox=None):
    df = df.copy()
    df['id'] = df[id_field].astype(str)
    if 'lat' not  in df.columns:
        df['lon'] = df.geometry.x
        df['lat'] = df.geometry.y
    if 'heading' not in df.columns:
        df['heading'] = -1
    df = df[['id', 'lat', 'lon', 'heading']]
    gdf = geopandas.GeoDataFrame(
        df, geometry=geopandas.points_from_xy(df.lon, df.lat))
    if bbox:
        ymin, ymax, xmin, xmax = tuple([v/100000 for v in bbox])
        gdf = gdf.cx[xmin:xmax, ymin:ymax]
    return gdf

# Berlin

Raw loop counter data was downloaded from
https://api.viz.berlin.de/daten/verkehrsdetektion/teu_standorte.json and https://api.viz.berlin.de/daten/verkehrsdetektion?path=2020%2FDetektoren+%28einzelne+Fahrspur%29%2F

In [3]:
berlin_locations_df = pd.read_excel(BERLIN_PATH / 'downloads' / 'Stammdaten_Verkehrsdetektion_2022_07_20.xlsx')
berlin_locations_df = berlin_locations_df[[
    'DET_NAME_NEU', 'DET_ID15', 'RICHTUNG', 'SPUR', 'LÄNGE (WGS84)', 'BREITE (WGS84)']]
berlin_locations_df = berlin_locations_df.rename(columns={
    'DET_NAME_NEU': 'detname', 'DET_ID15': 'detid_15', 'LÄNGE (WGS84)': 'lon', 'BREITE (WGS84)': 'lat',
    'SPUR': 'lane', 'RICHTUNG': 'heading'})
berlin_locations_df = berlin_locations_df.replace({'heading': {
    'Nord': 0, 'Nordost': 45, 'Ost': 90, 'Südost': 135,
    'Süd': 180, 'Südwest': 225, 'West': 270, 'Nordwest': 315
}})
berlin_locations_df = berlin_locations_df.groupby(by=['detname', 'detid_15', 'heading', 'lane', 'lon', 'lat']).count()
berlin_locations_df = berlin_locations_df.reset_index()
berlin_locations_df

Unnamed: 0,detname,detid_15,heading,lane,lon,lat
0,TE001_Det_HF1,100101010000167,225,HF_R,13.192578,52.433868
1,TE001_Det_HF2,100101010000268,225,HF_2vR,13.192578,52.433868
2,TE002_Det_HF1,100101010000369,45,HF_R,13.192747,52.433813
3,TE002_Det_HF2,100101010000470,45,HF_2vR,13.192747,52.433813
4,TE004_Det_HF1,100101010000874,180,HF_R,13.261301,52.436642
...,...,...,...,...,...,...
543,TE583_Det_HF2,100101010097975,0,HF_2vR,13.384196,52.457440
544,TE592_Det_HF1,100101010099692,180,HF_R,13.301719,52.509232
545,TE592_Det_HF2,100101010099793,180,HF_2vR,13.301719,52.509232
546,TE593_Det_HF1,100101010099894,0,HF_R,13.302183,52.508531


In [4]:
berlin_locations_df[berlin_locations_df['detid_15'].duplicated()]

Unnamed: 0,detname,detid_15,heading,lane,lon,lat


In [5]:
# Store the counter locations to geojson
get_gdf(berlin_locations_df, 'detid_15').to_file(BERLIN_PATH / 'counter_locations.geojson', driver="GeoJSON")

In [6]:
def get_counters_merged(year, month, locations_df):
    df = pd.read_csv(BERLIN_PATH / 'downloads' / f'det_val_hr_{year:04d}_{month:02d}.csv.gz', sep=';',
                     compression='gzip')
    print(f'Loaded {len(df)} rows with {len(df["detid_15"].unique())} unique counters')
    df = df.merge(locations_df, on=['detid_15'], how='left')
    df = df.rename(columns={'detid_15': 'id', 'detname': 'name'})
    df['time_bin'] = [f'{d[6:]}-{d[3:5]}-{d[:2]} {h:02d}:00' for d, h in zip(df['tag'], df['stunde'])]
    df.to_parquet(BERLIN_PATH / 'speed' / f'counters_{year:04d}-{month:02d}.parquet', compression="snappy")
    return df

# TODO: uncomment if processing data
# get_counters_merged(2021, 7, berlin_locations_df)
get_counters_merged(2019, 6, berlin_locations_df)

Loaded 373739 rows with 546 unique counters


Unnamed: 0,id,tag,stunde,qualitaet,q_kfz_det_hr,v_kfz_det_hr,q_pkw_det_hr,v_pkw_det_hr,q_lkw_det_hr,v_lkw_det_hr,name,heading,lane,lon,lat,time_bin
0,100101010000167,01.06.2019,0,1.0,227,82.8,207,83.9,20,71.6,TE001_Det_HF1,225,HF_R,13.192578,52.433868,2019-06-01 00:00
1,100101010000167,01.06.2019,1,1.0,131,78.4,115,80.5,16,63.7,TE001_Det_HF1,225,HF_R,13.192578,52.433868,2019-06-01 01:00
2,100101010000167,01.06.2019,2,1.0,94,77.0,74,80.6,20,63.3,TE001_Det_HF1,225,HF_R,13.192578,52.433868,2019-06-01 02:00
3,100101010000167,01.06.2019,3,1.0,83,76.4,68,76.2,15,77.3,TE001_Det_HF1,225,HF_R,13.192578,52.433868,2019-06-01 03:00
4,100101010000167,01.06.2019,4,1.0,131,76.9,118,78.8,13,59.9,TE001_Det_HF1,225,HF_R,13.192578,52.433868,2019-06-01 04:00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
373734,100101010099995,30.06.2019,19,1.0,191,43.4,189,43.4,2,42.0,TE593_Det_HF2,0,HF_2vR,13.302183,52.508531,2019-06-30 19:00
373735,100101010099995,30.06.2019,20,1.0,219,42.2,216,42.4,3,27.3,TE593_Det_HF2,0,HF_2vR,13.302183,52.508531,2019-06-30 20:00
373736,100101010099995,30.06.2019,21,1.0,265,39.6,257,39.9,8,29.5,TE593_Det_HF2,0,HF_2vR,13.302183,52.508531,2019-06-30 21:00
373737,100101010099995,30.06.2019,22,1.0,158,43.2,157,43.3,1,42.0,TE593_Det_HF2,0,HF_2vR,13.302183,52.508531,2019-06-30 22:00


# London

Scraping counter data from the Highways England WebTRIS API https://webtris.highwaysengland.co.uk/

In [7]:
WEBTRIS = 'https://webtris.highwaysengland.co.uk/api/v1'
LONDON_BBOX = [5120500, 5170000, -36900, 6700]

def save_json_rows(data, file_name):
    if not data:
        return
    with open(LONDON_PATH / 'downloads' / f'{file_name}.json', 'w') as f:
        for row in data:
            json.dump(row, f)
            f.write('\n')

def get_webtris_sites(bbox):
    min_lat, max_lat, min_lon, max_lon = tuple(bbox)
    url = f'{WEBTRIS}/sites'
    req = requests.get(url)
    req_json = req.json()
    print(req_json['row_count'])
    for site in req_json['sites']:
        lat_bin = int(floor(site['Latitude'] * 1e3) / 1e3 * 1e5)
        lon_bin = int(floor(site['Longitude'] * 1e3) / 1e3 * 1e5)
        if lat_bin >= min_lat and lat_bin <= max_lat:
            site['lat_bin'] = lat_bin
            site['lon_bin'] = lon_bin
            yield site

def time_ceil(time, delta):
    mod = (time - datetime(1970, 1, 1)) % delta
    if mod:
        return time + (delta - mod)
    return time

def time_bin_format(time):
    return time.strftime('%Y-%m-%d %H:%M')

def get_time_bins(datetime_str):
    time_bins = datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S')
    time_bins = time_ceil(time_bins, timedelta(minutes=5))
    return [
        time_bin_format(time_bins-timedelta(minutes=10)),
        time_bin_format(time_bins-timedelta(minutes=5)),
        time_bin_format(time_bins)
    ]

def join_site_info(data, sites_dict):
    for row in data:
        site = sites_dict[row['Site Name']]
        volume = row['Total Volume']
        if volume:
            volume = int(volume)
        else:
            volume = 0
        speed = row['Avg mph']
        if speed:
            speed = float(speed)*1.60934
        else:
            speed = 0
        time_bins = get_time_bins(f"{row['Report Date'][:10]} {row['Time Period Ending']}")
        yield {
            'id': int(site['Id']), 'name': row['Site Name'],
            'lat_bin': site['lat_bin'], 'lon_bin': site['lon_bin'],
            'lat': site['Latitude'], 'lon': site['Longitude'],
            'time_bins': time_bins,
            'volume': volume, 'speed': speed
        }

def get_chunk(id_chunk, date_from, date_to):
    date_from = datetime.strptime(date_from, '%Y-%m-%d').strftime('%d%m%Y')
    date_to = datetime.strptime(date_to, '%Y-%m-%d').strftime('%d%m%Y')
    req_ids_urlenc = ','.join(id_chunk)
    url = f'{WEBTRIS}/reports/{date_from}/to/{date_to}/Daily?sites={req_ids_urlenc}&page=1&page_size=10000'
    while True:
        print(url)
        req = requests.get(url)
        if req.status_code == 204:
            return
        req_json = req.json()
        yield from req_json['Rows']
        header = req_json['Header']
        print(header)
        if 'links' in header and len(header['links']) > 0 and header['links'][-1]['rel'] == 'nextPage':
            url =  header['links'][-1]['href']
            continue
        else:
            break

In [8]:
webtris_sites = list(get_webtris_sites(LONDON_BBOX))
save_json_rows(webtris_sites, 'webtris_sites')
print(len(webtris_sites))
webtris_sites[:2]

19355
4256


[{'Id': '1',
  'Name': 'MIDAS site at M4/2295A2 priority 1 on link 105009001; GPS Ref: 502816;178156; Westbound',
  'Description': 'M4/2295A2',
  'Longitude': -0.520379557723297,
  'Latitude': 51.4930115367112,
  'Status': 'Inactive',
  'lat_bin': 5149300,
  'lon_bin': -52100},
 {'Id': '5',
  'Name': 'MIDAS site at M25/5764B priority 1 on link 199056702; GPS Ref: 558308;188775; Anti-clockwise',
  'Description': 'M25/5764B',
  'Longitude': 0.283161593410359,
  'Latitude': 51.5756168053165,
  'Status': 'Active',
  'lat_bin': 5157500,
  'lon_bin': 28299}]

In [9]:
def download_webtris_chunks(webtris_sites, date_from, date_to, chunk_size=20):
    sites_dict = { s['Description']: s for s in webtris_sites }
    req_ids = [s['Id'] for s in webtris_sites]
    req_id_chunks = [req_ids[i:i + chunk_size] for i in range(0, len(req_ids), chunk_size)]
    print(len(req_id_chunks))
    for i in range(len(req_id_chunks)):
        print(f'Downloading chunk {i:03}...')
        chunk_data = get_chunk(req_id_chunks[i], date_from=date_from, date_to=date_to)
        joined_data = join_site_info(chunk_data, sites_dict)
        save_json_rows(joined_data, f'webtris_chunk_{date_from}_{date_to}_{i:03}')

# TODO: uncomment if processing data
# download_webtris_chunks(webtris_sites, '2019-07-01', '2019-07-31')
# download_webtris_chunks(webtris_sites, '2019-08-01', '2019-08-31')
# download_webtris_chunks(webtris_sites, '2019-09-01', '2019-09-30')
# download_webtris_chunks(webtris_sites, '2019-10-01', '2019-10-31')
# download_webtris_chunks(webtris_sites, '2019-11-01', '2019-11-30')
# download_webtris_chunks(webtris_sites, '2019-12-01', '2019-12-31')
# download_webtris_chunks(webtris_sites, '2020-01-01', '2020-01-31')

In [10]:
def t_from_time_bin(tb, do_floor=True):
    h, m = tb[10:].split(':')
    t = int(h) * 4 + int(float(m) / 15)
    if not do_floor:
        return t
    # T4c speed is floored, counters seem ceiled --> subtract 1
    t -= 1
    if t < 0:
        t = 95
    return t

def day_from_time_bin(tb, do_floor=True):
    if do_floor and t_from_time_bin(tb) == 95:
        d = datetime.strptime(tb, '%Y-%m-%d %H:%M') - timedelta(days=1)
        return d.strftime('%Y-%m-%d')
    return tb[:10]

def convert_webtris_chunks(do_floor=True):
    chunk_files = sorted((LONDON_PATH / 'downloads').glob('webtris_chunk_*.json'))
    df = pd.concat([pd.read_json(cf,  lines=True) for cf in chunk_files])
    df['id'] = df['id'].astype(str)
    df['heading'] = -1
    df['time_bin'] = [x[2] for x in df['time_bins']]
    df = df.rename(columns={'speed': 'speed_counter'})
    df['t'] = [t_from_time_bin(tb, do_floor) for tb in df['time_bin']]
    df['day'] = [day_from_time_bin(tb, do_floor) for tb in df['time_bin']]
    df = df[['id', 'name', 'lat', 'lon', 'heading', 'time_bin', 'volume', 'speed_counter', 't', 'day']]
    return df

# TODO: uncomment if processing data
# webtris_df = convert_webtris_chunks()
# webtris_df

In [11]:
# TODO: uncomment if processing data
# webtris_df['name'] = webtris_df['name'].astype(str)
# webtris_df.to_parquet(LONDON_PATH / 'speed' / 'webtris_london_201907-202001.parquet', compression="snappy")

Read TfL TIMS locations (additional data, currently not used for validations)

In [115]:
bucket = 'roads.data.tfl.gov.uk'
prefix = 'TIMS/'
s3_client = boto3.client('s3')
    
def get_tims_csv_files(limit=-1):
    tims_csv_files = []
    paginator = s3_client.get_paginator("list_objects_v2")
    for page in paginator.paginate(Bucket=bucket, Prefix=prefix, Delimiter='/'):
        pf = [c['Key'] for c in page["Contents"] if c['Key'].endswith('.csv')]
        tims_csv_files.extend(pf)
        if limit > 0 and len(tims_csv_files) >= limit:
            return tims_csv_files[:limit]
    return tims_csv_files

def read_tims_csv_file(csv_file):
    response = s3_client.get_object(Bucket=bucket, Key=csv_file)
    return pd.read_csv(response.get("Body"))

def read_tims_day(day):
    ts = datetime.strptime(day, '%Y-%m-%d')
    tims_day_file = LONDON_PATH / 'downloads' /  f'tims_{day}.parquet'
    if os.path.exists(tims_day_file):
        return pd.read_parquet(tims_day_file)
    file_prefix = f'detdata{ts.day:02d}{ts.month:02d}{ts.year:04d}'
    files = [fp for fp in tims_csv_files if file_prefix in fp]
    print(f'Downloading {len(files)} files for {day}')
    df = pd.concat([read_tims_csv_file(fp) for fp in files])
    tims_day_file.parent.mkdir(exist_ok=True, parents=True)
    df.to_parquet(tims_day_file, compression="snappy")
    return df

tims_csv_files = get_tims_csv_files()
len(tims_csv_files)

In [246]:
tims_df = read_tims_day('2019-01-04')
tims_df

Unnamed: 0,TIMESTAMP,NODE,EASTING,NORTHING,FLOW_ACTUAL_15M,SAT_BANDINGS,DETECTOR_NO,TOTAL_DETECTOR_NO,DETECTOR_RATE
0,2019-01-03T23:44:00Z,14/013,544386.20,186690.59,193,0-79%,3,6,0.50
1,2019-01-03T23:44:00Z,14/015,543595.64,186425.27,297,0-79%,3,4,0.75
2,2019-01-03T23:44:00Z,14/016,540276.96,188758.87,61,0-79%,4,6,0.67
3,2019-01-03T23:44:00Z,14/019,542027.01,189905.20,139,0-79%,6,6,1.00
4,2019-01-03T23:44:00Z,14/020,540064.79,189082.82,129,0-79%,3,4,0.75
...,...,...,...,...,...,...,...,...,...
47027,2019-01-04T23:43:00Z,26/216,510671.00,185236.00,157,0-79%,3,3,1.00
47028,2019-01-04T23:43:00Z,27/001,521599.00,180897.40,391,0-79%,10,10,1.00
47029,2019-01-04T23:43:00Z,27/002,520569.57,181719.59,277,0-79%,5,8,0.63
47030,2019-01-04T23:43:00Z,27/004,518540.00,182590.00,372,0-79%,10,10,1.00


In [245]:
gbng = pyproj.CRS('EPSG:27700')
wgs84 = pyproj.CRS('EPSG:4326')
project = pyproj.Transformer.from_crs(gbng, wgs84, always_xy=True).transform

def en2ll(e, n):
    try:
        return transform(project, Point(e, n))
    except Exception as e:
        print(e)
        return Point(0, 0)

def get_projected_point(r):
    return en2ll(float(r['EASTING']), float(r['NORTHING']))

def get_tims_locations(df):
    df['id'] = df['NODE']
    print(f'Aggregating {len(df)} tims rows')
    df = df[['id', 'EASTING', 'NORTHING']].groupby(['id', 'EASTING', 'NORTHING']).min().reset_index()
    print(f'Found {len(df)} unique locations')
    df['geometry'] = df.apply(get_projected_point, axis=1)
    df = geopandas.GeoDataFrame(df, geometry='geometry')
    df['lon'] = df.geometry.x
    df['lat'] = df.geometry.y
    return df[['id', 'lat', 'lon']]

# Reading locations from different months to capture newly added or relevant temporary locations.
tims_locations = get_tims_locations(pd.concat([
    read_tims_day('2019-01-04'),  read_tims_day('2019-07-20'), read_tims_day('2020-02-10')
])) 
tims_locations

Aggregating 13045333 tims rows
Found 3882 unique locations


  arr = construct_1d_object_array_from_listlike(values)


Unnamed: 0,id,lat,lon
0,00/002,51.514163,-0.104402
1,00/003,51.511618,-0.075350
2,00/004,51.517597,-0.107617
3,00/005,51.511020,-0.108040
4,00/006,51.511665,-0.104276
...,...,...,...
3877,32/224,51.632029,-0.073554
3878,32/225,51.631973,-0.073398
3879,32/228,51.629826,-0.097257
3880,32/229,51.631823,-0.095122


In [94]:
# Create a dataframe with all london locations (WEBTRIS and TIMS)
webtris_locations = pd.DataFrame.from_records(webtris_sites).rename(
    columns={'Id': 'id', 'Latitude': 'lat', 'Longitude': 'lon'})
webtris_locations

Unnamed: 0,id,Name,Description,lon,lat,Status,lat_bin,lon_bin
0,1,MIDAS site at M4/2295A2 priority 1 on link 105...,M4/2295A2,-0.520380,51.493012,Inactive,5149300,-52100
1,5,MIDAS site at M25/5764B priority 1 on link 199...,M25/5764B,0.283162,51.575617,Active,5157500,28299
2,8,MIDAS site at M25/4876A priority 1 on link 199...,M25/4876A,-0.538796,51.433749,Active,5143300,-53900
3,14,MIDAS site at A2/8392M priority 1 on link 2000...,A2/8392M,0.381381,51.408466,Active,5140800,38100
4,16,MIDAS site at M2/8460A priority 1 on link 2000...,M2/8460A,0.465564,51.386570,Inactive,5138600,46500
...,...,...,...,...,...,...,...,...
4251,19757,MIDAS site at M4/2443B priority 1 on link 1050...,M4/2443B,-0.713142,51.499332,Active,5149900,-71400
4252,19758,MIDAS site at M4/2390A priority 1 on link 1050...,M4/2390A,-0.643031,51.508811,Active,5150800,-64400
4253,19759,MIDAS site at M4/2241A priority 1 on link 1991...,M4/2241A,-0.442319,51.494644,Active,5149400,-44300
4254,19761,MIDAS site at M4/2433A priority 1 on link 1050...,M4/2433A,-0.699329,51.500735,Active,5150000,-70000


In [95]:
london_locations = pd.concat([webtris_locations[['id', 'lat', 'lon']], tims_locations[['id', 'lat', 'lon']]])
london_locations = get_gdf(london_locations, bbox=LONDON_BBOX)
london_locations

Unnamed: 0,id,lat,lon,heading,geometry
7,28,51.268629,-0.166750,-1,POINT (-0.16675 51.26863)
13,46,51.264317,-0.132305,-1,POINT (-0.13230 51.26432)
17,57,51.273244,0.063816,-1,POINT (0.06382 51.27324)
22,75,51.259332,-0.108098,-1,POINT (-0.10810 51.25933)
25,80,51.258246,-0.053938,-1,POINT (-0.05394 51.25825)
...,...,...,...,...,...
3683,32/210,51.612580,-0.113407,-1,POINT (-0.11341 51.61258)
3684,32/224,51.632029,-0.073554,-1,POINT (-0.07355 51.63203)
3685,32/225,51.631973,-0.073398,-1,POINT (-0.07340 51.63197)
3686,32/228,51.629826,-0.097257,-1,POINT (-0.09726 51.62983)


In [96]:
# Store the counter locations to geojson
london_locations.to_file(LONDON_PATH / 'counter_locations.geojson', driver="GeoJSON")

In [172]:
def normalize_volumes(time_bins, volumes, num_bins=96):
    result = []
    for ts, vs in zip(time_bins, volumes):
        res_volumes = [-1 for _ in range(num_bins)]
        for idx, v in zip(ts, vs):
            assert(0 <= idx < num_bins)
            res_volumes[idx] = int(round(float(v)))
        result.append(res_volumes)
    return result


def process_volume_normalization(df):
    df = df.groupby(by=['day', 'id', 'lat', 'lon', 'heading']).agg(list).reset_index()
    df['volume'] = normalize_volumes(df['t'], df['volume'])
    print(f'Aggregated {len(df)} counter volume lists')
    return df #[['id', 'lat', 'lon', 'heading', 'day', 't', 'volume']]


def process_tims_day(day):
    day_ts = datetime.strptime(day, '%Y-%m-%d')
    dfa =pd.concat([
        read_tims_day((day_ts - timedelta(days=1)).strftime('%Y-%m-%d')),
        read_tims_day(day),
        read_tims_day((day_ts + timedelta(days=1)).strftime('%Y-%m-%d'))
    ])
    raw_num_records = len(dfa)
    dfa = dfa[dfa['TIMESTAMP'].str.startswith(day)]
    print(f'Read {len(dfa)}/{raw_num_records} records for {day}')
    raw_num_records = len(dfa)
    
    # Filter invalid records, convert the fields and generate time bins
    dfa['ts'] = pd.to_datetime(dfa['TIMESTAMP'], infer_datetime_format=True, errors='coerce')
    dfa = dfa[dfa['ts'].notna()]
    dfa['t'] = dfa['ts'].dt.hour * 4 + (dfa['ts'].dt.minute/15).astype(int)
    dfa['day'] = day
    dfa = dfa.drop(columns=['TIMESTAMP'])
    dfa = dfa.rename(columns={
        'NODE': 'id', 'EASTING': 'east', 'NORTHING': 'north', 'FLOW_ACTUAL_15M': 'flow_15m',
        'SAT_BANDINGS': 'sat_bandings', 'DETECTOR_NO': 'det_no', 'TOTAL_DETECTOR_NO': 'num_det',
        'DETECTOR_RATE': 'detector_rate'
    })
    dfa['east'] = pd.to_numeric(dfa['east'], errors='coerce')
    dfa['north'] = pd.to_numeric(dfa['north'], errors='coerce')
    dfa = dfa[dfa['east'].notna() & dfa['north'].notna()]
    if len(dfa) < raw_num_records:
        print(f'  filtered {raw_num_records - len(dfa)} invalid records')
    dfa['flow_15m'] = pd.to_numeric(dfa['flow_15m'], errors='coerce')
    dfa['det_no'] = pd.to_numeric(dfa['det_no'], errors='coerce')
    dfa['num_det'] = pd.to_numeric(dfa['num_det'], errors='coerce')
    dfa['detector_rate'] = pd.to_numeric(dfa['detector_rate'], errors='coerce')
    
    # Aggregate to average 15 minute volumes and generate one row per day
    df = dfa[['day', 't', 'id', 'east', 'north', 'flow_15m']].groupby(
        by=['day', 't', 'id', 'east', 'north']).mean().reset_index()
    df = df.rename(columns={'flow_15m': 'volume'})
    print(f'Collected {len(df)} time bin volumes for {day}')
    df['lat'] = [en2ll(float(e), float(n)).y for e, n in zip(df['east'], df['north'])]
    df['lon'] = [en2ll(float(e), float(n)).x for e, n in zip(df['east'], df['north'])]
    df['heading'] = -1.0
    return df[['id', 'lat', 'lon', 'heading', 'day', 't', 'volume']]


def process_tims_month(year, month):
    tims_month_file = LONDON_PATH / 'flow' / f'counters_tims_{year:04d}-{month:02d}.parquet'
    if os.path.exists(tims_month_file):
        print(f'File {tims_month_file} exists already')
        return
    
    day_dfs = []
    _, n = monthrange(year, month)
    for i in range(1, n + 1):
        day = f'{year:04d}-{month:02d}-{i:02d}'
        df = process_tims_day(day)
        if len(df) == 0:
            continue
        day_dfs.append(df)
    month_df = pd.concat(day_dfs)
    
    lat_min, lat_max, lon_min, lon_max = tuple([ll/100000 for ll in LONDON_BBOX])
    month_df = month_df[
        (month_df['lat'] >= lat_min) & (month_df['lat'] <= lat_max) &
        (month_df['lon'] >= lon_min) & (month_df['lon'] <= lon_max)]
    
    tims_month_file.parent.mkdir(exist_ok=True, parents=True)
    month_df.to_parquet(tims_month_file, compression="snappy")
    return month_df


# process_tims_day('2019-07-04')
# TODO: uncomment if processing data
process_tims_month(2019, 7)

Read 4578603/13710719 records for 2019-07-01
Collected 327551 time bin volumes for 2019-07-01
Read 4610633/13796313 records for 2019-07-02
Collected 329353 time bin volumes for 2019-07-02
Read 4618687/13837389 records for 2019-07-03
Collected 330089 time bin volumes for 2019-07-03
Read 4609916/13840808 records for 2019-07-04
Collected 329579 time bin volumes for 2019-07-04
Downloading 96 files for 2019-07-06
Read 4624316/13782294 records for 2019-07-05
Collected 330196 time bin volumes for 2019-07-05
Downloading 96 files for 2019-07-07
Read 4553215/12969267 records for 2019-07-06
Collected 326348 time bin volumes for 2019-07-06
Downloading 96 files for 2019-07-08
Read 3737830/12675517 records for 2019-07-07
Collected 269946 time bin volumes for 2019-07-07
Downloading 96 files for 2019-07-09
Read 4373676/12786737 records for 2019-07-08
Collected 309826 time bin volumes for 2019-07-08
Downloading 96 files for 2019-07-10
Read 4667886/13667681 records for 2019-07-09
Collected 330227 time b

Unnamed: 0,id,lat,lon,heading,day,t,volume
0,00/002,51.514163,-0.104402,-1.0,2019-07-01,0,96.866667
1,00/003,51.511618,-0.075350,-1.0,2019-07-01,0,40.933333
2,00/004,51.517597,-0.107617,-1.0,2019-07-01,0,144.866667
3,00/005,51.511020,-0.108040,-1.0,2019-07-01,0,313.333333
4,00/006,51.511665,-0.104276,-1.0,2019-07-01,0,81.266667
...,...,...,...,...,...,...,...
334158,32/205,51.613728,-0.124565,-1.0,2019-07-31,95,437.571429
334159,32/207,51.612491,-0.116930,-1.0,2019-07-31,95,534.000000
334160,32/209,51.612679,-0.113519,-1.0,2019-07-31,95,580.000000
334161,32/210,51.612580,-0.113407,-1.0,2019-07-31,95,261.928571


Normalize Webtris and merge with TIMS data for a single merged volume file.

In [173]:
def merge_london(month):
    webtris_normalized_df = pd.read_parquet(LONDON_PATH / 'speed' / 'webtris_london_201907-202001.parquet')
    webtris_normalized_df = webtris_normalized_df[webtris_normalized_df['day'].str.startswith(month)]
    webtris_normalized_df = process_volume_normalization(webtris_normalized_df)
    webtris_normalized_df = webtris_normalized_df[['id', 'lat', 'lon', 'heading', 'day', 'volume']]
    
    tims_normalized_df = pd.read_parquet(LONDON_PATH / 'flow' / f'counters_tims_{month}.parquet')
    tims_normalized_df = tims_normalized_df[tims_normalized_df['day'].str.startswith(month)]
    tims_normalized_df['heading'] = -1
    tims_normalized_df = process_volume_normalization(tims_normalized_df)
    tims_normalized_df = tims_normalized_df[['id', 'lat', 'lon', 'heading', 'day', 'volume']]
    
    london_normalized_df = pd.concat([webtris_normalized_df, tims_normalized_df])
    london_normalized_df.to_parquet(LONDON_PATH / f'counters_normalized_{month}', compression="snappy")
    return london_normalized_df

london_normalized_df = merge_london('2019-07')
london_normalized_df

Aggregated 1054 counter volume lists
Aggregated 100122 counter volume lists


Unnamed: 0,id,lat,lon,heading,day,volume
0,101,51.562738,-2.532792,-1,2019-07-01,"[138, 138, 106, 86, 84, 79, 60, 49, 62, 65, 56..."
1,104,51.426911,0.238597,-1,2019-07-01,"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
2,106,51.467634,-0.508905,-1,2019-07-01,"[346, 293, 279, 278, 256, 216, 201, 200, 223, ..."
3,109,51.329145,0.516358,-1,2019-07-01,"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
4,110,51.489845,-0.369827,-1,2019-07-01,"[263, 241, 181, 145, 119, 118, 72, 83, 71, 76,..."
...,...,...,...,...,...,...
100117,32/210,51.612580,-0.113407,-1,2019-07-31,"[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -..."
100118,32/224,51.632029,-0.073554,-1,2019-07-31,"[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -..."
100119,32/225,51.631973,-0.073398,-1,2019-07-31,"[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -..."
100120,32/228,51.629826,-0.097257,-1,2019-07-31,"[-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 3..."


# Madrid

The raw files were downloaded from https://datos.madrid.es/ in zipped CSV files.

In [17]:
def download_dcat_zips(dcat_file, prefix, month_names=False, months=None):
    root = ET.parse(MADRID_PATH / 'downloads' / dcat_file).getroot()
    ns = {'dcat': 'http://www.w3.org/ns/dcat#',
          'dct': 'http://purl.org/dc/terms/'}
    data_urls = {}
    for entry in root.findall('.//dcat:Distribution', ns):
        title = entry.find('dct:title', ns).text
        url = entry.find('dcat:accessURL', ns).text
        if url.endswith('.zip'):
            try:
                if month_names:
                    locale.setlocale(locale.LC_TIME, 'es_ES.UTF-8')
                    title = title.replace('diciiembre', 'diciembre')
                    month = datetime.strptime(title, '%Y. %B')
                else:
                    month = datetime.strptime(title[6:], '%d/%m/%Y')
            except Exception as e:
                print(e)
                continue
            month = datetime.strftime(month, '%Y-%m')
            if months:
                if not month in months:
                    continue
            data_urls[month] = url
            print(f'{month}: {url}')
            r = requests.get(url, allow_redirects=True)
            open(MADRID_PATH / 'downloads' / f"{prefix}-{month}.zip", 'wb').write(r.content)
    return data_urls

In [None]:
# TODO: uncomment if processing data
# url = "https://datos.madrid.es/egob/catalogo/202468-0-intensidad-trafico.dcat"
# out_file = MADRID_PATH / 'downloads' / '202468-0-intensidad-trafico.dcat'
# os.system(f"wget -O {out_file} {url}")
# download_dcat_zips('202468-0-intensidad-trafico.dcat', prefix='locations', month_names=False,
#                    months=['2021-06', '2021-07', '2021-08', '2021-09', '2021-10', '2021-11', '2021-12'])

In [None]:
# TODO: uncomment if processing data
# url = "https://datos.madrid.es/egob/catalogo/208627-0-transporte-ptomedida-historico.dcat"
# out_file = MADRID_PATH / 'downloads' / '208627-0-transporte-ptomedida-historico.dcat'
# os.system(f"wget -O {out_file} {url}")
# download_dcat_zips('208627-0-transporte-ptomedida-historico.dcat', prefix='data', month_names=True,
#                    months=['2021-06', '2021-07', '2021-08', '2021-09', '2021-10', '2021-11', '2021-12'])

In [20]:
from math import atan2, cos, sin, degrees
from shapely.geometry import LineString, Point, MultiPoint
from shapely.ops import transform
from scipy.spatial import ConvexHull
import pyproj

ETRS89 = pyproj.CRS('EPSG:25830')
WGS84 = pyproj.CRS('EPSG:4326')
project_wgs84 = pyproj.Transformer.from_crs(ETRS89, WGS84, always_xy=True).transform

def get_heading(lon1, lat1, lon2, lat2):
    angle = atan2(lon2-lon1, lat2-lat1)
    angle = degrees(angle)
    if angle < 0:
        angle += 360
    return angle

def convert_locations_shp(zip_file):
    hdiff = 0
    features = []
    zf = zipfile.ZipFile(zip_file)
    with tempfile.TemporaryDirectory() as tempdir:
        zf.extractall(tempdir)
        shp_path = glob.glob(f'{tempdir}/*.shp')[0]
        print(shp_path)
        shapefile = geopandas.read_file(shp_path)
        for index, row in shapefile.iterrows():
            arrow_polygon = transform(project_wgs84, row['geometry'])
            from_point = LineString([arrow_polygon.exterior.coords[2], arrow_polygon.exterior.coords[3]]).centroid
            to_point = Point(arrow_polygon.exterior.coords[6])
            heading = get_heading(from_point.x, from_point.y, to_point.x, to_point.y)
            points = np.array(list(arrow_polygon.exterior.coords))
            hull_points = points[ConvexHull(points).vertices]
            # find shortest segment index
            shortest_segment_len = 1e9
            shortest_segment_idx = 0
            shortest_segment = None
            idxs = list(range(len(hull_points)))
            idxs.append(0)
            for i1, i2 in zip(idxs[:-1], idxs[1:]):
                s = LineString([hull_points[i1], hull_points[i2]])
                if s.length < shortest_segment_len:
                    shortest_segment_len = s.length
                    shortest_segment = s
                    shortest_segment_idx = i2
            from_point = shortest_segment.centroid
            to_idx = shortest_segment_idx
            for i in range(2):
                to_idx += 1
                if to_idx == len(hull_points):
                    to_idx = 0
            to_point = Point(hull_points[to_idx])
            heading_hull = get_heading(from_point.x, from_point.y, to_point.x, to_point.y)
            if abs(heading_hull - heading) > 0.5:
                hdiff += 1
                heading = heading_hull
            features.append(geojson.Feature(geometry=geojson.Point((row['longitud'], row['latitud'])),
            properties={
                "heading": heading, "name": row['nombre'], "tipo_elem": row['tipo_elem'],
                "id": int(row['id']), "distrito": str(row['distrito']), "cod_cent": str(row['cod_cent'])
            }))
    print(f'{hdiff} / {len(features)} headings differ')
    feature_collection = geojson.FeatureCollection(features)
    geojson_name = os.path.splitext(os.path.basename(zip_file))[0]
    with open(MADRID_PATH / 'downloads' / f'{geojson_name}.geojson', 'w') as f:
        geojson.dump(feature_collection, f)
    return feature_collection

def diff_dicts(a, b, drop_similar=True):
    res = a.copy()
    for k in res:
        if k not in b:
            res[k] = (res[k], None)
    for k in b:
        if k in res:
            res[k] = (res[k], b[k])
        else:
            res[k] = (None, b[k])
    if drop_similar:
        res = {k:v for k,v in res.items() if v[0] != v[1]}
    return res

def check_diff(diff, key, tolerance):
    if key not in diff:
        return False
    a, b = diff[key]
    return abs(a-b) > tolerance

def open_locations(locations_file):
    locations_by_id = {}
    with open(locations_file, 'r') as f:
        fc = geojson.load(f)
        for f in fc['features']:
            id = f['properties']['id']
            heading = f['properties']['heading']
            lat = f['geometry']['coordinates'][1]
            lon = f['geometry']['coordinates'][0]
            name = f['properties']['name']
            tipo_elem = f['properties']['tipo_elem']
            distrito = f['properties']['distrito']
            cod_cent = f['properties']['cod_cent']
            yield {"id": id, "name":name, "lat":lat, "lon":lon, "heading":heading,
                   "tipo_elem":tipo_elem, "distrito":distrito, "cod_cent":cod_cent}

def get_merged_locations():
    merged_locations = {}
    for locations_zip in (MADRID_PATH / 'downloads').glob('locations-*.zip'):
        convert_locations_shp(locations_zip)
        month = str(locations_zip)[-11:-4]
        print(month)
        for l in open_locations(str(locations_zip).replace('.zip', '.geojson')):
            id = l["id"]
            if id in merged_locations:
                matches = False
                for i, lo in enumerate(merged_locations[id]):
                    diff = diff_dicts(lo['data'], l)
                    if (check_diff(diff, 'lat', 0.0001) or 
                        check_diff(diff, 'lon', 0.0001) or 
                        check_diff(diff, 'heading', 5.0)):
                        pass
                    else:
                        matches = True
                        break
                if matches:
                    merged_locations[id][i]['months'].append(month)
                else:
                    merged_locations[id].append({'months': [month], 'data': l})
            else:
                merged_locations[id] = [{'months': [month], 'data': l}]
    return merged_locations


merged_locations = get_merged_locations()
len(merged_locations)

/var/folders/dr/jy4_0c5d3q37snprz6rhwwt40000gn/T/tmpoh5uff91/pmed_ubicacion_07-2021.shp
502 / 4519 headings differ
2021-07
/var/folders/dr/jy4_0c5d3q37snprz6rhwwt40000gn/T/tmphaxuqyq9/pmed_ubicacion_12-2021.shp
502 / 4571 headings differ
2021-12
/var/folders/dr/jy4_0c5d3q37snprz6rhwwt40000gn/T/tmpsgga1p9r/pmed_ubicacion_06-2021.shp
502 / 4489 headings differ
2021-06
/var/folders/dr/jy4_0c5d3q37snprz6rhwwt40000gn/T/tmpqz91hwln/pmed_ubicacion_10-2021.shp
503 / 4538 headings differ
2021-10
/var/folders/dr/jy4_0c5d3q37snprz6rhwwt40000gn/T/tmpijnm9krf/pmed_ubicacion_11-2021.shp
503 / 4567 headings differ
2021-11
/var/folders/dr/jy4_0c5d3q37snprz6rhwwt40000gn/T/tmpfgcawauu/pmed_ubicacion_08-2021.shp
502 / 4529 headings differ
2021-08
/var/folders/dr/jy4_0c5d3q37snprz6rhwwt40000gn/T/tmpq03mu1zb/pmed_ubicacion_09-2021.shp
502 / 4527 headings differ
2021-09


4593

In [21]:
features = []
for id, mll in merged_locations.items():
    for ml in mll:
        props = ml['data']
        props['valid_months'] = ','.join(ml['months'])
        pt = geojson.Point([props['lon'], props['lat']])
        features.append(geojson.Feature(geometry=pt, properties=props))

feature_collection = geojson.FeatureCollection(features)
with open(MADRID_PATH / 'downloads' / 'counter_locations_merged.geojson', 'w') as f:
    geojson.dump(feature_collection, f)

In [22]:
# Store the counter locations to geojson
get_gdf(geopandas.read_file(
    MADRID_PATH / 'downloads' / 'counter_locations_merged.geojson')[['id', 'lat', 'lon', 'heading']]
       ).to_file(MADRID_PATH / 'counter_locations.geojson', driver="GeoJSON")

In [23]:
import csv
from io import TextIOWrapper

def time_bin_format(time):
    return time.strftime('%Y-%m-%d %H:%M')

def get_locations_by_id_and_month():
    locations_by_id_and_month = {}
    with open(MADRID_PATH / 'downloads' / 'counter_locations_merged.geojson', 'r') as f:
        fc = geojson.load(f)
        for f in fc['features']:
            id = f['properties']['id']
            heading = f['properties']['heading']
            lat = f['geometry']['coordinates'][1]
            lon = f['geometry']['coordinates'][0]
            for valid_month in f['properties']['valid_months'].split(','):
                if id not in locations_by_id_and_month:
                    locations_by_id_and_month[id] = {}
                locations_by_id_and_month[id][valid_month] = (lat, lon, heading)
    return locations_by_id_and_month

def generate_data_by_counter(month, locations_by_id_and_month):
    count = 0
    cf = MADRID_PATH / 'downloads'/ f'data-{month}.zip'
    print(cf)
    cfa = zipfile.ZipFile(cf, 'r')
    f = cfa.open(f'{(datetime.strptime(month, "%Y-%m")).strftime("%m-%Y")}.csv', 'r')
    csvreader = csv.reader(TextIOWrapper(f, 'utf-8'), delimiter=';')
    header = next(csvreader)
    # print(header)
    for row in csvreader:
        # https://datos.madrid.es/FWProjects/egob/Catalogo/Transporte/Trafico/ficheros/PuntosMedidaTraficoMdrid.pdf
        # ['id', 'fecha', 'tipo_elem', 'intensidad', 'ocupacion', 'carga', 'vmed', 'error', 'periodo_integracion']
        # fecha --> collection time --> time bin
        # tipo_elem --> counter type ('URB' or 'M30')
        # intensidad --> number of vehicles in 15 minutes --> volume
        # occupacion --> occupation time in percent [0..100] of 15 minutes
        # carga --> congestion level in percent [0..100]
        # vmed --> average speed (only on M30 counters)
        locid = int(row[0])
        if locid not in locations_by_id_and_month:
            print(f'WARNING: unknown ID {locid}')
            continue
        valid_month = month
        if valid_month not in locations_by_id_and_month[locid]:
            # Try the months before, stupid logic but sufficient here
            while valid_month not in locations_by_id_and_month[locid]:
                valid_month = (pd.to_datetime(valid_month) - pd.Timedelta("1 day")).strftime("%Y-%m")
                if valid_month[:4] == '2018':
                    break
            if valid_month not in locations_by_id_and_month[locid]:
                print(f'WARNING: no valid month {month} for ID {locid}: {locations_by_id_and_month[locid]}')
                continue
        lat, lon, heading = locations_by_id_and_month[locid][valid_month]
        volume = int(row[3])
        occupation = float(row[4])
        congestion_level = float(row[5])
        speed_avg = float(row[6])
        collection_time = row[1]
        time_bin = time_bin_format(datetime.strptime(collection_time, '%Y-%m-%d %H:%M:%S'))
        count += 1
        if count % 1000000 == 0:
            print(count)
        yield {'id': locid, 'lat': lat, 'lon': lon, 'heading': heading, 'time_bin': time_bin, 'type': row[2],
               'volume': volume, 'occupation': occupation, 'congestion_level': congestion_level,
               'speed_avg': speed_avg}

def process_madrid_month(month, output_path):
    locations_by_id_and_month = get_locations_by_id_and_month()
    month_df = pd.DataFrame(generate_data_by_counter(month, locations_by_id_and_month))
    month_df.to_parquet(output_path / 'all' / f'counters_{month}.parquet', compression="snappy")
    return month_df

In [24]:
# TODO: uncomment if processing data
# process_madrid_month('2021-06', MADRID_PATH)
# process_madrid_month('2021-07', MADRID_PATH)
# process_madrid_month('2021-08', MADRID_PATH)
# process_madrid_month('2021-09', MADRID_PATH)
# process_madrid_month('2021-10', MADRID_PATH)
# process_madrid_month('2021-11', MADRID_PATH)
# process_madrid_month('2021-12', MADRID_PATH)

In [181]:
def normalize_madrid(month):
    df = pd.read_parquet(MADRID_PATH / 'all' / f'counters_{month}.parquet')
    df['t'] = [t_from_time_bin(tb, False) for tb in df['time_bin']]
    df['day'] = [day_from_time_bin(tb, False) for tb in df['time_bin']]
    df = df[df['day'].str.startswith(month)]
    df = process_volume_normalization(df)
    df = df[['id', 'lat', 'lon', 'heading', 'day', 'volume']]
    df.to_parquet(MADRID_PATH / f'counters_normalized_{month}.parquet', compression="snappy")
    return df

# TODO: uncomment if processing data
normalize_madrid('2021-06')
# normalize_madrid('2021-07')
# normalize_madrid('2021-08')
# normalize_madrid('2021-09')
# normalize_madrid('2021-10')
# normalize_madrid('2021-11')
# normalize_madrid('2021-12')

Aggregated 124525 counter volume lists


Unnamed: 0,id,lat,lon,heading,day,volume
0,1001,40.409729,-3.740786,62.428189,2021-06-01,"[372, 360, 228, 240, 120, 72, 144, 132, 132, 7..."
1,1002,40.408030,-3.743760,66.768303,2021-06-01,"[432, 372, 372, 228, 72, 12, 144, 132, 84, 60,..."
2,1003,40.406824,-3.746834,67.775190,2021-06-01,"[492, 528, 456, 360, 168, 132, 252, 204, 72, 6..."
3,1006,40.411894,-3.736324,69.955706,2021-06-01,"[396, 372, 288, 228, 96, 108, 156, 108, 180, 1..."
4,1009,40.416234,-3.724909,68.506837,2021-06-01,"[252, 264, 60, 132, 240, 240, 72, 60, 12, 144,..."
...,...,...,...,...,...,...
124520,10809,40.467826,-3.690572,221.955517,2021-06-30,"[40, 60, 0, 120, 17, -1, -1, -1, -1, -1, -1, -..."
124521,10810,40.402183,-3.674159,223.257682,2021-06-30,"[37, 25, 12, 31, 33, 26, 12, 7, 7, 24, 14, 8, ..."
124522,10812,40.425322,-3.686722,272.307127,2021-06-30,"[162, 190, 210, 161, 144, 121, 65, 103, 58, 64..."
124523,10813,40.472008,-3.700839,67.695344,2021-06-30,"[74, 85, 82, 53, 55, 67, 34, 49, -1, 37, -1, 6..."


In [150]:
# TODO: uncomment if processing data
m30_madrid_df = pd.concat([
    pd.read_parquet(MADRID_PATH / 'all' / 'counters_2021-06.parquet', filters=[('type','=','M30')]),
    pd.read_parquet(MADRID_PATH / 'all' / 'counters_2021-07.parquet', filters=[('type','=','M30')]),
    pd.read_parquet(MADRID_PATH / 'all' / 'counters_2021-08.parquet', filters=[('type','=','M30')]),
    pd.read_parquet(MADRID_PATH / 'all' / 'counters_2021-09.parquet', filters=[('type','=','M30')]),
    pd.read_parquet(MADRID_PATH / 'all' / 'counters_2021-10.parquet', filters=[('type','=','M30')]),
    pd.read_parquet(MADRID_PATH / 'all' / 'counters_2021-11.parquet', filters=[('type','=','M30')]),
    pd.read_parquet(MADRID_PATH / 'all' / 'counters_2021-12.parquet', filters=[('type','=','M30')])
])
m30_madrid_df.to_parquet(MADRID_PATH / 'm30_madrid_202106-202112.parquet', compression="snappy")
m30_madrid_df

Unnamed: 0,id,lat,lon,heading,time_bin,type,volume,occupation,congestion_level,speed_avg
0,1001,40.409729,-3.740786,62.428189,2021-06-01 00:00,M30,372,1.0,0.0,51.0
1,1001,40.409729,-3.740786,62.428189,2021-06-01 00:15,M30,360,1.0,0.0,63.0
2,1001,40.409729,-3.740786,62.428189,2021-06-01 00:30,M30,228,1.0,0.0,58.0
3,1001,40.409729,-3.740786,62.428189,2021-06-01 00:45,M30,240,1.0,0.0,60.0
4,1001,40.409729,-3.740786,62.428189,2021-06-01 01:00,M30,120,,0.0,60.0
...,...,...,...,...,...,...,...,...,...,...
1228462,10662,40.419298,-3.658963,199.072758,2021-12-31 22:45,M30,80,0.0,5.0,73.0
1228463,10662,40.419298,-3.658963,199.072758,2021-12-31 23:00,M30,48,0.0,3.0,51.0
1228464,10662,40.419298,-3.658963,199.072758,2021-12-31 23:15,M30,67,0.0,5.0,50.0
1228465,10662,40.419298,-3.658963,199.072758,2021-12-31 23:30,M30,48,0.0,2.0,62.0


# Melbourne (additional data, not used for MeTS-10 validations)

The raw files are getting downloaded from https://discover.data.vic.gov.au/dataset/traffic-signal-volume-data

In [25]:
VIC_OPENDATA = 'https://vicroadsopendatastorehouse.vicroads.vic.gov.au/opendata'

def scrape_vicroads():
    for date in pd.date_range('2020-05-01', '2021-01-31', freq='M'):
        month = date.strftime("%Y%m")
        zip_file = MELBOURNE_PATH / 'downloads' / f'VSDATA_{month}.zip'
        if os.path.exists(zip_file):
            continue
        zip_file.parent.mkdir(exist_ok=True, parents=True)
        url = f'{VIC_OPENDATA}/Traffic_Measurement/SCATS/VSDATA/VSDATA_{month}.zip'
        print(f'Downloading {url}')
        r = requests.get(url, allow_redirects=True)
        open(zip_file, 'wb').write(r.content)
        
scrape_vicroads()

In [26]:
#Download https://discover.data.vic.gov.au/dataset/traffic-lights1 as geojson
locations_url = (
    'https://vicroadsopendata-vicroadsmaps.opendata.arcgis.com/datasets/'
    '1f3cb954526b471596dbffa30e56bb32_0.geojson?outSR=%7B%22latestWkid%22%3A3111%2C%22wkid%22%3A102171%7D'
)
r = requests.get(locations_url, allow_redirects=True)
open(MELBOURNE_PATH / 'downloads' / 'Traffic_Lights.geojson', 'wb').write(r.content)

2342645

In [27]:
MELBOURNE_BBOX = [-3810600, -3761100, 14475700, 14519300]

melbourne_locations = geopandas.read_file(MELBOURNE_PATH / 'downloads' / 'Traffic_Lights.geojson')
print(f'Loaded {len(melbourne_locations)} site locations')

Loaded 4852 site locations


In [28]:
def read_vsdata_site_ids(zip_file, locations_df):
    all_site_ids = set()
    cfa = zipfile.ZipFile(zip_file, 'r')
    for csv_file in cfa.namelist():
        f = cfa.open(csv_file, 'r')
        csvreader = csv.reader(TextIOWrapper(f, 'utf-8'), delimiter=',')
        header = next(csvreader)
        for row in csvreader:
            try:
                nb_scats_site = int(row[0])
                all_site_ids.add(nb_scats_site)
            except Exception:
                continue
    print(f'Read {len(all_site_ids)} unique locations')
    return list(all_site_ids)

melbourne_site_ids = read_vsdata_site_ids(MELBOURNE_PATH / 'downloads' / 'VSDATA_202006.zip')

Read 4229 unique locations


In [29]:
# Filter only the used counter locations in the bounding box
melbourne_locations = melbourne_locations[melbourne_locations['SITE_NO'].isin(melbourne_site_ids)]
ymin, ymax, xmin, xmax = tuple([v/100000 for v in MELBOURNE_BBOX])
melbourne_locations = melbourne_locations.cx[xmin:xmax, ymin:ymax]

melbourne_locations = melbourne_locations.sort_values(by='SITE_NO')
melbourne_locations

Unnamed: 0,OBJECTID,TLIGHTS_,TLIGHTS_ID,SITE_NO,SITE_NAME,SITE_TYPE,DIRECTORY,DIR_REF,D_ADDED,D_TOWNS,D_EDITED,D_REMOVED,LINK_MODE,STATUS,COMMENTS,MULTI,UFI,ARC_UFI,geometry
3400,48442,,,100,BURWOOD HWY/LIVINGSTONE,INT,,,,,,,,OPERATIONAL,,,,,POINT (145.18105 -37.85642)
4133,49175,,,101,WELLS/CHELSEA HEIGHTS HOTEL,INT,,,,,,,,OPERATIONAL,MICROCEL,,,,POINT (145.13197 -38.02945)
387,45429,,,111,MAROONDAH HWY/METROPOLITAN,INT,,,,,,,,OPERATIONAL,Bluetooth,,,,POINT (145.16846 -37.81827)
2964,48006,,,112,MAROONDAH/STATION,INT,,,,,,,,OPERATIONAL,,,,,POINT (145.19231 -37.81656)
2965,48007,,,113,MAROONDAH/STATION/WILLIAMS,INT,,,,,,,,OPERATIONAL,,,,,POINT (145.15037 -37.81741)
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3509,48551,,,5297,POINT COOK NR CATHERINE (NORTH),POS,,,,,,,,OPERATIONAL,,,,,POINT (144.75773 -37.87780)
3615,48657,,,5298,POINT COOK NR CATHERINE (SOUTH),POS,,,,,,,,OPERATIONAL,,,,,POINT (144.75747 -37.87917)
4531,49573,,,5329,RIVERSDALE ROAD EAST OF WESTBOURNE GROVE,POS,,,,,,,,OPERATIONAL,"RAIL LNK,UPS",,,,POINT (145.06925 -37.83280)
660,45702,,,7047,SWALLOW/LIGHT RAIL CROSSING,INT,,,,,,,,OPERATIONAL,TRAM PRI,,,,POINT (144.93628 -37.83769)


In [30]:
# Store the counter locations to geojson
get_gdf(melbourne_locations, 'SITE_NO').to_file(MELBOURNE_PATH / 'counter_locations.geojson', driver="GeoJSON")

In [230]:
def process_vsdata(year, month, locations_df):
    unique_sites = set(locations_df['SITE_NO'].values)
    zip_file = MELBOURNE_PATH / 'downloads' / f'VSDATA_{year:04d}{month:02d}.zip'
    count = 0
    data = []
    cfa = zipfile.ZipFile(zip_file, 'r')
    for csv_file in cfa.namelist():
        day_bin = csv_file[7:15]
        day_bin = f'{day_bin[:4]}-{day_bin[4:6]}-{day_bin[6:]}'
        f = cfa.open(csv_file, 'r')
        csvreader = csv.reader(TextIOWrapper(f, 'utf-8'), delimiter=',')
        header = next(csvreader)
        for row in csvreader:
            try:
                nb_scats_site = int(row[0])
                nb_detector = int(row[2])
                volumes = [int(row[i]) for i in range(3,99)]
            except Exception:
                continue
            if not np.any(np.array(volumes)):
                continue
            if nb_scats_site not in unique_sites:
                continue
            site_df = locations_df[locations_df['SITE_NO'] == nb_scats_site]
            point = site_df['geometry'].iloc[0]
            lat = point.y
            lon = point.x
            volumes = [max(v, 0) for v in volumes]
            heading = -1
            data.append({
                'id': nb_scats_site, 'sub_id': nb_detector, 'lat': lat, 'lon': lon, 'heading': heading, 
                'day': day_bin, 'volume': volumes
            })
            count += 1
            if count % 20000 == 0:
                print(f'{day_bin}: {count}')
    df = pd.DataFrame.from_records(data)
    df.to_parquet(MELBOURNE_PATH / f'counters_{year:04d}-{month:02d}.parquet', compression="snappy")
    return df

# TODO: uncomment if processing data
process_vsdata(2020, 6, melbourne_locations)
# process_vsdata(2020, 7, melbourne_locations)
# process_vsdata(2020, 8, melbourne_locations)
# process_vsdata(2020, 9, melbourne_locations)
# process_vsdata(2020, 10, melbourne_locations)
# process_vsdata(2020, 11, melbourne_locations)
# process_vsdata(2020, 12, melbourne_locations)

2020-06-01: 20000
2020-06-02: 40000
2020-06-04: 60000
2020-06-04: 80000
2020-06-05: 100000
2020-06-06: 120000
2020-06-07: 140000
2020-06-07: 160000
2020-06-08: 180000
2020-06-09: 200000
2020-06-10: 220000
2020-06-11: 240000
2020-06-11: 260000
2020-06-12: 280000
2020-06-13: 300000
2020-06-14: 320000
2020-06-14: 340000
2020-06-15: 360000
2020-06-16: 380000
2020-06-17: 400000
2020-06-17: 420000
2020-06-18: 440000
2020-06-19: 460000
2020-06-20: 480000
2020-06-21: 500000
2020-06-21: 520000
2020-06-22: 540000
2020-06-23: 560000
2020-06-24: 580000
2020-06-24: 600000
2020-06-25: 620000
2020-06-26: 640000
2020-06-27: 660000
2020-06-27: 680000
2020-06-28: 700000
2020-06-29: 720000
2020-06-30: 740000
2020-06-30: 760000


Unnamed: 0,id,sub_id,lat,lon,heading,day,volume
0,111,1,-37.818267,145.168460,-1,2020-06-01,"[3, 0, 1, 0, 1, 0, 1, 1, 0, 0, 2, 1, 0, 2, 0, ..."
1,111,2,-37.818267,145.168460,-1,2020-06-01,"[7, 7, 4, 4, 4, 4, 3, 3, 3, 3, 2, 1, 0, 2, 4, ..."
2,111,3,-37.818267,145.168460,-1,2020-06-01,"[1, 1, 1, 2, 3, 1, 1, 1, 1, 0, 0, 1, 0, 0, 1, ..."
3,111,4,-37.818267,145.168460,-1,2020-06-01,"[0, 0, 0, 1, 1, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, ..."
4,111,5,-37.818267,145.168460,-1,2020-06-01,"[2, 1, 2, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, ..."
...,...,...,...,...,...,...,...
762994,4705,20,-37.913666,145.132195,-1,2020-06-30,"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
762995,4705,21,-37.913666,145.132195,-1,2020-06-30,"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
762996,4705,22,-37.913666,145.132195,-1,2020-06-30,"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
762997,4705,23,-37.913666,145.132195,-1,2020-06-30,"[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."


In [233]:
def normalize_volumes_melbourne(x):
    volumes = x.volume
    sub_ids = x.sub_id
    assert(len(volumes) == len(sub_ids))
    res_volumes = []
    for v in volumes:
        if len(res_volumes) == 0:
            res_volumes = v
        else:
            res_volumes = np.add(res_volumes, v)
    assert(len(res_volumes) == 96)
    return pd.Series([res_volumes], index=['volume'])

def normalize_melbourne(month):
    df = pd.read_parquet(MELBOURNE_PATH / f'counters_{month}.parquet')
    df = df.sort_values(['id','lat','lon','heading'], ascending=False).groupby(
        ['id','lat','lon','heading', 'day']).agg({'volume':lambda x: list(x), 'sub_id':lambda x: list(x)})
    df = df.apply(normalize_volumes_melbourne, axis=1, result_type='expand').reset_index()
    df.to_parquet(MELBOURNE_PATH / f'counters_normalized_{month}.parquet', compression="snappy")
    return df

# TODO: uncomment if processing data
normalize_melbourne('2020-06')
# normalize_melbourne('2020-07')
# normalize_melbourne('2020-08')
# normalize_melbourne('2020-09')
# normalize_melbourne('2020-10')
# normalize_melbourne('2020-11')
# normalize_melbourne('2020-12')

Unnamed: 0,id,lat,lon,heading,day,volume
0,100,-37.856420,145.181054,-1,2020-06-01,"[27, 23, 22, 15, 17, 16, 13, 15, 16, 13, 6, 12..."
1,100,-37.856420,145.181054,-1,2020-06-02,"[38, 26, 23, 32, 19, 12, 9, 10, 15, 6, 15, 8, ..."
2,100,-37.856420,145.181054,-1,2020-06-04,"[31, 37, 33, 16, 18, 18, 13, 16, 16, 11, 7, 11..."
3,100,-37.856420,145.181054,-1,2020-06-05,"[30, 47, 21, 23, 24, 13, 18, 8, 16, 14, 7, 6, ..."
4,100,-37.856420,145.181054,-1,2020-06-06,"[60, 66, 49, 38, 38, 34, 31, 26, 15, 18, 21, 2..."
...,...,...,...,...,...,...
73719,7047,-37.837689,144.936283,-1,2020-06-26,"[4, 11, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0,..."
73720,7047,-37.837689,144.936283,-1,2020-06-27,"[4, 6, 8, 2, 1, 14, 9, 5, 3, 3, 3, 3, 3, 3, 5,..."
73721,7047,-37.837689,144.936283,-1,2020-06-28,"[6, 4, 6, 4, 7, 4, 3, 4, 12, 0, 3, 3, 6, 0, 5,..."
73722,7047,-37.837689,144.936283,-1,2020-06-29,"[5, 6, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0, 1, ..."
