# Caso práctico con  Spark Streaming

En este noteboo se utilizarán los datos de Kaggle de fraud detection

In [3]:
import findspark
findspark.init()

In [4]:
#%load_ext nb_black
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.getOrCreate()

In [8]:
df = spark.read.csv("C:/Users/ilse-/fraud_detection.csv", 
                    header=True, 
                    inferSchema=True)

In [9]:
df.columns

['step',
 'type',
 'amount',
 'nameOrig',
 'oldbalanceOrg',
 'newbalanceOrig',
 'nameDest',
 'oldbalanceDest',
 'newbalanceDest',
 'isFraud',
 'isFlaggedFraud']

In [10]:
df = df.drop("isFraud", "isFlaggedFraud")

In [11]:
df.show(2)

+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
|step|   type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
|   1|PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|
|   1|PAYMENT|1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
only showing top 2 rows



Step maps a unit of time in the real world. In this case 1 step is 1 hour of time. So we can assume for this example that we have another job that runs every hour and gets all the transactions in that time frame.

In [12]:
df.groupBy("step").count().show(3)

+----+-----+
|step|count|
+----+-----+
| 148|   12|
| 463|   10|
| 471| 2620|
+----+-----+
only showing top 3 rows



We can therefore save the output of that job by filtering on each step and saving it to a separate file. 

In [16]:
%%time
"""steps = df.select("step").distinct().collect()
for step in steps[:]:
    _df = df.where(f"step = {step[0]}")
    #by adding coalesce(1) we save the dataframe to one file
    _df.coalesce(1).write.mode("append").option("header", "true").csv("C:/Users/ilse-/fraud_detection.csv")"""

Wall time: 0 ns


'steps = df.select("step").distinct().collect()\nfor step in steps[:]:\n    _df = df.where(f"step = {step[0]}")\n    #by adding coalesce(1) we save the dataframe to one file\n    _df.coalesce(1).write.mode("append").option("header", "true").csv("C:/Users/ilse-/fraud_detection.csv")'

In [17]:
!cd data/fraud 

El sistema no puede encontrar la ruta especificada.


In [19]:
part = spark.read.csv(
    "C:/Users/ilse-/fraud_detection.csv",
    header=True,
    inferSchema=True,
)

In [20]:
part.groupBy("step").count().show()

+----+-----+
|step|count|
+----+-----+
| 148|   12|
| 463|   10|
| 471| 2620|
| 496|  873|
| 243|    8|
| 392|   10|
| 540| 5476|
| 623|    6|
| 737|   10|
|  31|   12|
| 516|   14|
|  85|   14|
| 137|32559|
| 251|35269|
| 451| 3751|
| 580|   58|
|  65|   20|
| 458|  438|
|  53|   10|
| 255|28840|
+----+-----+
only showing top 20 rows



Let’s create a streaming version of this input, we'll read each file one by one as if it was a stream.

In [21]:
dataSchema = part.schema

In [22]:
dataSchema

StructType(List(StructField(step,IntegerType,true),StructField(type,StringType,true),StructField(amount,DoubleType,true),StructField(nameOrig,StringType,true),StructField(oldbalanceOrg,DoubleType,true),StructField(newbalanceOrig,DoubleType,true),StructField(nameDest,StringType,true),StructField(oldbalanceDest,DoubleType,true),StructField(newbalanceDest,DoubleType,true),StructField(isFraud,IntegerType,true),StructField(isFlaggedFraud,IntegerType,true)))

*maxFilesPerTrigger* allows you to control how quickly Spark will read all of the files in the folder. 
In this example we're limiting the flow of the stream to one file per trigger.


In [25]:
streaming = (
    spark.readStream.schema(dataSchema)
    .option("maxFilesPerTrigger", 1)
    .csv("C:/Users/ilse-/fraud")
)

Let's set up a transformation.

The nameDest column is the recipient ID of the transaction.

In [26]:
dest_count = streaming.groupBy("nameDest").count().orderBy(F.desc("count"))

Now that we have our transformation, we need to specify an output sink for the results. For this example, we're going to write to a memory sink which keeps the results in memory.

We also need to define how Spark will output that data. In this example, we'll use the complete output mode (rewriting all of the keys along with their counts after every trigger).

In this example we won't include activityQuery.awaitTermination() because it is required only to prevent the driver process from terminating when the stream is active.

So in order to be able to run this locally in a notebook we won't include it.

In [27]:

activityQuery = (
    dest_count.writeStream.queryName("dest_counts")
    .format("memory")
    .outputMode("complete")
    .start()
)

# include this in production
# activityQuery.awaitTermination()

import time

for x in range(50):
    _df = spark.sql(
        "SELECT * FROM dest_counts WHERE nameDest != 'nameDest' AND count >= 2"
    )
    if _df.count() > 0:
        _df.show(10)
    time.sleep(0.5)

Py4JJavaError: An error occurred while calling o178.start.
: java.io.IOException: (null) entry in command string: null chmod 0644 C:\Users\ilse-\AppData\Local\Temp\temporary-b0d9f95d-20f1-4ea1-86b3-a8e0ee8080bb\.metadata.3710c44b-5a3d-4788-89d2-8bd3fea5a72f.tmp
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:770)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:866)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:849)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
	at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
	at org.apache.hadoop.fs.FileSystem.primitiveCreate(FileSystem.java:1017)
	at org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:99)
	at org.apache.hadoop.fs.ChecksumFs$ChecksumFSOutputSummer.<init>(ChecksumFs.java:352)
	at org.apache.hadoop.fs.ChecksumFs.createInternal(ChecksumFs.java:399)
	at org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:584)
	at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:686)
	at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:682)
	at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
	at org.apache.hadoop.fs.FileContext.create(FileContext.java:688)
	at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:311)
	at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:133)
	at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:136)
	at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:318)
	at org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:78)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.apply(StreamExecution.scala:125)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.apply(StreamExecution.scala:123)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:123)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.<init>(MicroBatchExecution.scala:48)
	at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:275)
	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:316)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:267)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)


Check if stream is active

In [28]:
spark.streams.active[0].isActive

IndexError: list index out of range

In [None]:
activityQuery.status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

If we  want to turn off the stream we'll run activityQuery.stop() to reset the query for testing purposes.

In [None]:
activityQuery.stop()