# DSCI 417 - Homework 08
**Malcolm Nichols**

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr

spark = SparkSession.builder.getOrCreate()

## Problem 1: Creating Streaming DataFrame

In [0]:
static_df = (
  spark.read
  .option('header', True)
  .option('inferSchema', True)
  .csv('/FileStore/tables/paysim/step_001.csv')
)

paysim_schema = static_df.schema

static_df.show(5)

In [0]:
paysim_stream = (
    spark.readStream
    .option('header', True)
    .option('maxFilesPerTrigger', 1)
    .schema(paysim_schema)
    .csv('/FileStore/tables/paysim/')
)

print(paysim_stream.isStreaming)

## Problem 2: Apply Transformations

In [0]:
type_summary = (
    paysim_stream
    .groupBy('type')
    .agg(
        expr('COUNT(*) AS n'),
        expr('ROUND(MEAN(amount),2) AS avg_amount'),
        expr('MIN(amount) AS min_amount'),
        expr('MAX(amount) AS max_amount')
    )
    .sort('n', ascending=False)
)

In [0]:
destinations = (
    paysim_stream
    .filter(expr('type == "TRANSFER"'))
    .groupBy('nameDest')
    .agg(
        expr('COUNT(*) AS n'),
        expr('ROUND(SUM(amount),2) AS total_amount'),
        expr('ROUND(MEAN(amount),2) AS avg_amount')
    )
    .sort('n', ascending=False)
)

## Problem 3: Define Output Sinks

In [0]:
type_writer = (
    type_summary
    .writeStream
    .format('memory')
    .queryName('type_summary')
    .outputMode('complete')
)

dest_writer = (
    destinations
    .writeStream
    .format('memory')
    .queryName('destinations')
    .outputMode('complete')
)

## Problem 4: Start and Monitor the Streams

In [0]:
type_query = type_writer.start()
dest_query = dest_writer.start()

In [0]:
spark.sql('SELECT * FROM type_summary').show()

In [0]:
print(spark.sql('SELECT * FROM destinations').count())
spark.sql('SELECT * FROM destinations').show(16)

In [0]:
type_query.stop()
dest_query.stop()