In [None]:
from pyspark.sql import SparkSession 
import pyspark.sql.functions as F 
import pyspark.sql.types as T 
spark = SparkSession.builder.getOrCreate() 

In [8]:
#data from https://www.kaggle.com/datasets/ealaxi/paysim1
df = spark.read.csv("PS_20174392719_1491204439457_log.csv",header=True,inferSchema=True) 

                                                                                

In [9]:
df.columns

['step',
 'type',
 'amount',
 'nameOrig',
 'oldbalanceOrg',
 'newbalanceOrig',
 'nameDest',
 'oldbalanceDest',
 'newbalanceDest',
 'isFraud',
 'isFlaggedFraud']

In [10]:
df = df.drop("isFraud","isFlaggedFraud")

In [16]:
df.show(10)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|           0.0|           0.0|
|   1| PAYMENT| 7817.71|  C90045638|      53860.0|      46042.29| M573487274|           0.0|           0.0|
|   1| PAYMENT| 7107.77| C15

In [17]:
#Each step is a unit of time mapped to the real world 
df.groupBy("step").count().show(10)

[Stage 5:>                                                          (0 + 8) / 8]

+----+-----+
|step|count|
+----+-----+
|  31|   12|
|  34|30904|
|  28|    4|
|  26|  440|
|  27|   41|
|  12|36153|
|  22|12635|
|   1| 2708|
|  13|37515|
|   6| 1660|
+----+-----+
only showing top 10 rows



                                                                                

In [18]:
!mkdir data 
!mkdir data/chunks

In [None]:
#saving the output by filtering on each step and saving it to a separate file
steps = df.select("step").distinct().collect() 
for step in steps[:]: 
    _df = df.where(f"step = {step[0]}") 
    _df.coalesce(1).write.mode("append").option("header","true").csv("data/chunks")

                                                                                

In [None]:
#viewing the files 
!cd data/chunks/ && ls 

In [23]:
#one part of the streaming data 
subset = spark.read.csv(
    "data/chunks/part-00000-00cc0439-322d-4a05-acae-90429fa2b352-c000.csv",
    header = True, 
    inferSchema = True
)


In [24]:
subset.groupBy("step").count().show() 

+----+-----+
|step|count|
+----+-----+
|  39|23391|
+----+-----+



In [25]:
#streaming version of this subset 
schema = subset.schema
schema

StructType([StructField('step', IntegerType(), True), StructField('type', StringType(), True), StructField('amount', DoubleType(), True), StructField('nameOrig', StringType(), True), StructField('oldbalanceOrg', DoubleType(), True), StructField('newbalanceOrig', DoubleType(), True), StructField('nameDest', StringType(), True), StructField('oldbalanceDest', DoubleType(), True), StructField('newbalanceDest', DoubleType(), True)])

In [26]:
#I'm going to adjust this to view only one file per trigger 
streaming = (
    spark.readStream.schema(schema).option("maxFilesPerTrigger",1).csv("data/chunks/")
)

In [27]:
#Here I will setup the transformation 
# nameDest column is the recipient ID of the transaction 
dest_count = streaming.groupBy("nameDest").count().orderBy(F.desc("count")) 

In [None]:
activityQuery = (
    dest_count.writeStream.queryName("dest_counts").format("memory").outputMode("complete").start()
)

import time 
for x in range(50): 
    _df = spark.sql("SELECT * FROM dest_counts WHERE nameDest != 'nameDest' AND count>= 2")
    if _df.count() > 0: 
        _df.show(10) 
    time.sleep(0.5)



In [30]:
spark.streams.active[0].isActive

True



In [31]:
activityQuery.status



{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}





In [32]:
activityQuery.stop() 

                                                                                