In [1]:
import os
import pyspark
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import col, from_json, translate
import re
from IPython.display import display, clear_output
from time import sleep

In [2]:
SERVER = 'broker:29092'

In [3]:
TOPIC = 'streaming_cartao'

In [4]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2 pyspark-shell'

In [5]:
spark = SparkSession.builder.appName("teste").getOrCreate()

In [6]:
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", SERVER) \
  .option("subscribe", TOPIC) \
  .option("startingOffsets", "earliest") \
  .load()

In [7]:
esquema_dados = StructType([                 
                 StructField('name', StringType(), True),
                 StructField('bandeira', StringType(), True),
                 StructField('credit_card', StringType(), True),
                 StructField('value', StringType(), True),
                 StructField('dia_compra', StringType(), True),
                 StructField('horario_compra', StringType(), True),
                 StructField('formato_captura', StringType(), True)
])

In [8]:
df = df.selectExpr("CAST(value AS STRING)")

In [9]:
df_dois = df.withColumn("jsonData", from_json(col("value"), esquema_dados)).select("jsonData.*")

In [10]:
df_dois.printSchema()

root
 |-- name: string (nullable = true)
 |-- bandeira: string (nullable = true)
 |-- credit_card: string (nullable = true)
 |-- value: string (nullable = true)
 |-- dia_compra: string (nullable = true)
 |-- horario_compra: string (nullable = true)
 |-- formato_captura: string (nullable = true)



In [11]:
df_media_captura = df_dois.select(col('formato_captura').alias('captura'),
                                  translate(col("value"), "$,", "").cast('float').alias('valor'))

In [12]:
df_media_captura = df_media_captura.groupby('captura').mean('valor')

In [13]:
query = df_media_captura.writeStream \
        .outputMode("complete") \
        .format("memory") \
        .queryName('media_valor_captura') \
        .start()

while True:
    clear_output(wait = True)
    display(query.status)
    display(spark.sql('SELECT * FROM media_valor_captura').show())
    time.sleep(1)

In [14]:
while True:    
    spark.sql('SELECT * FROM media_valor_captura').show()    
    sleep(3)

+-------+----------+
|captura|avg(valor)|
+-------+----------+
+-------+----------+

+-------+----------+
|captura|avg(valor)|
+-------+----------+
+-------+----------+

+-------+------------------+
|captura|        avg(valor)|
+-------+------------------+
|    TEF| 11361.85643365036|
|Gateway|10537.767753891945|
|    POS|14558.005987310478|
+-------+------------------+

+-------+------------------+
|captura|        avg(valor)|
+-------+------------------+
|    TEF| 11361.85643365036|
|Gateway|10537.767753891945|
|    POS|14558.005987310478|
+-------+------------------+

+-------+------------------+
|captura|        avg(valor)|
+-------+------------------+
|    TEF| 11361.85643365036|
|Gateway|10537.767753891945|
|    POS|14558.005987310478|
+-------+------------------+



KeyboardInterrupt: 