In [1]:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

import spark.implicits._

val schema = new StructType().add("nombre","string")
                            .add("puesto","string")
                            .add("salario","long")

// Dataframe estatico
val staticPersonas = spark.read.json("/home/vmuser/personas/*")

Intitializing Scala interpreter ...

Spark Web UI available at http://10.0.2.15:4040
SparkContext available as 'sc' (version = 2.4.6, master = local[*], app id = local-1616272525874)
SparkSession available as 'spark'


import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._
schema: org.apache.spark.sql.types.StructType = StructType(StructField(nombre,StringType,true), StructField(puesto,StringType,true), StructField(salario,LongType,true))
staticPersonas: org.apache.spark.sql.DataFrame = [_corrupt_record: string, batchTimestampMs: bigint ... 7 more fields]


In [2]:
// Dataframe dinamico
val streamingSalarios = spark.readStream
    .schema(schema)
    .json("/home/vmuser/streamingSalarios/")

// Se realiza el join entre los Dataframes, para obtener uno
val both = streamingSalarios.join(staticPersonas, "nombre")
// Creamos una vista(tabla) del dataframe creado en el paso anterior
both.createOrReplaceTempView("personas_salario")

// Definimos la query
val mediaSalario = spark.sql("select puesto, avg(salario) from personas_salario group by puesto")

// Definimos el streaming
val query = mediaSalario.writeStream
    .outputMode("complete")
    .format("console")
    .start()

// Ejecutamos el streaming
query.awaitTermination(90000)

-------------------------------------------
Batch: 0
-------------------------------------------
+-------------+------------+
|       puesto|avg(salario)|
+-------------+------------+
|Desarrollador|        11.0|
|     Analista|        15.0|
+-------------+------------+



streamingSalarios: org.apache.spark.sql.DataFrame = [nombre: string, puesto: string ... 1 more field]
both: org.apache.spark.sql.DataFrame = [nombre: string, puesto: string ... 9 more fields]
mediaSalario: org.apache.spark.sql.DataFrame = [puesto: string, avg(salario): double]
query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@446bd2b6
res0: Boolean = false


-------------------------------------------
Batch: 1
-------------------------------------------
+-------------+------------+
|       puesto|avg(salario)|
+-------------+------------+
|Desarrollador|        12.0|
|     Analista|        17.0|
+-------------+------------+

-------------------------------------------
Batch: 2
-------------------------------------------
+-------------+------------+
|       puesto|avg(salario)|
+-------------+------------+
|Desarrollador|        12.4|
|     Analista|        17.0|
+-------------+------------+

-------------------------------------------
Batch: 3
-------------------------------------------
+-------------+------------------+
|       puesto|      avg(salario)|
+-------------+------------------+
| JefeProyecto|              28.0|
|Desarrollador|12.714285714285714|
|     Analista|              17.0|
+-------------+------------------+



In [4]:
// Muestra el id de la query
query.id

res2: java.util.UUID = 010c3678-443a-4b9c-bd0f-c6050a775a6a


In [5]:
// Explica la query
query.explain()

== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@3e808b9d
+- *(5) HashAggregate(keys=[puesto#126], functions=[avg(salario#127L)])
   +- StateStoreSave [puesto#126], state info [ checkpoint = file:/tmp/temporary-db35d3ff-2f42-434b-adab-ee4ea95592ce/state, runId = d34766c6-bcbb-4d61-b6db-430d499174eb, opId = 0, ver = 3, numPartitions = 200], Complete, 0, 2
      +- *(4) HashAggregate(keys=[puesto#126], functions=[merge_avg(salario#127L)])
         +- StateStoreRestore [puesto#126], state info [ checkpoint = file:/tmp/temporary-db35d3ff-2f42-434b-adab-ee4ea95592ce/state, runId = d34766c6-bcbb-4d61-b6db-430d499174eb, opId = 0, ver = 3, numPartitions = 200], 2
            +- *(3) HashAggregate(keys=[puesto#126], functions=[merge_avg(salario#127L)])
               +- Exchange hashpartitioning(puesto#126, 200)
                  +- *(2) HashAggregate(keys=[puesto#126], functions=[partial_avg(salario#127L)])
                     +- *(2) P

In [6]:
// Muestra el nombre de la query
query.name

res4: String = null


In [7]:
// Muestra un resumen de la query
query.lastProgress

res5: org.apache.spark.sql.streaming.StreamingQueryProgress =
{
  "id" : "010c3678-443a-4b9c-bd0f-c6050a775a6a",
  "runId" : "d34766c6-bcbb-4d61-b6db-430d499174eb",
  "name" : null,
  "timestamp" : "2021-03-20T20:44:55.118Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 4,
    "triggerExecution" : 4
  },
  "stateOperators" : [ {
    "numRowsTotal" : 3,
    "numRowsUpdated" : 0,
    "memoryUsedBytes" : 80615,
    "customMetrics" : {
      "loadedMapCacheHitCount" : 1200,
      "loadedMapCacheMissCount" : 0,
      "stateOnCurrentVersionSizeBytes" : 17991
    }
  } ],
  "sources" : [ {
    "description" : "FileStreamSource[file:/home/vmuser/streamingSalarios]",
    "startOffset" : {
      "logOff...

In [10]:
// Regresa si la query esta activa
query.isActive

res8: Boolean = false


In [11]:
// Detinene la query
query.stop()