# Structured Streaming Intro

In [1]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
import seaborn as sns
import sqlite3

In [2]:
spark = SparkSession.builder.appName('Spark Test App')\
    .config('spark.jars', 'jars/postgresql-42.2.14.jar')\
    .getOrCreate() 
sc = spark.sparkContext
spark

### 1. Streaming from Socket

Count words in sentences sent by a socket stream

Streaming data is simulated with natcat in the console. Data is sent to localhost 9999

In [89]:
host = "localhost"
port = "9999"

In [90]:
# Read stream from socket
lines = spark.readStream\
    .format('socket')\
    .option('host', host)\
    .option('port', port)\
    .load()

In [91]:
# Aggregate word counts
counts = lines.select(F.explode(F.split("value"," ")).alias("word"))\
            .groupby("word")\
            .count()

In [12]:
# Write stream into in-memory table
count = counts.writeStream\
    .queryName('words')\
    .trigger(processingTime='1 second')\
    .outputMode('complete')\
    .format("memory")\
    .start()

NameError: name 'counts' is not defined

Send some text by natcat from the console: nc -l 9999

In [97]:
spark.table("words").show()

+-----+-----+
| word|count|
+-----+-----+
|World|    1|
|  Man|    2|
|Super|    1|
|Hello|    4|
+-----+-----+



In [99]:
#Get list of active streams
[s.name for s in spark.streams.active]

[]

In [98]:
#Stop all streams
for s in spark.streams.active:
    s.stop()

### 2. Streaming from files

Streaming wage data csv files added to folder "streaming_drop"

In [112]:
# Prepare source files
df = pd.read_csv("Wage.csv")
df = df[["year", "age", "sex", "education", "health", "wage"]]
for k in range(10):
    df_part=df.iloc[k*50:k*50+50,:]
    df_part.to_csv(f"streaming_source/wage{k}.csv", index=False)

In [4]:
# Define Schema for ingestion
schema = (
    T.StructType()
        .add("year", T.IntegerType())
        .add("age", T.IntegerType())
        .add("sex", T.StringType())
        .add("education", T.StringType())
        .add("health", T.StringType())
        .add("wage", T.FloatType())
)

In [9]:
# Read Stream from csv files in drop location
dfStreamFiles = (
    spark
    .readStream
    .csv("streaming_drop", schema=schema, header=True)
)

In [19]:
# Stream ready
dfStreamFiles.isStreaming

True

In [12]:
# Write arriving files to table with append mode
query = dfStreamFiles.writeStream\
    .queryName('wage_data')\
    .outputMode('append')\
    .format("memory")\
    .start()

In [15]:
# Get top 10 rows by wage
dfWage = spark.table("wage_data")
dfWage.sort(F.desc("wage")).show(10)

+----+---+-------+------------------+--------------+---------+
|year|age|    sex|         education|        health|     wage|
+----+---+-------+------------------+--------------+---------+
|2005| 49|1. Male|5. Advanced Degree|2. >=Very Good| 277.6014|
|2004| 43|1. Male|   4. College Grad|2. >=Very Good|272.29477|
|2003| 60|1. Male|   4. College Grad|2. >=Very Good| 268.2663|
|2009| 35|1. Male|   4. College Grad|2. >=Very Good| 267.9011|
|2006| 50|1. Male|5. Advanced Degree|2. >=Very Good|212.84235|
|2006| 59|1. Male|5. Advanced Degree|     1. <=Good|200.54326|
|2005| 57|1. Male|5. Advanced Degree|2. >=Very Good|200.54326|
|2003| 38|1. Male|   4. College Grad|2. >=Very Good|200.54326|
|2006| 45|1. Male|   4. College Grad|2. >=Very Good|200.54326|
|2003| 37|1. Male|   4. College Grad|2. >=Very Good|200.54326|
+----+---+-------+------------------+--------------+---------+
only showing top 10 rows



In [46]:
#Get list of active streams
[s.name for s in spark.streams.active]

[]

In [17]:
query.stop()

If aggregations are used output mode has to be set to "complete"

In [21]:
queryAgg = (
    dfStreamFiles
    .groupby("year")
    .avg("wage")
    .withColumnRenamed("avg(wage)", "wage")
    .writeStream
    .queryName('wage_by_age')
    .outputMode('complete')
    .format("memory")
    .start()
)

In [22]:
dfWageAgg = spark.table("wage_by_age")

In [44]:
dfWageAgg.show()

+----+------------------+
|year|              wage|
+----+------------------+
|2003|112.57746773797112|
|2007|116.98974895477295|
|2006| 115.4145488194057|
|2004|103.77844826834543|
|2009|123.48056060791015|
|2005|107.01629535968488|
|2008|118.36346608942205|
+----+------------------+



In [45]:
queryAgg.stop()

### 3. Streaming into database

Streaming wage data csv files added to folder "streaming_drop" into local postgres db 

In [54]:
# Define Schema for ingestion
schema = (
    T.StructType()
        .add("year", T.IntegerType())
        .add("age", T.IntegerType())
        .add("sex", T.StringType())
        .add("education", T.StringType())
        .add("health", T.StringType())
        .add("wage", T.FloatType())
)

In [55]:
# Read Stream from csv files in drop location
dfStreamFiles = (
    spark
    .readStream
    .csv("streaming_drop", schema=schema, header=True)
)

In [56]:
# Stream ready
dfStreamFiles.isStreaming

True

In [57]:
# Postgres Connection
dbURL = "jdbc:postgresql://localhost/andreas?user=andreas&password=montana"
jdbcConf = {
    "driver": "org.postgresql.Driver"
}

In [58]:
# Get average wage by sex and education and stream into db in batches with "foreachBatch"
queryAgg = (
    dfStreamFiles
    .groupby("sex", "education")
    .avg("wage")
    .withColumnRenamed("avg(wage)", "wage")
    .writeStream
    .outputMode('update')
    .foreachBatch(lambda df, batchID: {
        df.write.jdbc(url=dbURL, table="test", properties=jdbcConf, mode="overwrite")
    })
    .start()
)

In [62]:
#Get list of active streams
[s.name for s in spark.streams.active]

[None]

In [63]:
queryAgg.stop()

### 4. Streaming with Window Aggregation

Simple example: Count number of records with wage>100.000 within certain time window

In [81]:
# Define Schema for ingestion
schema = (
    T.StructType()
        .add("year", T.IntegerType())
        .add("age", T.IntegerType())
        .add("sex", T.StringType())
        .add("education", T.StringType())
        .add("health", T.StringType())
        .add("wage", T.FloatType())
)

In [107]:
# Read Stream from csv files in drop location
dfStream = (
    spark
    .readStream
    .csv("streaming_drop", schema=schema, header=True)
)

In [108]:
dfStream.isStreaming

True

In [109]:
dfStream = (
    dfStream
    .withColumn("timestamp", F.current_timestamp())
    .where("wage>100")
    .withWatermark("timestamp", "20 seconds")
)

In [110]:
# Count in Windows of 15 sec
windowedCounts = (
    dfStream
    .groupBy(F.window("timestamp", "10 seconds"))
    .count()
    .withColumn("window", F.expr("cast(window as string)"))
)

In [111]:
# Postgres Connection
dbURL = "jdbc:postgresql://localhost/andreas?user=andreas&password=montana"
jdbcConf = {
    "driver": "org.postgresql.Driver"
}

In [116]:
# Stream into database
queryAgg = (
    windowedCounts
    .writeStream
    .queryName('counts')
    .option("checkpointLocation","checkpoint") 
    .outputMode('update')
    .foreachBatch(lambda df, batchID: {
        df.write.jdbc(url=dbURL, table="test2", properties=jdbcConf, mode="append" )
    })
    .start()
)

  

In [119]:
#Get list of active streams
[s.name for s in spark.streams.active]

[]

In [118]:
#Stop all streams
for s in spark.streams.active:
    s.stop()