<a href="https://colab.research.google.com/github/hjesse92/SparkStreamingPractice/blob/main/PySpark_Practice.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Simulate Streaming with Spark

Simulating with Kaggle's PaySim dataset: https://www.kaggle.com/ealaxi/paysim1

In [None]:
!pip install pyspark

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.getOrCreate()

In [11]:
df = spark.read.csv('/content/drive/MyDrive/PySpark Practice/paysim.csv', header=True, inferSchema=True)

In [12]:
df.columns

['step',
 'type',
 'amount',
 'nameOrig',
 'oldbalanceOrg',
 'newbalanceOrig',
 'nameDest',
 'oldbalanceDest',
 'newbalanceDest',
 'isFraud',
 'isFlaggedFraud']

In [13]:
df = df.drop('isFraud', 'isFlaggedFraud')

In [14]:
df.show(2)

+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
|step|   type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
|   1|PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|
|   1|PAYMENT|1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
only showing top 2 rows



In [15]:
df.groupBy('step').count().show(3)

+----+-----+
|step|count|
+----+-----+
| 148|   12|
|  31|   12|
|  85|   14|
+----+-----+
only showing top 3 rows



Each step is an hour, a unit of transaction

In [16]:
%%time
all_steps = df.select('step').distinct().collect()
5
for step in all_steps:
    _df = df.where(f'step = {step[0]}')
    _df.coalesce(1).write.mode('append').option('header','true').csv('/content/drive/MyDrive/PySpark Practice/data')

CPU times: user 4.42 s, sys: 434 ms, total: 4.85 s
Wall time: 16min 17s


In [21]:
part = spark.read.csv(
    "/content/drive/MyDrive/PySpark Practice/data/part-00000-002c27df-41b7-4e63-9bf7-c9d155a8e329-c000.csv",
    header=True,
    inferSchema=True

)

In [23]:
part.show(5)

+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+
|step|    type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+
|  23| PAYMENT| 27736.41| C965406344|      10258.0|           0.0| M433292571|           0.0|           0.0|
|  23| PAYMENT|   7740.2|C1990257786|      94936.0|       87195.8|M1839804600|           0.0|           0.0|
|  23|TRANSFER|310381.44|  C62483755|      11032.0|           0.0| C851565313|      41497.72|     351879.16|
|  23| PAYMENT|  2321.65| C113761050|      11277.0|       8955.35|M2012554372|           0.0|           0.0|
|  23| PAYMENT|  1886.45|C1409148466|      20286.0|      18399.55| M459610451|           0.0|           0.0|
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+
only showing top 5 

In [24]:
part.groupBy('step').count().show()

+----+-----+
|step|count|
+----+-----+
|  23| 6144|
+----+-----+



Time to stream the data

In [25]:
dataSchema = part.schema

In [26]:
dataSchema

StructType(List(StructField(step,IntegerType,true),StructField(type,StringType,true),StructField(amount,DoubleType,true),StructField(nameOrig,StringType,true),StructField(oldbalanceOrg,DoubleType,true),StructField(newbalanceOrig,DoubleType,true),StructField(nameDest,StringType,true),StructField(oldbalanceDest,DoubleType,true),StructField(newbalanceDest,DoubleType,true)))

In [28]:
streaming = (
    spark.readStream.schema(dataSchema)
    .option("maxFilesPerTrigger",1)
    .csv("/content/drive/MyDrive/PySpark Practice/data/")
)

In [35]:
dest_count = streaming.groupBy("nameDest").count().orderBy(F.desc("count"))

In [41]:
activityQuery = (
    dest_count.writeStream.queryName("dest_counts")
    .format("memory")
    .outputMode("complete")
    .start()
)

In [43]:
import time

for x in range(5):
  _df = spark.sql(
      "SELECT * FROM dest_counts WHERE nameDest != 'nameDest' and count >= 2"
  )
  if _df.count() > 0:
    _df.show(10)
  time.sleep(0.5)

+-----------+-----+
|   nameDest|count|
+-----------+-----+
|C1470685392|    6|
| C947126193|    5|
|  C61797727|    5|
|C1152780087|    5|
| C225852166|    5|
|C2049145129|    5|
|C1104243021|    5|
|C1454189174|    5|
| C167875008|    5|
| C796568622|    5|
+-----------+-----+
only showing top 10 rows

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C680344850|    7|
|C2049145129|    7|
|C1797724563|    7|
| C240670965|    7|
|C1368674093|    7|
|C1386732390|    7|
| C947126193|    6|
|C1944598988|    6|
|C1152780087|    6|
| C581015888|    6|
+-----------+-----+
only showing top 10 rows

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C680344850|    7|
|C2049145129|    7|
|C1797724563|    7|
| C240670965|    7|
|C1368674093|    7|
|C1386732390|    7|
| C947126193|    6|
|C1944598988|    6|
|C1152780087|    6|
| C581015888|    6|
+-----------+-----+
only showing top 10 rows

+-----------+-----+
|   nameDest|count|
+-----------+-----+
|C1386732390|    9|
| 

In [44]:
spark.streams.active[0].isActive

True

In [45]:
activityQuery.status

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

In [None]:
activityQuery.stop()