# Pyspark Streaming Example

- Dataset: https://www.kaggle.com/datasets/ealaxi/paysim1/
    - Synthetically created finane transactions to detect frauds

In [1]:
# Create the Spark Session
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Streaming Process Files") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .master("local[*]") \
    .getOrCreate()

spark

23/12/19 11:40:39 WARN Utils: Your hostname, gowtham-Inspiron-5570 resolves to a loopback address: 127.0.1.1; using 192.168.29.189 instead (on interface wlp2s0)
23/12/19 11:40:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/19 11:40:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
df_pyspark=spark.read.csv('data/finance_data.csv', header=True, inferSchema=True)
df_pyspark.show()

                                                                                

+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1| PAYMENT|  9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1| PAYMENT|  1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
|   1|TRANSFER|    181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|    181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1| PAYMENT| 11668.14|C2048537720|      41554.0|      29885.86|M123070170

In [4]:
df_pyspark.describe().show()

23/12/18 11:43:24 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 5:>                                                          (0 + 1) / 1]

+-------+------------------+--------+------------------+-----------+------------------+-----------------+-----------+------------------+------------------+--------------------+--------------------+
|summary|              step|    type|            amount|   nameOrig|     oldbalanceOrg|   newbalanceOrig|   nameDest|    oldbalanceDest|    newbalanceDest|             isFraud|      isFlaggedFraud|
+-------+------------------+--------+------------------+-----------+------------------+-----------------+-----------+------------------+------------------+--------------------+--------------------+
|  count|           6362620| 6362620|           6362620|    6362620|           6362620|          6362620|    6362620|           6362620|           6362620|             6362620|             6362620|
|   mean|243.39724563151657|    NULL|179861.90354913048|       NULL| 833883.1040744851|855113.6685785787|       NULL|1100701.6665196505|1224996.3982019336|0.001290820448180152| 2.51468734577894E-6|
| stddev| 

                                                                                

- We don't need last two columns 'isFraud' and 'isFlaggedFraud' as for streaming systems

In [5]:
df_pyspark = df_pyspark.drop('isFraud', 'isFlaggedFraud')
df_pyspark.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|           0.0|           0.0|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+
only showing top 5 rows



- Step column maps a unit of time in the real world. In this case let us assume that the value of '1' in step represents 1 hour of time.
- So we can assume for this example that we have another job that runs every hour and gets all the transactions in that time frame.

In [8]:
# Lets try to group it with respect to step and see the dataframe
from pyspark.sql.functions import sum, col, desc
df_pyspark.groupBy('step').count().sort(desc('count')).show()

[Stage 13:>                                                         (0 + 8) / 8]

+----+-----+
|step|count|
+----+-----+
|  19|51352|
|  18|49579|
| 187|49083|
| 235|47491|
| 307|46968|
| 163|46352|
| 139|46054|
| 403|45155|
|  43|45060|
| 355|44787|
|  15|44609|
| 186|43747|
| 306|43615|
|  17|43361|
| 259|43328|
|  16|42471|
| 379|41759|
|  14|41485|
|  42|41304|
| 354|40696|
+----+-----+
only showing top 20 rows



                                                                                

- We can clearly see the number of transactions(count) for each step
- We can now save the output by filtering on each step and saving it as a seperate file

In [12]:
# The below code steps are not mandate but this is just for creating seperate dataframes based on each step
# %%time
# steps = df_pyspark.select('step').distinct().collect()
# print(steps)
# for step in steps:
#     df_upt = df_pyspark.where(f'step = {step[0]}')
#     # By addig coalesce(1) we save the dataframe to one file
#     df_upt.coalesce(1).write.mode('append').option('header', 'true').csv('data/finance_data_upt')


In [13]:
# The below line of code displays the all the dataframes that were saved in the above step
!cd data/finance_data_upt/ && ls

part-00000-011f96ea-80ac-400d-9db0-955d2a0aea30-c000.csv
part-00000-012978cf-b20d-4756-9e0e-d3bf370d3313-c000.csv
part-00000-05777074-0890-455d-8c1c-aac06c9660a3-c000.csv
part-00000-059eb5c5-e2bd-404e-a0ab-47381658803e-c000.csv
part-00000-05e25c1c-5a9a-4fcc-9a28-188be78f74ad-c000.csv
part-00000-0836efd2-5e8f-4df4-bba9-e5099484d332-c000.csv
part-00000-092ca57f-ec0e-4e7f-ab54-215d12f4c200-c000.csv
part-00000-0c5cf012-b6d7-4eb5-bcaf-a5458b2cbfe1-c000.csv
part-00000-10eb07cb-a846-45cf-a2f3-c951ae37802a-c000.csv
part-00000-1264dacc-c225-461e-9054-0c7930c3b437-c000.csv
part-00000-13be92a5-58e6-45be-8b2b-7a5036f862ce-c000.csv
part-00000-1976a447-5384-48be-8b3d-8b20b1fd86c5-c000.csv
part-00000-1afe4d59-5646-4f4a-a8a5-030d1709af88-c000.csv
part-00000-1cab8aaf-ff48-49a4-9e75-ca3d7d2cd5bb-c000.csv
part-00000-21324b12-3f58-4a6b-a084-57b474b17943-c000.csv
part-00000-2240c379-2754-4242-8ca6-a8333aed925c-c000.csv
part-00000-301cf07e-511a-425d-be75-621fee5ca89d-c000.csv
part-00000-3687259a-1306-403d-8

In [3]:
df_partition = spark.read.csv(
            'data/finance_data_upt/part-00000-4c4a07a5-fdcd-4f6e-8d57-d0a9606d2874-c000.csv',
            header=True,
            inferSchema=True,
)
df_partition.show()

+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+
|step|    type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+
|  30|TRANSFER|122235.11|C1279213624|    122235.11|           0.0|C2100791242|           0.0|           0.0|
|  30|CASH_OUT|122235.11| C356571967|    122235.11|           0.0|C1905066109|           0.0|     122235.11|
|  30|TRANSFER|236947.25|C1273047168|    236947.25|           0.0|C1636242484|           0.0|           0.0|
|  30|CASH_OUT|236947.25| C474899128|    236947.25|           0.0| C116505755|      70084.28|     307031.53|
|  30|TRANSFER| 19016.22| C182136548|     19016.22|           0.0| C717672329|           0.0|           0.0|
|  30|CASH_OUT| 19016.22|C1257554854|     19016.22|           0.0|C1044117371|    9347999.52|    9367015.74|
|  30|TRANSFER|1094

In [4]:
df_partition.groupBy('step').count().show()

+----+-----+
|step|count|
+----+-----+
|  30|    8|
+----+-----+



- Creating a streaming version of the input: we will read each file one by one as if it was a stream

In [5]:
# Lets check the schema of the partition
dataSchema = df_partition.schema
dataSchema

StructType([StructField('step', IntegerType(), True), StructField('type', StringType(), True), StructField('amount', DoubleType(), True), StructField('nameOrig', StringType(), True), StructField('oldbalanceOrg', DoubleType(), True), StructField('newbalanceOrig', DoubleType(), True), StructField('nameDest', StringType(), True), StructField('oldbalanceDest', DoubleType(), True), StructField('newbalanceDest', DoubleType(), True)])

In [6]:
# Here streaming data is created from the finance_data folder setting the trigger to '1'
# maxFilesPerTrigger - allows to control how quickly spark will reall the files in the folder
stream = (
        spark.readStream.schema(dataSchema) \
        .option('maxFilesPerTrigger',1) \
        .csv('data/finance_data_upt/')
)

- Let us perform some transformations on the streaming data(Dstreams)
- Here 'nameDest' column is the 'recipient ID' of the transaction

In [8]:
# Idea is to identify the fradulent transactions, 
# Considering a scenario - if multiple account holders transfer the amount to one account
# So grouping the 'nameDest' feature and sorting it
from pyspark.sql.functions import sum, col, desc
dest_count = stream.groupBy('nameDest').count().orderBy(desc('count'))

After the transformation 
- We need to specify the output sink for the results. For this example we are going to write it to a memory sink which keeps the results in the memory.
- We also need to denfine how spark will output that data. In this example we will use the complete output mode(rewriting all the keys along with their counts after every trigger)

In [9]:
query = (
        dest_count.writeStream.queryName('dest_counts') \
        .format('memory') \
        .outputMode('complete') \
        .start()
)
# query.awaitTermination()        # can be used in production environment
import time
for x in range(50):
    df_query = spark.sql(
            "SELECT * FROM dest_counts WHERE nameDest != 'nameDest' AND count >= 2"
    )
    if df_query.count() > 0:
        df_query.show(20)
    
    time.sleep(1)

23/12/18 11:59:06 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-fcb82d61-07c0-4b8c-8f24-9bb769732811. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/12/18 11:59:06 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C357028200|    5|
|C1308497108|    5|
| C553146000|    5|
| C479356446|    5|
| C480863671|    5|
|C2030023933|    5|
| C850125606|    4|
| C823524092|    4|
|C1010390776|    4|
| C347981017|    4|
| C685802025|    4|
|C1652671483|    4|
|C1353518278|    4|
|C1120829019|    4|
|C1954345491|    4|
|C1924761929|    4|
| C860128720|    4|
|C1668323738|    4|
|C1919331419|    4|
|C1007286597|    4|
+-----------+-----+
only showing top 20 rows



                                                                                

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C357028200|    5|
|C1308497108|    5|
| C553146000|    5|
| C479356446|    5|
| C480863671|    5|
|C2030023933|    5|
| C850125606|    4|
| C823524092|    4|
|C1010390776|    4|
| C347981017|    4|
| C685802025|    4|
|C1652671483|    4|
|C1353518278|    4|
|C1120829019|    4|
|C1954345491|    4|
|C1924761929|    4|
| C860128720|    4|
|C1668323738|    4|
|C1919331419|    4|
|C1007286597|    4|
+-----------+-----+
only showing top 20 rows



                                                                                

+-----------+-----+
|   nameDest|count|
+-----------+-----+
|C1954345491|    5|
| C357028200|    5|
|C1308497108|    5|
| C553146000|    5|
| C479356446|    5|
| C480863671|    5|
|C2030023933|    5|
| C850125606|    4|
| C823524092|    4|
|C1010390776|    4|
| C347981017|    4|
|C1192879985|    4|
| C685802025|    4|
|C1652671483|    4|
|C1353518278|    4|
|C1120829019|    4|
|C1924761929|    4|
| C860128720|    4|
|C1668323738|    4|
| C794978521|    4|
+-----------+-----+
only showing top 20 rows



                                                                                

+-----------+-----+
|   nameDest|count|
+-----------+-----+
|C1954345491|    5|
| C357028200|    5|
|C1308497108|    5|
| C553146000|    5|
| C479356446|    5|
| C480863671|    5|
|C2030023933|    5|
| C850125606|    4|
| C823524092|    4|
|C1010390776|    4|
| C347981017|    4|
|C1192879985|    4|
| C685802025|    4|
|C1652671483|    4|
|C1353518278|    4|
|C1120829019|    4|
|C1924761929|    4|
| C860128720|    4|
|C1668323738|    4|
| C794978521|    4|
+-----------+-----+
only showing top 20 rows



                                                                                

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C499714286|   10|
|C2131465140|    9|
|C1292435170|    8|
|C1437074823|    8|
| C240779865|    8|
|C1677262317|    8|
|C2067466390|    8|
| C873744218|    8|
|C1885272526|    8|
|C1730752455|    8|
| C141847315|    8|
|C1876731592|    7|
| C131837504|    7|
| C668247389|    7|
| C502295547|    7|
|C1979563377|    7|
|C1229206841|    7|
|C1673688244|    7|
|C2040491146|    7|
| C776749939|    7|
+-----------+-----+
only showing top 20 rows



                                                                                

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C499714286|   10|
| C115095474|   10|
|C2131465140|    9|
|C1401721023|    9|
|C1876731592|    8|
|C1229206841|    8|
|C1292435170|    8|
|C1437074823|    8|
| C859300166|    8|
| C240779865|    8|
|C1239707538|    8|
|C1638556416|    8|
|C1139127799|    8|
|C1677262317|    8|
|C1499819931|    8|
|C2067466390|    8|
|C1774146551|    8|
| C873744218|    8|
|C1382816537|    8|
|C1885272526|    8|
+-----------+-----+
only showing top 20 rows



                                                                                

+-----------+-----+
|   nameDest|count|
+-----------+-----+
|C1590550415|   35|
| C985934102|   33|
| C564160838|   30|
|C2083562754|   26|
| C665576141|   25|
| C401424608|   23|
|C1286084959|   23|
|  C33524623|   23|
| C998351292|   23|
|C1023714065|   22|
| C451111351|   21|
|C1789550256|   21|
| C747464370|   20|
|C1068824137|   19|
| C932583850|   19|
| C453211571|   19|
|C1360767589|   19|
| C248609774|   18|
|C1740000325|   18|
|C1335050193|   18|
+-----------+-----+
only showing top 20 rows



                                                                                

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C985934102|   36|
|C1590550415|   36|
| C564160838|   30|
|C2083562754|   28|
| C665576141|   26|
| C998351292|   26|
|C1789550256|   25|
| C401424608|   23|
| C451111351|   23|
|C1286084959|   23|
|  C33524623|   23|
|C1023714065|   22|
|C1360767589|   21|
|C1068824137|   20|
|C1170794006|   20|
| C453211571|   20|
|C1749186397|   20|
| C747464370|   20|
| C248609774|   19|
| C932583850|   19|
+-----------+-----+
only showing top 20 rows



ERROR:root:KeyboardInterrupt while sending command.              (39 + 8) / 200]
Traceback (most recent call last):
  File "/home/gowtham/Documents/Python/BigData/spark_demo/spark-streaming/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/gowtham/Documents/Python/BigData/spark_demo/spark-streaming/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt

KeyboardInterrupt: 



In [10]:
spark.streams.active[0].isActive

True



In [11]:
query.status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

                                                                                

In [12]:
query.stop()

23/12/18 12:00:55 WARN Shell: Interrupted while joining on: Thread[Thread-48857,5,main]
java.lang.InterruptedException
	at java.base/java.lang.Object.wait(Native Method)
	at java.base/java.lang.Thread.join(Thread.java:1300)
	at java.base/java.lang.Thread.join(Thread.java:1375)
	at org.apache.hadoop.util.Shell.joinThread(Shell.java:1042)
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:1002)
	at org.apache.hadoop.util.Shell.run(Shell.java:900)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1306)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1288)
	at org.apache.hadoop.fs.FileUtil.readLink(FileUtil.java:212)
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileLinkStatusInternal(RawLocalFileSystem.java:1113)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1102)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatus(RawLocalFi