<a href="https://colab.research.google.com/github/DanieleFG/Ciencias_de_Dados/blob/master/spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
# @title Texto de título padrão
from pyspark import SparkContext
import os
sc = SparkContext("local", "Analise de Vendas")

file_path = "vendas.csv"
# if not os.path.exists(file_path):
#     print(f"Erro: O arquivo '{file_path}' não foi encontrado.")
#     sc.stop()
#     exit()


vendas_rdd = sc.textFile(file_path)

vendas_tuplas_rdd = vendas_rdd.map(lambda linha: tuple(linha.split(",")))

produtos_rdd = vendas_tuplas_rdd.map(lambda venda: (venda[1], (int(venda[2]), float(venda[3]))))

total_vendas_por_produto = produtos_rdd.reduceByKey(lambda a,b: (a[0] + b[0], a[1] + b[1]))
print("Total de vendas por produto")

for produto in total_vendas_por_produto.collect():
    print(f"Produto id {produto[0]}: Quantidade Total = {produto[1][0]}, ValorTotal = {produto[1][1]}")

produto_mais_vendido = total_vendas_por_produto.reduce(lambda a,b: a if a[1][0] >b[1][0] else b)

print(f"Produto mais vendido: Produto ID {produto_mais_vendido[0]} com {produto_mais_vendido[1][0]}")

media_vendas_por_produto = total_vendas_por_produto.mapValues(lambda valor: valor[1] / valor[0])

print("Média de vendas por produto")

for produto in media_vendas_por_produto.collect():
    print(f"Produto id {produto[0]}: Média = {produto[1]:.2f}")

sc.stop()

Total de vendas por produto
Produto id 101: Quantidade Total = 5, ValorTotal = 500.0
Produto id 102: Quantidade Total = 19, ValorTotal = 1200.0
Produto id 103: Quantidade Total = 20, ValorTotal = 978.0
Produto id 104: Quantidade Total = 6, ValorTotal = 365.0
Produto id 105: Quantidade Total = 10, ValorTotal = 109.0
Produto id 106: Quantidade Total = 25, ValorTotal = 864.0
Produto mais vendido: Produto ID 106 com 25
Média de vendas por produto
Produto id 101: Média = 100.00
Produto id 102: Média = 63.16
Produto id 103: Média = 48.90
Produto id 104: Média = 60.83
Produto id 105: Média = 10.90
Produto id 106: Média = 34.56


# Nova seção

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg

spark = SparkSession.builder.appName("Analise de Vendas SQL").getOrCreate()
vendas_data = [
    {"data": "2025-01-01", "produto": "Produto A", "quantidade": 100, "preco_unitario": 10.5},
    {"data": "2025-01-02", "produto": "Produto B", "quantidade": 50, "preco_unitario": 8.2},
    {"data": "2025-01-03", "produto": "Produto A", "quantidade": 75, "preco_unitario": 11.0},
    {"data": "2025-01-04", "produto": "Produto C", "quantidade": 120, "preco_unitario": 9.8},
    {"data": "2025-01-05", "produto": "Produto B", "quantidade": 60, "preco_unitario": 7.5}
]

vendas_df = spark.createDataFrame(vendas_data)

produtos_data = [
    {"produto": "Produto A", "categoria": "Eletrônico"},
    {"produto": "Produto B", "categoria": "Roupa"},
    {"produto": "Produto C", "categoria": "Alimentos"}
]

produtos_df = spark.createDataFrame(produtos_data)

vendas_df.createOrReplaceTempView("vendas")
produtos_df.createOrReplaceTempView("produtos")

resultado = spark.sql("""
  SELECT
    v.data,
    v.produto,
    v.quantidade,
    v.preco_unitario,
    p.categoria
  FROM
    vendas v
  JOIN
    produtos p ON v.produto = p.produto
    ORDER BY v.data
""")

print("Resultado da consulta sql")

resultado.show()

total_por_categoria = spark.sql("""
      SELECT
        p.categoria,
        SUM(v.quantidade * v.preco_unitario) AS total_vendas
      FROM
        vendas v
      JOIN
        produtos p ON v.produto = p.produto
      GROUP BY
        p.categoria
      ORDER BY
        total_vendas DESC
""")
print("Total por categoria")

total_por_categoria.show()

media_quantidade = vendas_df.select(avg("quantidade").alias("media_quantidade"))

print("Média de quantidade")

media_quantidade.show()

vendas_altas = vendas_df.filter(col("quantidade") > 100)
print("Vendas acima de 100")

vendas_altas.show()



spark.stop()









Resultado da consulta sql
+----------+---------+----------+--------------+----------+
|      data|  produto|quantidade|preco_unitario| categoria|
+----------+---------+----------+--------------+----------+
|2025-01-01|Produto A|       100|          10.5|Eletrônico|
|2025-01-02|Produto B|        50|           8.2|     Roupa|
|2025-01-03|Produto A|        75|          11.0|Eletrônico|
|2025-01-04|Produto C|       120|           9.8| Alimentos|
|2025-01-05|Produto B|        60|           7.5|     Roupa|
+----------+---------+----------+--------------+----------+

Total por categoria
+----------+------------+
| categoria|total_vendas|
+----------+------------+
|Eletrônico|      1875.0|
| Alimentos|      1176.0|
|     Roupa|       860.0|
+----------+------------+

Média de quantidade
+----------------+
|media_quantidade|
+----------------+
|            81.0|
+----------------+

Vendas acima de 100
+----------+--------------+---------+----------+
|      data|preco_unitario|  produto|quantida

# Nova seção

In [None]:
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("Processamento de dados empresa").getOrCreate()

df = spark.read.csv("vendas.csv", header=True, inferSchema=True)
clientes_df = spark.read.format("json").load("clientes.json")

df.createOrReplaceTempView("vendas")

df.show()

# Nova seção

In [None]:
from pyspark.sql.functions import sum , avg
vendas_agregadas = vendas_df.groupBy("produto_id").agg(
    sum("quantidade").alias("total_vendido"),
    sum("valor_unitario").alias("valor_total"),
    avg("valor_unitario").alias("preco_medio"))
vendas_agregadas.show()

# Nova seção

In [None]:
from pyspark.sql.functions import window

vendas_stream_df = spark.readStream.format("kafka").option("header", "true") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "vendas") \
  .load()

vendas_stream_df = vendas_stream_df.selectExpr("CAST(value AS STRING)").alias("value")
vendas_stream_df = vendas_stream_df.selectExpr("split(value, ',')[0] as produto_id",
                                               "split(value, ',')[1] as quantidade",
                                               "split(value, ',')[2] as valor")

vendas_agregadas_stream = vendas_stream_df.groupBy(
    window("timestamp", "1 minute"),
    "produto_id"
).agg(
    sum("quantidade").alias("total_vendido"),
    sum("valor").alias("valor_total")
)

query = vendas_agregadas_stream.writeStream \
  .outputMode("complete") \
  .start()

query.awaitTermination()