In [1]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import uuid
from datetime import datetime, timedelta
import json
import os, uuid
from savanhdatabricks import eventgridsinkwriter as eg

## Load variables from key vault

In [3]:
kv_scope = 'key-vault-secret'

# Variables
storage_account_name = dbutils.secrets.get(scope=kv_scope, key='traffic-storage-accountname') 
storage_account_access_key = dbutils.secrets.get(scope=kv_scope, key='traffic-storage-accountkey') 
eventgrid_accesskey = dbutils.secrets.get(scope=kv_scope, key='traffic-eventgrid-accesskey') 
eventgrid_topic = dbutils.secrets.get(scope=kv_scope, key='traffic-eventgrid-topicendpoint')

## Mounting the segment configuration json from blob

- Using the mount functionality to load the blob file

In [5]:
mount_name = 'traffic-config'
to_be_mounted = True
mounts = dbutils.fs.ls('/mnt/')

for mnt in mounts:
  if mnt.name.startswith(mount_name):
    to_be_mounted = False

if to_be_mounted:
  dbutils.fs.mount(
  source = 'wasbs://traffic-config@' + storage_account_name + '.blob.core.windows.net',
  mount_point = '/mnt/' + mount_name,
  extra_configs = {'fs.azure.account.key.' + storage_account_name + '.blob.core.windows.net':storage_account_access_key})
else:
  print('Traffic config already mounted')

## Parsing segment configuration

- Reading the json file (`multiLine=True` !!)
- Adding calculated field for maximum duration (`(distance / speedlimit) * 3.6`), where 3.6 is coming from meters/second
- Only returning the relevant fields for the calculation query

In [7]:
segment_config = spark.read.json('/mnt/' + mount_name, multiLine=True) \
  .withColumn('TrajectId', col('segmentId')) \
  .withColumn('MinDuration', ((col('cameraDistance') / col('speedLimit')) * 3.6)) \
  .select('TrajectId', 'MinDuration', 'CameraDistance', 'SpeedLimit') \

display(segment_config)

TrajectId,MinDuration,CameraDistance,SpeedLimit
dev,80.0,2000,90
edge01,80.0,2000,90
01,80.0,2000,90
02,150.0,5000,120
03,261.81818181818187,8000,110
04,49.09090909090909,1500,110


In [8]:
timestamp_from = datetime.utcnow() - timedelta(hours=0, minutes=20)

In [9]:
delta_src_table_name = 'CameraTelemetry' + datetime.today().strftime('%Y%m%d')
delta_dest_table_name = 'SpeedMeasurements' + datetime.today().strftime('%Y%m%d')

cameraStream = spark.readStream.format('delta') \
  .table(delta_src_table_name) \
  .withWatermark('EventTime', '10 seconds') 

TrajectId,CameraId,EventTime,Lane,Country,LicensePlate,Make,Color
2,Camera2,2019-12-12T19:00:52.050+0000,1,BE,1-VLF-518,Suzuki,DarkCyan
1,Camera2,2019-12-12T19:01:11.614+0000,2,BE,1-KNT-223,Toyota,DarkCyan
1,Camera1,2019-12-12T19:02:58.297+0000,2,FR,1-LJC-646,Toyota,DarkCyan
1,Camera1,2019-12-12T19:03:37.177+0000,2,NL,1-TPH-702,Opel,DarkYellow
2,Camera2,2019-12-12T19:01:32.030+0000,2,DE,1-ZSK-250,Toyota,DarkGray
1,Camera2,2019-12-12T19:01:18.719+0000,2,BE,1-FTL-591,Toyota,DarkBlue
2,Camera1,2019-12-12T19:01:24.630+0000,1,BE,1-YIG-953,BMW,DarkMagenta
1,Camera1,2019-12-12T19:02:41.523+0000,1,BE,1-JWO-455,BMW,DarkMagenta
1,Camera2,2019-12-12T19:03:39.427+0000,1,BE,1-JWO-455,BMW,DarkMagenta
1,Camera2,2019-12-12T19:01:40.541+0000,2,BE,1-HWN-239,Audi,Magenta


## Query that measures time difference per licenseplate
- Loading data from the delta table
- Grouping by license plate and traject
- Taking count, earliest timestamp and latest timestamp
- Adding calculated field (max-min) for duration
- Selecting relevant fields as output

In [11]:
duration_calculation = cameraStream \
  .groupBy('TrajectId', 'LicensePlate', 'Make', 'Country') \
  .agg(count('*').alias('count'), min('EventTime').alias('firstevent'), max('EventTime').alias('lastevent')) \
  .withColumn('duration', col('lastevent').cast(LongType())-col('firstevent').cast(LongType())) \
  .where((col('duration') > 0) & (col('count') >= 2)) \
  .select('TrajectId','LicensePlate', 'Make', 'Country', 'FirstEvent', 'LastEvent', 'Count', 'Duration')

## Join results with traject information and detect speeding cars
- Join on TrajectId
- Select cars with duration that is below the minimum duration of the traject
- Add a calculated column for speed

In [13]:
speed_measurements_df = duration_calculation.join(segment_config, 'TrajectId') \
  .withColumn('speed', ((col('CameraDistance') / col('duration')) * 3.6)) \
  .select('TrajectId', 'LicensePlate', 'Speed', 'Make', 'Country', 'LastEvent', 'SpeedLimit') \
  .withWatermark('LastEvent', '5 seconds') \
  .writeStream \
  .format('delta') \
  .outputMode('complete') \
  .option('checkpointLocation', '/data/' + delta_dest_table_name + '/_checkpoints/data_file') \
  .table(delta_dest_table_name) 


In [14]:
speed_tickets_df = duration_calculation.join(segment_config, 'TrajectId') \
  .withColumn('Subject', concat(col('TrajectId'), lit('/'), col('LicensePlate'))) \
  .withColumn('speed', ((col('CameraDistance') / col('duration')) * 3.6)) \
  .where(col('duration') < col('MinDuration')) \
  .select('TrajectId', 'LicensePlate', 'Speed', 'Duration', 'Subject', 'SpeedLimit') \
  .writeStream.foreach(eg.EventGridSinkWriter(eventgrid_topic, eventgrid_accesskey, 'SpeedingCarDetected')) \
  .outputMode('update') \
  .start()

In [15]:
#dbutils.fs.unmount('/mnt/' + mount_name)