# DSCI 617 – Homework 08.
**Felix Asare**

In [0]:
# Importing libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr

spark = SparkSession.builder.getOrCreate()

## Problem 1: Creating Streaming DataFrame

In [0]:
# Loading Data
paysim_static = (
    spark.read
    .option("delimiter", ",")
    .option("header", "true")
    .option("inferSchema", "true")
    .csv('/FileStore/tables/paysim/step_001.csv')
)
paysim_schema = paysim_static.schema

# Displaying first 5 rows
paysim_static.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|           0.0|           0.0|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+
only showing top 5 rows



In [0]:
# Createing DataFrame for stream data
paysim_stream = (
    spark.readStream
    .option("delimiter", ",")
    .option("header", "true")
    .option("maxFilesPerTrigger", 1)
    .schema(paysim_schema)
    .csv('/FileStore/tables/paysim/')
)

display(paysim_stream.isStreaming)

True

## Problem 2: Apply Transformations

In [0]:
# Creating type_summary  DataFrame
type_summary  = (
      paysim_stream
      .groupBy('Type')
      .agg(
        expr('count(*) as n'),
        expr('round(avg(amount),2) as avg_amount'),
        expr('min(amount) as min_amount'),
        expr('max(amount) as max_amount')
        
      )
      .orderBy(col('n'), ascending=False)
)


In [0]:
# Creating destinations DataFrame


destinations = (
    paysim_stream
    .filter(col('Type') == 'TRANSFER')
    .groupBy('nameDest')
    .agg(
        expr('count(*) as n'),
        expr('sum(amount) as total_amount'),
        expr('round(avg(amount),2) as avg_amount')
    )
    .orderBy(col('n'), ascending=False)
)

## Problem 3: Define Output Sinks

In [0]:
# Creating in-memory sink for both DataFrame
type_summary_query = (
    type_summary
    .writeStream
    .format('memory')
    .queryName('type_summary')
    .outputMode('complete')
)

# Destination sink
destinations_query = (
    destinations
    .writeStream
    .format('memory')
    .queryName('destinations')
    .outputMode('complete')
)


## Problem 4: Start and Monitor the Streams

In [0]:
# Starting both queries
type_query = type_summary_query.start()
dest_query = destinations_query.start()

In [0]:
# Using Spark SQL to query the in-memory type_summary table
spark.sql("SELECT * FROM type_summary").show()


+----+---+----------+----------+----------+
|Type|  n|avg_amount|min_amount|max_amount|
+----+---+----------+----------+----------+
+----+---+----------+----------+----------+



In [0]:
# Using Spark SQL to query the in-memory  destination table
print(spark.sql('SELECT * from destinations').count())
spark.sql("SELECT * FROM destinations").show(16)

0
+--------+---+------------+----------+
|nameDest|  n|total_amount|avg_amount|
+--------+---+------------+----------+
+--------+---+------------+----------+



In [0]:
# Stop both quries
type_query.stop()


In [0]:
dest_query.stop()