# Caso práctico con  Spark Streaming

En este notebook se utilizarán los datos de Kaggle de fraud detection. Queremos contar el número de transacciones que le llegan a un destinatario (nameDest) en streaming

#### Importación de datos

In [1]:
import findspark
findspark.init()

In [2]:
#%load_ext nb_black
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.getOrCreate()

In [3]:
df = spark.read.csv("data/fraud_detection.csv", 
                    header=True, 
                    inferSchema=True)

In [4]:
df.columns

['step',
 'type',
 'amount',
 'nameOrig',
 'oldbalanceOrg',
 'newbalanceOrig',
 'nameDest',
 'oldbalanceDest',
 'newbalanceDest',
 'isFraud',
 'isFlaggedFraud']

In [5]:
df = df.drop("isFraud", "isFlaggedFraud")

In [6]:
df.show(2)

+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
|step|   type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
|   1|PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|
|   1|PAYMENT|1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
only showing top 2 rows



#### Obtención secuencial de los datos

Step mapea una unidad de tiempo, en este caso, 1 step es 1 hora de tiempo. En un caso real tendriamos un proceso que se ejecuta cada hora y obtendria todas las transacciones cada hora.

In [7]:
df.groupBy("step").count().show(3)

+----+-----+
|step|count|
+----+-----+
|  12|36153|
|   1| 2708|
|  13|37515|
+----+-----+
only showing top 3 rows



Por lo tanto, podemos guardar el resultado de ese trabajo filtrando en cada step y guardándolo en un archivo separado.

In [8]:
%%time
steps = df.select("step").distinct().collect()

for step in steps[:]:
    _df = df.where(f"step = {step[0]}")
    #by adding coalesce(1) we save the dataframe to one file
    _df.coalesce(1).write.mode("append").option("header", "true").csv("data/fraud")

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "C:\spark\python\lib\py4j-0.10.9.5-src.zip\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\spark\python\lib\py4j-0.10.9.5-src.zip\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "C:\Users\jaime\AppData\Local\Programs\Python\Python38\lib\socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [9]:
!cd data/fraud 

In [10]:
part = spark.read.csv(
    "data/fraud/part-00000-897a9dd3-832b-4e43-bcdc-c0009cfec4f0-c000.csv",
    header=True,
    inferSchema=True,
)

In [11]:
part.groupBy("step").count().show()

+----+-----+
|step|count|
+----+-----+
|  34|30904|
+----+-----+



#### Procesamiento en Streaming

Creemos una versión de transmisión de esta entrada, leeremos cada archivo uno por uno como si fuera un proceso en streaming.

In [12]:
dataSchema = part.schema

In [13]:
dataSchema

StructType(List(StructField(step,IntegerType,true),StructField(type,StringType,true),StructField(amount,DoubleType,true),StructField(nameOrig,StringType,true),StructField(oldbalanceOrg,DoubleType,true),StructField(newbalanceOrig,DoubleType,true),StructField(nameDest,StringType,true),StructField(oldbalanceDest,DoubleType,true),StructField(newbalanceDest,DoubleType,true)))

**maxFilesPerTrigger**  permite controlar la rapidez con la que Spark leerá todos los archivos de la carpeta.
En este ejemplo, estamos limitando el flujo de la transmisión a un archivo por disparador.

In [14]:
streaming = (
    spark.readStream.schema(dataSchema)
    .option("maxFilesPerTrigger", 1)
    .csv("data/fraud/")
)

Establezcamos una transformación.La columna nameDest es el ID del destinatario de la transacción.

In [15]:
dest_count = streaming.groupBy("nameDest").count().orderBy(F.desc("count"))

Ahora que tenemos nuestra transformación, necesitamos especificar un output de salida para los resultados. Para este ejemplo, vamos a escribir los resultados en memoria.

También necesitamos definir cómo Spark generará esos datos. En este ejemplo, usaremos el modo de salida completo (reescribiendo todos los datos junto con sus recuentos después de cada disparo) con **.outputMode("complete")**.

En este ejemplo, no incluiremos **activityQuery.awaitTermination()** porque es necesario solo para evitar que el proceso del controlador finalice cuando la transmisión está activa. Entonces, para poder ejecutar esto localmente en un portátil, no lo incluiremos.

In [16]:
activityQuery = (
    dest_count.writeStream.queryName("dest_counts")
    .format("memory")
    .outputMode("complete")
    .start()
)

# include this in production
# activityQuery.awaitTermination()

import time

for x in range(50):
    _df = spark.sql(
        "SELECT * FROM dest_counts WHERE nameDest != 'nameDest' AND count >= 2"
    )
    if _df.count() > 0:
        _df.show(10)
    time.sleep(0.5)

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C319921943|    2|
| C803352127|    2|
|C1887077333|    2|
+-----------+-----+

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C319921943|    2|
| C803352127|    2|
|C1887077333|    2|
+-----------+-----+

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C319921943|    2|
| C803352127|    2|
|C1887077333|    2|
+-----------+-----+

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C319921943|    2|
| C803352127|    2|
|C1887077333|    2|
+-----------+-----+

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C319921943|    2|
| C803352127|    2|
|C1887077333|    2|
+-----------+-----+

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C319921943|    2|
| C803352127|    2|
|C1887077333|    2|
| C763794011|    2|
| C488343370|    2|
+-----------+-----+

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C319921943|    2|
| C803352127|    2|
|C1887077333| 

Comprueba si la transmisión está activa

In [17]:
spark.streams.active[0].isActive

True

In [18]:
activityQuery.status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

Si queremos desactivar la transmisión, ejecutaremos activityQuery.stop () para restablecer la consulta con fines de prueba.

In [19]:
activityQuery.stop()