# Structured Streaming Demo

### Demo

In [14]:
import findspark
# TODO: your path will likely not have 'matthew' in it. Change it to reflect your path.
findspark.init('/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2')

In [15]:
import os
import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

In [16]:
spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()

In the terminal, type `nc -lk 5555` to run the netcat server, and then type in whatever you choose.

In [17]:
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark.readStream.format("socket").option("host", "localhost").option("port", 5555).load()

* select:(): picks out certain areas of a dataframe for whatever operation follow
* split(): uses regular expression to divide a string
* explode (): divides a list (or map) into different lines

In [18]:
# Split the lines into words
words = lines.select(explode(split(lines.value, " ")).alias("word"))

In [19]:
# Generate running word count
wordCounts = words.groupBy("word").count()

Some of the operations we can run on the structured stream:

| Operator               | Purpose                                                                                     |
|------------------------|------------------------------------------------------------------------------------------|
| query.name()           | get the unique identifier of the running query that persists across restarts from checkpoint data |
| query.id()             | get the unique identifier of the running query that persists across restarts from checkpoint data |
| query.runId()          | get the unique id of this run of the query, which will be generated at every start/restart        |
| query.recentProgress() | an array of the most recent progress updates for this query                                       |
| query.lastProgress()   | the most recent progress update of this streaming query                                           |
| spark.streams().active | get the list of currently active streaming queries                                                |
| query.stop()           | stop the query                                                                                    |

In [20]:
# Start running the query that prints the running counts to the console
query = wordCounts.writeStream.outputMode("complete").format("console").start()
query.awaitTermination()

KeyboardInterrupt: 

#### Resultado:



* No cmd:  
[root@hadoop-83 ~]# nc -lk 5555  
PO  
L  
OIIII  
oi, tudo bem você está bem hoje como foi seu dia bem




* Spark:  
+-----+-----+  
| word|count|  
+-----+-----+  
|  seu|    1|  
|  oi,|    1|  
|  dia|    1|  
| você|    1|  
| bem?|    1|  
|  bem|    2|  
|    L|    1|  
|  foi|    1|  
|   PO|    1|  
| está|    1|  
| hoje|    1|  
| como|    1|  
|     |    1|  
|OIIII|    1|  
| tudo|    1|  
+-----+-----+  
    

## References
1. 