In [1]:
#Description	Variable Name
#Spark Context	sc
#SQL Context / Hive Context	sqlContext
#SparkSession (2.0 Only)	spark

In [2]:
spark

In [3]:
sqlContext

In [4]:
%fs ls /FileStore/tables/

In [5]:
%fs head /FileStore/tables/matches_time_result.csv

In [6]:
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import *

inputPath = "/FileStore/tables/matches.csv"

Schema = StructType([StructField("match_id", StringType()), StructField("start_time", TimestampType()), StructField("radiant_win", StringType())])

#staticInputDF = (  
#  spark.read
#    .option("header", "true")
#    .option("inferSchema", "true")
#    .option("delimiter", ",")
#    .schema(Schema)
#    .json(inputPath)
#)
df = spark.read.csv(inputPath, header="true")
df.show()

In [7]:
#df2 = df.select("start_time", from_unixtime(df.start_time, 'MM/dd/yyyy').alias('date_time'))
df = df.withColumn("date_time",(from_unixtime(df.start_time, 'yyyy-MM-dd HH:mm:ss')))
df.show()
#to_date(df.start_time)

dfCount = df.groupBy("radiant_win").count()

dfCount.createOrReplaceTempView("static_counts")
dfCount.cache()
df.cache()


In [8]:
staticCountsDF = (
  df.groupBy(
       df.radiant_win, 
       window(df.date_time, "4 hour")
  )
    .count()
)
staticCountsDF.cache()

# Register the DataFrame as table 'static_counts'
staticCountsDF.createOrReplaceTempView("static_counts")

In [9]:
display(sql("select radiant_win, sum(count) as total_count from static_counts group by radiant_win"))

In [10]:
%sql select radiant_win, date_format(window.end, "MMM-dd HH:mm") as time, count from static_counts order by time, radiant_win

In [11]:
inputPath = "/FileStore/tables/"
Schema = StructType([StructField("match_id", StringType()), StructField("start_time", StringType()), StructField("radiant_win", StringType())])
streamingInputDF = spark.readStream.csv(inputPath, schema = Schema)
streamingInputDF = streamingInputDF.withColumn("date_time",(from_unixtime(streamingInputDF.start_time, 'yyyy-MM-dd HH:mm:ss')))
# Same query as staticInputDF
#display(streamingInputDF)
streamingCountsDF = (                 
  streamingInputDF
    .groupBy(
      streamingInputDF.radiant_win, 
      window(streamingInputDF.date_time, "1 hour"))
    .count()
)

# Is this DF actually a streaming DF?
streamingCountsDF.isStreaming

In [12]:
spark.conf.set("spark.sql.shuffle.partitions", "2")

In [13]:
query = (
  streamingCountsDF
    .writeStream
    .format("memory")        
    .queryName("counts")     
    .outputMode("complete")  
    .start()
)
display(streamingCountsDF)