<a href="https://colab.research.google.com/github/ctshiz/WORKSPACE_SPARK/blob/main/Spark_Stream_with_Pyspark_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [25]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.2.2/spark-3.2.2-bin-hadoop2.7.tgz
!tar xf spark-3.2.2-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install pyspark
!pip install pyspark[sql]

0% [Working]            Hit:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (185.125.190.36)] [Connecting to security.0% [1 InRelease gpgv 1,581 B] [Connecting to archive.ubuntu.com (185.125.190.36                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [1 InRelease gpgv 1,581 B] [Connecting to archive.ubuntu.com (185.125.190.360% [1 InRelease gpgv 1,581 B] [Connecting to archive.ubuntu.com (185.125.190.36                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:4 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:6 http://archive.ubuntu.com

In [41]:
import pyspark
from pyspark.sql import SparkSession
spark =  SparkSession.builder.getOrCreate()
import os
os.environ["JAVA_HOME"] = "/user/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

import findspark
import pyspark.sql.functions as F
import pyspark.sql.types as T
findspark.init()
from pyspark import SparkContext
sc = SparkContext.getOrCreate()


In [27]:
#read the files
from pyspark.sql.functions import to_timestamp, col, lit
df =  spark.read.csv('stream1.csv', header=True)
df = df.drop("_c0","isFraud", "isFlaggedFraud")
df.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|           0.0|           0.0|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+
only showing top 5 rows



Column **STEP** maps a unit of time in the real world. Let assume that 1 step is 1 hour of time. 
So we can assume for this example that we have another job that runs every hour and gets all the transactions in that time frame.

In [28]:
#count the transactions per step
df.groupBy("step").count().orderBy("count", ascending=False).show(5)

+----+-----+
|step|count|
+----+-----+
|  19|51352|
|  18|49579|
|  15|44609|
|  17|43361|
|  16|42471|
+----+-----+
only showing top 5 rows



We can therefore sabe the output of that job by filtering on each step and saving it a separate file

In [29]:
%%time
steps = df.select("step").distinct().collect()
for step in steps[:]:
  _df = df.where(f"step ={step[0]}")
  _df.coalesce(1).write.mode("append").option("header", "true").csv("data2/paysim")

CPU times: user 325 ms, sys: 45.2 ms, total: 371 ms
Wall time: 38.1 s


**STREAMING**

Let's create a streaming version of this input, we'll read each file one by one as if it was a stream

In [31]:
part = spark.read.csv("data2/paysim/part-00000-0be070f4-5a37-4093-9398-8de6341af1fe-c000.csv",
                      header=True,
                      inferSchema=True)

In [34]:
part.show(5)

+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+
|step|    type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+
|  11|CASH_OUT|361489.71|C1721094820|      19111.0|           0.0|C1131138067|           0.0|      13277.21|
|  11|CASH_OUT| 490908.5| C303404123|      14688.0|           0.0| C483370168|      46331.87|     537240.36|
|  11| CASH_IN|308605.41|  C47112015|      53074.0|     361679.41|C1794607688|    1980718.09|    4580208.79|
|  11| PAYMENT| 19598.84| C146233901|   1192108.08|    1172509.23|M1090496877|           0.0|           0.0|
|  11|TRANSFER|514117.49|C2130074213|   1172509.23|     658391.74|C1996526533|     784590.77|     812630.85|
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+
only showing top 5 

In [33]:
dataSchema = part.schema

maxFilesPerTrigger allows you to control how quickly Spark will read all of the files in the folder. In this example we are limiting the flow of the stream to one file per trigger

In [39]:
streaming = (spark.readStream.schema(dataSchema)\
                  .option("maxFilesPerTrigger",1)\
                  .csv("data2/paysim/"))

Let's set up a transformation

The **NAMEDEST** column is the recipient ID of the transaction

In [42]:
dest_count = streaming.groupBy("nameDest").count().orderBy(F.desc("count"))

Now that we have our transformation, we need to specify an output sink for the results. For this example, we are going to write to memory sink which keeps the results in memory.

We also need to define how Spark will output that data. In this example, we will use the complete output mode (rewriting all of the keys along with their counts after every trigger).

In this example we won't include **activityQuery.awaitTermination()** because it is required only to prevent the driver process from terminating when the stream is active.

So in order to be able to run this locally in a notebook we won't include it.

In [44]:
activityQuery = (
    dest_count.writeStream.queryName("dest_counts")\
              .format("memory")\
              .outputMode("complete")\
              .start()
)

#include this production
#activiyQuery.awayTerminantion()

In [46]:
import time

In [48]:
for x in range(50):
  _df = spark.sql(
      "SELECT * FROM dest_counts WHERE nameDest != 'nameDest' AND count >= 2"
  )
  if _df.count() > 0:
    _df.show(10)
  time.sleep(0.5)

+-----------+-----+
|   nameDest|count|
+-----------+-----+
|C1674899618|   39|
|C1286084959|   39|
| C909295153|   38|
| C766681183|   37|
| C667346055|   37|
|C1360767589|   37|
|C1303868418|   37|
| C803116137|   37|
|  C97730845|   36|
|C1060041730|   36|
+-----------+-----+
only showing top 10 rows

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C803116137|   46|
|C1674899618|   44|
|C1286084959|   44|
| C665576141|   43|
| C909295153|   42|
|C2006081398|   42|
| C766681183|   41|
| C667346055|   41|
|C1360767589|   41|
| C248609774|   40|
+-----------+-----+
only showing top 10 rows

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C985934102|   67|
|C1590550415|   66|
|C1286084959|   65|
| C665576141|   61|
|C2083562754|   58|
| C248609774|   57|
|C1360767589|   56|
| C451111351|   55|
|  C97730845|   51|
| C667346055|   50|
+-----------+-----+
only showing top 10 rows

+-----------+-----+
|   nameDest|count|
+-----------+-----+
|C1590550415|   70|
| 

In [51]:
#check if stream is active
spark.streams.active[0].isActive

True

In [52]:
activityQuery.status

{'isDataAvailable': False,
 'isTriggerActive': False,
 'message': 'Waiting for data to arrive'}

In [53]:
#if we want to turn off the stream we will run activityQuery.stop()to reset the query for testing purposes
activityQuery.stop()