In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType
from pyspark.sql.functions import col, when, trim

spark = SparkSession.builder \
    .appName("segundaAula-hurum") \
    .getOrCreate()

In [2]:
csv_data = """id,data_venda,produto,valor,cliente_id
1,2026-01-01,Notebook,3500.00,100
2,2026-01-01,,150.50,101
3,2026-01-02,Mouse,20.00,
4,2026-01-02,Teclado,Erro,102
1,2026-01-01,Notebook,50000.00,100
5,2026-01-03,Monitor,1200.00,103"""

os.makedirs("dados", exist_ok=True)
with open("dados/vendas_sujas.csv", "w") as f:
    f.write(csv_data)

print("Arquivo 'vendas_sujas.csv' gerado!")

Arquivo 'vendas_sujas.csv' gerado!


In [3]:
schema_vendas = StructType([
    StructField("id", IntegerType(), True),
    StructField("data_venda", DateType(), True), # Tenta converter data
    StructField("produto", StringType(), True),
    StructField("valor", StringType(), True), # Lemos como String primeiro para tratar erros
    StructField("cliente_id", IntegerType(), True)
])

In [4]:
df_sujo = spark.read.csv(
    "dados/vendas_sujas.csv", 
    header=True, 
    schema=schema_vendas, # For√ßa nosso schema
    mode="PERMISSIVE" # Permite ler linhas com erro colocando NULL
)

In [5]:
df_sujo.show()

+---+----------+--------+--------+----------+
| id|data_venda| produto|   valor|cliente_id|
+---+----------+--------+--------+----------+
|  1|2026-01-01|Notebook| 3500.00|       100|
|  2|2026-01-01|    NULL|  150.50|       101|
|  3|2026-01-02|   Mouse|   20.00|      NULL|
|  4|2026-01-02| Teclado|    Erro|       102|
|  1|2026-01-01|Notebook|50000.00|       100|
|  5|2026-01-03| Monitor| 1200.00|       103|
+---+----------+--------+--------+----------+



In [18]:
df_drop_duplicates = df_sujo.dropDuplicates(["id"])

In [19]:
df_drop_duplicates.show()

+---+----------+--------+-------+----------+
| id|data_venda| produto|  valor|cliente_id|
+---+----------+--------+-------+----------+
|  1|2026-01-01|Notebook|3500.00|       100|
|  2|2026-01-01|    NULL| 150.50|       101|
|  3|2026-01-02|   Mouse|  20.00|      NULL|
|  4|2026-01-02| Teclado|   Erro|       102|
|  5|2026-01-03| Monitor|1200.00|       103|
+---+----------+--------+-------+----------+



In [6]:
df_na = df_sujo.na.fill({"produto": "Produto Desconhecido"})
df_na = df_na.na.drop(subset=["cliente_id"])

In [7]:
df_tipos = df_na.withColumn("valor_numerico", col("valor").cast(DoubleType()))

df_final = df_tipos.na.fill({"valor_numerico": 0.0}) \
                   .drop("valor")

In [8]:
df_final.show()

+---+----------+--------------------+----------+--------------+
| id|data_venda|             produto|cliente_id|valor_numerico|
+---+----------+--------------------+----------+--------------+
|  1|2026-01-01|            Notebook|       100|        3500.0|
|  2|2026-01-01|Produto Desconhecido|       101|         150.5|
|  4|2026-01-02|             Teclado|       102|           0.0|
|  1|2026-01-01|            Notebook|       100|       50000.0|
|  5|2026-01-03|             Monitor|       103|        1200.0|
+---+----------+--------------------+----------+--------------+



In [23]:
df_final.printSchema()

root
 |-- id: integer (nullable = true)
 |-- data_venda: date (nullable = true)
 |-- produto: string (nullable = false)
 |-- cliente_id: integer (nullable = true)
 |-- valor_numerico: double (nullable = false)



In [9]:
df_final.createOrReplaceTempView("tb_vendas")


In [10]:
df_relatorio = spark.sql("""
    SELECT 
        produto,
        COUNT(id) as total_pedidos,
        SUM(valor_numerico) as faturamento_total,
        AVG(valor_numerico) as ticket_medio
    FROM tb_vendas
    GROUP BY produto
    ORDER BY faturamento_total DESC
""")


In [11]:
df_relatorio.show()

+--------------------+-------------+-----------------+------------+
|             produto|total_pedidos|faturamento_total|ticket_medio|
+--------------------+-------------+-----------------+------------+
|            Notebook|            2|          53500.0|     26750.0|
|             Monitor|            1|           1200.0|      1200.0|
|Produto Desconhecido|            1|            150.5|       150.5|
|             Teclado|            1|              0.0|         0.0|
+--------------------+-------------+-----------------+------------+



In [12]:
caminho_parquet = "dados/relatorio_vendas.parquet"

df_relatorio.write \
    .mode("overwrite") \
    .parquet(caminho_parquet)

In [13]:
df_leitura = spark.read.parquet(caminho_parquet)

In [14]:
df_leitura.show()

+--------------------+-------------+-----------------+------------+
|             produto|total_pedidos|faturamento_total|ticket_medio|
+--------------------+-------------+-----------------+------------+
|            Notebook|            2|          53500.0|     26750.0|
|             Monitor|            1|           1200.0|      1200.0|
|Produto Desconhecido|            1|            150.5|       150.5|
|             Teclado|            1|              0.0|         0.0|
+--------------------+-------------+-----------------+------------+

