In [1]:
# # Dependencia
# # Instala o findspark
# !pip install findspark

In [1]:
# Importa o findspark e inicializa
import findspark
findspark.init()

# Import required modules
import pyspark
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, DateType, TimestampType, ArrayType
from pyspark.sql.functions import col, sum, from_json, unix_timestamp, window
import pyspark.sql.functions as F

In [2]:
# Conector
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 pyspark-shell'

In [3]:
# Cria a sessão Spark
spark = SparkSession.builder.appName("case-improving").getOrCreate()

24/06/04 21:46:43 WARN Utils: Your hostname, cj resolves to a loopback address: 127.0.1.1; using 192.168.15.34 instead (on interface enp2s0)
24/06/04 21:46:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/cj/.ivy2/cache
The jars for the packages stored in: /home/cj/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ef177f03-4c4c-41fe-be19-e6899b18ffe4;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.1 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 499ms :: artifacts dl 17ms
	:: modules

In [4]:
# Vamos criar uma subscrição no tópico que tem o streaming de dados que desejamos "puxar" os dados.
from_kafka_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "vendas-deshboard-bronze") \
    .option("failOnDataLoss", "false") \
    .option("startingOffsets", "latest") \
    .load()
# latest
# earliest
from_kafka_df

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [5]:
from_kafka_schema = StructType([
    StructField("id_vendedor", IntegerType(), False),
    StructField("id_cliente", IntegerType(), False),
    StructField("id_produto", IntegerType(), False),
    StructField("id_venda", IntegerType(), False),
    StructField("quantidade", IntegerType(), False),
    StructField("valor_unitario", DoubleType(), False),
    StructField("valor_total", DoubleType(), False),
    StructField("desconto", DoubleType(), False),
    StructField("data", DateType(), False)
])

In [6]:
# Capturamos cada linha de dado (cada valor) como string
from_kafka_value_str = from_kafka_df.selectExpr("CAST(value AS STRING)")

# Parse do formato JSON em dataframe
from_kafka_bronze_df = from_kafka_value_str.withColumn("jsonData", from_json(col("value"), from_kafka_schema)).select("jsonData.*")
from_kafka_bronze_df.printSchema()

root
 |-- id_vendedor: integer (nullable = true)
 |-- id_cliente: integer (nullable = true)
 |-- id_produto: integer (nullable = true)
 |-- id_venda: integer (nullable = true)
 |-- quantidade: integer (nullable = true)
 |-- valor_unitario: double (nullable = true)
 |-- valor_total: double (nullable = true)
 |-- desconto: double (nullable = true)
 |-- data: date (nullable = true)



In [7]:
fat_desc_silver_df = (
    from_kafka_bronze_df
        .select('valor_total', 'desconto', (col('valor_total') - col('desconto')).alias('faturamento'))
        .agg(F.sum('valor_total').alias('valor_total'),
             F.sum('desconto').alias('desconto'),
             F.sum('faturamento').alias('faturamento')
        )
        .where(col('valor_total').isNotNull())
        .withColumn('grafico',F.lit('fat_desc'))
    )
fat_desc_silver_df.printSchema()
to_kafka_media_vendas_json = fat_desc_silver_df.select(F.to_json(F.struct(*fat_desc_silver_df.columns)).alias("json_data"))
to_kafka_media_vendas_json.printSchema()

root
 |-- valor_total: double (nullable = true)
 |-- desconto: double (nullable = true)
 |-- faturamento: double (nullable = true)
 |-- grafico: string (nullable = false)

root
 |-- json_data: string (nullable = true)



In [8]:
maiores_vendas_silver_df = (
    from_kafka_bronze_df
        .select('id_vendedor', (col('valor_total') - col('desconto')).alias('faturamento'))
        .groupby(col('id_vendedor').alias('id_vendedor')).agg(F.sum('faturamento').alias('faturamento'))
        .where(col('id_vendedor').isNotNull())
        .withColumn('grafico',F.lit('maiores_vendas'))
)
maiores_vendas_silver_df.printSchema()
to_kafka_maiores_vendas_json = maiores_vendas_silver_df.select(F.to_json(F.struct(*maiores_vendas_silver_df.columns)).alias("json_data"))
to_kafka_maiores_vendas_json.printSchema()

root
 |-- id_vendedor: integer (nullable = true)
 |-- faturamento: double (nullable = true)
 |-- grafico: string (nullable = false)

root
 |-- json_data: string (nullable = true)



In [9]:
mais_vendidos_silver_df = (
    from_kafka_bronze_df
        .select(col('id_produto'), col('quantidade'), col('valor_total'), col('desconto'),
            (col('valor_total') - col('desconto')).alias('faturamento'))
        .groupby(col('id_produto').alias('id_produto'))
        .agg(F.sum('quantidade').alias('quantidade'),
            F.sum('valor_total').alias('valor_total'),
            F.sum('desconto').alias('desconto'),
            F.sum('faturamento').alias('faturamento'))
        .where(col('faturamento').isNotNull())
        .withColumn('grafico',F.lit('mais_vendidos'))
)

mais_vendidos_silver_df.printSchema()
to_kafka_mais_vendidos_json = mais_vendidos_silver_df.select(F.to_json(F.struct(*mais_vendidos_silver_df.columns)).alias("json_data"))
to_kafka_mais_vendidos_json.printSchema()

root
 |-- id_produto: integer (nullable = true)
 |-- quantidade: long (nullable = true)
 |-- valor_total: double (nullable = true)
 |-- desconto: double (nullable = true)
 |-- faturamento: double (nullable = true)
 |-- grafico: string (nullable = false)

root
 |-- json_data: string (nullable = true)



In [11]:
result_union_df = to_kafka_media_vendas_json.union(to_kafka_maiores_vendas_json)
result_union_df.printSchema()

result_union_df2 = result_union_df.union(to_kafka_mais_vendidos_json)
result_union_df2.printSchema()

result_union_df_json = result_union_df2.select(F.to_json(F.struct(*result_union_df2.columns)).alias("value"))
result_union_df_json.printSchema()

root
 |-- json_data: string (nullable = true)

root
 |-- json_data: string (nullable = true)

root
 |-- value: string (nullable = true)



In [16]:
final_result = (sendersql
    .selectExpr("CAST(value AS STRING)")
    .writeStream
    .format("console")
    # .format("kafka")
    .outputMode("complete")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "vendas-deshboard-gold")
    .option("checkpointLocation", "./check.txt")
    .option("truncate", False)
    .start()
    # .awaitTermination()
)

AnalysisException: [WRITE_STREAM_NOT_ALLOWED] `writeStream` can be called only on streaming Dataset/DataFrame.

In [None]:
final_result.awaitTermination()

24/06/04 21:25:48 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
24/06/04 21:26:01 WARN HDFSBackedStateStoreProvider: The state for version 3 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
24/06/04 21:26:01 WARN HDFSBackedStateStoreProvider: The state for version 3 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
24/06/04 21:26:01 WARN HDFSBackedStateStoreProvider: The state for version 3 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
24/06/04 21:26:01 WARN HDFSBackedStateStoreProvider: The state for version 3 doesn't exist in loadedMaps. Reading snapshot file and delta files

-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                     |
+------------------------------------------------------------------------------------------------------------------------------------------+
|{"json_data":"{\"valor_total\":8.0,\"desconto\":2.0,\"faturamento\":6.0,\"grafico\":\"fat_desc\"}"}                                       |
|{"json_data":"{\"id_vendedor\":1,\"faturamento\":6.0,\"grafico\":\"maiores_vendas\"}"}                                                    |
|{"json_data":"{\"id_produto\":1,\"quantidade\":2,\"valor_total\":8.0,\"desconto\":2.0,\"faturamento\":6.0,\"grafico\":\"mais_vendidos\"}"}|
+--------------------------------------------------------

                                                                                

-------------------------------------------
Batch: 4
-------------------------------------------
+------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                     |
+------------------------------------------------------------------------------------------------------------------------------------------+
|{"json_data":"{\"valor_total\":12.0,\"desconto\":3.0,\"faturamento\":9.0,\"grafico\":\"fat_desc\"}"}                                      |
|{"json_data":"{\"id_vendedor\":1,\"faturamento\":6.0,\"grafico\":\"maiores_vendas\"}"}                                                    |
|{"json_data":"{\"id_vendedor\":2,\"faturamento\":3.0,\"grafico\":\"maiores_vendas\"}"}                                                    |
|{"json_data":"{\"id_produto\":1,\"quantidade\":2,\"valor

In [None]:
query.stop()

In [None]:
query = fat_desc_from_kafka_df.writeStream.option("truncate", "false").outputMode("update").format("console").start()

In [None]:
spark.stop()

In [None]:
# Renomeamos as colunas para simplificar nossa análise
df_conversao_dados = (df_conversao
    .select(
        col("id_vendedor").alias("vendedor"),
        col("id_cliente").alias("cliente"),
        col("id_produto").alias("produto"),
        col("id_venda").alias("venda"),
        col("quantidade").alias("quantidade"),
        col("valor_unitario").alias("valor_unitario"),
        col("valor_total").alias("total"),
        col("desconto").alias("desconto"),
        col("data").alias("data")
    )
)