# Truck Data Wrangler - Streaming part

In this notebook we will develop a solution to stream the trucks data using Spark Structured Streaming.

First of all, lets get a Spark Session to work on

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Truck Data Wrangler").getOrCreate()
spark

## Schema

After getting the spark session, we'll define the schema of this Structured Streaming process:

| Field | Type | Description |
| ----- | ---- | ----------- |
| `c0` | `integer` | The index key, used just as reference of the order |
| `event_type` | `string` | The event type accordingly to the categorization of the data |
| `label` | `string` | The label for data segmentation |
| `accel_x` | `double` | The X-axis accelerometer value |
| `accel_y` | `double` | The Y-axis accelerometer value |
| `accel_z` | `double` | The Z-axis accelerometer value |
| `gyro_roll` | `double` | The Roll-axis accelerometer value |
| `gyro_pitch` | `double` | The Pitch-axis accelerometer value |
| `gyro_yaw` | `double` | The Yaw-axis accelerometer value |

## Schema on Apache Spark

In [2]:
from pyspark.sql.types import *

csvSchema = StructType([
    StructField("c0", StringType(), True),
    StructField("event_type", StringType(), False),
    StructField("label", StringType(), False),
    StructField("timestamp", LongType(), False),
    StructField("accel_x", DoubleType(), False),
    StructField("accel_y", DoubleType(), False),
    StructField("accel_z", DoubleType(), False),
    StructField("gyro_roll", DoubleType(), False),
    StructField("gyro_pitch", DoubleType(), False),
    StructField("gyro_yaw", DoubleType(), False)
])

## Schema on TimescaleDB

For the database to serve as a read to visualize and query our truck data, we'll go with TimescaleDB.

In [3]:
!pip install psycopg2



In [4]:
import psycopg2
from config import parse_config
from sql_queries import create_jerked_truck_events_table, drop_jerked_truck_events_table

configs = parse_config()

print(configs['timescaledb']['host'])

# connect to recreate the database
conn = psycopg2.connect("host={} port={} user={} password={}".format( \
    configs['timescaledb']['host'], \
    configs['timescaledb']['port'], \
    configs['timescaledb']['user'], \
    configs['timescaledb']['password'], \
))
conn.set_session(autocommit=True)
cur = conn.cursor()

database_name = configs['timescaledb']['db']

#cur.execute("DROP DATABASE IF EXISTS {}".format(database_name))
#cur.execute("CREATE DATABASE {} WITH ENCODING 'utf8' TEMPLATE template0".format(database_name))

cur.close()
conn.close()

172.17.0.2


In [5]:
# connect to create the tables in the database
conn = psycopg2.connect("host={} port={} dbname={} user={} password={}".format( \
    configs['timescaledb']['host'], \
    configs['timescaledb']['port'], \
    database_name, \
    configs['timescaledb']['user'], \
    configs['timescaledb']['password'] \
))
conn.set_session(autocommit=True)
cur = conn.cursor()

# create sparkify database with UTF8 encoding
cur.execute(drop_jerked_truck_events_table)
cur.execute(create_jerked_truck_events_table)


cur.close()
conn.close()

In [6]:
conn = psycopg2.connect("host={} port={} dbname={} user={} password={}".format( \
    configs['timescaledb']['host'], \
    configs['timescaledb']['port'], \
    database_name, \
    configs['timescaledb']['user'], \
    configs['timescaledb']['password'] \
))
conn.set_session(autocommit=True)
cur = conn.cursor()

results = cur.execute("SELECT COUNT(*) FROM jerked_truck_events")

print(results)

cur.close()
conn.close()

None


## Loading the data

We will test load the data just to see if the schema is compatible with the stream file source.

**The code below is commented because it was made to be a playground around the data**.

In [7]:
# from pyspark.sql import functions as F
# from pyspark.sql.functions import col

# truck_events_df = spark.read.schema(csvSchema).csv('data/unified.csv', header=True)
# truck_events_df = truck_events_df.withColumn(
#     "date_timestamp",
#     F.to_timestamp(F.from_unixtime(((col("timestamp") / 1000) / 1000), 'yyyy-MM-dd HH:mm:ss.SSS'))
# )
# truck_events_df = (    
#     truck_events_df
#         .groupBy(col("date_timestamp"), col("event_type"), col("label"), F.window(col("date_timestamp"), "1 second"))
#         .agg(
#             F.mean('accel_x'),
#             F.mean('accel_y'),
#             F.mean('accel_z'),
#             F.mean('gyro_roll'),
#             F.mean('gyro_pitch'),
#             F.mean('gyro_yaw')
#         )
#         .withColumnRenamed('avg(accel_x)', 'accel_x')
#         .withColumnRenamed('avg(accel_y)', 'accel_y')
#         .withColumnRenamed('avg(accel_z)', 'accel_z')
#         .withColumnRenamed('avg(gyro_roll)', 'gyro_roll')
#         .withColumnRenamed('avg(gyro_pitch)', 'gyro_pitch')
#         .withColumnRenamed('avg(gyro_yaw)', 'gyro_yaw')
# )

# event_types = [
#     'aggressive_longitudinal_acceleration',
#     'agressive_bump',
#     'normal_turn',
#     'aggressive_turn',
#     'normal_mixed',
#     'normal_longitudinal_acceleration'
# ]

# # truck_events_df.describe("gyro_roll", "gyro_pitch", "gyro_yaw").show()
# from pyspark.sql.functions import mean as _mean, stddev as _stddev, col

# df_stats = truck_events_df.select(
#     _stddev(col('gyro_yaw')).alias('std')
# ).collect()


# print(df_stats[0]['std'])

# for event_type in event_types:
#     print("Gonna show for {}".format(event_type))
#     truck_events_df.where(truck_events_df.event_type == event_type).describe("gyro_roll", "gyro_pitch", "gyro_yaw").show()
    
# truck_events_df.limit(10).toPandas()

## Stream Processing

Now that we tested the schema by loading our default `data/unified.csv`, we have to set the stream processing options and actions.

In [8]:
inputPath = '/home/jovyan/workspace/Projects/truck-data-wrangler/data/'

rawRecords = (
    spark
        .readStream
        .schema(csvSchema)
        .option("maxFilesPerTrigger", 1)
        .csv(inputPath, header=True)
)

### Generating jerk data as stream flow in

Essentially, we need to be calculating the jerk values and the flags (is_accelerating, is_breaking, is_turning_right and is_turning_left), however the streaming data frame don't support partitioning/ordering windows with non-time based column types. For that reason we will have to explode that columns in another table using the `forEachBatch` callback.

Moreover, we wil group our data from second to second, and all sensors values will be averaged. This way we can reduce a little bit the noise from them and have a nice graphic to plot

In [9]:
from pyspark.sql.functions import col
from pyspark.sql import functions as F
from pyspark.sql.window import Window

jerk_truck_events_df = rawRecords

jerk_truck_events_df = jerk_truck_events_df.withColumn(
    "date_timestamp",
    F.to_timestamp(F.from_unixtime(((col("timestamp") / 1000) / 1000), 'yyyy-MM-dd HH:mm:ss.SSS'))
)

#column_list = ["timestamp", "event_type","label"]

windowedStreaming = (
    jerk_truck_events_df
        .groupBy(col("date_timestamp"), col("event_type"), col("label"), F.window(col("date_timestamp"), "1 second"))
        .agg(
            F.mean('accel_x'),
            F.mean('accel_y'),
            F.mean('accel_z'),
            F.mean('gyro_roll'),
            F.mean('gyro_pitch'),
            F.mean('gyro_yaw')
        )
        .withColumnRenamed('avg(accel_x)', 'accel_x')
        .withColumnRenamed('avg(accel_y)', 'accel_y')
        .withColumnRenamed('avg(accel_z)', 'accel_z')
        .withColumnRenamed('avg(gyro_roll)', 'gyro_roll')
        .withColumnRenamed('avg(gyro_pitch)', 'gyro_pitch')
        .withColumnRenamed('avg(gyro_yaw)', 'gyro_yaw')
)

windowedStreaming.printSchema()

jerk_truck_events_df.printSchema()

root
 |-- date_timestamp: timestamp (nullable = true)
 |-- event_type: string (nullable = false)
 |-- label: string (nullable = false)
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- accel_x: double (nullable = true)
 |-- accel_y: double (nullable = true)
 |-- accel_z: double (nullable = true)
 |-- gyro_roll: double (nullable = true)
 |-- gyro_pitch: double (nullable = true)
 |-- gyro_yaw: double (nullable = true)

root
 |-- c0: string (nullable = true)
 |-- event_type: string (nullable = false)
 |-- label: string (nullable = false)
 |-- timestamp: long (nullable = false)
 |-- accel_x: double (nullable = false)
 |-- accel_y: double (nullable = false)
 |-- accel_z: double (nullable = false)
 |-- gyro_roll: double (nullable = false)
 |-- gyro_pitch: double (nullable = false)
 |-- gyro_yaw: double (nullable = false)
 |-- date_timestamp: timestamp (nullable = true)



Now we define the `forEachBatch` function that we mentioned earlier in this section. Inside this callback you can see that we group over a composed partition. This is because depending upon the `event_type` or `label` we have to treat the sequence differently.

In [None]:
from time import sleep
from pyspark.sql.functions import mean as _mean, stddev as _stddev, col

def explodeJerkColumns(df, epochId):
    global configs
    jerk_truck_events_df = df
    
    df_stats = jerk_truck_events_df.select(
        _stddev(col('gyro_yaw')).alias('std')
    ).collect()
    
    turn_threshold_min = df_stats[0]['std'] * -1
    turn_threshold_max = df_stats[0]['std']
    
    column_list = ["event_type","label"]
    
    jerk_truck_events_df = jerk_truck_events_df.withColumn("timestamp", F.unix_timestamp(col("date_timestamp")))
    
    win_spec = Window.partitionBy([col(x) for x in column_list]).orderBy("timestamp")

    columns_that_needs_latest_values = ['accel_x', 'accel_y', 'accel_z']

    for column_name in columns_that_needs_latest_values:
        jerk_truck_events_df = jerk_truck_events_df.withColumn("last_" + column_name, F.lag(col(column_name)).over(win_spec))
    
    # last timestamp before this registry
#     jerk_truck_events_df = jerk_truck_events_df.withColumn("latest_timestamp", F.lag(col("timestamp")).over(win_spec))
    jerk_truck_events_df = jerk_truck_events_df.withColumn(
        "last_timestamp", 
        F.when(F.isnull(F.lag(col("date_timestamp")).over(win_spec)), 0)
         .otherwise(F.unix_timestamp(F.lag(col("date_timestamp")).over(win_spec)))
    )
#     jerk_truck_events_df = jerk_truck_events_df.withColumn("date_timestamp", col("timestamp"))

    # x axis
    jerk_truck_events_df = jerk_truck_events_df.withColumn(
        "jerk_x", 
        F.when(F.isnull(col("last_accel_x")), 0)
         .when(F.isnull(col("last_timestamp")), 0)
         .otherwise((col("accel_x") - col("last_accel_x")) / (col("timestamp") - col("last_timestamp")))
    )

    # y axis
    jerk_truck_events_df = jerk_truck_events_df.withColumn(
        "jerk_y", 
        F.when(F.isnull(col("last_accel_y")), 0)
         .when(F.isnull(col("last_timestamp")), 0)
         .otherwise((col("accel_y") - col("last_accel_y")) / (col("timestamp") - col("last_timestamp")))
    )

    # z axis
    jerk_truck_events_df = jerk_truck_events_df.withColumn(
        "jerk_z", 
        F.when(F.isnull(col("last_accel_z")), 0)
         .when(F.isnull(col("last_timestamp")), 0)
         .otherwise((col("accel_z") - col("last_accel_z")) / (col("timestamp") - col("last_timestamp")))
    )

    # adding the is_accelerating flag
    jerk_truck_events_df = jerk_truck_events_df.withColumn(
        "is_accelerating",
        F.when(F.isnull(col("jerk_x")), 0)
         .when(col("jerk_x") > 0, 1)
         .otherwise(0)
    )

    # adding the is_breaking flag
    jerk_truck_events_df = jerk_truck_events_df.withColumn(
        "is_breaking",
        F.when(F.isnull(col("jerk_x")), 0)
         .when(col("jerk_x") < 0, 1)
         .otherwise(0)
    )
    
    # adding the is_turning_left flag
    jerk_truck_events_df = jerk_truck_events_df.withColumn(
        "is_turning_left",
        F.when(col("gyro_yaw") <= turn_threshold_min, 1)
         .otherwise(0)
    )
    
    # adding the is_turning_right flag
    jerk_truck_events_df = jerk_truck_events_df.withColumn(
        "is_turning_right",
        F.when(col("gyro_yaw") >= turn_threshold_max, 1)
         .otherwise(0)
    )
    
    dbhost = configs['timescaledb']['host']
    dbport = configs['timescaledb']['port']
    dbname = configs['timescaledb']['db']
    dbuser = configs['timescaledb']['user']
    dbpass = configs['timescaledb']['password']
    url = "jdbc:postgresql://"+dbhost+":"+dbport+"/"+dbname
    properties = {
        "driver": "org.postgresql.Driver",
        "user": dbuser,
        "password": dbpass
    }
    
    jerk_truck_events_df = jerk_truck_events_df[[
        'date_timestamp',
        'event_type',
        'label',
        'accel_x',
        'accel_y',
        'accel_z',
        'gyro_roll',
        'gyro_pitch',
        'gyro_yaw',
        'last_timestamp',
        'last_accel_x',
        'last_accel_y',
        'last_accel_z',
        'jerk_x',
        'jerk_y',
        'jerk_z',
        'is_accelerating',
        'is_breaking',
        'is_turning_left',
        'is_turning_right'
    ]]

    jerk_truck_events_df.write.jdbc(url=url, table="jerked_truck_events", mode="append",
                          properties=properties)

streamingIn = windowedStreaming \
    .writeStream \
    .trigger(processingTime='10 seconds') \
    .outputMode("complete") \
    .option("checkpointLocation", ".spark-stream-checkpoint/") \
    .foreachBatch(explodeJerkColumns) \
    .start(path=inputPath) \
    .awaitTermination()

