## Structured Streaming in Action:
Let’s get to an applied example of how you might use Structured Streaming. For our examples,
we’re going to be working with the Heterogeneity Human Activity Recognition Dataset. The
data consists of smartphone and smartwatch sensor readings from a variety of devices—
specifically, the accelerometer and gyroscope, sampled at the highest possible frequency
supported by the devices. Readings from these sensors were recorded while users performed
activities like biking, sitting, standing, walking, and so on. There are several different
smartphones and smartwatches used, and nine total users.

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [3]:
# E:\AI-ITI\Second_3_Months\PySpark\Book\data\activity_data
PATH = "data/activity_data/streaming_test.json"
static = spark.read.json(PATH)
dataSchema = static.schema

In [4]:
static.printSchema()

root
 |-- Arrival_Time: long (nullable = true)
 |-- Creation_Time: long (nullable = true)
 |-- Device: string (nullable = true)
 |-- Index: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- User: string (nullable = true)
 |-- _corrupt_record: string (nullable = true)
 |-- gt: string (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)



In [5]:
static.show(3)

+-------------+-------------------+--------+-----+------+----+---------------+-----+-------------+------------+-------------+
| Arrival_Time|      Creation_Time|  Device|Index| Model|User|_corrupt_record|   gt|            x|           y|            z|
+-------------+-------------------+--------+-----+------+----+---------------+-----+-------------+------------+-------------+
|1424686735175|1424686733176178965|nexus4_1|   35|nexus4|   g|           null|stand| 0.0014038086|   5.0354E-4|-0.0124053955|
|1424686735378|1424686733382813486|nexus4_1|   76|nexus4|   g|           null|stand|-0.0039367676| 0.026138306|  -0.01133728|
|1424686735577|1424686733579072031|nexus4_1|  115|nexus4|   g|           null|stand|  0.003540039|-0.034744263| -0.019882202|
+-------------+-------------------+--------+-----+------+----+---------------+-----+-------------+------------+-------------+
only showing top 3 rows



* Basically, all of the transformations that are available in the static Structured APIs apply to Streaming DataFrames.
* However, one small difference is that Structured Streaming does not let you perform schema inference without explicitly enabling it. You can enable schema inference for this by setting the configuration spark.sql.streaming.schemaInference to true. 
* Given that fact, we will read the schema from one file (that we know has a valid schema) and pass the dataSchema object from our static DataFrame to our streaming DataFrame.
* As mentioned, you should avoid doing this in a production scenario where your data may (accidentally) change out from under you:

In [6]:
streaming = spark.readStream.schema(dataSchema).option("maxFilePerTrigger", 1)\
.json("data/activity_data/streaming_test.json")

Just like with other Spark APIs, streaming DataFrame creation and execution is lazy. In
particular, 

In [7]:
activityCounts = streaming.groupBy("gt").count()

Because this code is being written in local mode on a small machine, we are going to set the shuffle partitions to a small value to avoid creating too many shuffle partitions:

In [8]:
spark.conf.set("spark.sql.shuffle.partitions", 5)

* Now that we set up our transformation, we need only to specify our action to start the query. 
* We will specify an output destination, or output sink for our result of this query. 
* For this basic example, we are going to write to a memory sink which keeps an in-memory table of the results.

In [9]:
activityQuery = activityCounts.writeStream.queryName("activity_counts")\
.format("memory").outputMode("complete")\
.start()

We are now writing out our stream! You’ll notice that we set a unique query name to represent
this stream, in this case activity_counts. We specified our format as an in-memory table and
we set the output mode.
When we run the preceding code, we also want to include the following line:

In [11]:
activityQuery.awaitTermination()

StreamingQueryException: Option 'basePath' must be a directory
=== Streaming Query ===
Identifier: activity_counts [id = b3976ce6-3b84-4d17-bf36-232d0d983609, runId = 9db05c32-4947-4e76-b589-c3fe892badad]
Current Committed Offsets: {}
Current Available Offsets: {FileStreamSource[file:/E:/AI-ITI/Second_3_Months/PySpark/Book/data/activity_data/streaming_test.json]: {"logOffset":0}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
WriteToMicroBatchDataSource org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@5b33f458
+- Aggregate [gt#92], [gt#92, count(1) AS count#119L]
   +- StreamingExecutionRelation FileStreamSource[file:/E:/AI-ITI/Second_3_Months/PySpark/Book/data/activity_data/streaming_test.json], [Arrival_Time#85L, Creation_Time#86L, Device#87, Index#88L, Model#89, User#90, _corrupt_record#91, gt#92, x#93, y#94, z#95]
