# Vehicles

## Loading the data

In [1]:
data = spark.read.csv('/user/wolffartu/input/*.csv', header='true', inferSchema='true')

In [2]:
# print(data.schema)

## Processing loaded data

In [3]:
from pyspark.sql import functions as F
withoutStopped = data.where(data.ile_sek_do_odjazdu <= 0) # Get rid of vehicles that haven't started their trip yet.

### Adding new columns

In [4]:
# Fix situations where the vehicle hasn't initialized its Route Number display.
withRouteCol = withoutStopped.withColumn('route', F.when(data.numer_lini.isNull() | (F.length(data.numer_lini) == 0), data.nast_num_lini).otherwise(data.numer_lini))

In [5]:
withDayCol = withRouteCol.withColumn('day', data.ts.substr(1, 10))
withAbsCol = withDayCol.withColumn('abs_deviation', F.abs(data.odchylenie))

### Tuple count (debug)

In [6]:
cached = withAbsCol.cache()
print(f'{cached.count()}')

4285206


## Analytics

### Grouping by route and day

In [7]:
grouped = cached.groupBy('route', 'day').avg('abs_deviation')

### Ranking routes

In [8]:
from pyspark.sql.window import Window
ranked = grouped.withColumn('rank', F.dense_rank().over(Window.partitionBy('day').orderBy(F.asc('avg(abs_deviation)'))))

## Results

In [9]:
ranked.where(ranked.rank < 6).orderBy(F.asc('day'), F.asc('avg(abs_deviation)')).show(7 * 5 + 1, False)

+-----+----------+------------------+----+
|route|day       |avg(abs_deviation)|rank|
+-----+----------+------------------+----+
|141  |2019-05-01|25.559649122807016|1   |
|103  |2019-05-01|30.30132770597028 |2   |
|205  |2019-05-01|31.705436635726464|3   |
|105  |2019-05-01|32.054443927408094|4   |
|121  |2019-05-01|38.09932268988873 |5   |
|201  |2019-05-02|13.180875576036867|1   |
|202  |2019-05-02|29.293522267206477|2   |
|103  |2019-05-02|34.80671023008072 |3   |
|N01  |2019-05-02|35.78456014362657 |4   |
|141  |2019-05-02|49.853782581055306|5   |
|141  |2019-05-03|21.107173725151252|1   |
|131  |2019-05-03|30.50436227669298 |2   |
|205  |2019-05-03|31.256736672051698|3   |
|127  |2019-05-03|36.191797060218114|4   |
|117  |2019-05-03|36.72287660766586 |5   |
|141  |2019-05-04|22.17748917748918 |1   |
|103  |2019-05-04|28.24853450572875 |2   |
|110  |2019-05-04|37.74891842995359 |3   |
|112  |2019-05-04|43.88504326328801 |4   |
|109  |2019-05-04|44.14953855779759 |5   |
|141  |2019

In [10]:
spark.stop()