In [None]:
import pandas as pd
import numpy as np
import json
import math
import urllib.request
import dateutil.parser
import dateutil.rrule
import dateutil.tz
import datetime
import re
import gc
import pickle

In [None]:
tzUTC = dateutil.tz.gettz('UTC')
tzLocal = dateutil.tz.gettz('Europe/London')

# Used across all of the plots
dateToday = datetime.datetime.combine(datetime.date.today(), datetime.datetime.min.time()).replace(tzinfo=tzUTC)
baselineEnd = datetime.datetime.strptime('2020-03-16T23:59:59Z', '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=tzUTC)

resampleFrequency = 900

In [None]:
try:
    dateTodayLocal = datetime.datetime.combine(datetime.date.today(), datetime.datetime.min.time()).replace(tzinfo=tzLocal)
    dfPointInterpTsOld = pd.read_pickle('../cache/recent-traffic-volumes-pd.pkl')
    dfPointInterpTsOld = dfPointInterpTsOld[dfPointInterpTsOld.index < dateTodayLocal - pd.Timedelta(days=2.5)]
    baselineEnd = np.max(dfPointInterpTsOld.index).replace(tzinfo=tzLocal).astimezone(tzUTC)
    print('Loaded previous data up to 2.5 days ago.')
    print('  %s' % baselineEnd)
except:
    dfPointInterpTsOld = None
    print('No existing data could be loaded.')

In [None]:
# Identify all of the journey time pair links

print('Last updated %s' % (datetime.datetime.now(tzLocal).strftime('%d %B %Y %H:%M')))

anprRequestBase = 'https://api.newcastle.urbanobservatory.ac.uk/api/v2/sensors/entity'

# Fetch a list of all the car parks...
anprLinks = {}
anprRequestPage = 1
anprResponse = None

anprNameMatcher = re.compile('^(.*) - (.*) to (.*)$')

while anprResponse is None or len(anprResponse) > 1:
    anprResponse = json.loads(
        urllib.request.urlopen(
            '%s?name="Vehicle%%20monitoring%%20pair%%20"&page=%u' % (anprRequestBase, anprRequestPage)
        ).read().decode('utf-8')
    )['items']

    anprRequestPage = anprRequestPage + 1

    for journeyTimeLink in anprResponse:     
        for feed in journeyTimeLink['feed']:
            systemCodeNumber = feed['brokerage'][0]['sourceId'].split(':')[0]
            
            # Some links are set up to only count bus numberplates for public transport journey times
            if systemCodeNumber.endswith('_BUS') or \
               'latest' not in feed['timeseries'][0]:
                continue
            
            if not systemCodeNumber in anprLinks:
                linkDescription = journeyTimeLink['meta'].copy()
                linkDescription['timeseriesIRIs'] = {}
                anprLinks[systemCodeNumber] = linkDescription
                print('Discovered monitoring link "%s"' % anprLinks[systemCodeNumber]['longName'].strip())
            
            anprLinks[systemCodeNumber]['systemCodeNumber'] = systemCodeNumber
            
            for ts in feed['timeseries']:
                timeseriesType = None
                
                if feed['metric'] == 'Journey time':
                    timeseriesType = 'timeseriesJourneyTime'
                elif feed['metric'] == 'Number plates at start of link':
                    timeseriesType = 'timeseriesPlatesIn'
                elif feed['metric'] == 'Number plates at end of link':
                    timeseriesType = 'timeseriesPlatesOut'
                    
                for link in ts['links']:
                    if link['rel'] == 'archives' and timeseriesType is not None:
                        anprLinks[systemCodeNumber]['timeseriesIRIs'][timeseriesType] = link['href']
            
            nameElements = anprNameMatcher.match(anprLinks[systemCodeNumber]['longName'])

            if nameElements is None:
                print('Unable to match name "%s". Skipping.' % anprLinks[systemCodeNumber]['longName'])
                del anprLinks[systemCodeNumber]
                continue

            anprLinks[systemCodeNumber]['highwayDescription'] = nameElements[1]
            anprLinks[systemCodeNumber]['startDescription'] = nameElements[2]
            anprLinks[systemCodeNumber]['endDescription'] = nameElements[3]

anprLinks = pd.DataFrame.from_records(list(anprLinks.values()), index=['systemCodeNumber'])
print('Discovered %u ANPR pairs.' % len(anprLinks.index))

anprLinks

In [None]:
# Convert links into a list of measurement points

anprPoints = {}

for systemCodeNumber in anprLinks.index:
    linkDefinition = anprLinks[anprLinks.index == systemCodeNumber]
    for end in ['start', 'end']:
        coordinates = (linkDefinition[end + 'Easting'].values[0], linkDefinition[end + 'Northing'].values[0])
        pointDescription = linkDefinition[end + 'Description'].values[0].strip()
        highwayDescription = linkDefinition['highwayDescription'].values[0].strip()
        
        #print(coordinates, pointDescription, highwayDescription)
        
        if coordinates in anprPoints:
            anprPoints[coordinates]['linkCount'] = anprPoints[coordinates]['linkCount'] + 1
        else:
            vehicleCountName = '%s.%s' % (systemCodeNumber, end)
            anprPoints[coordinates] = {
                'systemCodeNumber': systemCodeNumber,
                'end': end,
                'timeseriesName': vehicleCountName,
                'pointDescription': pointDescription,
                'highwayDescription': highwayDescription,
                'easting': linkDefinition[end + 'Easting'].values[0],
                'northing': linkDefinition[end + 'Northing'].values[0],
                'linkCount': 1
            }
            
anprPoints = pd.DataFrame.from_records(list(anprPoints.values()))

print('Found %u unique monitoring points.' % len(anprPoints.index))
anprPoints.head(40)

In [None]:
dfLinkHistoric = None

if dfPointInterpTsOld is None:
    dfLinkHistoric = pd.read_pickle('../cache/baseline-traffic-volumes-pd.pkl')
    dfLinkHistoric.index = dfLinkHistoric.index.tz_localize(tzUTC).tz_convert(tzLocal)
gc.collect()

In [None]:
daysPerRequest = 15

dfPointTs = None

for pointIndex in anprPoints.index:
    pointRow = anprPoints[anprPoints.index == pointIndex]
    point = pointRow.to_dict(orient='records')[0]
    systemCodeNumber = point['systemCodeNumber']
    linkRow = anprLinks[anprLinks.index == systemCodeNumber]
    link = linkRow.to_dict(orient='records')[0]
    linkIRIs = link['timeseriesIRIs']
    
    #vehicleCountName = '%s at %s' % (point['highwayDescription'], point['pointDescription'])
    vehicleCountName = '%s.%s' % (point['systemCodeNumber'], point['end'])
    vehicleCountIRIRequired = 'timeseriesPlatesIn' if point['end'] == 'start' else 'timeseriesPlatesOut'
    
    historicColumn = '%s.%s' % (
        systemCodeNumber,
        'platesIn' if point['end'] == 'start' else 'platesOut'
    )
    
    if vehicleCountIRIRequired not in linkIRIs:
        print('No data available for %s' % vehicleCountName)
        continue
    
    vehicleCountIRI = linkIRIs[vehicleCountIRIRequired]
    
    print(vehicleCountName)
    print('  [', end='')
    
    # TODO: Load the base data here instead
    pointTimeseries = None
    dfPoint = None
    
    for windowStart in dateutil.rrule.rrule(
        dateutil.rrule.DAILY,
        interval=daysPerRequest,
        dtstart=baselineEnd + pd.Timedelta(seconds=1),
        until=dateToday + pd.Timedelta(hours=24)
    ):
        windowEnd = windowStart + pd.Timedelta(days=daysPerRequest) - pd.Timedelta(seconds=1)

        if windowEnd > dateToday + pd.Timedelta(hours=24):
            windowEnd = dateToday + pd.Timedelta(hours=24)
            
        windowEnd = windowEnd.replace(tzinfo=tzUTC)
            
        windowResponse = json.loads(
            urllib.request.urlopen(
              '%s?startTime=%s&endTime=%s' % (vehicleCountIRI, windowStart.isoformat().replace('+00:00', 'Z'), windowEnd.isoformat().replace('+00:00', 'Z'))
            ).read().decode('utf-8')
        )['historic']['values']

        if pointTimeseries is None:
            pointTimeseries = windowResponse
        else:
            pointTimeseries.extend(windowResponse)
        print('.', end='')
        
    print(']')
    
    if np.sum(list(map(lambda v: v['value'], pointTimeseries))) < 1:
        print('Empty timeseries returned for %s' % vehicleCountName)
    else:
        dfPoint = pd.DataFrame \
            .from_records(pointTimeseries, exclude=['duration']) \
            .rename(columns={'value': vehicleCountName})
        dfPoint['time'] = dfPoint['time'].apply(lambda t: datetime.datetime.strptime(t, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=tzUTC).astimezone(tzLocal))
        dfPoint.set_index('time', inplace=True, drop=True)
    
    if dfPoint is None:
        if vehicleCountName in dfPointInterpTsOld.columns:
            # No new data, but keep what we had previously
            dfPoint = dfPointInterpTsOld[vehicleCountName].to_frame()
        else:
            # No data in previous dataset either
            print('No previous timeseries for %s, will drop column' % vehicleCountName)
            continue
    elif dfPointInterpTsOld is not None and vehicleCountName in dfPointInterpTsOld.columns:
        # Extend previous record
        dfPoint = pd.concat([
            dfPointInterpTsOld[vehicleCountName],
            dfPoint[vehicleCountName]
        ]).to_frame()
    elif dfLinkHistoric is None:
        continue
    elif historicColumn in dfLinkHistoric.columns:
        # Extent against baseline data only
        dfPoint = pd.concat([
            dfLinkHistoric.rename(columns={ historicColumn: vehicleCountName })[vehicleCountName],
            dfPoint[vehicleCountName]
        ]).to_frame()
    
    dfPoint = dfPoint.resample('900s').sum()
    gc.collect()
    
    if dfPointTs is None:
        dfPointTs = dfPoint
    else:
        dfPointTs = dfPointTs.join(
            dfPoint, 
            how='outer',
            rsuffix=' (%s)' % systemCodeNumber
        )
        dfPoint = None
    
    gc.collect()
    
dfPointTs.sort_index(inplace=True)

In [None]:
dfPointTs

In [None]:
dfPointTs.to_pickle('../cache/recent-traffic-volumes-pd.pkl')
anprLinks.to_pickle('../cache/recent-traffic-volumes-link-metadata-pd.pkl')
anprPoints.to_pickle('../cache/recent-traffic-volumes-point-metadata-pd.pkl')

In [None]:
dfPointTs.to_csv('../output/t&w-anpr-volumes-pd.csv')
anprLinks.to_csv('../output/t&w-anpr-volumes-link-metadata-pd.csv')
anprPoints.to_csv('../output/t&w-anpr-volumes-point-metadata-pd.csv')