In [81]:
from pyspark.sql import SparkSession, types
from pyspark.sql.functions import pandas_udf, broadcast, round
import pandas as pd

In [82]:
spark = SparkSession \
          .builder \
          .master('local[*]') \
          .appName('test') \
          .getOrCreate()

## Measurements

In [83]:
measurements_schema = types.StructType([
  types.StructField('day', types.DateType(), nullable=False),
  types.StructField('interval', types.IntegerType(), nullable=False),
  types.StructField('detid', types.StringType(), nullable=False),
  types.StructField('flow', types.IntegerType(), nullable=True),
  types.StructField('occ', types.FloatType(), nullable=True),
  types.StructField('error', types.IntegerType(), nullable=True),
  types.StructField('city', types.StringType(), nullable=False),
  types.StructField('speed', types.FloatType(), nullable=True)
])

In [84]:
@pandas_udf(types.IntegerType())
def get_hour(intervals: pd.Series) -> pd.Series:
  return intervals.floordiv(60 ** 2)

In [85]:
measurements_df = spark.read \
                    .option("header", True) \
                    .schema(measurements_schema) \
                    .csv('./data/measurements_test.csv')

In [86]:
measurements_df = measurements_df \
                    .repartition(20, 'day', 'city') \
                    .withColumn('hour', get_hour('interval')) \
                    .withColumn('density', round(measurements_df.occ * 100)) \
                    .drop('interval', 'occ')

In [87]:
hrly_measurements_df = measurements_df \
  .groupBy(['city', 'detid', 'day', 'hour']) \
  .agg({
    'flow': 'mean',
    'density': 'mean',
    'speed': 'mean',
    'error': 'array_agg'
  })

In [88]:
hrly_measurements_df = hrly_measurements_df.withColumnsRenamed(
  {
    'collect_list(error)': 'errors',
    'avg(flow)': 'flow',
    'avg(density)': 'density',
    'avg(speed)': 'speed'
  }
)

In [89]:
hrly_measurements_df.show(20)



+--------+--------+----------+----+-------+--------------------+-----+-----+
|    city|   detid|       day|hour|density|              errors| flow|speed|
+--------+--------+----------+----+-------+--------------------+-----+-----+
|augsburg|06.X-2li|2017-05-06|   4|    0.0|[1, 1, 1, 1, 1, 1...|  5.0| NULL|
|augsburg|06.X-2li|2017-05-06|   5|    2.5|[1, 1, 1, 1, 1, 1...| 14.0| NULL|
|augsburg|06.X-2li|2017-05-06|   6|   1.25|[1, 1, 1, 1, 1, 1...| 13.0| NULL|
|augsburg|06.X-2li|2017-05-06|  11|  35.75|[1, 1, 1, 1, 1, 1...|100.0| NULL|
|augsburg|06.X-2li|2017-05-06|  17|   36.5|[1, 1, 1, 1, 1, 1...|113.0| NULL|
|augsburg|06.X-2li|2017-05-06|  21|   0.25|[1, 1, 1, 1, 1, 1...| 35.0| NULL|
|augsburg|06.X-2li|2017-05-06|  22|   0.25|[1, 1, 1, 1, 1, 1...| 20.0| NULL|
|augsburg|06.X-2li|2017-05-07|   0|    0.0|[1, 1, 1, 1, 1, 1...|  9.0| NULL|
|augsburg|06.X-2li|2017-05-07|   3|    0.0|[1, 1, 1, 1, 1, 1...|  5.0| NULL|
|augsburg|06.X-2li|2017-05-07|   4|    0.0|[1, 1, 1, 1, 1, 1...|  3.0| NULL|

                                                                                

## Detectors

In [90]:
detectors_schema = types.StructType([
  types.StructField('detid', types.StringType()),
  types.StructField('length', types.DoubleType()),
  types.StructField('pos', types.DoubleType()),
  types.StructField('fclass', types.StringType()),
  types.StructField('road', types.StringType()),
  types.StructField('limit', types.IntegerType()),
  types.StructField('citycode', types.StringType()),
  types.StructField('lanes', types.IntegerType()),
  types.StructField('linkid', types.IntegerType()),
  types.StructField('long', types.DoubleType()),
  types.StructField('lat', types.DoubleType())
])

In [91]:
detectors_df = spark.read \
                .option("header", True) \
                .schema(detectors_schema) \
                .csv('./data/detectors_public.csv')

In [93]:
detectors_df = detectors_df \
                .drop('linkid', 'pos', 'length') \
                .withColumnRenamed('citycode', 'city')

In [94]:
detectors_df.show(5)

+------+---------+----------------+-----+--------+-----+----------+----------+
| detid|   fclass|            road|limit|    city|lanes|      long|       lat|
+------+---------+----------------+-----+--------+-----+----------+----------+
|U1-52G|secondary|Gögginger Straße|   50|augsburg|    1|10.8895527| 48.359957|
|U1-51G|secondary|Gögginger Straße|   50|augsburg|    1| 10.889601|48.3599454|
|U1-52L|secondary|Gögginger Straße|   50|augsburg|    1|10.8893555|48.3598759|
|U1-51L|secondary|Gögginger Straße|   50|augsburg|    1|10.8893958|48.3598617|
| U1-62|secondary|   Rosenaustraße|   50|augsburg|    1|10.8893609|48.3605781|
+------+---------+----------------+-----+--------+-----+----------+----------+
only showing top 5 rows



## Merge

In [None]:
hrly_df = hrly_measurements_df.join(broadcast(detectors_df), on=['detid', 'city'])

In [None]:
# df.write.parquet('./data/pq/measurements.csv', mode='overwrite')

25/04/09 11:17:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/04/09 11:17:42 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/04/09 11:17:42 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/04/09 11:17:43 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [80]:
spark.stop()