In [None]:
!pip install pyspark

In [None]:
!pip install nb_black

Useful packages

In [1]:
%reload_ext nb_black
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.getOrCreate()

<IPython.core.display.Javascript object>

The data use here is available at https://www.kaggle.com/ntnu-testimon/paysim1

In [2]:
df = spark.read.csv("paysim.csv", header=True, inferSchema=True)

<IPython.core.display.Javascript object>

In [3]:
df = df.drop("isFraud", "isFlaggedFraud")

<IPython.core.display.Javascript object>

In [4]:
df.show(2)

+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
|step|   type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
|   1|PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|
|   1|PAYMENT|1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
only showing top 2 rows



<IPython.core.display.Javascript object>

Step maps a unit of time in the real world. In this case 1 1 stape is 1 hour of time. So we can assume for this example that we have another job that runs every hour and gets all the transactions in that time frame.

In [5]:
df.groupBy("step").count().show(3)

+----+-----+
|step|count|
+----+-----+
| 148|   12|
| 463|   10|
| 471| 2620|
+----+-----+
only showing top 3 rows



<IPython.core.display.Javascript object>

We can therefore save output of that job filtering on each step and saving it to a separate file.

In [None]:
# %%time
# steps = df.select("step").distinct().collect()
# for step in steps[:]:
#     _df = df.where(f"step = {step[0]}")
#     _df.coalesce(1).write.mode("append").option("header", "true").csv("paysim1")

In [6]:
!cd /home/andry/Documents/Big_data/hadoop_cours/paysim1/ && ls

part-00000-0023a947-df87-4003-920e-5c4e739557d8-c000.csv
part-00000-004dbfd3-07f8-44f5-9543-cd92329fbf8f-c000.csv
part-00000-00828cf3-37c3-4620-8438-2ee8cceb8a2c-c000.csv
part-00000-00a05824-4fc0-46a7-98db-1b19271a3d07-c000.csv
part-00000-011a6fbd-c0d1-4230-8033-501379275160-c000.csv
part-00000-012956cf-73dc-43ee-83f6-60d8f6e668ef-c000.csv
part-00000-0190dc9a-1b0a-4379-8b87-465678468bf5-c000.csv
part-00000-01b242b8-8b47-4e58-80ea-a5b4d07c94ac-c000.csv
part-00000-01e5d673-1198-41f6-811a-028748d63b4c-c000.csv
part-00000-0262741c-c1c1-48c6-a227-212dec06ff1f-c000.csv
part-00000-03cf198b-b06e-47f5-8153-49828a5ed7e7-c000.csv
part-00000-040e03e1-15d9-419f-9d3c-5794d800def0-c000.csv
part-00000-044770eb-abc6-4c66-ac85-cb4d8bbfad8c-c000.csv
part-00000-0493ec0d-923b-43ed-a98b-a2e722502ca6-c000.csv
part-00000-04ff5077-bc7c-4c28-a54f-32f410de6ee7-c000.csv
part-00000-05776960-8a63-4525-be77-66096f72cc6c-c000.csv
part-00000-05caad14-a99b-4388-af8f-ba63981fa870-c000.csv
part-00000-071

<IPython.core.display.Javascript object>

In [7]:
part = spark.read.csv("/home/andry/Documents/Big_data/hadoop_cours/paysim1/part-00000-ff8dea42-b513-482f-9269-d676b88f53c0-c000.csv", header =True, inferSchema =True)


<IPython.core.display.Javascript object>

In [8]:
part.groupBy("step").count().show()

+----+-----+
|step|count|
+----+-----+
| 144| 3523|
+----+-----+



<IPython.core.display.Javascript object>

Let's create a streaming version of this input, we'll read each file one by one as if it was stream

In [9]:
dataSchema = part.schema

<IPython.core.display.Javascript object>

In [10]:
dataSchema

StructType(List(StructField(step,IntegerType,true),StructField(type,StringType,true),StructField(amount,DoubleType,true),StructField(nameOrig,StringType,true),StructField(oldbalanceOrg,DoubleType,true),StructField(newbalanceOrig,DoubleType,true),StructField(nameDest,StringType,true),StructField(oldbalanceDest,DoubleType,true),StructField(newbalanceDest,DoubleType,true)))

<IPython.core.display.Javascript object>

maxFilesPerTrigger allows you to control how quickly spark will read all of the files in the folder. In this example we're limiting the flow of the stream to one file per trigger.

In [11]:
streaming = (
    spark.readStream.schema(dataSchema)
    .option("maxFilesPerTrigger", 1)
    .csv("/home/andry/Documents/Big_data/hadoop_cours/paysim1/")
)

<IPython.core.display.Javascript object>

Let's set up a transformation. The nameDest column is the recipient ID of the transaction

In [12]:
dest_count = streaming.groupBy("nameDest").count().orderBy(F.desc("count"))

<IPython.core.display.Javascript object>

Now that we have our transformation. we need t o specify an output sink for the results. For this example, we're going to write to a memory sink which keeps the results in memory.

We also need to define how Spark will output that data. In this example. we'll use the complete output mode ( rewritting all of the keys along with theis counts after every trigger)

In this example we won't include activityQuery.awaitTerminate() because it is required only to prevent the driver process from terminating when the stream is active.

So, in order to be able to run this locally in a notebook we won't include it.


In [14]:
activityQuery = (
    dest_count.writeStream.queryName("dest_counts")
    .format("memory")
    .outputMode("complete")
    .start()
)
# include this in production
# activityQuery.awaitTerminate()

import time

for x in range(50):
    _df = spark.sql(
        "SELECT * FROM dest_counts WHERE nameDest != 'nameDest' AND count >= 2"
    )
    if _df.count() > 0:
        _df.show(10)
    time.sleep(0.5)

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C319921943|    2|
| C803352127|    2|
|C1887077333|    2|
+-----------+-----+

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C319921943|    2|
| C803352127|    2|
|C1887077333|    2|
| C763794011|    2|
| C488343370|    2|
+-----------+-----+

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C319921943|    2|
| C803352127|    2|
|C1887077333|    2|
| C763794011|    2|
| C488343370|    2|
+-----------+-----+

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C319921943|    2|
| C803352127|    2|
|C1887077333|    2|
| C763794011|    2|
| C488343370|    2|
+-----------+-----+

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C587204551|    2|
|C1015743493|    2|
| C359227905|    2|
|C1850343194|    2|
| C379236140|    2|
| C319921943|    2|
|C1377194794|    2|
|C1455885936|    2|
| C803352127|    2|
| C325257804|    2|
+-----------+-----+
only showing top 10 rows

+---------

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C225852166|    6|
|C1898476716|    6|
| C705073130|    6|
|C1187819806|    6|
|  C74529249|    6|
|C1673419845|    5|
|C1499142229|    5|
|C1107146140|    5|
|C1093509042|    5|
|C1309311919|    5|
+-----------+-----+
only showing top 10 rows

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C225852166|    6|
|C1898476716|    6|
| C705073130|    6|
|C1187819806|    6|
|  C74529249|    6|
|C1673419845|    5|
|C1499142229|    5|
|C1107146140|    5|
|C1093509042|    5|
|C1309311919|    5|
+-----------+-----+
only showing top 10 rows

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C225852166|    6|
|C1898476716|    6|
| C705073130|    6|
|C1187819806|    6|
|  C74529249|    6|
|C1673419845|    5|
|C1499142229|    5|
|C1107146140|    5|
|C1093509042|    5|
|C1309311919|    5|
+-----------+-----+
only showing top 10 rows

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C225852166|    6|
|C

<IPython.core.display.Javascript object>

Check if the stream is active

In [15]:
spark.streams.active[0].isActive

True

<IPython.core.display.Javascript object>

In [16]:
activityQuery.status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

<IPython.core.display.Javascript object>

If we want to turn off the stream we'll run  activityQuery.stop() to reset the query for testing purposes.

In [17]:
activityQuery.stop()

<IPython.core.display.Javascript object>