Alumnos: Jose Gomez Baco / Leticia Yepez Chavez

# Stream Finance With Window
Program that reads finance in streaming from a directory, finding those that are higher than a given threshold.

It is assumed that an external entity is writing files in that directory.
The file should have the CSV format with the schema: ["name":string, "money":string, "time":timestamp]

:param directory: streaming directory

In [2]:
#Load library
import sys
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, window
from pyspark.sql.types import StructType

In [None]:
# Define de path of the directory
directory=""

In [3]:
spark = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("StreamingFinanceData") \
    .getOrCreate()

In [None]:
spark.sparkContext.setLogLevel("ERROR")

In [None]:
fileSchema = StructType().add('name', 'string').add('money', 'string').add('time', 'timestamp')

In [None]:
# Create DataFrame representing the stream of input lines
lines = spark \
    .readStream \
    .format("CSV") \
    .option('sep', ',') \
    .option("header", "true") \
    .option('includeTimestamp', 'true') \
    .schema(fileSchema) \
    .load(directory)

lines.printSchema()

In [None]:
# Split the lines into words
words = lines.select(lines.name, lines.money, lines.time)
words.printSchema()

In [None]:
# Generate running word count, indicating window parameters (in seconds)
windowSize = '{} seconds'.format(3)
slideSize = '{} seconds'.format(2)
windowedCounts = words.groupBy(
        window(words.time, windowSize, slideSize),
        words.name
    ).count()\
    .orderBy('window')

windowedCounts.printSchema()

In [None]:
# Start running the query that prints the output in the screen
query = windowedCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option('truncate', 'false') \
    .start()

In [None]:
query.awaitTermination()