# Hands On: Spark Streaming

In [None]:
# Example weather station data
#
# 1419408015	0R1,Dn=059D,Dm=066D,Dx=080D,Sn=8.5M,Sm=9.5M,Sx=10.3M
# 1419408016	0R1,Dn=059D,Dm=065D,Dx=078D,Sn=8.5M,Sm=9.5M,Sx=10.3M
# 1419408016	0R2,Ta=13.9C,Ua=28.5P,Pa=889.9H
# 1419408017	0R1,Dn=059D,Dm=064D,Dx=075D,Sn=8.7M,Sm=9.6M,Sx=10.3M
# 1419408018	0R1,Dn=059D,Dm=064D,Dx=075D,Sn=8.9M,Sm=9.6M,Sx=10.3M
# 1419408019	0R1,Dn=059D,Dm=065D,Dx=075D,Sn=8.8M,Sm=9.5M,Sx=10.3M

In [None]:
# Key for measurements:
#
# Sn      Wind speed minimum m/s, km/h, mph, knots #,M, K, S, N
# Sm      Wind speed average m/s, km/h, mph, knots #,M, K, S, N
# Sx      Wind speed maximum m/s, km/h, mph, knots #,M, K, S, N
# Dn      Wind direction minimum deg #, D
# Dm      Wind direction average deg #, D
# Dx      Wind direction maximum deg #, D
# Pa      Air pressure hPa, Pa, bar, mmHg, inHg #, H, P, B, M, I
# Ta      Air temperature °C, °F #, C, F
# Tp      Internal temperature °C, °F #, C, F
# Ua      Relative humidity %RH #, P
# Rc      Rain accumulation mm, in #, M, I
# Rd      Rain duration s #, S
# Ri      Rain intensity mm/h, in/h #, M, I
# Rp      Rain peak intensity mm/h, in/h #, M, I
# Hc      Hail accumulation hits/cm2, hits/in2, hits #, M, I, H
# Hd      Hail duration s #, S
# Hi      Hail intensity hits/cm2h, hits/in2h, hits/ h #, M, I, H
# Hp      Hail peak intensity hits/cm2h, hits/in2h, hits/ h #, M, I, H
# Th      Heating temperature °C, °F #, C, F
# Vh      Heating voltage V #, N, V, W, F2
# Vs      Supply voltage V V
# Vr      3.5 V ref. voltage V V

In [None]:
import re
def parse(line):
    match = re.search(r"Dm=(\d+)", line)
    if match:
        val = match.group(1)
        return int(val)
    return None

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, TimestampType
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()

In [None]:
lines = (
    spark
    .readStream
    .format("socket")
    .option("host", "rtd.hpwren.ucsd.edu")
    .option("port", 12024)
    .load()
)
lines

In [None]:
parsed_lines =lines.withColumn("parsed", udf(parse, IntegerType())("value"))
parsed_lines

In [None]:
windowed_data = (
    parsed_lines
    .withColumn("time", current_timestamp())
    .groupBy(window("time", "10 seconds"))
    .agg(collect_list("parsed").alias("wind_direction"))
    .withColumn("max_val", array_max("wind_direction"))
    .withColumn("min_val", array_min("wind_direction"))
    .select("wind_direction", "max_val", "min_val")
)
windowed_data

In [None]:
query = (
    windowed_data
    .writeStream
    .outputMode("update")
    .trigger(processingTime="10 seconds") 
    .foreachBatch(lambda batch_df, epoch_id: batch_df.show(truncate=False))
    .start()
)

In [None]:
query.stop()