# Congestion detection
Atousa Zarindast

In [4]:
import numpy as np
import pandas as pd
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession


In [149]:
df_pd=pd.read_csv('./Cong_interstate/1485903433.csv',header=None)
df_pd.columns=['Segment',
                                      'C_value',
                                      'segco',
                                      'Score',
                                      'Speed',
                                      'Average',
                                          'Ref',
                                         'Travel',
                                          'time',
              'ct']

In [150]:
df_pd=df_pd[['Segment','C_value','Speed','Ref','time']]

In [151]:
df_pd.head()

Unnamed: 0,Segment,C_value,Speed,Ref,time
0,1485903433,,53.0,53.0,2018-07-01 00:09:10 CDT
1,1485903433,,53.0,53.0,2018-07-01 00:10:04 CDT
2,1485903433,59.0,53.0,53.0,2018-07-01 00:11:09 CDT
3,1485903433,59.0,53.0,53.0,2018-07-01 00:12:00 CDT
4,1485903433,59.0,53.0,53.0,2018-07-01 00:12:00 CDT


In [152]:
df = spark.createDataFrame(df_pd)


In [128]:
df.show(n=5)

+----------+-------+-----+----+--------------------+
|   Segment|C_value|Speed| Ref|                time|
+----------+-------+-----+----+--------------------+
|1485903433|    NaN| 53.0|53.0|2018-07-01 00:09:...|
|1485903433|    NaN| 53.0|53.0|2018-07-01 00:10:...|
|1485903433|   59.0| 53.0|53.0|2018-07-01 00:11:...|
|1485903433|   59.0| 53.0|53.0|2018-07-01 00:12:...|
|1485903433|   59.0| 53.0|53.0|2018-07-01 00:12:...|
+----------+-------+-----+----+--------------------+
only showing top 5 rows



In [13]:
from pyspark import SparkConf
conf = SparkConf()

spark = SparkSession.builder \
        .config(conf=conf) \
        .appName('Dataframe with Indexes') \
        .getOrCreate()

In [129]:
from pyspark.sql import Window
from pyspark.sql import SparkSession, functions as F
# the window is necessary here because row_number is a windowing function
# that means you can have row_number run over some amount of your data
# we'll be currently running it over the sorted by column1 data, row per row - our window will be of size 2 (rows),
# the whole dataframe that is.
window = Window.orderBy(F.col('time'))



In [131]:
df = df.withColumn('row_number', F.row_number().over(window))

In [132]:
df.show(5)

+----------+-------+-----+----+--------------------+----------+
|   Segment|C_value|Speed| Ref|                time|row_number|
+----------+-------+-----+----+--------------------+----------+
|1485903433|    NaN| 53.0|53.0|2018-01-10 00:02:...|         1|
|1485903433|    NaN| 53.0|53.0|2018-01-10 00:08:...|         2|
|1485903433|  100.0| 50.0|53.0|2018-01-10 00:09:...|         3|
|1485903433|  100.0| 50.0|53.0|2018-01-10 00:10:...|         4|
|1485903433|  100.0| 50.0|53.0|2018-01-10 00:14:...|         5|
+----------+-------+-----+----+--------------------+----------+
only showing top 5 rows



String date to timestamp

In [180]:
from pyspark.sql.functions import col, udf, to_timestamp
date_format = "yyyy-MM-dd HH:mm:ss"
df_date=df.withColumn("timestamp", to_timestamp("time", date_format))

Criteria of congestion with col D which is speed below 54

In [182]:
from pyspark.sql import functions as f
df_date=df_date.withColumn('D', f.when(f.col('Speed') < 54, 1).otherwise(0))

In [184]:
from pyspark.sql import Window as Window
w = Window.partitionBy("Segment").orderBy("timestamp")
from pyspark.sql import functions as func



In [206]:
df_date.select('timestamp','D').show(10)

+-------------------+---+
|          timestamp|  D|
+-------------------+---+
|2018-07-01 00:09:10|  1|
|2018-07-01 00:10:04|  1|
|2018-07-01 00:11:09|  1|
|2018-07-01 00:12:00|  1|
|2018-07-01 00:12:00|  1|
|2018-07-01 00:12:07|  1|
|2018-07-01 00:13:23|  0|
|2018-07-01 00:14:00|  0|
|2018-07-01 00:14:06|  1|
|2018-07-01 00:15:03|  1|
+-------------------+---+
only showing top 10 rows



make second wise timestamp

In [236]:
df_try = df_date.withColumn('timestamp_long',df_date.timestamp.astype('Timestamp').cast("long"))


+----------+-------+-----+----+--------------------+---+-------------------+--------------+
|   Segment|C_value|Speed| Ref|                time|  D|          timestamp|timestamp_long|
+----------+-------+-----+----+--------------------+---+-------------------+--------------+
|1485903433|    NaN| 53.0|53.0|2018-07-01 00:09:...|  1|2018-07-01 00:09:10|    1530403750|
|1485903433|    NaN| 53.0|53.0|2018-07-01 00:10:...|  1|2018-07-01 00:10:04|    1530403804|
|1485903433|   59.0| 53.0|53.0|2018-07-01 00:11:...|  1|2018-07-01 00:11:09|    1530403869|
|1485903433|   59.0| 53.0|53.0|2018-07-01 00:12:...|  1|2018-07-01 00:12:00|    1530403920|
|1485903433|   59.0| 53.0|53.0|2018-07-01 00:12:...|  1|2018-07-01 00:12:00|    1530403920|
+----------+-------+-----+----+--------------------+---+-------------------+--------------+
only showing top 5 rows



In [237]:
df_date.select('timestamp','D').show(20)

+-------------------+---+
|          timestamp|  D|
+-------------------+---+
|2018-07-01 00:09:10|  1|
|2018-07-01 00:10:04|  1|
|2018-07-01 00:11:09|  1|
|2018-07-01 00:12:00|  1|
|2018-07-01 00:12:00|  1|
|2018-07-01 00:12:07|  1|
|2018-07-01 00:13:23|  0|
|2018-07-01 00:14:00|  0|
|2018-07-01 00:14:06|  1|
|2018-07-01 00:15:03|  1|
|2018-07-01 00:16:00|  1|
|2018-07-01 00:16:01|  1|
|2018-07-01 00:17:05|  1|
|2018-07-01 00:18:00|  1|
|2018-07-01 00:18:00|  1|
|2018-07-01 00:18:01|  1|
|2018-07-01 00:19:05|  1|
|2018-07-01 00:20:00|  1|
|2018-07-01 00:20:00|  1|
|2018-07-01 00:20:01|  1|
+-------------------+---+
only showing top 20 rows



Between 5 min interval count the number of congestion criteria

In [238]:
w2 = Window.partitionBy('Segment').orderBy('timestamp_long').rangeBetween(-60*5,0)

df_try = df_try.withColumn('occurrences_in_5_min',F.count('D').over(w2))
df_try.select('timestamp','occurrences_in_5_min').show(10)

+-------------------+--------------------+
|          timestamp|occurrences_in_5_min|
+-------------------+--------------------+
|2018-01-10 00:02:08|                   1|
|2018-01-10 00:08:08|                   1|
|2018-01-10 00:09:07|                   2|
|2018-01-10 00:10:06|                   3|
|2018-01-10 00:14:07|                   3|
|2018-01-10 00:15:04|                   3|
|2018-01-10 00:16:04|                   3|
|2018-01-10 00:17:04|                   4|
|2018-01-10 00:18:04|                   5|
|2018-01-10 00:19:06|                   6|
+-------------------+--------------------+
only showing top 10 rows



if count of congestion criteria whitin 5 min is above 5 give me an indicator (this defines whether if all the minutes in that interval are congested continuesly)

In [239]:
from pyspark.sql.types import *
add_bool = udf(lambda col: 1 if col>5 else False, BooleanType())
df_try = df_try.withColumn('already_occured',add_bool('occurrences_in_5_min'))
df_try.select('timestamp','already_occured').show(10)

+-------------------+---------------+
|          timestamp|already_occured|
+-------------------+---------------+
|2018-01-10 00:02:08|          false|
|2018-01-10 00:08:08|          false|
|2018-01-10 00:09:07|          false|
|2018-01-10 00:10:06|          false|
|2018-01-10 00:14:07|          false|
|2018-01-10 00:15:04|          false|
|2018-01-10 00:16:04|          false|
|2018-01-10 00:17:04|          false|
|2018-01-10 00:18:04|          false|
|2018-01-10 00:19:06|           null|
+-------------------+---------------+
only showing top 10 rows



Turn the true/false to boolian as indicator col

In [240]:
add_bool_1 = udf(lambda col: 1 if col>5 else 0, IntegerType())
df_try = df_try.withColumn('indicator',add_bool_1('occurrences_in_5_min'))


+-------------------+---------+
|          timestamp|indicator|
+-------------------+---------+
|2018-01-10 00:02:08|        0|
|2018-01-10 00:08:08|        0|
|2018-01-10 00:09:07|        0|
|2018-01-10 00:10:06|        0|
|2018-01-10 00:14:07|        0|
|2018-01-10 00:15:04|        0|
|2018-01-10 00:16:04|        0|
|2018-01-10 00:17:04|        0|
|2018-01-10 00:18:04|        0|
|2018-01-10 00:19:06|        1|
+-------------------+---------+
only showing top 10 rows



Introduce col p which is the time that congestion started (start_time)

In [351]:
from pyspark.sql import functions as f
w4 = Window.partitionBy('Segment').orderBy('timestamp')
from pyspark.sql.functions import col, expr, when

In [353]:
df_try_2=df_try.withColumn("p",when((df_try.indicator==1),f.lag(df_try.timestamp,5).over(w4)).otherwise("null"))
                                
                                

In [354]:
df_try_2.select('timestamp','indicator','p').show(20)

+-------------------+---------+-------------------+
|          timestamp|indicator|                  p|
+-------------------+---------+-------------------+
|2018-01-10 00:02:08|        0|               null|
|2018-01-10 00:08:08|        0|               null|
|2018-01-10 00:09:07|        0|               null|
|2018-01-10 00:10:06|        0|               null|
|2018-01-10 00:14:07|        0|               null|
|2018-01-10 00:15:04|        0|               null|
|2018-01-10 00:16:04|        0|               null|
|2018-01-10 00:17:04|        0|               null|
|2018-01-10 00:18:04|        0|               null|
|2018-01-10 00:19:06|        1|2018-01-10 00:14:07|
|2018-01-10 00:20:06|        0|               null|
|2018-01-10 00:22:08|        0|               null|
|2018-01-10 00:23:13|        0|               null|
|2018-01-10 00:24:01|        0|               null|
|2018-01-10 00:25:00|        1|2018-01-10 00:19:06|
|2018-01-10 00:25:00|        1|2018-01-10 00:20:06|
|2018-01-10 

Puting start time and end time altogether please note that congestion criteria here is speed below 54 for consequative 5 min

In [362]:
df_try_2.filter(df_try_2.indicator==1).select('p','timestamp').show(10)


+-------------------+-------------------+
|                  p|          timestamp|
+-------------------+-------------------+
|2018-01-10 00:14:07|2018-01-10 00:19:06|
|2018-01-10 00:19:06|2018-01-10 00:25:00|
|2018-01-10 00:20:06|2018-01-10 00:25:00|
|2018-01-10 00:22:08|2018-01-10 00:25:00|
|2018-01-10 00:23:13|2018-01-10 00:25:01|
|2018-01-10 00:33:05|2018-01-10 00:37:07|
|2018-01-10 00:34:00|2018-01-10 00:38:05|
|2018-01-10 00:34:05|2018-01-10 00:39:05|
|2018-01-10 00:35:04|2018-01-10 00:40:04|
|2018-01-10 00:36:11|2018-01-10 00:41:06|
+-------------------+-------------------+
only showing top 10 rows



Algorithm done

# RC detection based on probabilities

Separating date and time

In [417]:
from pyspark.sql.functions import date_format
data=df_try_2.withColumn("date",date_format('timestamp', 'yyyy-MM-dd')).withColumn("time_separated",date_format('timestamp', 'HH:mm:ss'))
 

In [392]:
data.select('indicator','time','date','timestamp','time_separated').show(4)

+---------+--------------------+----------+-------------------+--------------+
|indicator|                time|      date|          timestamp|time_separated|
+---------+--------------------+----------+-------------------+--------------+
|        0|2018-01-10 00:02:...|2018-01-10|2018-01-10 00:02:08|      00:02:08|
|        0|2018-01-10 00:08:...|2018-01-10|2018-01-10 00:08:08|      00:08:08|
|        0|2018-01-10 00:09:...|2018-01-10|2018-01-10 00:09:07|      00:09:07|
|        0|2018-01-10 00:10:...|2018-01-10|2018-01-10 00:10:06|      00:10:06|
+---------+--------------------+----------+-------------------+--------------+
only showing top 4 rows



In [399]:
data.groupBy("time_separated").agg(f.sum('indicator').alias("occourance_p")).show(10)

+--------------+------------+
|time_separated|occourance_p|
+--------------+------------+
|      03:50:07|          20|
|      05:00:08|          14|
|      05:11:20|           0|
|      09:19:23|           1|
|      15:50:05|          23|
|      19:24:06|          19|
|      00:41:00|          21|
|      10:12:05|          18|
|      16:49:10|           7|
|      20:08:09|          15|
+--------------+------------+
only showing top 10 rows



In [420]:
data.select('time_separated','date','indicator').show(5)

+--------------+----------+---------+
|time_separated|      date|indicator|
+--------------+----------+---------+
|      00:02:08|2018-01-10|        0|
|      00:08:08|2018-01-10|        0|
|      00:09:07|2018-01-10|        0|
|      00:10:06|2018-01-10|        0|
|      00:14:07|2018-01-10|        0|
+--------------+----------+---------+
only showing top 5 rows



In [423]:
time_partition= Window.partitionBy('time_separated')
data1=data.withColumn('occurance', f.sum('indicator').over(time_partition))


In [424]:
data1.select('date','occurance').show(4)

+----------+---------+
|      date|occurance|
+----------+---------+
|2018-11-14|        0|
|2018-05-16|        1|
|2018-04-27|        6|
|2018-04-30|        6|
+----------+---------+
only showing top 4 rows



In [427]:
time_partition= Window.partitionBy('time_separated')
data1=data1.withColumn('occurance_all', f.count('date').over(time_partition))



In [431]:
data1.schema

StructType(List(StructField(Segment,LongType,true),StructField(C_value,DoubleType,true),StructField(Speed,DoubleType,true),StructField(Ref,DoubleType,true),StructField(time,StringType,true),StructField(D,IntegerType,false),StructField(timestamp,TimestampType,true),StructField(timestamp_long,LongType,true),StructField(occurrences_in_5_min,LongType,false),StructField(already_occured,BooleanType,true),StructField(indicator,IntegerType,true),StructField(p,StringType,true),StructField(date,StringType,true),StructField(time_separated,StringType,true),StructField(occurance,LongType,true),StructField(occurance_p,LongType,false),StructField(occurance_all,LongType,false)))

In [433]:
data1.withColumn('occurance_p',f.col('occurance')/f.col('occurance_all')).select('occurance','occurance_all','occurance_p','date','time_separated').show(5)

+---------+-------------+------------------+----------+--------------+
|occurance|occurance_all|       occurance_p|      date|time_separated|
+---------+-------------+------------------+----------+--------------+
|        0|            1|               0.0|2018-11-14|      00:01:48|
|        1|            1|               1.0|2018-05-16|      00:07:15|
|        6|           11|0.5454545454545454|2018-04-27|      00:29:10|
|        6|           11|0.5454545454545454|2018-04-30|      00:29:10|
|        6|           11|0.5454545454545454|2018-05-01|      00:29:10|
+---------+-------------+------------------+----------+--------------+
only showing top 5 rows



In [434]:
data2=data1.withColumn('occurance_p',f.col('occurance')/f.col('occurance_all'))

In [445]:
data2.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("spark.csv")

# finding RC
for timetamps with congested probability higher than 0.2 we identify continues congestion for concurrent 5 min and report start and end time
this would be combination of previous two step, as such that instead of just having congestion criteria we care for congested for continues 5 min and probability higher than 0.2 for that time period as well

Probability in 5 min

In [460]:
w2 = Window.partitionBy('Segment').orderBy('timestamp_long').rangeBetween(-60*5,0)

data3 = data2.withColumn('prob_in_5_min',F.sum('occurance_p').over(w2))

In [483]:
data4 = data3.withColumn('RC_indicator', f.when((data3.occurrences_in_5_min >5)& (data3.prob_in_5_min>1),1).otherwise(0) )




In [479]:
data4.select('occurrences_in_5_min','prob_in_5_min','time_separated','RC_indicator').show(10)

+--------------------+------------------+--------------+------------+
|occurrences_in_5_min|     prob_in_5_min|time_separated|RC_indicator|
+--------------------+------------------+--------------+------------+
|                   1|0.3333333333333333|      00:02:08|           0|
|                   1|              0.64|      00:08:08|           0|
|                   2| 1.281025641025641|      00:09:07|           0|
|                   3|1.6976923076923078|      00:10:06|           0|
|                   3|1.7984330484330484|      00:14:07|           0|
|                   3|2.0487117552334944|      00:15:04|           0|
|                   3| 2.401275857797597|      00:16:04|           0|
|                   4|3.0499245064462457|      00:17:04|           0|
|                   5|3.9165911731129124|      00:18:04|           0|
|                   6|4.5984093549310945|      00:19:06|           1|
+--------------------+------------------+--------------+------------+
only showing top 10 

showing the time that RC defition happend with start and end

In [485]:
w4 = Window.partitionBy('Segment').orderBy('timestamp')
data4=data4.withColumn("RC_start_time",when((data4.RC_indicator==1),f.lag(data4.timestamp,5).over(w4)).otherwise("null"))

data4.filter(data4.RC_indicator==1).select('RC_start_time','timestamp').show(10)





+-------------------+-------------------+
|      RC_start_time|          timestamp|
+-------------------+-------------------+
|2018-01-10 00:14:07|2018-01-10 00:19:06|
|2018-01-10 00:19:06|2018-01-10 00:25:00|
|2018-01-10 00:20:06|2018-01-10 00:25:00|
|2018-01-10 00:22:08|2018-01-10 00:25:00|
|2018-01-10 00:23:13|2018-01-10 00:25:01|
|2018-01-10 00:33:05|2018-01-10 00:37:07|
|2018-01-10 00:34:00|2018-01-10 00:38:05|
|2018-01-10 00:34:05|2018-01-10 00:39:05|
|2018-01-10 00:35:04|2018-01-10 00:40:04|
|2018-01-10 00:36:11|2018-01-10 00:41:06|
+-------------------+-------------------+
only showing top 10 rows



In [487]:
data4.when(data4.RC_indicator==1,agg(f.mean('Speed')).otherwise(0)

+---------+
|avg_speed|
+---------+
|      NaN|
+---------+

