### Structured Streaming

In [62]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, column, desc, col, date_format

In [44]:
spark = SparkSession.builder.appName('structuredStreaming').getOrCreate()

In [45]:
#lets reduce the number of shuffle partitions from default 200 to 5 to match the number of executors on this machine
spark.conf.set("spark.sql.shuffle.partitions", "5")

In [46]:
# let's first create static dataFrame. (later we shall create a streaming dataFrame)
staticDataFrame = spark.read.format("csv")\
                        .option("header", "true")\
                        .option("inferSchema", "true")\
                        .load("/Users/ilyosnishanov/Spark-The-Definitive-Guide/data/retail-data/by-day/*.csv")

In [47]:
staticSchema=staticDataFrame.schema

In [48]:
staticDataFrame\
    .selectExpr(
      "CustomerId",
      "(UnitPrice * Quantity) as total_cost",
      "InvoiceDate")\
.groupBy(
      col("CustomerId"), window(col("InvoiceDate"), "1 day")\
        )\
    .sum("total_cost")\
    .show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   14075.0|{2011-12-05 05:00...|316.78000000000003|
|   18180.0|{2011-12-05 05:00...|            310.73|
|   15358.0|{2011-12-05 05:00...| 830.0600000000003|
|   15392.0|{2011-12-05 05:00...|304.40999999999997|
|   15290.0|{2011-12-05 05:00...|263.02000000000004|
+----------+--------------------+------------------+
only showing top 5 rows



In [49]:
staticDataFrame.createOrReplaceTempView("retail_data")

In [50]:
spark.catalog.tableExists("retail_data")

True

In [51]:
# you can of course always query this dataFrame with SQL
asdf = spark.sql("""
            SELECT
                             CustomerId
                             ,sum(UnitPrice * Quantity) as total_cost
                             ,InvoiceDate
            FROM retail_data
                 GROUP BY CustomerId, InvoiceDate
                 LIMIT 5
""")
asdf.show()

+----------+------------------+-------------------+
|CustomerId|        total_cost|        InvoiceDate|
+----------+------------------+-------------------+
|   15156.0|391.46000000000004|2011-12-05 09:15:00|
|   12646.0| 615.2799999999999|2011-12-05 10:14:00|
|   17402.0|             286.8|2011-12-05 10:15:00|
|   14911.0|511.99999999999994|2011-12-05 10:18:00|
|   14194.0| 484.1000000000001|2011-12-05 10:38:00|
+----------+------------------+-------------------+



In [52]:
# now let's create streaming data frame. syntax is almost the same
streamingDataFrame=spark.readStream\
                            .schema(staticSchema)\
                            .option("maxFilesPerTrigger", 1)\
                            .format("csv")\
                            .option("header", "true")\
                            .load("/Users/ilyosnishanov/Spark-The-Definitive-Guide/data/retail-data/by-day/*.csv")


In [53]:
#check if this df is streaming
streamingDataFrame.isStreaming

True

In [54]:
# lets query (and perfom some manipulation along the way) this streaming data
purchaseByCustomerPerHour = streamingDataFrame\
                                .selectExpr(
                                    "CustomerId",
                                    "(UnitPrice*Quantity) as total_cost",
                                    "InvoiceDate"
                                )\
                                .groupBy(
                                    col("CustomerId"),
                                    window(col("InvoiceDate"),
                                           "1 day")
                                )\
                                .sum("total_cost")

In [56]:
purchaseByCustomerPerHour.writeStream\
                                .format("memory")\
                                .queryName("customer_purchases_2")\
                                .outputMode("complete")\
                                .start()

24/10/20 10:27:20 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/z5/yj06y7v952z04n49_xrf7tw00000gn/T/temporary-8d9876ec-35a4-42c4-a0ac-f1fe41e58ec5. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/10/20 10:27:20 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x108dd1d00>

In [57]:
spark.sql("""
    select * from customer_purchases_2 order by `sum(total_cost)` desc
""").show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   18102.0|{2010-12-07 05:00...|          25920.37|
|      NULL|{2010-12-10 05:00...|25399.560000000012|
|      NULL|{2010-12-17 05:00...|25371.769999999768|
|      NULL|{2010-12-06 05:00...|23395.099999999904|
|      NULL|{2010-12-03 05:00...| 23021.99999999999|
+----------+--------------------+------------------+
only showing top 5 rows



In [58]:
purchaseByCustomerPerHour.writeStream\
                                .format("console")\
                                .queryName("customer_purchases_3")\
                                .outputMode("complete")\
                                .start()

24/10/20 10:27:37 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/z5/yj06y7v952z04n49_xrf7tw00000gn/T/temporary-fee38ca6-e6ed-4fd9-841f-de2a1fca4cf4. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/10/20 10:27:37 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x108dd1f40>

-------------------------------------------
Batch: 0
-------------------------------------------
+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   12921.0|{2010-12-01 05:00...|             322.4|
|   16583.0|{2010-12-01 05:00...|233.45000000000002|
|   17897.0|{2010-12-01 05:00...|            140.39|
|   12748.0|{2010-12-01 05:00...|              4.95|
|   15350.0|{2010-12-01 05:00...|            115.65|
|   17809.0|{2010-12-01 05:00...|              34.8|
|   13747.0|{2010-12-01 05:00...|              79.6|
|   16250.0|{2010-12-01 05:00...|            226.14|
|   15983.0|{2010-12-01 05:00...|            440.89|
|   17511.0|{2010-12-01 05:00...|           1825.74|
|   14001.0|{2010-12-01 05:00...|            301.24|
|   17460.0|{2010-12-01 05:00...|              19.9|
|   18074.0|{2010-12-01 05:00...|             489.6|
|   12868.0|{2010-12-01 05:00...|             203.3|
| 

In [59]:
# spark.stop()

-------------------------------------------
Batch: 16
-------------------------------------------
+----------+--------------------+-------------------+
|CustomerId|              window|    sum(total_cost)|
+----------+--------------------+-------------------+
|   17576.0|{2010-12-13 05:00...| 177.35000000000002|
|   15039.0|{2010-12-14 05:00...|  706.2500000000002|
|   16250.0|{2010-12-01 05:00...|             226.14|
|   14594.0|{2010-12-01 05:00...| 254.99999999999997|
|   15899.0|{2010-12-06 05:00...|              56.25|
|   14850.0|{2010-12-07 05:00...|              -47.6|
|   17220.0|{2010-12-10 05:00...| 317.50000000000006|
|   14865.0|{2010-12-02 05:00...|               37.2|
|   18223.0|{2010-12-16 05:00...|  501.6899999999999|
|   13329.0|{2010-12-20 05:00...|-35.400000000000006|
|   14800.0|{2010-12-05 05:00...|  555.8399999999999|
|   14256.0|{2010-12-10 05:00...|  523.8599999999999|
|   12434.0|{2010-12-14 05:00...|-27.749999999999996|
|   18041.0|{2010-12-02 05:00...|  428

### MLlib

In [60]:
staticDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



-------------------------------------------
Batch: 148
-------------------------------------------
+----------+--------------------+-------------------+
|CustomerId|              window|    sum(total_cost)|
+----------+--------------------+-------------------+
|   15290.0|{2011-02-22 05:00...|              -1.65|
|   17416.0|{2011-05-20 05:00...| 1005.7000000000003|
|   12921.0|{2011-03-30 05:00...| -87.30000000000001|
|   17652.0|{2011-03-03 05:00...|              222.3|
|   14292.0|{2011-05-25 05:00...|              -20.8|
|   15899.0|{2010-12-06 05:00...|              56.25|
|   18223.0|{2010-12-16 05:00...|  501.6899999999999|
|   17961.0|{2011-03-11 05:00...| 3.0999999999999996|
|   17841.0|{2011-02-28 05:00...| 227.95000000000005|
|   18188.0|{2011-02-22 05:00...|              426.6|
|   15068.0|{2011-03-28 05:00...| 239.95999999999995|
|   17175.0|{2011-02-16 05:00...|             519.08|
|   16837.0|{2011-04-20 05:00...|              102.0|
|   13184.0|{2011-02-22 05:00...| 212

In [63]:
preppedDataFrame = staticDataFrame\
                    .na.fill(0)\
                    .withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
                    .coalesce(5)

In [64]:
#split data into training and test parts. for some advanced method you can use MLlib's transformation APIs. covered later
# in this case let's use a date for split
trainDataFrame = preppedDataFrame\
                    .where("InvoiceDate < '2011-07-01'")
testDataFrame= preppedDataFrame\
                    .where("InvoiceDate >= '2011-07-01'")

                                                                                

In [68]:
trainDataFrame.count()

Py4JJavaError: An error occurred while calling o306.count.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62)
java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502)
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:486)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
py4j.ClientServerConnection.run(ClientServerConnection.java:106)
java.base/java.lang.Thread.run(Thread.java:1570)

The currently active SparkContext was created at:

(No active SparkContext.)
         
	at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:122)
	at org.apache.spark.SparkContext.defaultParallelism(SparkContext.scala:2702)
	at org.apache.spark.sql.execution.adaptive.CoalesceShufflePartitions.$anonfun$apply$1(CoalesceShufflePartitions.scala:61)
	at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.java:23)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.adaptive.CoalesceShufflePartitions.apply(CoalesceShufflePartitions.scala:58)
	at org.apache.spark.sql.execution.adaptive.CoalesceShufflePartitions.apply(CoalesceShufflePartitions.scala:34)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$optimizeQueryStage$2(AdaptiveSparkPlanExec.scala:169)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.optimizeQueryStage(AdaptiveSparkPlanExec.scala:168)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.newQueryStage(AdaptiveSparkPlanExec.scala:588)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:538)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:577)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:577)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:277)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:272)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:417)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3616)
	at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3615)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:3615)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1570)


In [66]:
spark.stop()

24/10/20 10:44:01 ERROR MicroBatchExecution: Query customer_purchases [id = 564e889c-e4f3-4131-a512-a386a94c5bf0, runId = 9260fffa-6f67-4e9c-87a9-e0a38fc02217] terminated with error
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62)
java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502)
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:486)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.wa

24/10/20 10:44:11 WARN StateStore: Error running maintenance thread
java.lang.IllegalStateException: SparkEnv not active, cannot do maintenance on StateStores
	at org.apache.spark.sql.execution.streaming.state.StateStore$.doMaintenance(StateStore.scala:632)
	at org.apache.spark.sql.execution.streaming.state.StateStore$.$anonfun$startMaintenanceIfNeeded$1(StateStore.scala:610)
	at org.apache.spark.sql.execution.streaming.state.StateStore$MaintenanceTask$$anon$1.run(StateStore.scala:453)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:358)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.

In [67]:
spark

AttributeError: 'NoneType' object has no attribute 'sc'

<pyspark.sql.session.SparkSession at 0x108dc6030>