In [91]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number
from pyspark.sql import SparkSession

#### **Functions**

- `select()` - seleciona colunas
- `filter()` - filtra linhas

In [92]:
spark = SparkSession.builder \
    .master("local") \
    .appName("RoadTracker") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .getOrCreate()

In [93]:
# load the data
df = spark.read.csv("all_roads.csv", header=True, inferSchema=True)

In [94]:
# analise 1
n_roads = df.select("road").distinct().count()
print("Number of roads: {}".format(n_roads))

Number of roads: 5


In [95]:
# analise 2
n_cars = df.select("plate").distinct().count()
print("Number of cars: {}".format(n_cars))

Number of cars: 228


In [96]:
# CALCULATE SPEED AND ACCELERATION

windowDept = Window.partitionBy("plate").orderBy(col("time").desc())

df = df.withColumn("row",row_number().over(windowDept)) \
        .filter(col("row") <= 3)

# calculo da velocidade
df = df.withColumn("speed", F.col("x") - F.lag("x", 1).over(windowDept))

# make all values positive
df = df.withColumn("speed", F.abs(F.col("speed")))

# calculo da aceleracao
df = df.withColumn("acc", -F.col("speed") + F.lag("speed", 1).over(windowDept))

# drop null values
df = df.na.drop()

# drop row column
df = df.drop("row")

df.show()

+-----+----------+---------+---+---+-----+--------------------+---------+-----+---+
| road|road_speed|road_size|  x|  y|plate|                time|direction|speed|acc|
+-----+----------+---------+---+---+-----+--------------------+---------+-----+---+
|road0|       120|     1000|211|  1|AB574| 1.686406213437755E9|        1|    0|  0|
|road1|       120|     1000|240|  3|AC921|1.6864062615287898E9|        1|   38|-38|
|road2|       120|     1000|870|  5|AD953|1.6864062396293578E9|       -1|   66| -5|
|road1|       120|     1000|548|  9|AG129|1.6864062242716305E9|       -1|   93| -2|
|road4|       120|     1000|225|  1|AG604| 1.686406237883799E9|        1|    0|  0|
|road4|       120|     1000|487|  3|AQ481|1.6864062349618049E9|        1|   86|  1|
|road4|       120|     1000|232|  0|AQ887|1.6864062389233131E9|        1|   73| -3|
|road2|       120|     1000|608|  6|AS492|1.6864062400063767E9|       -1|    0|  0|
|road4|       120|     1000|858|  8|AS757|  1.68640622600074E9|       -1|   

In [97]:
# analise 3

# add a column for the cars over the speed limit
df = df.withColumn("over_speed_limit", F.when(F.col("speed") > F.col("road_speed"), 1).otherwise(0))

cars_over_speed_limit = df.filter(F.col("over_speed_limit") == 1) \
    .select("plate").distinct().count()

print("Number of cars over the speed limit: {}".format(cars_over_speed_limit))

Number of cars over the speed limit: 4


In [98]:
# DF DE RISCO DE COLISÃO
windowDept = Window.partitionBy("road", "y").orderBy("x")

# calcula o risco de colisao fazendo posicao + (velocidade * direcao)

# essa versao funciona, mas adiciona o risco de colisao em carros diferentes dependendo de forward ou backward
#df = df.withColumn("collision_risk",
                #F.when((F.col("x") + (F.col("speed") * F.col("direction"))) > (F.lag("x", -1).over(windowDept) + (F.lag("speed", -1).over(windowDept) * F.lag("direction", -1).over(windowDept))), 1).otherwise(0))

# essa versao padroniza           
df = df.withColumn("collision_risk",
                   F.when(F.col("direction") == 1,
                          F.when((F.col("x") + F.col("speed") + F.col("acc")) > (F.lag("x", -1).over(windowDept) + F.lag("speed", -1).over(windowDept) + F.lag("acc",-1).over(windowDept)), 1).otherwise(0)) \
                   .otherwise(F.when((F.col("x") - F.col("speed") - F.col("acc")) < (F.lag("x", 1).over(windowDept) - F.lag("speed", 1).over(windowDept) - F.lag("acc", 1).over(windowDept)), 1).otherwise(0)))

df.show()

+-----+----------+---------+---+---+-----+--------------------+---------+-----+---+----------------+--------------+
| road|road_speed|road_size|  x|  y|plate|                time|direction|speed|acc|over_speed_limit|collision_risk|
+-----+----------+---------+---+---+-----+--------------------+---------+-----+---+----------------+--------------+
|road0|       120|     1000|193|  0|AW987| 1.686406249730599E9|        1|    0|  0|               0|             0|
|road0|       120|     1000|193|  0|FB448|1.6864062490415573E9|        1|    0|  0|               0|             0|
|road0|       120|     1000|193|  0|ME801|1.6864062275076733E9|        1|    0|  0|               0|             0|
|road0|       120|     1000|193|  0|NZ856|1.6864062310521214E9|        1|    0|  0|               0|             0|
|road0|       120|     1000|193|  0|RY127|1.6864062278370519E9|        1|    0|  0|               0|             0|
|road0|       120|     1000|211|  1|AB574| 1.686406213437755E9|        1

In [99]:
# analise 4

cars_collision_risk = df.filter(F.col("collision_risk") == 1) \
    .select("plate").distinct().count()

print("Number of cars with collision risk: {}".format(cars_collision_risk))

Number of cars with collision risk: 4


In [100]:
# analise 5
CollectionOverSpeedLimit = df.filter(F.col("over_speed_limit") == 1) \
                        .select("plate", "speed", "collision_risk") \
                        .collect()

CollectionOverSpeedLimit

[Row(plate='EV158', speed=205, collision_risk=0),
 Row(plate='TH675', speed=183, collision_risk=0),
 Row(plate='MT869', speed=122, collision_risk=0),
 Row(plate='IR559', speed=126, collision_risk=0)]

In [101]:
# analise 6
CollectionCollisionRisk = df.filter(F.col("collision_risk") == 1) \
                .select("plate", "speed") \
                .collect()
                
CollectionCollisionRisk

[Row(plate='EI774', speed=85),
 Row(plate='BW901', speed=99),
 Row(plate='SW786', speed=64),
 Row(plate='KL664', speed=74)]