In [291]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, countDistinct
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

In [292]:
# def init_spark():
#     mongo_conn = "mongodb://mongo:27017"
#     conf = SparkConf()
#     conf.set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1")
#     conf.set("spark.write.connection.uri", mongo_conn)
#     conf.set("spark.mongodb.write.database", "roadtracker")
#     conf.set("spark.mongodb.write.collection", "historical1")
    
#     sc = SparkContext.getOrCreate(conf=conf)
    
#     return SparkSession() \
#         .builder \
#         .appName("RoadTracker") \
#         .getOrCreate()

# # spark = init_spark()

In [293]:
spark = SparkSession.builder \
    .master("local") \
    .appName("HistoricalRoadTracker") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .getOrCreate()

In [294]:
# load the data
df = spark.read.csv("all_roads.csv", header=True, inferSchema=True)

# read a parquet file
# df = spark.read.parquet("1686522436.8648.parquet", header=True, inferSchema=True)

In [295]:
# historical 1
dfRoadCount = df.groupBy("plate").agg(countDistinct('road')).withColumnRenamed("count(road)", "road_count")

# get the top 100
dfRoadCount = dfRoadCount.orderBy(col("road_count").desc()).limit(100)
dfRoadCount.show()

+-----+----------+
|plate|road_count|
+-----+----------+
|EK016|         1|
|OB699|         1|
|EJ199|         1|
|KB351|         1|
|NN265|         1|
|GB035|         1|
|BH673|         1|
|IA523|         1|
|LW495|         1|
|PA873|         1|
|WE779|         1|
|WK061|         1|
|JK992|         1|
|UD451|         1|
|GN979|         1|
|AZ064|         1|
|UH029|         1|
|JY708|         1|
|ED249|         1|
|ZI913|         1|
+-----+----------+
only showing top 20 rows



In [296]:
# CALCULATE SPEED AND ACCELERATION

# calculate all speeds achieved by each car
windowDept = Window.partitionBy("plate").orderBy(col("time").desc())
dfCalcs = df.withColumn("row",row_number().over(windowDept))

# calc all speeds
dfCalcs = dfCalcs.withColumn("speed", F.col("x") - F.lag("x", -1).over(windowDept))

# make all values positive
dfCalcs = dfCalcs.withColumn("speed", F.abs(F.col("speed")))

# calc all accs
dfCalcs = dfCalcs.withColumn("acc", F.col("speed") - F.lag("speed", -1).over(windowDept))

# drop nulls and row column
dfCalcs = dfCalcs.na.drop()
dfCalcs = dfCalcs.drop("row")

dfCalcs.show()

+-----+----------+---------+-----+---+-----+--------------------+---------+-----+----+
| road|road_speed|road_size|    x|  y|plate|                time|direction|speed| acc|
+-----+----------+---------+-----+---+-----+--------------------+---------+-----+----+
|road4|       170|    87097|86876| 11|AB443|1.6872867809014084E9|       -1|   75|   2|
|road6|       135|    86558|  305|  3|AH354|1.6872867894516697E9|        1|    0|-101|
|road6|       135|    86558|  305|  4|AH354|1.6872867851617193E9|        1|  101|  -1|
|road6|       135|    86558|  530|  0|AI125|1.6872867889800618E9|        1|  106|   1|
|road6|       135|    86558|  424|  0|AI125|1.6872867582297566E9|        1|  105|   2|
|road6|       135|    86558|  319|  0|AI125| 1.687286732575757E9|        1|  103|  -5|
|road3|       130|    70900|  120|  0|AI224|1.6872867978729136E9|        1|    0|   0|
|road3|       130|    70900|  120|  0|AI224|1.6872867652387855E9|        1|    0|   0|
|road3|       130|    70900|  120|  0|AI224

In [297]:
# historical 2

# get average speed per road
dfStats = dfCalcs.groupBy("road").avg("speed", "road_size")\
            .withColumnRenamed("avg(speed)", "avg_speed")\
            .withColumnRenamed("avg(road_size)", "road_size")

# calculate avg time to cross
dfStats = dfStats.withColumn("avg_time_to_cross", F.col( "road_size") / F.col("avg_speed"))
dfStats = dfStats.select("road", "avg_speed", "avg_time_to_cross")

# get rows where speed = 0 and acc = 0 (collisions)
dfCollisions = dfCalcs.filter((F.col("speed") == 0) & (F.col("acc") == 0))

# group by road and count
dfCollisions = dfCollisions.groupBy("road").count().withColumnRenamed("count", "total_collisions")

# join the dataframes to get all stats
dfStats = dfStats.join(dfCollisions, "road", "left")

In [298]:
# historical 3

# partition by plate and order by time (twice to have ascending and descending row numbers)
windowDept = Window.partitionBy("plate").orderBy(col("time").desc())
windowDept2 = Window.partitionBy("plate").orderBy(col("time").asc())

# create rows columns
dfSpeeds = dfCalcs.withColumn("row",row_number().over(windowDept))
dfSpeeds = dfSpeeds.withColumn("row2",row_number().over(windowDept2))

# check where speed is greater than 120 and the previous speed was less than road_speed (that is, new infraction)
dfSpeeds = dfSpeeds.withColumn("change_in_speed",
                   F.when(((F.col("speed") > F.col("road_speed")) & (F.lag("speed", -1).over(windowDept) <= F.lag("road_speed", -1).over(windowDept) )) , 1) \
                   .otherwise(0))

# check for vehicles that enter a road with speed > road_speed (infraction)
dfSpeeds = dfSpeeds.withColumn("change_in_speed",
                     F.when(((F.col("speed") > F.col("road_speed")) & (F.col("row2") ==1)), 1) \
                        .otherwise(F.col("change_in_speed")))

# chosen T (change it after testing)
t = 2500000000

# get all rows where now() - time < t
dfSpeeds = dfSpeeds.withColumn("past_time", F.unix_timestamp(F.current_timestamp()).cast("double"))
dfSpeeds = dfSpeeds.withColumn("diff_time", F.col("past_time") - F.col("time"))
dfSpeeds = dfSpeeds.filter(F.col("diff_time") < t)

#  check which cars have more than 10 infractions
dfInfractions = dfSpeeds.groupBy("plate").sum("change_in_speed") \
   .withColumnRenamed("sum(change_in_speed)", "total_infractions").filter(F.col("total_infractions") >= 10)

In [299]:
# historical 3

# partition by plate and order by time (twice to have ascending and descending row numbers)
windowDept = Window.partitionBy("plate").orderBy(col("time").asc())

# create rows columns
dfSpeeds = dfCalcs.withColumn("row",row_number().over(windowDept))

# check where speed is greater than road_speed and the previous speed was less than road_speed (that is, new infraction)
dfSpeeds = dfSpeeds.withColumn("change_in_speed",
                   F.when(((F.col("speed") > F.col("road_speed")) & (F.lag("speed", 1).over(windowDept) <= F.col("road_speed"))) , 1) \
                   .otherwise(0))

# check for vehicles that enter a road with speed > road_speed (infraction)
dfSpeeds = dfSpeeds.withColumn("change_in_speed",
                     F.when(((F.col("speed") > F.col("road_speed")) & (F.col("row") ==1)), 1) \
                        .otherwise(F.col("change_in_speed")))

dfSpeeds.show(400)

# chosen T (change it after testing)
t = 2500000000

# get all rows where now() - time < t
dfSpeeds = dfSpeeds.withColumn("past_time", F.unix_timestamp(F.current_timestamp()).cast("double"))
dfSpeeds = dfSpeeds.withColumn("diff_time", F.col("past_time") - F.col("time"))
dfSpeeds = dfSpeeds.filter(F.col("diff_time") < t)

#  check which cars have more than 10 infractions
dfInfractions = dfSpeeds.groupBy("plate").sum("change_in_speed") \
   .withColumnRenamed("sum(change_in_speed)", "total_infractions").filter(F.col("total_infractions") >= 10)

+-----+----------+---------+-----+---+-----+--------------------+---------+-----+----+---+---------------+
| road|road_speed|road_size|    x|  y|plate|                time|direction|speed| acc|row|change_in_speed|
+-----+----------+---------+-----+---+-----+--------------------+---------+-----+----+---+---------------+
|road4|       170|    87097|86876| 11|AB443|1.6872867809014084E9|       -1|   75|   2|  1|              0|
|road6|       135|    86558|  305|  4|AH354|1.6872867851617193E9|        1|  101|  -1|  1|              0|
|road6|       135|    86558|  305|  3|AH354|1.6872867894516697E9|        1|    0|-101|  2|              0|
|road6|       135|    86558|  319|  0|AI125| 1.687286732575757E9|        1|  103|  -5|  1|              0|
|road6|       135|    86558|  424|  0|AI125|1.6872867582297566E9|        1|  105|   2|  2|              0|
|road6|       135|    86558|  530|  0|AI125|1.6872867889800618E9|        1|  106|   1|  3|              0|
|road3|       130|    70900|  120|  0

In [300]:
# analise alternativa
windowDept = Window.partitionBy("plate").orderBy(col("time").desc())
dfCalcs = dfCalcs.withColumn("row",row_number().over(windowDept))

# create column "changed_y" that is 1 if the car changed y and 0 otherwise
dfCalcs = dfCalcs.withColumn("changed_lane",
                     F.when(((F.col("y") != F.lag("y", -1).over(windowDept))), 1) \
                        .otherwise(0))

# create column "over_road_speed" that is 1 if the car is over the road speed and 0 otherwise
dfCalcs = dfCalcs.withColumn("over_road_speed",
                        F.when(((F.col("speed") > F.col("road_speed"))), 1) \
                        .otherwise(0))

# create column "over_acc" that is 1 if (acc>40 or acc<-40) and 0 otherwise
dfCalcs = dfCalcs.withColumn("over_acc",
                        F.when(((F.col("acc") > 40) | (F.col("acc") < -40)), 1) \
                        .otherwise(0))

# create a column "total" that is the sum of the previous columns for each car
# we consider that changing lane 3 times counts as 1 case of unsafe driving
dfCalcs = dfCalcs.withColumn("total", F.col("changed_lane") / 4 + F.col("over_road_speed") + F.col("over_acc")).filter(F.col("total") > 0)
dfCalcs = dfCalcs.withColumn('cum_sum', F.sum('total').over(windowDept.rangeBetween(Window.unboundedPreceding, 0)))

# for each cumsum higher than 10, check cumsum-10 to see if time is in desired interval

dfCalcs.show()

# get all rows where now() - time < t (THIS ISNT WHAT WE WANT, TEMPORARY)
dfCalcs = dfCalcs.withColumn("past_time", F.unix_timestamp(F.current_timestamp()).cast("double"))
dfCalcs = dfCalcs.withColumn("diff_time", F.col("past_time") - F.col("time"))
dfCalcs = dfCalcs.filter(F.col("diff_time") < t)

# for each car compute the sum of the column "total" and filter the ones that are greater than 4
dfCalcs = dfCalcs.groupBy("plate").sum("total").withColumnRenamed("sum(total)", "total_infractions").filter(F.col("total_infractions") >= 4)

#dfCalcs.collect()
dfCalcs.select("plate").show()

+-----+----------+---------+-----+---+-----+--------------------+---------+-----+----+---+------------+---------------+--------+-----+-------+
| road|road_speed|road_size|    x|  y|plate|                time|direction|speed| acc|row|changed_lane|over_road_speed|over_acc|total|cum_sum|
+-----+----------+---------+-----+---+-----+--------------------+---------+-----+----+---+------------+---------------+--------+-----+-------+
|road6|       135|    86558|  305|  3|AH354|1.6872867894516697E9|        1|    0|-101|  1|           1|              0|       1| 1.25|   1.25|
|road8|       149|    72331|  126|  3|AI433|1.6872867680793958E9|        1|    0| -63|  2|           0|              0|       1|  1.0|    1.0|
|road5|       152|    59840|59674| 14|AZ064|1.6872867795049994E9|       -1|   83|  83|  1|           0|              0|       1|  1.0|    1.0|
|road5|       152|    59840|59565| 15|BO582|1.6872867866547756E9|       -1|  139|  71|  1|           0|              0|       1|  1.0|    1.0|