In [1]:
spark

In [2]:
%fs ls /FileStore/tables/


In [3]:
%fs head /FileStore/tables/1.json

In [4]:
from pyspark.sql.types import *

inputPath = "dbfs:/FileStore/tables/"

# Since we know the data format already, let's define the schema to speed up processing (no need for Spark to infer schema)
jsonSchema = StructType([ StructField("filename", StringType(), True), StructField("instrument_source_str", StringType(), True), StructField("instrument", StringType(), True), StructField("S1", StringType(), True), StructField("S2",
StringType(), True), StructField("S3", StringType(), True), StructField("S4", StringType(), True), StructField("S5", StringType(), True) ])

# Static DataFrame representing data in the JSON files
staticInputDF = (
  spark
    .read
    .schema(jsonSchema)
    .json(inputPath)
)

display(staticInputDF)
staticInputDF.createOrReplaceTempView("static_counts")

In [5]:
%sql select instrument_source_str, sum(S1),sum(S2),sum(S3), sum(S4), sum(S5), count(instrument_source_str) as count from static_counts group by instrument_source_str
 

In [6]:
from pyspark.sql.functions import *

# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (
  spark
    .readStream                       
    .schema(jsonSchema)               # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 10)  # Treat a sequence of files as a stream by picking one file at a time
    .json(inputPath)
)

# Same query as staticInputDF
streamingCountsDF = (                 
  streamingInputDF
    .groupBy(
      streamingInputDF.instrument_source_str
      )
    .count()
)

# Is this DF actually a streaming DF?
streamingCountsDF.isStreaming

In [7]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only in Spark 2.0)
    .queryName("counts")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

In [8]:
%sql select instrument_source_str, count from counts order by instrument_source_str desc



In [9]:
%sql select * from counts