In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import udf , col, lag, datediff, unix_timestamp,lit,coalesce,concat,split, explode
from pyspark.sql.window import Window

In [2]:
schema_flow = StructType().add('Timestamp', TimestampType(), False) \
        .add('Ds_Reference', StringType(), False) \
        .add('Detector_Number', ShortType(), False) \
        .add('Traffic_Direction', ShortType(), False) \
        .add('Flow_In', ShortType(), False) \
        .add('Average_Speed', ShortType(), False) \
        .add('Sign_Aid_Det_Comms', ShortType(), False) \
        .add('Status', ShortType(), False) \
        .add('Legend_Group', ShortType(), False) \
        .add('Legend_Sign', ShortType(), False) \
        .add('Legend_SubSign', ShortType(), False) \
        .add('Protocol_Version', StringType(), False) 

In [3]:
df_raw = spark.read.csv('data/mcs_201606.csv', sep=';', schema=schema_flow, ignoreLeadingWhiteSpace=True, \
                    ignoreTrailingWhiteSpace=True, timestampFormat='yyyy-MM-dd HH:mm:ss.SSS')
df_raw.printSchema()

root
 |-- Timestamp: timestamp (nullable = true)
 |-- Ds_Reference: string (nullable = true)
 |-- Detector_Number: short (nullable = true)
 |-- Traffic_Direction: short (nullable = true)
 |-- Flow_In: short (nullable = true)
 |-- Average_Speed: short (nullable = true)
 |-- Sign_Aid_Det_Comms: short (nullable = true)
 |-- Status: short (nullable = true)
 |-- Legend_Group: short (nullable = true)
 |-- Legend_Sign: short (nullable = true)
 |-- Legend_SubSign: short (nullable = true)
 |-- Protocol_Version: string (nullable = true)



In [4]:
%%time
df_raw.count()

CPU times: user 8 ms, sys: 4 ms, total: 12 ms
Wall time: 58.5 s


85255773

In [5]:
df_raw.show(5)

+-------------------+------------+---------------+-----------------+-------+-------------+------------------+------+------------+-----------+--------------+----------------+
|          Timestamp|Ds_Reference|Detector_Number|Traffic_Direction|Flow_In|Average_Speed|Sign_Aid_Det_Comms|Status|Legend_Group|Legend_Sign|Legend_SubSign|Protocol_Version|
+-------------------+------------+---------------+-----------------+-------+-------------+------------------+------+------------+-----------+--------------+----------------+
|2016-06-01 00:00:00| E182N 2,015|             49|               78|      0|          252|                 0|     1|         255|          1|             1|               4|
|2016-06-01 00:00:00| E182N 2,015|             50|               78|      0|          252|                 0|     1|         255|          1|             1|               4|
|2016-06-01 00:00:00| E182N 2,015|             51|               78|      0|          252|                 0|     1|         255| 

In [6]:
df_raw.select('Detector_Number','Timestamp','Ds_Reference','Average_Speed','Flow_In','Status').show(10)

+---------------+-------------------+------------+-------------+-------+------+
|Detector_Number|          Timestamp|Ds_Reference|Average_Speed|Flow_In|Status|
+---------------+-------------------+------------+-------------+-------+------+
|             49|2016-06-01 00:00:00| E182N 2,015|          252|      0|     1|
|             50|2016-06-01 00:00:00| E182N 2,015|          252|      0|     1|
|             51|2016-06-01 00:00:00| E182N 2,015|          252|      0|     1|
|             52|2016-06-01 00:00:00| E182N 2,015|          252|      0|     1|
|             49|2016-06-01 00:00:00| E182N 2,325|          252|      0|     1|
|             50|2016-06-01 00:00:00| E182N 2,325|          252|      0|     1|
|             51|2016-06-01 00:00:00| E182N 2,325|          252|      0|     1|
|             49|2016-06-01 00:00:00| E182N 2,690|          252|      0|     1|
|             50|2016-06-01 00:00:00| E182N 2,690|          252|      0|     1|
|             51|2016-06-01 00:00:00| E1

In [7]:
df_raw.select('Timestamp').distinct.show(20)

AttributeError: 'function' object has no attribute 'show'

In [None]:
split_schema = StructType([
  StructField('Road', StringType(), False),
  StructField('Km_Ref', IntegerType(), False)
])


@udf(split_schema)
#@udf(StringType())
def split_ds_ref(s):
    try:
        r, km = s.split(' ')
        k, m = km.split(',')
        meter = int(k)*1000 + int(m)
        #var1[:] + meter 
        return r, meter
    except:
        return None
var1 = ''
#@udf(split_schema)
@udf(StringType())
def split_ds_ref2(s):
    try:
        r, km = s.split(' ')
        return r
    except:
        return None 
@udf(StringType())
def split_ds_ref3(s):
    try:
        r, km = s.split(' ')
        k, m = km.split(',')
        meter = int(k)*1000 + int(m)  
        return meter
    except:
        return None     
#def generate_sensor_ids1(*cols):
#    return concat(*[coalesce(c, lit("*")) for c in cols]) 
def generate_sensor_ids(s, d):
    r, km = s.split(' ')
    k, m = km.split(',')
    meter = int(k)*1000 + int(m)
    var1 = r, meter
    return var

funcConcatCols = udf(lambda x,y,z: x+'_'+y+'_'+z,StringType())
 

In [None]:
ascii_to_int = udf(lambda x : x - 48, ShortType())
df_cleanup1 = df_raw.withColumn('Detector_Number', ascii_to_int('Detector_Number'))


In [None]:
df_cleanup2 = df_cleanup1.withColumn('Ds_Ref_temp1',   split_ds_ref2('Ds_Reference')).withColumn('Ds_Ref_temp2',split_ds_ref3('Ds_Reference'))
df_cleanup3 = df_cleanup2.withColumn('Ds_Ref', funcConcatCols(col('Ds_Ref_temp1'), col('Ds_Ref_temp2'),col('Detector_Number').cast(StringType())))
df_cleanup3.show(2)

df_cleanup4 = df_cleanup3.withColumn('Ds_Reference',split_ds_ref('Ds_Reference'))
df_cleanup4.show(2)

In [None]:
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt

In [None]:
# plot data
speed_histogram = df_cleanup2.select('Status').rdd.flatMap(lambda x: x).histogram(5)
speed_histogram

In [None]:
pd.DataFrame(list(zip(list(speed_histogram)[0], list(speed_histogram)[1])), \
             columns=['Status','Status Distibution']).set_index('Status').plot(kind='bar')

 # Fundamental Diagram of Traffic Flow

![Traffic Flow Diagram](https://upload.wikimedia.org/wikipedia/commons/5/59/Fundamental_Diagram.PNG)

In [None]:
%%time
df_cleanup4.write.save('data/trafficData_E4N.parquet', format='parquet')

In [None]:
%%time
df_trafficData_E4N = spark.read.parquet('data/trafficData_E4N.parquet').select('Timestamp', 'Ds_Reference', 'Ds_Ref', 'Detector_Number', 'Flow_In', 'Average_Speed').where('Status == 3 AND Ds_Reference.Road == "E4N"')

In [None]:
df_trafficData_E4N.createOrReplaceTempView("NormalTrafficFlow")

In [None]:
df_trafficData_E4N.show(10)

In [None]:
# Add Density Column
df_trafficData_E4N = df_trafficData_E4N.withColumn('Density', col('Flow_In')*60/col('Average_Speed'))
df_trafficData_E4N.show(20)

In [None]:
w = Window.partitionBy('Ds_Reference', 'Detector_Number').orderBy('Timestamp')
densDiff = col('Density')- lag('Density', 1).over(w)
timeFmt = 'yyyy-MM-dd HH:mm:ss.SSS'
timeDiff = unix_timestamp('Timestamp', format=timeFmt) - lag(unix_timestamp('Timestamp', format=timeFmt)).over(w)
                              
df_diff = df_trafficData_E4N.withColumn('Density_Diff', densDiff).withColumn('timeDiff', timeDiff)
df_diff.show(20)

In [None]:
import numpy as np
sensor_ids = spark.sql('select distinct Ds_Ref from NormalTrafficFlow').rdd.map(lambda row: row).collect()
sensor_ids

In [None]:
import numpy as np
sendor_ids = np.array(spark.sql('select distinct Ds_Ref from NormalTrafficFlow').rdd.map(lambda row: row[0]).collect())
sendor_ids

In [None]:
file_ids = spark.sql('select distinct _c1 from NormalTrafficFlow').rdd.map(lambda row: row._c1).collect()

In [None]:
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LSTM
from keras.callbacks import Callback
from keras.models import Sequential
from keras.layers import LSTM, Dense, Activation
import time


In [None]:
class LossHistory(Callback):
    def on_train_begin(self, logs={}):
        self.losses = []

    def on_batch_end(self, batch, logs={}):
        self.losses.append(logs.get('loss'))

In [None]:
timesteps = 100
dim = 2
lossHistory = LossHistory()
# design network

model = Sequential()
model.add(LSTM(50,input_shape=(timesteps,dim),return_sequences=True))
model.add(Dense(2))
model.compile(loss='mae', optimizer='adam')

def train(data):
    model.fit(data, data, epochs=20, batch_size=72, validation_data=(data, data), verbose=1, shuffle=False,callbacks=[lossHistory])

def score(data):
    yhat =  model.predict(data)
    return yhat

In [None]:

def create_trimmed_recording(df,file_id):
    recording = np.array(df.orderBy(df['Timestamp']).where(df['Ds_Ref'] == file_id).select('Density','Density_Diff').rdd.map(lambda row: np.array([row.Density,row.Density_Diff])).collect())  
    samples = len(recording)
    print(samples)
    trim = samples % 100
    recording_trimmed = recording[:samples-trim]
    print(recording_trimmed.shape)
    recording_trimmed.shape = (samples//timesteps,timesteps,dim)
    return recording_trimmed

In [None]:
#sensor_ids = spark.sql('select distinct _c1 from df_healthy').rdd.map(lambda row: row._c1).collect()
start = time.time()
for sensor_id in sendor_ids:
    print(sensor_id)
    recording_trimmed = create_trimmed_recording(df_diff,sensor_id)
    print ("Staring training on:" + str(sensor_id))
    train(recording_trimmed)
    print ("Finished training on" + str(sensor_id) + " after " + str(time.time()-start) + " seconds")

print ("Finished job on after " + str(time.time()-start) + " seconds")
healthy_losses = lossHistory.losses

In [None]:
df_E4N = spark.sql('SELECT Timestamp, Ds_Reference,Ds_Ref, Detector_Number, Flow_In, Average_Speed ' 
                  'FROM NormalTrafficFlow WHERE Status == 3 AND Ds_Reference.Road == "E4N"')

In [None]:
#newDF = df_E4N.select("Ds_Reference", "Ds_Reference.*");


In [None]:
#sensor_ids = spark.sql('select Ds_Reference from NormalTrafficFlow').rdd.map(lambda x : x.Ds_Reference ).collect()
#sensor_ids

In [None]:
#df_E4N.count()
df_E4N.show(10)

In [None]:
# Add Density Column
df_E4N_D = df_E4N.withColumn('Density', col('Flow_In')*60/col('Average_Speed'))
df_E4N_D.show(20)

In [None]:

w = Window.partitionBy('Ds_Reference', 'Detector_Number').orderBy('Timestamp')
densDiff = col('Density')- lag('Density', 1).over(w)
timeFmt = 'yyyy-MM-dd HH:mm:ss.SSS'
timeDiff = unix_timestamp('Timestamp', format=timeFmt) - lag(unix_timestamp('Timestamp', format=timeFmt)).over(w)
                              
df_diff = df_E4N_D.withColumn('Density_Diff', densDiff).withColumn('timeDiff', timeDiff)
df_diff.show(20)


In [None]:
import numpy as np
#sensor_ids = np.array(df_diff.select('Ds_Ref').collect())
#file_ids = spark.sql('select distinct _c1 from df_healthy').rdd.map(lambda row: row._c1).collect()
sensor_ids = df_diff.select("Ds_Ref").rdd.flatMap(lambda row : row).collect()

In [None]:
#sensor_ids = spark.sql('select  distinct Ds_Ref from NormalTrafficFlow').rdd.map(lambda x : x._3).collect()
sensor_ids = df_diff.select("Ds_Ref").rdd.flatMap(lambda x: x).collect()

In [None]:
sensor_ids = np.array(df_diff.select('Ds_Ref').rdd.map(lambda row: np.array([row[0]])).collect())

In [None]:
import numpy as np
#sensor_ids = np.array(df_diff.select('Ds_Ref').collect())
#file_ids = spark.sql('select distinct _c1 from df_healthy').rdd.map(lambda row: row._c1).collect()
sensor_ids = df_diff.select("Ds_Ref").rdd.flatMap(lambda row : row).collect()

In [None]:
sensor_ids

In [None]:
sensor_ids = df_diff.select("Ds_Ref").rdd.map(lambda row : row).collect()

In [None]:
def create_trimmed_recording(df,file_id):
    recording = np.array(df.df_diff(df['_c0']).where(df['_c1'] == file_id).select('_c2','_c3').rdd.map(lambda row: np.array([row._c2,row._c3])).collect())  
    samples = len(recording)
    #Ds_Ref
    trim = samples % 100
    recording_trimmed = recording[:samples-trim]
    recording_trimmed.shape = (samples//timesteps,timesteps,dim)
    return recording_trimmed

In [None]:
for sensor_id from sensor_ids:
    trimmed_recording = create_trimmed_record(df_diff, sensor_id)
        recording_trimmed = create_trimmed_recording(df_healthy_read_parquet,file_id)
    print ("Staring training on:" + str(file_id))
    
    train(recording_trimmed)
    print ("Finished training on" + str(file_id) + " after " + str(time.time()-start) + " seconds")

print ("Finished job on after " + str(time.time()-start) + " seconds")
healthy_losses = lossHistory.losses

In [None]:
timesteps = 100
dim = 2
lossHistory = LossHistory()
# design network

model = Sequential()
model.add(LSTM(50,input_shape=(timesteps,dim),return_sequences=True))
model.add(Dense(2))
model.compile(loss='mae', optimizer='adam')

def train(data):
    model.fit(data, data, epochs=20, batch_size=72, validation_data=(data, data), verbose=1, shuffle=False,callbacks=[lossHistory])

def score(data):
    yhat =  model.predict(data)
    return yhat

In [None]:
df_E4N = spark.sql('SELECT Timestamp, Ds_Reference, Detector_Number, Flow_In, Average_Speed ' 
                  'FROM NormalTrafficFlow WHERE Status == 3 AND Ds_Reference.Road == "E4N"')

sensor_ids = spark.sql('select distinct Sensor_ID from df_diff').rdd.map(lambda x : x.Sensor_ID ).collect()


In [None]:
df_raw.withColumn('Ds_Reference', split_ds_ref('Ds_Reference'))

In [None]:
#%%spark -o hist_df
hist_df = spark.createDataFrame(list(zip(list(speed_histogram)[0], list(speed_histogram)[1])), \
             schema=['Status','Status distribution'])

In [None]:
hist_df.set_index('Status').plot(kind='bar')

In [None]:
df_cleanup2.createOrReplaceTempView("FlowData")
df_E4N = spark.sql('SELECT Timestamp, Ds_Reference, Detector_Number, Flow_In, Average_Speed ' 
                  'FROM FlowData WHERE Status == 3 ')
df_E4N = spark.sql('SELECT Timestamp, Ds_Reference, Detector_Number, Flow_In, Average_Speed ' 
                  'FROM FlowData WHERE Status == 1 ')
df_E4N = spark.sql('SELECT Timestamp, Ds_Reference, Detector_Number, Flow_In, Average_Speed ' 
                  'FROM FlowData WHERE Status == 2 ')