In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as function
import pyspark.sql.types as types

In [2]:
spark = (
    SparkSession.
    builder.
    appName('spark-paysim').
    getOrCreate()
)

In [3]:
filename = 'data/paysim.csv'

In [4]:
df = (
    spark.
    read.
    format('csv').
    option('header', 'true').
    option('inferSchema', 'true').
    load(filename)
)

In [5]:
df.show(n=10)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|      

In [6]:
df.columns

['step',
 'type',
 'amount',
 'nameOrig',
 'oldbalanceOrg',
 'newbalanceOrig',
 'nameDest',
 'oldbalanceDest',
 'newbalanceDest',
 'isFraud',
 'isFlaggedFraud']

In [7]:
df = df.drop('isFraud', 'isFlaggedFraud')

In [8]:
(
    df.
    groupBy('step').
    count().
    show(n=3)
)

+----+-----+
|step|count|
+----+-----+
|  12|36153|
|   1| 2708|
|  13|37515|
+----+-----+
only showing top 3 rows



In [11]:
# steps = df.select('step').distinct().collect()
# for step in steps[:]:
#      _df = df.where(f'step = {step[0]}')
#     # _df.coalesce(1).write.mode('append').option('header', 'true').csv('../data/paysim')
#     _df.coalesce(1).write.mode('append').format('csv').option('header','true').csv('data/paysim')

In [10]:
!cd data/paysim/ && ls

part-00000-0248b774-4c6c-40da-8601-23058ce7bad0-c000.csv
part-00000-03585af6-cf76-4b7e-96ab-2d3323f5e8ac-c000.csv
part-00000-0484266d-29b0-46b4-9680-c74153161ae0-c000.csv
part-00000-04af313c-57cb-4d00-8b9b-ab0c16896672-c000.csv
part-00000-0affab26-cd30-41b6-ab4d-b924826327e2-c000.csv
part-00000-0c331c10-47fa-4c49-b2b1-7d67581856d7-c000.csv
part-00000-0f800789-81a1-4fe5-92bd-faecf29a9390-c000.csv
part-00000-118c6005-6395-4baa-bdf9-9e210544517d-c000.csv
part-00000-146ece07-fb1d-4b5c-9d34-0a69c1003fc8-c000.csv
part-00000-14716195-9912-403a-8db7-f2a36c35311e-c000.csv
part-00000-14eba50c-ed70-4b3b-9341-c9800165fc87-c000.csv
part-00000-16b8188f-86fd-444a-a60f-f6bae0a5f01f-c000.csv
part-00000-18fd1995-bb6c-4df4-b669-e18861b1ed46-c000.csv
part-00000-1ccbf5a3-c873-4065-be63-ecca5fcb3fa4-c000.csv
part-00000-24287748-3033-4023-b586-21cec7225b44-c000.csv
part-00000-25a115dd-365f-4c0d-b9c6-273767722e35-c000.csv
part-00000-2821e853-b7dc-4c75-a6fe-5c2d59237600-c000.csv
part-00000-2af19103-b1cc-43ae-a

In [13]:
part_filename = 'data/paysim/part-00000-6da2b181-4ee6-4859-b68c-236c50901c4e-c000.csv'

part = (
    spark.
    read.
    format('csv').
    option('header', 'true').
    option('inferSchema', 'true').
    load(part_filename)
)

In [14]:
(
    part.
    groupBy('step').
    count().
    show()
)

+----+-----+
|step|count|
+----+-----+
|  17|43361|
+----+-----+



Let's create a streaming version of this input, we'll read each file one by one as if it was a stream

In [15]:
dataSchema = part.schema

In [16]:
dataSchema

StructType([StructField('step', IntegerType(), True), StructField('type', StringType(), True), StructField('amount', DoubleType(), True), StructField('nameOrig', StringType(), True), StructField('oldbalanceOrg', DoubleType(), True), StructField('newbalanceOrig', DoubleType(), True), StructField('nameDest', StringType(), True), StructField('oldbalanceDest', DoubleType(), True), StructField('newbalanceDest', DoubleType(), True)])

**maxFilesPerTrigger** allows you to control how quickly Spark will read all of the files in the folder. In this example we're limiting the flow of the stream to one file per trigger.

In [16]:
streaming = (
    spark.readStream.schema(dataSchema).
    option("maxFilesPerTrigger", 1).
    csv("data/paysim/")   
)

Let's set up a transformation.

The nameDest column is the recipient ID of the transaction.

In [17]:
dest_count = (
    streaming.
    groupBy('nameDest').
    count().
    orderBy(function.desc('count'))
)

Now that we have our transformations, we need to specify an output sink for the results. For this example, we're going to write to a memory sink which keeps the results in memory.

We would also need to define how Spark will output the data. In this example, we'll compute output mode (rewriting all of the keys along with their counts after every trigger).

We won't include activityQuery.await.Terminator() because it is required only to prevent the driver process from terminating when the stream is active.

So in order to be able to run this locally in a notebook we won't include it.

In [19]:
activityQuery = (
    dest_count.
        writeStream.
        queryName('dest_counts').
    format("memory").
    outputMode("complete").
    start()
)


import time

for x in range(50):
    _df = spark.sql(
        """
        SELECT
            *
        FROM
            dest_counts
        WHERE
            nameDest != 'nameDest'
        AND
            count >= 2
        """
    )
    
    if _df.count() > 0:
        _df.show(10)
    time.sleep(0.5)

+-----------+-----+
|   nameDest|count|
+-----------+-----+
|C2131465140|    9|
| C499714286|    9|
|C1760966565|    7|
|C1239707538|    7|
|C1968101532|    7|
|C1139127799|    7|
| C709613653|    7|
|C1193495878|    7|
|C1907159141|    7|
|C1390358265|    7|
+-----------+-----+
only showing top 10 rows

+-----------+-----+
|   nameDest|count|
+-----------+-----+
|C2131465140|    9|
| C499714286|    9|
|C1760966565|    7|
|C1239707538|    7|
|C1968101532|    7|
|C1139127799|    7|
| C709613653|    7|
|C1193495878|    7|
|C1907159141|    7|
|C1390358265|    7|
+-----------+-----+
only showing top 10 rows

+-----------+-----+
|   nameDest|count|
+-----------+-----+
|C1590550415|   34|
| C985934102|   33|
| C564160838|   30|
|C2083562754|   25|
| C665576141|   23|
|C1286084959|   23|
| C401424608|   22|
|  C33524623|   22|
| C998351292|   22|
|C1023714065|   22|
+-----------+-----+
only showing top 10 rows

+-----------+-----+
|   nameDest|count|
+-----------+-----+
|C1590550415|   34|
| 

Check if stream is active

In [20]:
spark.streams.active[0].isActive

True

In [21]:
activityQuery.status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

if we want to turn off the stream, we'll run activityQuery.stop() to reset the query for testing purpose.

In [22]:
activityQuery.stop()