# DSCI 617 – Homework 08
**Jeffery Boczkaja**

In [0]:
from pyspark.sql.functions import col, expr
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

## Problem 1: Creating Streaming DataFrame

We will start by loading a single file into a static DataFrame, and then using this DataFrame to define a schema.

In [0]:
df = spark.read.csv("/FileStore/tables/paysim/step_001.csv", header=True, inferSchema=True)
schema = df.schema
print("Schema of the DataFrame:")
print(schema)

print("First 5 rows of the DataFrame:")
df.show(5)

Schema of the DataFrame:
StructType([StructField('step', IntegerType(), True), StructField('type', StringType(), True), StructField('amount', DoubleType(), True), StructField('nameOrig', StringType(), True), StructField('oldbalanceOrg', DoubleType(), True), StructField('newbalanceOrig', DoubleType(), True), StructField('nameDest', StringType(), True), StructField('oldbalanceDest', DoubleType(), True), StructField('newbalanceDest', DoubleType(), True)])
First 5 rows of the DataFrame:
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|   

We will now create a streaming DataFrame.

In [0]:
paysim_stream = spark.readStream \
    .option("maxFilesPerTrigger", 1) \
    .schema(schema) \
    .csv("/FileStore/tables/paysim/", header=True)

We will verify if the schema is streaming.

In [0]:
print("Is the DataFrame streaming?", paysim_stream.isStreaming)

Is the DataFrame streaming? True


## Problem 2: Apply Transformations

We will apply two different sets of transformations to the PaySim stream. We will start by grouping the results
according to the transaction type. 

In [0]:
type_summary = paysim_stream.groupBy("type") \
    .agg(
        expr("count(*) as n"),
        expr("round(avg(amount), 2) as avg_amount"),
        expr("min(amount) as min_amount"),
        expr("max(amount) as max_amount")
    ) \
    .orderBy(col("n").desc())

We will verify the DataFrame is streaming

In [0]:
print("Is the DataFrame streaming?", type_summary.isStreaming)

Is the DataFrame streaming? True


Next, we will group the transfers according to their destination. 

In [0]:
destinations = paysim_stream.filter(col("type") == "TRANSFER") \
    .groupBy("nameDest") \
    .agg(
        expr("count(*) as n"),
        expr("sum(amount) as total_amount"),
        expr("round(avg(amount), 2) as avg_amount")
    ) \
    .orderBy(col("n").desc())

We will verify the DataFrame is streaming.

In [0]:
print("Is the DataFrame streaming?", destinations.isStreaming)

Is the DataFrame streaming? True


## Problem 3: Define Output Sinks

We will now define in-memory output sinks for both of the transformed DataFrames we created in Problem 2.
We will do this by creating a DataStreamWriter object for each of these DataFrames.

In [0]:
type_summary_writer = type_summary.writeStream \
    .outputMode("complete") \
    .queryName("type_summary") \
    .format("memory") \
    .start()

destinations_writer = destinations.writeStream \
    .outputMode("complete") \
    .queryName("destinations") \
    .format("memory") \
    .start()

## Problem 4: Start and Monitor the Streams

We will now start the two data streams. 

In [0]:
type_summary_writer_new = type_summary.writeStream \
    .outputMode("complete") \
    .queryName("type_summary_new") \
    .format("memory")

type_query = type_summary_writer_new.start()

destinations_writer_new = destinations.writeStream \
    .outputMode("complete") \
    .queryName("destinations_new") \
    .format("memory")

dest_query = destinations_writer_new.start()

print("Is type_query active?", type_query.isActive)
print("Is dest_query active?", dest_query.isActive)

Is type_query active? True
Is dest_query active? True


We will now view the contents of the type_summary table.

In [0]:
static_type_summary = spark.sql("SELECT * FROM type_summary_new")
record_count = static_type_summary.count()
print(f"Number of records: {record_count}")

static_type_summary.show()

Number of records: 5
+--------+------+----------+----------+----------+
|    type|     n|avg_amount|min_amount|max_amount|
+--------+------+----------+----------+----------+
|CASH_OUT|283602| 187343.61|      0.37|     1.0E7|
| PAYMENT|262254|  11618.97|       0.1| 115264.68|
| CASH_IN|172327| 172399.61|      5.44|1781905.26|
|TRANSFER| 65121|  665790.8|       2.6|     1.0E7|
|   DEBIT|  5365|   5846.53|      0.87| 218148.28|
+--------+------+----------+----------+----------+



We will count the unique destinations of transfers.

In [0]:
unique_dest_count_query = """
SELECT COUNT(DISTINCT nameDest) as unique_dest_count
FROM destinations_new
"""

unique_dest_count_df = spark.sql(unique_dest_count_query)

unique_dest_count_df.show()

+-----------------+
|unique_dest_count|
+-----------------+
|            25752|
+-----------------+



Next, we will view the contents of the destinations table.

In [0]:
static_destinations = spark.sql("SELECT * FROM destinations_new")
record_count_dest = static_destinations.count()
print(f"Number of records: {record_count_dest}")

static_destinations.show(16)

Number of records: 184
+-----------+---+--------------------+----------+
|   nameDest|  n|        total_amount|avg_amount|
+-----------+---+--------------------+----------+
|C1590550415| 12|1.7533855849999998E7|1461154.65|
| C306206744| 10|   7134909.749999999| 713490.97|
| C453211571|  8|   4764787.380000001| 595598.42|
| C564160838|  8|          3070298.59| 383787.32|
| C248609774|  8|          6160533.15| 770066.64|
| C985934102|  7|  3718139.3599999994| 531162.77|
|C1916720513|  7|           4312182.8| 616026.11|
| C451111351|  7|           5617336.9|  802476.7|
|C1286084959|  7|          2244985.66| 320712.24|
| C392292416|  7|           4587850.6| 655407.23|
|C1899073220|  7|          4089296.77| 584185.25|
|C1782113663|  7|          4161329.67| 594475.67|
| C665576141|  6|   7626856.100000001|1271142.68|
|C1789550256|  6|   4334877.899999999| 722479.65|
| C998351292|  6|  1880762.2400000002| 313460.37|
|C1170794006|  5|  3501781.2300000004| 700356.25|
+-----------+---+----------

We will stop the streaming.

In [0]:
type_query.stop()
dest_query.stop()
print("Is type_query active?", type_query.isActive)
print("Is dest_query active?", dest_query.isActive)

Is type_query active? False
Is dest_query active? False
