# Importación de librerias

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

from pyspark.sql.functions import *
from pyspark.sql.types import *

# Ejercicio 1

In [0]:
#Creamos el directorio stream, si no lo estuviera.
dbutils.fs.mkdirs("dbfs:/FileStore/tables/data_stream")

# Cada vez que queramos probarlo, realizamos:

# Limpiamos el directorio stream
dbutils.fs.rm("dbfs:/FileStore/tables/data_stream/transactions_part1.csv")
dbutils.fs.rm("dbfs:/FileStore/tables/data_stream/transactions_part2.csv")
dbutils.fs.rm("dbfs:/FileStore/tables/data_stream/transactions_part3.csv")
dbutils.fs.rm("dbfs:/FileStore/tables/data_stream/transactions_part4.csv")

#UNA VEZ LANZADO, vamos copiando uno a uno mientras observamos la salida en la consola en drivers_logs de Databrick
# Primera parte del dataset
dbutils.fs.cp("dbfs:/FileStore/tables/data/transactions_part1.csv","dbfs:/FileStore/tables/data_stream/transactions_part1.csv")

# Segunda parte del dataset
dbutils.fs.cp("dbfs:/FileStore/tables/data/transactions_part2.csv","dbfs:/FileStore/tables/data_stream/transactions_part2.csv")

# Tercera parte del dataset
dbutils.fs.cp("dbfs:/FileStore/tables/data/transactions_part3.csv","dbfs:/FileStore/tables/data_stream/transactions_part3.csv")

# Cuarta parte del dataset
dbutils.fs.cp("dbfs:/FileStore/tables/data/transactions_part4.csv","dbfs:/FileStore/tables/data_stream/transactions_part4.csv")

True

In [0]:
spark = SparkSession \
    .builder \
    .appName("StructuredNetwork_Transactions_BDA") \
    .getOrCreate()

# Configurar el esquema
schema = StructType([
    StructField("transaction_id", IntegerType(), True),
    StructField("amount", IntegerType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("transaction_type", StringType(), True),
    StructField("currency", StringType(), True)
])

# Leer el flujo de datos desde archivos CSV
df_init = spark.readStream \
    .format("csv") \
    .option("header", True) \
    .schema(schema) \
    .option("path", "dbfs:/FileStore/tables/data_stream") \
    .load()

# Aplicar aggregated para manejar los eventos
aggregated_df = df_init.groupBy(
        col("transaction_type"),
        col("currency")
    ) \
    .agg(
        sum("amount").alias("total_amount"),
        count("amount").alias("transaction_count"),
        avg("amount").alias("avg_amount")
    )

# Mostrar los resultados en la consola
query = aggregated_df.writeStream \
    .outputMode("complete") \
    .option("truncate", False) \
    .option("numRows", 10) \
    .format("console") \
    .start()

query.awaitTermination()

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

### Apartado 4

El modo de salida utilizado ha sido complete, porque en este modo todos los resultados de la agregación se vuelven a calcular y se escriben completamente en cada batch de streaming.
En nuestro caso, dado que no estamos utilizando watermark o ventanas, los resultados de todas las transacciones deben ser recalculados y mostrados nuevamente cada vez que se agregan nuevas transacciones. El modo complete asegura que se muestren los resultados más actualizados.

# Ejercicio 2

In [0]:
spark = SparkSession \
    .builder \
    .appName("StructuredNetwork_Transactions_BDA") \
    .getOrCreate()


# Set Spark logging level to ERROR to avoid various other logs on console.
spark.sparkContext.setLogLevel("ERROR")


schema = StructType([
    StructField("transaction_id", IntegerType(), True),
    StructField("amount", IntegerType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("transaction_type", StringType(), True),
    StructField("currency", StringType(), True)
])

df_init = spark \
    .readStream \
    .format("csv") \
    .option("maxFilesPerTrigger",2) \
    .option("header", True) \
    .option("path", "dbfs:/FileStore/tables/data_stream") \
    .schema(schema) \
    .load()

df_init.printSchema()

# Aplicar Watermark para manejar eventos tardíos
aggregated_df = df_init.withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(col("timestamp"), "5 minutes"),  # Ventana de 5 minutos
        col("transaction_type"),
        col("currency")
    ) \
    .agg(
        sum("amount").alias("total_amount"),
        count("amount").alias("transaction_count")
    )

# Escribir los resultados en la consola
query = aggregated_df.writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", False) \
    .option("numRows", 10) \
    .start()

query.awaitTermination()

root
 |-- transaction_id: integer (nullable = true)
 |-- amount: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- currency: string (nullable = true)



com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

Se utiliza el modo de salida update al igual que el primer apartado, en este caso debido a las siguientes razones:

1. Agregaciones en tiempo real:

Como estamos calculando agregaciones (suma y número de transacciones) dentro de ventanas de 5 minutos, el modo update es ideal porque solo actualiza las filas afectadas cuando llega un nuevo conjunto de datos (transacciones).

2. Optimización del rendimiento:

El modo update es más eficiente que complete porque no reimprime toda la tabla de resultados cada vez, solo las filas que han cambiado. 

3. Manejo de eventos tardíos:

Dado que se está utilizando un watermark de 10 minutos, algunos eventos pueden llegar con retraso, pero solo aquellos eventos dentro de este rango serán considerados. El modo update asegura que, cuando se recibe un evento tardío, solo se actualicen las agregaciones de esa ventana y las filas.