In [1]:
#data in textfile separated by ::
#so loading as textFile
lines = spark.read.text('/FileStore/tables/sample_movielens_ratings.txt').rdd

In [2]:
print lines.take(5)

In [3]:
#Split based on ::
data = lines.map(lambda row:row.value.split('::'))

In [4]:
from pyspark.sql import Row
rdd = data.map(lambda d: Row(userId= int(d[0]), 
                       movieId=int(d[1]), 
                       rating=int(d[2]), 
                       timestamp=long(d[3]) ))

In [5]:
df = spark.createDataFrame(rdd)

In [6]:
df.show(10)

In [7]:
%fs ls /databricks-datasets/structured-streaming/events/

In [8]:
from pyspark.sql.types import *

inputPath = "/databricks-datasets/structured-streaming/events/"

# Since we know the data format already, let's define the schema to speed up processing (no need for Spark to infer schema)
jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])

# Static DataFrame representing data in the JSON files
staticInputDF = (
  spark
    .read
    .schema(jsonSchema)
    .json(inputPath)
)

display(staticInputDF)

In [9]:
from pyspark.sql.functions import *      # for window() function

staticCountsDF = (
  staticInputDF
    .groupBy(
       staticInputDF.action, 
       window(staticInputDF.time, "1 hour"))    
    .count()
)
staticCountsDF.cache()

# Register the DataFrame as table 'static_counts'
staticCountsDF.createOrReplaceTempView("static_counts")

In [10]:
#%sql select action, sum(count) as total_count from static_counts group by action

In [11]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time,count from static_counts order by action, time

In [12]:
from pyspark.sql.functions import *

# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (
  spark
    .readStream                       
    .schema(jsonSchema)               # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .json(inputPath)
)

# Same query as staticInputDF
streamingCountsDF = (                 
  streamingInputDF
    .groupBy(
      streamingInputDF.action, 
      window(streamingInputDF.time, "1 hour"))
    .count()
)

# Is this DF actually a streaming DF?
streamingCountsDF.isStreaming

In [13]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only in Spark 2.0)
    .queryName("counts")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)