# Operations on Streaming Dataframes/Datasets Demo

### Demo

In [1]:
import findspark
findspark.init('/home/brunosouza/Documents/spark')

In [2]:
from pyspark.sql.types import StructType
userSchema = StructType().add("userA","string")\
                         .add("userB","string")\
                         .add("timestamp","timestamp")\
                         .add("interaction","string")

In [3]:
import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, current_timestamp
from pyspark.sql.functions import split
from pyspark.sql.functions import window
from pyspark.sql.types import StructType, TimestampType

In [4]:
staging_dir = 'monitoring_data'

In [5]:
spark = SparkSession\
            .builder\
            .appName("UserInteractionAnalyzer")\
            .getOrCreate()

In [6]:
# Read all the csv files written atomically in a directory
# The schema is as follows:
# userA, userB, timestamp, interaction
userSchema = StructType()\
              .add("userA", "integer")\
              .add("userB", "integer")\
              .add("timestamp", TimestampType())\
              .add("interaction", "string")

In [7]:
activity = spark\
             .readStream\
             .option("sep", ",")\
             .schema(userSchema)\
             .csv(staging_dir+"/*.csv")

In [8]:
wordCounts = activity\
                .select("userB")\
                .where("interaction = \"MT\"")

In [9]:
def udf_batch_csv_save(df, epoch_id):
    df.write.format('csv').save(staging_dir+"/CSVout")
    pass

In [10]:
def udf_batch_parquet_save(df, epoch_id):
    df.write.parquet(staging_dir+"/PARQUETout")
    pass

In [None]:
# query = wordCounts\
#           .writeStream.trigger(processingTime='10 seconds')\
#           .format("parquet")\
#           .option("checkpointLocation", "applicationHistory") \
#           .option("path",staging_dir+"/out")\
#           .start()

query1 = wordCounts\
          .writeStream\
          .trigger(processingTime='10 seconds')\
          .option("checkpointLocation", "applicationHistory")\
          .foreachBatch(udf_batch_csv_save)\
          .foreachBatch(udf_batch_parquet_save)\
          .start()

query2 = wordCounts\
          .writeStream\
          .trigger(processingTime='10 seconds')\
          .format("console")\
          .start()

query1.awaitTermination()
# spark.sql("select * from aggregates").show()   # interactively query in-memory table