### Caso Spark Streaming
Se utilizan los datos de Kaggle de fraud detection. Queremos contar el número de transacciones que le llegan a un destinatario (nameDest) en streaming

In [1]:
# Importación de los datos
import findspark
findspark.init()

In [2]:
#%load_ext nb_black
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.getOrCreate()

In [4]:
df = spark.read.csv('../data/fraud_detection.csv',
                   header= True,
                   inferSchema=True)

In [5]:
df.columns

['step',
 'type',
 'amount',
 'nameOrig',
 'oldbalanceOrg',
 'newbalanceOrig',
 'nameDest',
 'oldbalanceDest',
 'newbalanceDest',
 'isFraud',
 'isFlaggedFraud']

In [6]:
df = df.drop('isFraud', 'isFlaggedFraud')

In [7]:
df.show(3)

+----+--------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
|step|    type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|
+----+--------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
|   1| PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|
|   1| PAYMENT|1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|
|   1|TRANSFER|  181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|
+----+--------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
only showing top 3 rows



#### Obtención secuencial de los datos

Step mapea una unidad de tiempo, en este caso, 1 step es 1 hora de tiempo. En un caso real tendríamos un proceso que se ejecuta cada hora y obtendría todas las transacciones cada hora

In [8]:
df.groupBy('step').count().show(3)

+----+-----+
|step|count|
+----+-----+
| 148|   12|
| 463|   10|
| 471| 2620|
+----+-----+
only showing top 3 rows



Por lo tanto, podemos guardar el resultado de ese trabajo filtrando en cada step y guardandolo en un archivo separado

In [None]:
%%time
steps = df.select('step').distinct().collect()

for step in steps[:]:
    _df = df.where(f"step = {step[0]}")
    #by adding coalesce(1) we save the datframe to one file
    _df.coalesce(1).write.mode('append').option('header', 'true').csv('data/fraud')

In [10]:
!cd data/fraud

In [None]:
part = spark.read.csv(
    'data/fraud/part-00000-00ac3d12-8057-4aae-bbbb-5fe71c2813e2-c000.csv.crc',
    header=True,
    inferSchema=True
)

In [None]:
part.groupBy('step').count().show()

#### Procesamiento en Streaming
Creemos una versión de transmisión de esta entrada, leeremos cada archivo uno por uno como si fuera un proceso en streaming

In [None]:
dataSchema = part.schema

In [None]:
dataSchema

**maxFilesTriggre** permite controlar la rapidez con la que Spark leerá todos los archivos de la carpeta. En este ejemplo, estamos limitando el flujo de la transmisión a un archivo por disparador

In [None]:
streaming = (
    spark.readStream.schema(dataSchema)
    .option('maxFilesPerTrigger', 1)
    .csv('data/fraud/')
)

Establecemos una transformación. La columna nameDest es el ID del destinatario de la transacción.

In [None]:
dest_count = streaming.groupBy('nameDest').count().orderBy(F.desc('count'))

Ahora que tenemos nuestra transformación, necesitamos especificar un output de salida para los resultados.  Para este ejemplo, vamos a escribir los resultados en memoria.

También necesitamos definir como Spark generará esos datos. En este ejemplo, usaremos el modo de salida completo (reescribiendo todos los datos junto con sus recuentos después de cada disparo) con .**outputMode('complete')**.

En este ejemplo, no incluiremos **activityQuery.awaitTermination()** porque es necesario solo para evitar que el proceso del controlador finalice cuando la transmisión está activa. Entonces, para poder ejecutar esto localmente en un portátil, no lo incluiremos.

In [None]:
activityQuery = (
    dest_count.writeStream.queryName('dest_counts')
    .format('memory')
    .outputMode('complete')
    .start()
)

# Include this in production
# activityQuery.awaitTermination()

import time

for x in range(50):
    _df = spark.sql(
        "SELECT * FROM dest_counts WHERE nameDest != 'nameDest' AND count >= 2'
    )
    if _df.count() > 0:
        _df.show(10)
    time.sleep(0.5)

Comprobar si la transmisión esta activa

In [None]:
spark.streams.active[0].isActive

In [None]:
activityQuery.status

Si queremos desactivar la transmisión, ejecutaremos activityQuery.stop() para reestablecer la consulta con fines de prueba

In [None]:
activityQuery.stop()