In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=e9bf6fba118cb68d04a08f3e125ebf8148f33ba8067bdc096d85d86eaf439872
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


Starting Pyspark session

In [2]:

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.getOrCreate()

In [4]:
df = spark.read.csv('/content/Paysim.csv',header = True, inferSchema=True)
df.columns

['step',
 'type',
 'amount',
 'nameOrig',
 'oldbalanceOrg',
 'newbalanceOrig',
 'nameDest',
 'oldbalanceDest',
 'newbalanceDest',
 'isFraud',
 'isFlaggedFraud']

In [5]:
df = df.drop("isFraud",'isFlaggedFraud')
df.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|           0.0|           0.0|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+
only showing top 5 rows



Stepmaps a unit in the real world. In this case 1 step is 1 hour of time. So we can assume for this example that we have another job that runs every hour and gets all the transactions in that time frame 

In [6]:
df.groupBy("step").count().show(3)

+----+-----+
|step|count|
+----+-----+
|   1| 2708|
|   6| 1660|
|   3|  552|
+----+-----+
only showing top 3 rows



We can therefore save the outpt of that job by filtering on each step and saving it to a separate file 


In [7]:
%%time
steps = df.select("step").distinct().collect()
for step in steps[:]:
  _df = df.where(f"step = {step[0]}")
  _df.coalesce(1).write.mode("append").option("header","true").csv("/content/Paysim")

CPU times: user 37 ms, sys: 4.55 ms, total: 41.6 ms
Wall time: 4.61 s


To get the storage location of all the file in the stream

In [8]:
!cd /content/Paysim && ls

part-00000-029b548e-612d-44ab-a340-ccbab3da86b2-c000.csv
part-00000-22eeaeed-4474-4b39-8091-20f23d763325-c000.csv
part-00000-4bb05cf6-9456-4dc6-bd65-41ba2d953612-c000.csv
part-00000-5b1c343e-6580-4d19-97bc-e391aa96a5a7-c000.csv
part-00000-5df9d322-fd61-4f9d-b086-91d93044a8ca-c000.csv
part-00000-6d2f4fe6-ea92-496f-afdc-829aabcfa9e5-c000.csv
part-00000-76ff296b-bc3b-406a-beae-74f92db8eb2a-c000.csv
part-00000-c1bf1861-3296-4b52-831d-121d92b2c139-c000.csv
part-00000-c229216a-fa59-430a-ba56-adf13ac3b960-c000.csv
part-00000-c85d62d0-d373-403c-9aee-be011ef3a6a4-c000.csv
_SUCCESS


In [11]:
part = spark.read.csv(
    "/content/Paysim/part-00000-029b548e-612d-44ab-a340-ccbab3da86b2-c000.csv",
    header = True,
    inferSchema = True,
)

In [12]:
part.groupBy("step").count().show()

+----+-----+
|step|count|
+----+-----+
|   7| 6837|
+----+-----+



Lets create a streaming version of this input, we we'll read each file one by one as if it was a stream

In [13]:
dataSchema = part.schema
dataSchema

StructType([StructField('step', IntegerType(), True), StructField('type', StringType(), True), StructField('amount', DoubleType(), True), StructField('nameOrig', StringType(), True), StructField('oldbalanceOrg', DoubleType(), True), StructField('newbalanceOrig', DoubleType(), True), StructField('nameDest', StringType(), True), StructField('oldbalanceDest', DoubleType(), True), StructField('newbalanceDest', DoubleType(), True)])

maxFilesPerTrigger allows you quickly Spark will read all of the files in the folder. In this example we're limiting the flow of the stream to one file per trigger


In [14]:
streaming = (
    spark.readStream.schema(dataSchema)
    .option('maxFilesPerTrigger',1)
    .csv("/content/Paysim")
)

Lets set up transformation. 

The nameDest column is the recipent ID of the transaction

In [15]:
dest = streaming.groupBy("nameDest").count().orderBy(F.desc("count"))

After transformation, we need to specify an output sink for results. Eg. We are going to write to a memory sink which keeps the results in memory. 

We also need to define how Spark will output that data. We will use complete output mode (reqriting all of keys along with their counts after every trigger)

In this exampple e wont include activityQuery.await.Termintation() because it is required only to prevent the driver process from terminating when the stream is active

In [16]:
activityQuery = (
    dest.writeStream.queryName("dest_counts")
    .format("memory")
    .outputMode("complete")
    .start()
)

In [19]:
import time

for x in range(50):
  _df = spark.sql(
      "Select * from dest_counts Where nameDest != 'nameDest' AND count >=2"
  )

  if _df.count()>0:
    _df.show(10)
    time.sleep(0.5)

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C985934102|   66|
|C1590550415|   54|
| C248609774|   53|
|C1286084959|   53|
|C2083562754|   49|
| C451111351|   48|
|C1360767589|   48|
| C977993101|   48|
|C1899073220|   47|
|C1782113663|   44|
+-----------+-----+
only showing top 10 rows

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C985934102|   66|
|C1590550415|   54|
| C248609774|   53|
|C1286084959|   53|
|C2083562754|   49|
| C451111351|   48|
|C1360767589|   48|
| C977993101|   48|
|C1899073220|   47|
|C1782113663|   44|
+-----------+-----+
only showing top 10 rows

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C985934102|   66|
|C1590550415|   54|
| C248609774|   53|
|C1286084959|   53|
|C2083562754|   49|
| C451111351|   48|
|C1360767589|   48|
| C977993101|   48|
|C1899073220|   47|
|C1782113663|   44|
+-----------+-----+
only showing top 10 rows

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C985934102|   66|
|C

In [22]:

spark.streams.active[0].isActive

True

In [23]:
activityQuery.status

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [24]:
activityQuery.stop()