# Caso práctico con  Spark Streaming

En este notebook se utilizarán los datos de Kaggle de fraud detection

In [1]:
import findspark
findspark.init()

In [2]:
#%load_ext nb_black
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.getOrCreate()

In [3]:
df = spark.read.csv("data/fraud_detection.csv", 
                    header=True, 
                    inferSchema=True)

In [4]:
df.columns

['step',
 'type',
 'amount',
 'nameOrig',
 'oldbalanceOrg',
 'newbalanceOrig',
 'nameDest',
 'oldbalanceDest',
 'newbalanceDest',
 'isFraud',
 'isFlaggedFraud']

### <font color = #ff4fa4> <i> <b> En Notebooks anteriores vimos que .drop() quita 2 columns del dataset </b>:

In [5]:
df = df.drop("isFraud", "isFlaggedFraud")

In [6]:
df.show(2)

+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
|step|   type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
|   1|PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|
|   1|PAYMENT|1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
only showing top 2 rows



Step maps a unit of time in the real world. In this case 1 step is 1 hour of time. So we can assume for this example that we have another job that runs every hour and gets all the transactions in that time frame.

In [12]:
df.groupBy("step").count().show(3)

+----+-----+
|step|count|
+----+-----+
| 148|   12|
|  31|   12|
|  85|   14|
+----+-----+
only showing top 3 rows



We can therefore save the output of that job by filtering on each step and saving it to a separate file. 

### <font color = #ff4fa4> <i> <b>  %%time is a magic command available in Jupyter notebooks. It is part of the IPython kernel and is used to measure the execution time of a single cell </b>:

In [8]:
%%time
"""steps = df.select("step").distinct().collect()
for step in steps[:]:
    _df = df.where(f"step = {step[0]}")
    #by adding coalesce(1) we save the dataframe to one file
    _df.coalesce(1).write.mode("append").option("header", "true").csv("data/fraud")"""

CPU times: total: 0 ns
Wall time: 0 ns


'steps = df.select("step").distinct().collect()\nfor step in steps[:]:\n    _df = df.where(f"step = {step[0]}")\n    #by adding coalesce(1) we save the dataframe to one file\n    _df.coalesce(1).write.mode("append").option("header", "true").csv("data/fraud")'

### <font color = #ff4fa4> <i> <b> ! prefix allows you to execute shell commands  straight in Jupyter Cells

### <font color = #ff4fa4> <i> <b> :This changes the current working directory to the data/fraud folder relative to the current directory.

In [9]:
!cd data/fraud 

In [13]:
part = spark.read.csv(
    "data/fraud/part-00000-897a9dd3-832b-4e43-bcdc-c0009cfec4f0-c000.csv",
    header=True,
    inferSchema=True,
)

In [14]:
part.groupBy("step").count().show()

+----+-----+
|step|count|
+----+-----+
|  34|30904|
+----+-----+



Let’s create a streaming version of this input, we'll read each file one by one as if it was a stream.

In [16]:
dataSchema = part.schema

In [17]:
dataSchema

StructType([StructField('step', IntegerType(), True), StructField('type', StringType(), True), StructField('amount', DoubleType(), True), StructField('nameOrig', StringType(), True), StructField('oldbalanceOrg', DoubleType(), True), StructField('newbalanceOrig', DoubleType(), True), StructField('nameDest', StringType(), True), StructField('oldbalanceDest', DoubleType(), True), StructField('newbalanceDest', DoubleType(), True)])

*maxFilesPerTrigger* allows you to control how quickly Spark will read all of the files in the folder. 
In this example we're limiting the flow of the stream to one file per trigger.


In [22]:
import os
print(os.listdir("data/"))

['fraud', 'fraud_detection.csv', 'heart.csv']


In [19]:
streaming = (
    spark.readStream.schema(dataSchema)
    .option("maxFilesPerTrigger", 1)
    .csv("data/fraud/")
)

Py4JJavaError: An error occurred while calling o80.csv.
: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.spark.util.HadoopFSUtils$.listLeafFiles(HadoopFSUtils.scala:180)
	at org.apache.spark.util.HadoopFSUtils$.$anonfun$parallelListLeafFilesInternal$1(HadoopFSUtils.scala:95)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFilesInternal(HadoopFSUtils.scala:85)
	at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFiles(HadoopFSUtils.scala:69)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.bulkListLeafFiles(InMemoryFileIndex.scala:162)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:133)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:96)
	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:68)
	at org.apache.spark.sql.execution.datasources.DataSource.createInMemoryFileIndex(DataSource.scala:539)
	at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$sourceSchema$2(DataSource.scala:265)
	at org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$lzycompute$1(DataSource.scala:162)
	at org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$1(DataSource.scala:162)
	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:167)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:259)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118)
	at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118)
	at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:36)
	at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:198)
	at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:212)
	at org.apache.spark.sql.streaming.DataStreamReader.csv(DataStreamReader.scala:260)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)


Let's set up a transformation.

The nameDest column is the recipient ID of the transaction.

In [33]:
dest_count = streaming.groupBy("nameDest").count().orderBy(F.desc("count"))

Now that we have our transformation, we need to specify an output sink for the results. For this example, we're going to write to a memory sink which keeps the results in memory.

We also need to define how Spark will output that data. In this example, we'll use the complete output mode (rewriting all of the keys along with their counts after every trigger).

In this example we won't include activityQuery.awaitTermination() because it is required only to prevent the driver process from terminating when the stream is active.

So in order to be able to run this locally in a notebook we won't include it.

In [34]:
activityQuery = (
    dest_count.writeStream.queryName("dest_counts")
    .format("memory")
    .outputMode("complete")
    .start()
)

# include this in production
# activityQuery.awaitTermination()

import time

for x in range(50):
    _df = spark.sql(
        "SELECT * FROM dest_counts WHERE nameDest != 'nameDest' AND count >= 2"
    )
    if _df.count() > 0:
        _df.show(10)
    time.sleep(0.5)

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C319921943|    2|
| C803352127|    2|
|C1887077333|    2|
+-----------+-----+

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C319921943|    2|
| C803352127|    2|
|C1887077333|    2|
+-----------+-----+

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C319921943|    2|
| C803352127|    2|
|C1887077333|    2|
+-----------+-----+

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C319921943|    2|
| C803352127|    2|
|C1887077333|    2|
+-----------+-----+

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C319921943|    2|
| C803352127|    2|
|C1887077333|    2|
+-----------+-----+

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C319921943|    2|
| C803352127|    2|
|C1887077333|    2|
+-----------+-----+

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C319921943|    2|
| C803352127|    2|
|C1887077333|    2|
+-----------+-----+

+-----------+

Check if stream is active

In [35]:
spark.streams.active[0].isActive

True

In [36]:
activityQuery.status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

If we  want to turn off the stream we'll run activityQuery.stop() to reset the query for testing purposes.

In [37]:
activityQuery.stop()