In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import (
    concat,
    count,
    col,
    from_csv,
    lit,
    count,
    max,
    first,
    struct,
    first,
    last,
    unix_timestamp,
    avg
)

spark = SparkSession.builder.appName("EsempioStatico").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/04 17:48:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
options = {"sep": ","}
schema = "targa INT, varco INT, corsia DOUBLE, timestamp TIMESTAMP, nazione STRING"

df = spark.read.option("header",True) \
    .schema(schema) \
    .csv("../data/feb2016/01.02.2016_cleaned.csv")

df.printSchema()
df.show()

root
 |-- targa: integer (nullable = true)
 |-- varco: integer (nullable = true)
 |-- corsia: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- nazione: string (nullable = true)

+-------+-----+------+-------------------+-------+
|  targa|varco|corsia|          timestamp|nazione|
+-------+-----+------+-------------------+-------+
|  32540|   27|   1.0|2022-12-04 00:00:04|      ?|
|1773531|    8|   1.0|2022-12-04 00:00:05|      I|
|2699969|   21|   1.0|2022-12-04 00:00:05|      I|
|  76105|   27|   1.0|2022-12-04 00:00:06|      I|
| 237933|   12|   1.0|2022-12-04 00:00:08|    SLO|
| 440737|   21|   2.0|2022-12-04 00:00:08|      I|
|9466795|    9|   2.0|2022-12-04 00:00:08|      I|
|1345761|    5|   1.0|2022-12-04 00:00:08|      I|
|5822211|    8|   1.0|2022-12-04 00:00:10|      ?|
|7150472|    4|   1.0|2022-12-04 00:00:11|      ?|
| 340490|    8|   1.0|2022-12-04 00:00:12|      I|
|7456605|   19|   1.0|2022-12-04 00:00:14|      ?|
| 215748|    5|   1.0|2022-12-04

                                                                                

In [4]:
# Immette i tratti autostradali
tratti = [
    (27, 9, 8.48),
    (9, 26, 17.42),
    (26, 10, 6.0),
    (10, 18, 12.3),
    (18, 23, 14.0),
    (23, 15, 17.6),
    (15, 5, 7.7),
    (5, 8, 10.9),
    (8, 3, 6.9),
    (3, 13, 9.8),
    (22, 1, 10.6),
    (1, 12, 10.9),
    (12, 25, 7.7),
    (25, 20, 17.7),
    (20, 2, 13.8),
    (2, 16, 14.1),
    (16, 4, 14.0),
    (4, 21, 25.7),
]

tratti_schema = StructType(
    [
        StructField("ingresso", IntegerType()),
        StructField("uscita", IntegerType()),
        StructField("lunghezza", DoubleType())
    ]
)

df_tratti = spark.createDataFrame(data=tratti, schema=tratti_schema).cache()
df_tratti.show()

[Stage 1:>                                                          (0 + 1) / 1]

+--------+------+---------+
|ingresso|uscita|lunghezza|
+--------+------+---------+
|      27|     9|     8.48|
|       9|    26|    17.42|
|      26|    10|      6.0|
|      10|    18|     12.3|
|      18|    23|     14.0|
|      23|    15|     17.6|
|      15|     5|      7.7|
|       5|     8|     10.9|
|       8|     3|      6.9|
|       3|    13|      9.8|
|      22|     1|     10.6|
|       1|    12|     10.9|
|      12|    25|      7.7|
|      25|    20|     17.7|
|      20|     2|     13.8|
|       2|    16|     14.1|
|      16|     4|     14.0|
|       4|    21|     25.7|
+--------+------+---------+



                                                                                

In [5]:
df_left = df.join(df_tratti, (df.varco == df_tratti.ingresso ) | (df.varco == df_tratti.uscita), 'left')
df_left.show()

+-------+-----+------+-------------------+-------+--------+------+---------+
|  targa|varco|corsia|          timestamp|nazione|ingresso|uscita|lunghezza|
+-------+-----+------+-------------------+-------+--------+------+---------+
|  32540|   27|   1.0|2022-12-04 00:00:04|      ?|      27|     9|     8.48|
|1773531|    8|   1.0|2022-12-04 00:00:05|      I|       5|     8|     10.9|
|1773531|    8|   1.0|2022-12-04 00:00:05|      I|       8|     3|      6.9|
|2699969|   21|   1.0|2022-12-04 00:00:05|      I|       4|    21|     25.7|
|  76105|   27|   1.0|2022-12-04 00:00:06|      I|      27|     9|     8.48|
| 237933|   12|   1.0|2022-12-04 00:00:08|    SLO|       1|    12|     10.9|
| 237933|   12|   1.0|2022-12-04 00:00:08|    SLO|      12|    25|      7.7|
| 440737|   21|   2.0|2022-12-04 00:00:08|      I|       4|    21|     25.7|
|9466795|    9|   2.0|2022-12-04 00:00:08|      I|      27|     9|     8.48|
|9466795|    9|   2.0|2022-12-04 00:00:08|      I|       9|    26|    17.42|

In [6]:
df_aggregated = df_left.groupBy(['targa', 'ingresso', 'uscita']) \
    .agg(
        first('timestamp').alias('partenza'), 
        last('timestamp').alias('arrivo'),
        count('timestamp').alias('avvistamenti')
    )
df_aggregated.show()



+-------+--------+------+-------------------+-------------------+------------+
|  targa|ingresso|uscita|           partenza|             arrivo|avvistamenti|
+-------+--------+------+-------------------+-------------------+------------+
| 719308|       1|    12|2022-12-04 00:06:02|2022-12-04 00:06:02|           1|
|3968376|      27|     9|2022-12-04 00:14:12|2022-12-04 00:14:12|           1|
| 220738|       8|     3|2022-12-04 00:20:03|2022-12-04 00:23:37|           2|
|9462689|      27|     9|2022-12-04 00:33:49|2022-12-04 00:33:49|           1|
|6718473|      20|     2|2022-12-04 00:41:07|2022-12-04 00:41:07|           1|
|1592738|      22|     1|2022-12-04 00:55:14|2022-12-04 00:55:14|           1|
| 498262|      15|     5|2022-12-04 01:03:40|2022-12-04 01:08:56|           2|
|9475144|      16|     4|2022-12-04 01:19:44|2022-12-04 01:19:44|           1|
| 142913|      26|    10|2022-12-04 01:20:44|2022-12-04 06:31:16|           3|
|6127187|      26|    10|2022-12-04 01:22:49|2022-12

                                                                                

Dopo questa aggregazione si possono ramificare processi diversi, ad esempio per ottenere il numero di targhe per tratto si può raggruppare per tratto (ingresso, uscita) ed eseguire il ``count()``. 

Invece, per ottenere la velocità della targa, su un tratto si rifà il join con la tabella tratti per avere anche la distanza, e si crea una nuova colonna che calcola: ``lunghezza / (arrivo - partenza)``. Da qui, se si vuole l'ultimo tratto di una targa: 
1. raggruppa per targa
2. ordina per timestamp
3. aggrega per ultima riga, ``last()``

Oppure si calcola la velocità media della targa:
1. raggruppa per targa
2. aggrega con ``avg()``

In tutti i casi, bisogna prima filtrare via le righe con ``avvistamenti = 2``. Questo perché le righe con ``avvistamenti = 1`` rappresentano tutte quelle targhe che sono state avvistate solo ad un varco e non al successivo, perché magari sono uscite o il sensore non le ha rilevate correttamente.

 ## Velocità ultimo avvistamento per targa

In [7]:
df_speed = df_aggregated \
     .join(df_tratti, (df_aggregated.ingresso == df_tratti.ingresso ), 'left') \
     .filter(col('avvistamenti') == 2) \
     .orderBy('partenza') \
     .groupBy(df_aggregated.targa) \
     .agg(
          last('arrivo').alias('arrivo'),
          last('partenza').alias('partenza'),
          last('lunghezza').alias('lunghezza'),
          last(df_aggregated.ingresso).alias('ingresso'),
          last(df_aggregated.uscita).alias('uscita')
     ) \
     .withColumn('velocità', \
     ( (col('lunghezza') * 1000) / (unix_timestamp(col('arrivo')) - unix_timestamp(col('partenza')))) * 3.6) \
     .select('targa', col('arrivo').alias('timestamp'), 'ingresso', 'uscita', 'velocità')

df_speed.show()



+-------+-------------------+--------+------+------------------+
|  targa|          timestamp|ingresso|uscita|          velocità|
+-------+-------------------+--------+------+------------------+
| 141533|2022-12-04 00:49:27|      16|     4| 85.13513513513514|
|1947770|2022-12-04 00:30:49|      15|     5| 93.96610169491527|
|6705015|2022-12-04 01:05:00|       3|    13| 84.40191387559808|
| 102524|2022-12-04 12:32:06|       5|     8| 81.24223602484471|
| 398172|2022-12-04 02:49:48|      16|     4| 91.97080291970804|
|1470487|2022-12-04 04:43:27|      20|     2| 87.15789473684211|
| 143032|2022-12-04 05:00:03|      16|     4|51.533742331288344|
| 124798|2022-12-04 11:39:52|      12|    25| 82.74626865671642|
| 454087|2022-12-04 06:47:59|      16|     4| 83.72093023255813|
|  74281|2022-12-04 05:41:21|      16|     4|121.44578313253012|
| 324221|2022-12-04 14:15:32|    null|  null|              null|
|9351109|2022-12-04 06:37:33|      16|     4|132.63157894736844|
|  62680|2022-12-04 20:01

                                                                                

## Velocità media per targa

In [8]:
df_average = df_aggregated \
    .join(df_tratti, (df_aggregated.ingresso == df_tratti.ingresso ), 'left') \
    .filter(col('avvistamenti') == 2) \
    .orderBy('partenza') \
    .withColumn('velocità', \
    ( (col('lunghezza') * 1000) / (unix_timestamp(col('arrivo')) - unix_timestamp(col('partenza')))) * 3.6) \
    .groupBy('targa') \
    .agg(
        last(df_aggregated.ingresso).alias('ingresso'),
        last(df_aggregated.uscita).alias('uscita'),
        avg('velocità').alias('velocità_media')
    ) \
    .select('targa', 'ingresso', 'uscita', 'velocità_media')

df_average.show()

                                                                                

+-------+--------+------+------------------+
|  targa|ingresso|uscita|    velocità_media|
+-------+--------+------+------------------+
| 141533|      16|     4| 84.58990955498268|
|1947770|      15|     5| 93.96610169491527|
|6705015|       3|    13| 84.40191387559808|
| 102524|       5|     8| 88.16365596057877|
| 398172|      16|     4| 89.65646316425875|
|1470487|      20|     2| 87.15789473684211|
| 143032|      16|     4|51.533742331288344|
| 124798|      12|    25| 18.18212215174512|
| 454087|      16|     4| 74.31246317992183|
|  74281|      16|     4|126.30428691510228|
| 324221|    null|  null|              null|
|9351109|      16|     4|127.85105619065044|
|  62680|      20|     2|114.22453704228623|
|  74166|       5|     8| 69.99540932311388|
|  53565|    null|  null|              null|
|  46521|    null|  null|              null|
|5553026|      16|     4| 94.03417246067781|
|  78478|      27|     9| 128.2689075630252|
|6968171|       3|    13|119.48556545325174|
|  70863| 

## Conteggio targhe per tratto

Ha bisogno di altre due aggregazioni: una per ottenere solo una targa (come in velocità puntutale) e l'altra raggruppando per tratto.

In [9]:
df_single = df_aggregated \
     .join(df_tratti, (df_aggregated.ingresso == df_tratti.ingresso ), 'left') \
     .filter(col('avvistamenti') == 2) \
     .orderBy('partenza') \
     .groupBy(df_aggregated.targa) \
     .agg(
          last(df_aggregated.ingresso).alias('ingresso'),
          last(df_aggregated.uscita).alias('uscita')
     ) 
    

df_count = df_single \
     .groupBy('ingresso', 'uscita') \
     .agg(count('targa').alias('conteggio')) 

df_count.show()

                                                                                

+--------+------+---------+
|ingresso|uscita|conteggio|
+--------+------+---------+
|      27|     9|     2061|
|       8|     3|     2030|
|       3|    13|     5068|
|      20|     2|     1860|
|      10|    18|     1785|
|      23|    15|      490|
|      15|     5|     3691|
|    null|  null|     8331|
|      16|     4|     7055|
|       9|    26|      680|
|      25|    20|     1256|
|       4|    21|     3342|
|      12|    25|      842|
|       5|     8|     1568|
|      26|    10|      959|
|       1|    12|     2962|
|      18|    23|      441|
|      22|     1|      518|
|       2|    16|     1067|
+--------+------+---------+



Il conteggio deve essere fatto su una finestra temporale.