<br><br><br>
<span style="color:red;font-size:60px">Structured streaming</span>
<br><br>
<li>Scalable, fault tolerant, stream processing engine built on Spark SQL </li>
<li>Stream data is maintained in dataframe tables</li>
<li>Tables are <b>unbounded</b> and grow with each new batch arrival</li>
<li>Batches are similar to DStream batches but can be made "almost continuous" 1 nanosecond batches (this is experimental as of now)</li>
<li>Tables are structured as dataframes with two columns <span style="color:blue">value</span> which contains the data and <span style="color:blue">timestamp</span> which contains the timestamp associated with the microbatch</li>


<br><br><br>
<span style="color:green;font-size:xx-large">the dataframe</span>
<br><br>
<li>the dataframe needs some context information to be set up</li>
<li>the stream source (we will use a <span style="color:blue">socket</span> source)</li>
<li>the url and port associated with the socket (<span style="color:blue">localhost, 4444</span>)</li>

In [None]:
val lines = spark
    .readStream //Creates a readable stream
    .format("socket") //We will read it from a socket
    .option("host", "localhost") //the host
    .option("port", 4444) //the port
    .option("includeTimestamp", "true") //true if we want the event time stamp
    .load() 

<li>from the dataframe, get the data</li>
<li>we will do a word count for all the words that arrive through the socket</li>
<li>then group them and count them</li>
<li>note that this will count all the words that ever arrived at the socket!</li>
<li>note also that we haven't yet started the stream, this is just what we want to do</li>
<li><span style="color:red">value</span>: Extract the value column from the dataframe</li>

In [None]:
val words = lines.select("value").as[String].flatMap(_.split(" "))
val counts = words.groupBy("value").count()

<br><br><br>
<span style="color:blue;font-size:large">Initiate the stream</span>
<li>Above, we've created a program that creates a dataframe and counts the words in that dataframe</li>
<li>This returns an <span style="color:blue">unbounded table</span> that is constantly updated as each batch arrives</li>
<li>We need to apply a query to this table</li>
<li>Each time a new line is added to the table, the query will run</li>
<li>The query does something on the dataframe, sets the batch size, and starts listening on the stream</li>
<li>Note that the lines, words, counts tables are not retained from batch to batch</li>
<li>the query is processed, and the minimum data required to recreate it is retained</li>

<br><br><br>
<span style="color:blue;font-size:large">writeStream</span>
<li>the data stream writer - applies the program to the stream</li>
<li><span style="color:blue">outputMode</span>: What to output (entire table/new data/etc.)</li>
<ul>
    <li>use <span style="color:blue">complete</span> to see all results from all batches at each time point</li>
    <li>use <span style="color:blue">append</span> when using watermarks (see event time handling below)</li>
</ul>
<li><span style="color:red">Note</span>: Make sure you initiate the stream before running the query below!</li>

<span style="color:blue;font-size:large">Trigger</span>
<br>
<li>The <span style="color:red">trigger</span> defines the batch size. In the below example, the batch size is set to 10 seconds</li>
<li>If the trigger is omitted, the batch size drops to 1 nano second and we get, essentially, record-at-a-time continuous streaming</li>

In [None]:
//For setting batch sizes

//For defining a time unit
import java.util.concurrent.TimeUnit
import org.apache.spark.sql.streaming.Trigger

val query = counts
    .orderBy("count")
    .writeStream
    .outputMode("complete")
    .format("console")
    .trigger(Trigger.ProcessingTime("10 seconds"))
.start()


<br><br><br>
<span style="color:blue;font-size:large">Stop the stream</span>
<br><br>
<li>Note: Once the stream stops, you cannot access the data any more</li>

In [None]:
query.stop()

<br><br><br>
<span style="color:blue;font-size:large">Continuous streaming</span>
<br><br>

In [None]:

val query = counts.orderBy("count")
    .writeStream
    .outputMode("complete")
    .format("console")
.start()

In [None]:
query.stop()

<br><br><br>
<span style="color:green;font-size:xx-large">Window operations</span>
<br><br>
<li>Window operations use the <span style="color:red">window</span> function</li>
<li>arguments:</li>
<ul>
    <li><span style="color:red">time column</span>: a dataframe column that contains a timestamp object</li>
    <li><span style="color:red">window duration</span>: the window length</li>
    <li><span style="color:red">slide length</span> (optional): if provided, then the window is a sliding window. If not, it is a fixed (tumbling) window</li>
</ul>
<li>Group the dataframe by window and word and generate word counts</li>

In [None]:
import java.sql.Timestamp

val lines = spark
    .readStream //Creates a readable stream
    .format("socket") //We will read it from a socket
    .option("host", "localhost") //the host
    .option("port", 4444) //the port
    .option("includeTimestamp", "true") //true if we want the time stamp
    .load() 

val words = lines
    .as[(String, Timestamp)] //Creates a Dataset of (value,timestamp)
    .flatMap(line => line._1.
             split(" ").
             map(word => (word, line._2))) //Creates (word,timestamp) pairs for each word
    .toDF("word", "timestamp") //back to a dataframe

val windowedCounts = words.groupBy(
  window($"timestamp", "10 seconds", "10 seconds"),
  $"word"
).count()

In [None]:
val query = windowedCounts.writeStream
 .outputMode("complete")  
 .format("console")
 .option("truncate", "false")
 .start()



In [None]:
query.stop()

<br><br><br>
<span style="color:green;font-size:xx-large">Output modes</span>
<br><br>
<li><span style="color:red">complete</span>: Send complete results to the sink. Complete, for word count, will return the total number of occurrences of each word from stream start to now</li>
<li><span style="color:red">update</span>: only report the data that has changed. For word count, the total number of words from stream start to now will be reported but only for the words for which the total has changed</li>
<li><span style="color:red">append</span>: only new data added will be sent to the sink. Append only works with watermarks or with non-aggregatio queries</li>

In [None]:
val query = windowedCounts.writeStream
 .outputMode("update")  
 .format("console")
 .option("truncate", "false")
 .start()

In [None]:
query.stop()

In [None]:
import java.sql.Timestamp

val lines = spark
    .readStream //Creates a readable stream
    .format("socket") //We will read it from a socket
    .option("host", "localhost") //the host
    .option("port", 4444) //the port
    .option("includeTimestamp", "true") //true if we want the time stamp
    .load() 

val words = lines
    .as[(String, Timestamp)] //Creates a Dataset of (value,timestamp)
    .flatMap(line =>
                 line._1.split(" ").map(word => (word, line._2))) //Creates (word,timestamp) pairs for each word
    .toDF("word", "timestamp") //back to a dataframe

val query = words.writeStream
 .outputMode("append")  
 .format("console")
 .option("truncate", "false")
 .start()

In [None]:
query.stop()

<br><br><br>
<span style="color:green;font-size:xx-large">Example: Live streaming revenue calculation</span>
<br><br>
<li>Static products file contains prices and categories of products</li>
<li>Transactions are collected in batches from a stream (we'll use a file stream)</li>
<li>Report the accumulated revenue by product category as each batch arrives</li>


<span style="color:blue;font-size:large">Case classes and Streaming DF schemas</span>
<li>Recall that a streaming DF must include a schema when the input stream is defined</li>
<li>We can either create the schema the long way (StructField and StructType) or define a case class and use that to create the schema</li>
<li><span style="color:red">ScalaReflection</span> is a Spark SQL function that can map a case class to a dataframe schema</li>

<span style="color:blue;font-size:large">Imports and case class definitions</span>



In [None]:
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.types.StructType

//Create case class for each object - can be used to infer the schema
case class Product(item_num: Int,desc: String, price: Double,category: String)
case class Transaction(item_id: Int, trans_qty: Int)



<span style="color:blue;font-size:large">Read the static dataframe</span>



In [None]:

//Get products. This is a static dataframe
val products = spark.read
                    .option("inferSchema",true)
                    .option("header",true)
                    .csv("product_example/items.csv")
                    .as[Product] //Uses the case class to construct the schema




<span style="color:blue;font-size:large">Define the input stream</span>



In [None]:
//For the stream, we need to specify a schema
//ScalaReflection creates a schema from a case class by making each case class type into
//     a scala sql StructType
val transactionSchema = (ScalaReflection
                         .schemaFor[Transaction]
                         .dataType
                         .asInstanceOf[StructType])

//Source: a stream of transactions (each set of transactions is in a file)
val transactionStream = (spark.readStream
                        .schema(transactionSchema)
                        .option("header",false))
                        .option("maxFilePerTrigger",1)
                        .csv("product_example")

<span style="color:blue;font-size:large">Output Streaming DataFrame</span>
<li>Define the output streaming dataframe</li>
<li>The output dataframe must be a streaming dataframe</li>

In [None]:


val revenue_by_category = (transactionStream
                          .join(products,products("item_num")===transactionStream("item_id"))
                           .groupBy($"category")
                           .agg(sum($"price"*$"trans_qty") as "revenue"))

<span style="color:blue;font-size:large">Send the output to the sink</span>

In [None]:
val query = revenue_by_category.writeStream
    .outputMode("complete")
    .format("console")
    .start

In [None]:
query.stop()

<span style="color:green;font-size:xx-large">In-class problem</span>
<li>Modify the code above so that it returns the mean revenue from each product (e.g., eggs, widgets, etc.)</li>

<br><br><br>
<span style="color:green;font-size:xx-large">Revenue calculation from a socket stream</span>
<br><br>
<li>Socket data arrives in a column named "value"</li>
<li>If we want to impose a schema on it, we need to explicitly provide the schema</li>


In [None]:
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.types.StructType

//Create case class for each object - can be used to infer the schema
case class Product(item_num: Int,desc: String, price: Double,category: String)
case class Transaction(item_id: Int, trans_qty: Int)


//Get products. This is a static dataframe
val products = spark.read
                    .option("inferSchema",true)
                    .option("header",true)
                    .csv("product_example/items.csv")
                    .as[Product] //Uses the case class to construct the schema



In [None]:
//Read the stream from the socket
//Split it on ,
//Assign column names to each split value
val transactionStream = (spark
                         .readStream
                         .format("socket")
                         .option("host", "localhost")
                         .option("port", 4444)
                         .option("includeTimestamp", true)
                         .load()
                        .selectExpr("split(value, ',')[0] as item_id","split(value, ',')[1] as trans_qty"))


//The rest is the same as before
//Do the join with the static dataframe
//get revenue updates
val revenue_by_category = (transactionStream
                          .join(products,products("item_num")===transactionStream("item_id"))
                           .groupBy($"category")
                           .agg(sum($"price"*$"trans_qty") as "revenue"))

val query = revenue_by_category.writeStream
    .outputMode("complete")
    .format("console")
    .start
    
   

In [None]:
query.stop()

<br><br><br>
<span style="color:green;font-size:xx-large">Structured Streaming and Event Time</span>
<br><br>
<li>Spark uses an event time model</li>
<li>Let's look at an example to see how that works</li>
<li>Assume the following data flow:</li>
<pre>
2022-04-18 09:30:00,Bob
2022-04-18 09:30:03,Bob
2022-04-18 09:30:05,Bob
2022-04-18 09:30:06,Bill
2022-04-18 09:30:14,Henry
2022-04-18 09:30:16,Bob
2022-04-18 09:30:22,Mary
2022-04-18 09:30:04,Bob
2022-04-18 09:30:24,Jane
</pre>
<li>As we can see, one record is late, the 09:30:04 Bob record</li>
<li>Let's see how Spark will handle this</li>

In [None]:
val lineStream = (spark
                 .readStream
                .format("socket")
                .option("host", "localhost")
                .option("port", 4444)
                .option("includeTimestamp", true)
                .load()
                .selectExpr("split(value, ',')[0] as timestamp","split(value, ',')[1] as word"))

val windowedCounts = lineStream.groupBy(
  window($"timestamp", "10 seconds"),
  $"word"
).count()

val query = windowedCounts.writeStream
 .outputMode("complete")  
 .format("console")
 .option("truncate", "false")
 .start()

In [None]:
query.stop()

<span style="color:blue;font-size:large">Spark reports event time results</span>
<li>As the above example shows, Spark goes back and updates earlier windows to handle late arriving events</li>
<li>To do this, it needs to maintain the state of those earlier windows</li>
<li>How long should it maintain those states? It can't do so indefinitely because a long running stream will require a huge amount of resources</li>

<br><br><br>
<span style="color:green;font-size:xx-large">Watermarks</span></span>
<br><br>
<li>Watermarks are Spark's way of dealing with how long to hold data</li>
<li>A watermark tells Spark when to drop old aggregated data</li>
<li>Any data that is timestamped before (current time - watermark) is ignored</li>
<li>Watermarks only work in <span style="color:red">append</span> or <span style="color:red">update</span> modes. Complete mode will always use all the data</li>

In [None]:
import java.sql.Timestamp

val lineStream = (spark
                 .readStream
                .format("socket")
                .option("host", "localhost")
                .option("port", 4444)
                .load()
                .selectExpr("split(value, ',')[0] as timestamp","split(value, ',')[1] as word")
                 .withColumn("timestamp",to_timestamp(col("timestamp")))) //necessary because Spark will do time calcs

val windowedCounts = lineStream
                        .withWatermark("timestamp", "10 seconds") //10 seconds is the delay threshold, timestamp the "event time column"
                        .groupBy(window($"timestamp", "10 seconds"), $"word") //group data by window and word
                        .count() //count the words


val query = windowedCounts.writeStream
 .outputMode("update")  
 .format("console")
 .option("truncate", "false")
 .start()

In [None]:
query.stop()

<span style="color:blue;font-size:large">Conditions for watermarking</span>
<br>
<li>Watermarking guarantees that any data arriving inside the watermark will be used</li>
<li>However, it may continue to use data outside the watermark (a watermark is not a window)</li>
<li><span style="color:red">withWatermark</span> must he called on the timestamp column</li>
<li><span style="color:red">withWatermark</span> must be called just before the aggregation transformation</li>
<li>The aggregation must be on the time column or on a window on the time column (e.g., the groupBy must be on a window on timestamp in the above example)</li>


<br><br><br>
<span style="color:green;font-size:xx-large">Joining data from two streams</span>
<br><br>
<li>In the revenue example, we joined a streaming df to a static df</li>
<li>This is reasonably straightforward because the join executes when the streaming df exists and the static df always exists</li>
<li>What would be the implication of reversing the join (i.e., join a static df to a streaming df?)</li>
<li>When joining two data streams, data for the join may not be available in both streams at the same time</li>
<li>Since the program cannot wait forever for data, Spark requires that watermarks be used when joining two streaming dfs</li>

<span style="color:blue;font-size:large">Example: Home security monitoring</span>
<br>
<li>A home security system has two data feeds, a camera feed (with multiple cameras) and an alarm feed (with multiple alarms)</li>
<li>An alert occurs if the camera feed detects some activity and an alarm trips within one minute of the camera detecting the activity</li>
<li>Each stream (camera stream and alarm stream) is fed by multiple homes</li>
<li>Join the streams using the home id as the key and with the alert time window (1 minute) as a condition</li>
<li>Report all alerts</li>
<li>Sample data</li>
<pre>
2022-04-18 09:30:00,H1,C1
2022-04-18 09:33:00,H1,C2
2022-04-18 09:34:00,H2,C1
2022-04-18 09:45:00,H2,C1
</pre>
<pre>
2022-04-18 09:30:00,H1,A1
2022-04-18 09:32:00,H1,A2
2022-04-18 09:36:00,H2,A1
2022-04-18 09:51:00,H2,A2
</pre>

In [None]:
val cameraStream = (spark
                 .readStream
                .format("socket")
                .option("host", "localhost")
                .option("port", 4444)
                .load()
                .selectExpr("split(value, ',')[0] as timestamp_1","split(value, ',')[1] as house_1","split(value, ',')[2] as camera")
                 .withColumn("timestamp_1",to_timestamp(col("timestamp_1")))) //necessary because Spark will do time calcs


val alarmStream = (spark
                 .readStream
                .format("socket")
                .option("host", "localhost")
                .option("port", 9999)
                .load()
                .selectExpr("split(value, ',')[0] as timestamp_2","split(value, ',')[1] as house_2","split(value, ',')[2] as alarm")
                 .withColumn("timestamp_2",to_timestamp(col("timestamp_2")))) //necessary because Spark will do time calcs


//Must watermark the streaming windows before a join
val cameraStream_watermark = cameraStream.withWatermark("timestamp_1","10 minutes")
val alarmStream_watermark = alarmStream.withWatermark("timestamp_2","10 minutes")

//The first clause in a join expression must be equality 
//subsequent clauses don't need that
val query = cameraStream_watermark.join(alarmStream_watermark,expr("""
        house_1 = house_2 AND 
        timestamp_1 <= timestamp_2 + interval 1 minutes

        """ ))
    .writeStream
    .outputMode("append")
    .format("console")
    .start()


In [None]:
query.stop

<span style="color:blue;font-size:large">Aggregating data after the join</span>
<br>
<li>Let's add an additional clause, there must be at least two instances of alerts as defined above for an investigation to be initiated</li>
<li>e.g., if "c1,a1" and "c2,a1" send alerts in the one minute window, the company will investigate</li>
<li>if only "c1,a1" fires then no investigation</li>
<li>We want this to be true in any 10 minute window so we'll create a 10 minute sliding window that slides every 1 minute and count instances of alerts</li>
<li>Note the following:</li>
<ul>
    <li>a join on two streaming windows must be watermarked</li>
    <li>a join on two streaming windows must use "append" as the outputMode</li>
    <li>a streaming window that uses append and an aggregation must use watermarks</li>
</ul>

In [None]:
val cameraStream = (spark
                 .readStream
                .format("socket")
                .option("host", "localhost")
                .option("port", 4444)
                .load()
                .selectExpr("split(value, ',')[0] as timestamp_1","split(value, ',')[1] as house_1","split(value, ',')[2] as camera")
                 .withColumn("timestamp_1",to_timestamp(col("timestamp_1")))) //necessary because Spark will do time calcs


val alarmStream = (spark
                 .readStream
                .format("socket")
                .option("host", "localhost")
                .option("port", 9999)
                .load()
                .selectExpr("split(value, ',')[0] as timestamp_2","split(value, ',')[1] as house_2","split(value, ',')[2] as alarm")
                 .withColumn("timestamp_2",to_timestamp(col("timestamp_2")))) //necessary because Spark will do time calcs


val cameraStream_watermark = cameraStream.withWatermark("timestamp_1","10 minutes")
val alarmStream_watermark = alarmStream.withWatermark("timestamp_2","10 minutes")

val query = cameraStream_watermark.join(alarmStream_watermark,expr("""
        house_1 = house_2 AND
        timestamp_1 <= timestamp_2 + interval 1 minutes
        """ ))
    .withWatermark("timestamp_1","10 minutes") //Since mode is append, we'll need to watermark this stream as well
    .groupBy(window($"timestamp_1","10 minutes","1 minute"),$"house_1") //sliding windows
    .count //aggregation
    .writeStream
    .outputMode("append")
    .format("console")
    .start()

In [None]:
query.stop()

<span style="color:green;font-size:xx-large">Try this</span>
<li>Along with each signal, the producer (camera or alarm) sends a numerical threat level indicator (see the data below)</li>
<li>As before, compute alerts as a (camera signal, alarm signal) where the alarm signal is generated inside a minute after the camera signal</li>
<li>then, assign a numerical value to every 10 second window (sliding every minute) by adding the camera threat level and the alarm threat level</li>
<li>Report this total threat value for each signal</li>

<pre>
2022-04-18 09:30:00,H1,C1,5
2022-04-18 09:33:00,H1,C2,8
2022-04-18 09:34:00,H2,C1,9
2022-04-18 09:45:00,H2,C1,20
</pre>
<pre>
2022-04-18 09:30:00,H1,A1,12
2022-04-18 09:32:00,H1,A2,13
2022-04-18 09:36:00,H2,A1,2
2022-04-18 09:51:00,H2,A2,2
</pre>

<span style="color:blue;font-size:large">This is what you should get</span><br>
<pre>
+--------------------+-------+------+
|              window|house_1|threat|
+--------------------+-------+------+
|{2022-04-18 09:25...|     H1|  35.0|
|{2022-04-18 09:22...|     H1|  35.0|
|{2022-04-18 09:24...|     H1|  35.0|
|{2022-04-18 09:21...|     H1|  35.0|
|{2022-04-18 09:23...|     H1|  35.0|
|{2022-04-18 09:25...|     H2|  22.0|
+--------------------+-------+------+

</pre>

In [None]:
val cameraStream = (spark
                 .readStream
                .format("socket")
                .option("host", "localhost")
                .option("port", 4444)
                .load()
                    //WRITE THE selectExpr function call here
                 .withColumn("timestamp_1",to_timestamp(col("timestamp_1")))) //necessary because Spark will do time calcs


val alarmStream = (spark
                 .readStream
                .format("socket")
                .option("host", "localhost")
                .option("port", 9999)
                .load()
                //WRITE THE selectExpr function call here                 
                .withColumn("timestamp_2",to_timestamp(col("timestamp_2")))) //necessary because Spark will do time calcs


val cameraStream_watermark = cameraStream.withWatermark("timestamp_1","10 minutes")
val alarmStream_watermark = alarmStream.withWatermark("timestamp_2","10 minutes")

val query = cameraStream_watermark.join(alarmStream_watermark,expr("""
        house_1 = house_2 AND
        timestamp_1 <= timestamp_2 + interval 10 seconds
        """ ))
    .withWatermark("timestamp_1","10 minutes") //Since mode is append, we'll need to watermark this stream as well
    //FILL IN THE REST OF THE CODE HERE


In [None]:
query.stop()

<br><br><br>
<span style="color:green;font-size:xx-large">Example: Model evaluation with streaming data</span>
<br><br>

<li>Given a pre-trained ML model, we might want to get predictions and model metrics from a real time stream</li>
<li>As we'll see, there are some limitations with how structured streaming can handle this</li>
<li>Let's start with training our model the usual way</li>
<li>And creating a pipeline (the streaming data will pass through the pipeline)</li>
<li>The data is California housing data</li>
<ul>
    <li>Each data item is housing data rolled up into blocks</li>
    <li>The dependent variable is the median home value in the block</li>
    </ul>
<li>We'll create an ML pipeline</li>
<ul>
    <li>Read the data into a df</li>
    <li>Do some feature engineering to get the data inot the right format (prepareData function)</li>
    <li>Assemble the input features into a vector (required for ML models)</li>
    <li>Scale them to mean 0, std 1</li>
    <li>And run a linear regression model on the data</li>
    </ul>

In [None]:
import org.apache.spark.sql.DataFrame

//Read the data function
def readData(): (DataFrame,DataFrame) = {
    val df = spark.read.format("csv")
        .option("header","false")
        .option("inferschema","true")
        .load("cal_housing.data")
        .toDF("Longitude","Latitude","MedianAge",
                     "TotalRooms","TotalBedrooms","Population","Households",
                     "MedianIncome","MedianHomeValue")
    val Array(train,test) = df.randomSplit(Array(0.8,0.2),seed=1234L)
    (train,test)
}

//Do basic feature engineering to get the right set of features
def prepareData(df: DataFrame): DataFrame = {
    df.withColumn("MedianHomeValue",$"MedianHomeValue"/100000)
        .withColumn("RoomsPerHouse", col("TotalRooms")/col("Households"))
        .withColumn("PeoplePerHouse", col("Population")/col("Households"))
        .withColumn("BedroomsPerHouse", col("TotalBedrooms")/col("Households"))
        .select("MedianHomeValue", 
                  "MedianAge", 
                  "Population", 
                  "Households", 
                  "MedianIncome", 
                  "RoomsPerHouse", 
                  "PeoplePerHouse", 
                  "BedroomsPerHouse",
                   "Latitude",
                   "Longitude")
        .withColumnRenamed("MedianHomeValue","label")
}

//Split the data into train and test
//We will use the training data to train the model but will ignore the testing data

val (train,test) = readData()

//Assemble features vector and scale the data
//Set up the regression
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.StandardScaler
import org.apache.spark.ml.regression.LinearRegression

val cols = Array("Longitude", "Latitude", "MedianAge", "RoomsPerHouse", "BedroomsPerHouse", "PeoplePerHouse", 
                 "Households", "MedianIncome")
val assembler = new VectorAssembler()
  .setInputCols(cols)
  .setOutputCol("features")

val scaler = new StandardScaler()
      .setInputCol("features")
      .setOutputCol("scaledFeatures")
      .setWithStd(true)
      .setWithMean(true)

val lr = new LinearRegression()
    .setMaxIter(10)
    .setRegParam(0.3) //Regularization parameter
    .setElasticNetParam(0.8) //elastic net regularization parameter (L1 + L2 penalties)
    .setFeaturesCol("scaledFeatures") //independent variables
    .setLabelCol("label") //dependent variable (we don't need to specify this since we've called our col label)


//Create a pipeline
//fit training data to the pipeline
import org.apache.spark.ml.{Pipeline, PipelineModel}

val pipeline = new Pipeline().setStages(Array(assembler,scaler,lr))
val model = pipeline.fit(prepareData(train))

<br><br>
<span style="color:green;font-size:xx-large">Set up the streaming data reader</span>
<li>New data will arrive in batches in a stream</li>
<li>Note that we need to explicitly provide a schema</li>

In [None]:
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.DoubleType

val schema = StructType(Array(
    StructField("Longitude",DoubleType),
    StructField("Latitude",DoubleType),
    StructField("MedianAge",DoubleType),
    StructField("TotalRooms",DoubleType),
    StructField("TotalBedrooms",DoubleType),
    StructField("Population",DoubleType),
    StructField("Households",DoubleType),
    StructField("MedianIncome",DoubleType),
    StructField("MedianHomeValue",DoubleType)
    ))

    
val streaming_data = spark
    .readStream 
    .option("header", "false") 
    .option("maxFilesPerTrigger", 1)
    .schema(schema)
    .csv("datafiledir")



<br><br>
<span style="color:green;font-size:xx-large">Write a function that converts the input stream DF to an output stream DF</span>
<br><br>
<li>We'll need to call model.transform to add the predictions column to the df</li>
<li>And then, if necessary, do some transformations on the df</li>

In [None]:
import org.apache.spark.sql.DataFrame

def prepare_sink_df(): DataFrame = {
    val result = model.transform(prepareData(streaming_data))
                    .select("label","prediction")
                    .withColumnRenamed("label","Actual")
                    .withColumnRenamed("prediction","Predicted")
    result
                        
}

<br><br>
<span style="color:green;font-size:xx-large">Write the output stream df to the sink</span>
<br><br>

In [None]:
//import java.util.concurrent.TimeUnit
//import org.apache.spark.sql.streaming.Trigger

val query = prepare_sink_df
            .writeStream
            .format("console")
            .outputMode("update")
            .start()



In [None]:
query.stop()

<br><br>
<span style="color:green;font-size:xx-large">Evaluation metrics</span>
<br><br>
<li>Unfortunately, Streaming DataFrames are limited to dataframe transformations</li>
<li>Evaluation metrics (rmse,r2) are returned as Double, and we would need to create a dataframe of results</li>
<li>Which is not allowed - we can only run transformations on the input stream df</li>
<li>So, we'll need to calculate the metrics ourselves, using dataframe operations</li>

In [None]:
import org.apache.spark.sql.DataFrame

def prepare_sink_df(): DataFrame = {
    val result = model.transform(prepareData(streaming_data))
                    .select("label","prediction")
    result
        .withColumn("sq_diff",(col("prediction")-col("label")) * (col("prediction")-col("label")))
        .agg(mean("sq_diff") as "mse")
        .withColumn("rmse",sqrt("mse"))
        .select("rmse")
                        
}

In [None]:
val query = prepare_sink_df
            .writeStream
            .format("console")
            .outputMode("complete")
            .start()



In [None]:
query.stop()