<hr />
Example for Spark Streaming
<hr />

<hr />
The following command adds the pyspark to sys.path at runtime. If the pyspark is not on the system path by default. It also prints the path of the spark.
<hr />

In [1]:
import findspark

print(findspark.find())
findspark.init()

/opt/spark


<hr />
Create a Spark Session
<hr />

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

22/02/09 18:14:27 WARN Utils: Your hostname, CT-LP-234 resolves to a loopback address: 127.0.1.1; using 172.18.169.165 instead (on interface eth0)
22/02/09 18:14:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/02/09 18:14:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


<hr />
We use Netcat (a small utility found in most Unix-like systems) as a data server for providing the sentences to the streaming application. <br>

Run the below command in the terminal to start the data server on port 9999. <br>
nc -lk 9999 <br>


<hr />
Import the packages required to split a sentence into words.
<hr />

In [4]:
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

<hr />
The variable "lines" is a DataFrame that represents an unbounded table containing the streaming text data. <br>
The wordCounts DataFrame groups the unique values in the Dataset and counts them. <br>
<hr />

In [5]:
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Split the lines into words
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

# Generate running word count
wordCounts = words.groupBy("word").count()

22/02/09 18:19:20 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.


<hr />
The streaming computation is started in the background. <br>
The applications starts receiving data and computing the counts. <br>
Further, dispalys the complete set of counts to the console every time they are updated. <br>

After running the below cell, provide the input sentences in the data server terminal started earlier. <br>
<hr />

In [7]:
 # Start running the query that prints the running counts to the console
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()

22/02/09 18:25:17 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-132a16b9-b21f-4bf5-ac38-c5ea7b09b17a. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+
|word|count|
+----+-----+
+----+-----+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
|  word|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+



                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
|  word|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+



22/02/09 18:30:33 WARN TextSocketMicroBatchStream: Stream closed by localhost:9999


KeyboardInterrupt: 

In [8]:
spark.stop()

22/02/09 18:53:20 WARN StateStore: Error running maintenance thread
java.lang.IllegalStateException: SparkEnv not active, cannot do maintenance on StateStores
	at org.apache.spark.sql.execution.streaming.state.StateStore$.doMaintenance(StateStore.scala:430)
	at org.apache.spark.sql.execution.streaming.state.StateStore$.$anonfun$startMaintenanceIfNeeded$1(StateStore.scala:416)
	at org.apache.spark.sql.execution.streaming.state.StateStore$MaintenanceTask$$anon$1.run(StateStore.scala:332)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor