In [None]:
#Teste Fumio"
#Validando commit, push, branch e codspace"
#Este comentário está sendo inscrito no VSCODE do Codspace.
    #Instalei o Pyspark no Codspace.
#A sincronização entre CODSPACE e VSCODE foi realizada com sucesso
    #Criando novos comentários para documentação

# Importa o Spark e outras bibliotecas necessárias
import pyspark # O módulo principal
from pyspark.sql import SparkSession
from pyspark.sql import functions as F # O alias 'F' MAIÚSCULO é usado para todas as funções (F.col, F.when, etc.)
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType

# Cria ou obtém uma SparkSession
# Master: 'local[*]' indica que o Spark deve usar todos os núcleos disponíveis na máquina
# appName: Nome da aplicação (importante para monitoramento)
spark = (SparkSession.builder
    .master("local[*]")
    .appName("AulaPraticaPySpark_DataQuality")
    .config("spark.executor.memory", "4g") # Opcional: Configurações de memória
    .config("spark.driver.memory", "4g")  # Opcional: Configurações de memória
    .getOrCreate()
)

# Imprime o status da sessão para confirmação
print("SparkSession inicializada com sucesso!")
print(f"Versão do Spark: {spark.version}")
print(f"Aplicação: {spark.sparkContext.appName}")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/02 16:06:13 WARN Utils: Your hostname, codespaces-844813, resolves to a loopback address: 127.0.0.1; using 10.0.10.206 instead (on interface eth0)
25/11/02 16:06:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/02 16:06:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


SparkSession inicializada com sucesso!
Versão do Spark: 4.0.1
Aplicação: AulaPraticaPySpark_DataQuality


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 45196)
Traceback (most recent call last):
  File "/usr/local/python/3.12.1/lib/python3.12/socketserver.py", line 318, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/local/python/3.12.1/lib/python3.12/socketserver.py", line 349, in process_request
    self.finish_request(request, client_address)
  File "/usr/local/python/3.12.1/lib/python3.12/socketserver.py", line 362, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/local/python/3.12.1/lib/python3.12/socketserver.py", line 761, in __init__
    self.handle()
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/pyspark/accumulators.py", line 297, in handle
    poll(authenticate_and_accum_updates)
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/pyspark/accumulators.py", line 271, in poll
    if self.rfile in r and func():
   

In [2]:
schema_vendas = StructType([
    StructField("ID_VENDA", IntegerType(), nullable=False),
    StructField("DATA_REGISTRO_RAW", StringType(), nullable=True), # O tipo original é String, pois o formato é inconsistente ('2024-03-20' vs '2024/03/20')
    StructField("VALOR_BRUTO_RAW", StringType(), nullable=True),  # O tipo original é String, pois pode vir com vírgula (250,99) ou nulo
    StructField("PRODUTO", StringType(), nullable=True),
    StructField("STATUS_VENDA", StringType(), nullable=True)
])

In [3]:
caminho_arquivo = "vendas_brutas.csv" 
df_vendas_raw = (
    spark.read
    .csv(
        caminho_arquivo,
        header=True,
        schema=schema_vendas,
        sep=",",
        # O PySpark lê a primeira linha como cabeçalho
        # e garante que os dados sigam o schema definido (Data Quality!)
    )
)

In [4]:
print("\n--- Schema Original (Raw) ---")
df_vendas_raw.printSchema()

print("\n--- Primeiros Registros ---")
df_vendas_raw.show(truncate=False)


--- Schema Original (Raw) ---
root
 |-- ID_VENDA: integer (nullable = true)
 |-- DATA_REGISTRO_RAW: string (nullable = true)
 |-- VALOR_BRUTO_RAW: string (nullable = true)
 |-- PRODUTO: string (nullable = true)
 |-- STATUS_VENDA: string (nullable = true)


--- Primeiros Registros ---


25/11/02 16:49:45 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: ID_VENDA, DATA_REGISTRO, VALOR_BRUTO, PRODUTO, STATUS
 Schema: ID_VENDA, DATA_REGISTRO_RAW, VALOR_BRUTO_RAW, PRODUTO, STATUS_VENDA
Expected: DATA_REGISTRO_RAW but found: DATA_REGISTRO
CSV file: file:///workspaces/Mackenzie_Testando_Spark_Codspace/vendas_brutas.csv


+--------+-----------------+---------------+-------+------------+
|ID_VENDA|DATA_REGISTRO_RAW|VALOR_BRUTO_RAW|PRODUTO|STATUS_VENDA|
+--------+-----------------+---------------+-------+------------+
|1001    |2024-01-15       |150.50         |Laptop |Concluido   |
|1002    |2024-02-10       |99.00          |Mouse  |NULL        |
|1003    |2024/03/20       |250            |99     |Teclado     |
|1004    |2024-04-01       |NULL           |Monitor|Concluido   |
|1004    |2024-04-01       |NULL           |Monitor|Concluido   |
+--------+-----------------+---------------+-------+------------+



In [5]:
# Ação 1: Limpar/Preencher o campo STATUS_VENDA
# Se o status for nulo, vamos preencher com 'EM_PROCESSAMENTO' (regra de negócio)
df_limpeza = df_vendas_raw.withColumn(
    "STATUS_VENDA",
    F.coalesce(F.col("STATUS_VENDA"), F.lit("EM_PROCESSAMENTO")) # Coalesce pega o primeiro valor não nulo
)

# Ação 2: Tratamento de nulos em VALOR_BRUTO_RAW
# Se o valor for nulo, vamos preencher com 0 (regra de negócio para vendas não registradas)
df_limpeza = df_limpeza.na.fill(value="0.0", subset=['VALOR_BRUTO_RAW'])

print("\n--- 1. Limpeza (Cleaning): Nulos Tratados ---")
df_limpeza.show(truncate=False)


--- 1. Limpeza (Cleaning): Nulos Tratados ---
+--------+-----------------+---------------+-------+----------------+
|ID_VENDA|DATA_REGISTRO_RAW|VALOR_BRUTO_RAW|PRODUTO|STATUS_VENDA    |
+--------+-----------------+---------------+-------+----------------+
|1001    |2024-01-15       |150.50         |Laptop |Concluido       |
|1002    |2024-02-10       |99.00          |Mouse  |EM_PROCESSAMENTO|
|1003    |2024/03/20       |250            |99     |Teclado         |
|1004    |2024-04-01       |0.0            |Monitor|Concluido       |
|1004    |2024-04-01       |0.0            |Monitor|Concluido       |
+--------+-----------------+---------------+-------+----------------+



25/11/02 16:50:18 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: ID_VENDA, DATA_REGISTRO, VALOR_BRUTO, PRODUTO, STATUS
 Schema: ID_VENDA, DATA_REGISTRO_RAW, VALOR_BRUTO_RAW, PRODUTO, STATUS_VENDA
Expected: DATA_REGISTRO_RAW but found: DATA_REGISTRO
CSV file: file:///workspaces/Mackenzie_Testando_Spark_Codspace/vendas_brutas.csv


In [8]:
df_transformacao = (
    df_limpeza
    # Transformação 1: Padronizar STATUS_VENDA para caixa alta (CONSISTÊNCIA)
    .withColumn("STATUS_VENDA", F.upper(F.col("STATUS_VENDA")))

    # Transformação 2: Limpar e Converter VALOR_BRUTO_RAW para Decimal (TIPAGEM CORRETA)
    # 1. Substitui vírgulas (',') por ponto ('.')
    # 2. Converte o resultado para tipo Double
    .withColumn(
        "VALOR_VENDA",
        F.regexp_replace(F.col("VALOR_BRUTO_RAW"), ",", ".").cast(DoubleType())
    )

    # Transformação 3: Converter DATA_REGISTRO_RAW para Tipo Date (FORMATO CORRETO)
    # Tenta inferir o formato de data (yyyy-MM-dd, yyyy/MM/dd, etc.) - PySpark é flexível
    .withColumn(
        "DATA_VENDA",
        F.to_date(F.col("DATA_REGISTRO_RAW"), "yyyy-MM-dd") # Tenta formato padrão, o PySpark pode inferir variações simples
    )
    .withColumn("DATA_VENDA", F.coalesce(F.col("DATA_VENDA"), F.to_date(F.col("DATA_REGISTRO_RAW"), "yyyy/MM/dd"))) # Tenta o formato com barra, caso o anterior falhe
    .drop("DATA_REGISTRO_RAW", "VALOR_BRUTO_RAW") # Remove as colunas RAW
)

print("\n--- 2. Transformação: Padronização e Casting ---")
df_transformacao.printSchema()
#df_transformacao.show(truncate=False)
df_transformacao.show


--- 2. Transformação: Padronização e Casting ---
root
 |-- ID_VENDA: integer (nullable = true)
 |-- PRODUTO: string (nullable = true)
 |-- STATUS_VENDA: string (nullable = false)
 |-- VALOR_VENDA: double (nullable = true)
 |-- DATA_VENDA: date (nullable = true)



<bound method DataFrame.show of DataFrame[ID_VENDA: int, PRODUTO: string, STATUS_VENDA: string, VALOR_VENDA: double, DATA_VENDA: date]>

In [10]:
# Ação: Criar uma nova coluna indicando se a venda é considerada de "Alto Valor" (> 200)
df_enriquecimento = df_transformacao.withColumn(
    "FLAG_ALTO_VALOR",
    F.when(F.col("VALOR_VENDA") >= 200, F.lit(True)).otherwise(F.lit(False))
)

print("\n--- 3. Enriquecimento: Nova Feature Adicionada ---")
#df_enriquecimento.show(truncate=False)
df_enriquecimento.show


--- 3. Enriquecimento: Nova Feature Adicionada ---


<bound method DataFrame.show of DataFrame[ID_VENDA: int, PRODUTO: string, STATUS_VENDA: string, VALOR_VENDA: double, DATA_VENDA: date, FLAG_ALTO_VALOR: boolean]>

In [11]:
# Ação: Padronizar o nome do produto (ex: 'LAPTOP' pode vir como 'NoteBook', 'Laptop')
# Vamos usar o PRODUTO para garantir que todos sejam capitalizados
df_normalizacao = df_enriquecimento.withColumn(
    "PRODUTO_PADRAO",
    F.when(F.upper(F.col("PRODUTO")).like("%LAPTOP%"), F.lit("NOTEBOOK/LAPTOP"))
    .when(F.upper(F.col("PRODUTO")).like("%MOUSE%"), F.lit("PERIFERICO_SIMPLES"))
    .otherwise(F.upper(F.col("PRODUTO")))
).drop("PRODUTO").withColumnRenamed("PRODUTO_PADRAO", "PRODUTO") # Substitui a coluna original

print("\n--- 4. Normalização: Categorias Padronizadas ---")
#df_normalizacao.show(truncate=False)
df_normalizacao.show


--- 4. Normalização: Categorias Padronizadas ---


<bound method DataFrame.show of DataFrame[ID_VENDA: int, STATUS_VENDA: string, VALOR_VENDA: double, DATA_VENDA: date, FLAG_ALTO_VALOR: boolean, PRODUTO: string]>

In [13]:
# Usamos apenas as colunas que definem a unicidade do registro (chave primária natural)
colunas_chave = ["ID_VENDA"]

# Ação: Remover duplicatas estritas
df_curated = df_normalizacao.dropDuplicates(subset=colunas_chave)

# Demonstração de Desduplicação Avançada (Teoria: Manter o Mais Recente)
# Isso é útil quando ID_VENDA pudesse se repetir com versões diferentes.
# w = Window.partitionBy("ID_VENDA").orderBy(F.col("DATA_VENDA").desc())
# df_curated_avancado = (df_normalizacao
#     .withColumn("row_number", F.row_number().over(w))
#     .filter(F.col("row_number") == 1)
#     .drop("row_number")
# )

print(f"\n--- 5. Desduplicação: Registros Únicos ({df_normalizacao.count()} vs {df_curated.count()}) ---")
#df_curated.show(truncate=False)
df_curated.show
df_curated.printSchema()



--- 5. Desduplicação: Registros Únicos (5 vs 4) ---
root
 |-- ID_VENDA: integer (nullable = true)
 |-- STATUS_VENDA: string (nullable = false)
 |-- VALOR_VENDA: double (nullable = true)
 |-- DATA_VENDA: date (nullable = true)
 |-- FLAG_ALTO_VALOR: boolean (nullable = false)
 |-- PRODUTO: string (nullable = true)



In [15]:
# Definindo o caminho de saída (simulação de um Data Lake)
caminho_parquet = "data/curated/vendas_parquet"

# Ação: Salvar o DataFrame tratado (df_curated) em Parquet
(
    df_curated.write
    .mode("overwrite") # Sobrescreve se o diretório já existir
    .partitionBy("STATUS_VENDA") # Particionamento físico: otimiza queries que filtram por status
    .option("compression", "snappy") # Snappy é o padrão (bom trade-off)
    .parquet(caminho_parquet)
)

print(f"\n--- 1. Escrita em Parquet concluída ---")
print(f"Dados salvos e particionados em: {caminho_parquet}")

25/11/02 16:55:34 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: ID_VENDA, DATA_REGISTRO, VALOR_BRUTO, PRODUTO, STATUS
 Schema: ID_VENDA, DATA_REGISTRO_RAW, VALOR_BRUTO_RAW, PRODUTO, STATUS_VENDA
Expected: DATA_REGISTRO_RAW but found: DATA_REGISTRO
CSV file: file:///workspaces/Mackenzie_Testando_Spark_Codspace/vendas_brutas.csv
25/11/02 16:55:34 ERROR Executor: Exception in task 0.0 in stage 25.0 (TID 17)
org.apache.spark.SparkDateTimeException: [CANNOT_PARSE_TIMESTAMP] Text '2024/03/20' could not be parsed at index 4. Use `try_to_timestamp` to tolerate invalid input string and return NULL instead. SQLSTATE: 22007
	at org.apache.spark.sql.errors.QueryExecutionErrors$.ansiDateTimeParseError(QueryExecutionErrors.scala:279)
	at org.apache.spark.sql.errors.QueryExecutionErrors.ansiDateTimeParseError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at or

DateTimeException: [CANNOT_PARSE_TIMESTAMP] Text '2024/03/20' could not be parsed at index 4. Use `try_to_timestamp` to tolerate invalid input string and return NULL instead. SQLSTATE: 22007

In [16]:
# Definindo o caminho de saída
caminho_orc = "data/curated/vendas_orc"

# Ação: Salvar o mesmo DataFrame em ORC
(
    df_curated.write
    .mode("overwrite")
    .partitionBy("STATUS_VENDA")
    .orc(caminho_orc)
)

print(f"\n--- 2. Escrita em ORC concluída ---")
print(f"Dados salvos e particionados em: {caminho_orc}")

25/11/02 16:58:55 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: ID_VENDA, DATA_REGISTRO, VALOR_BRUTO, PRODUTO, STATUS
 Schema: ID_VENDA, DATA_REGISTRO_RAW, VALOR_BRUTO_RAW, PRODUTO, STATUS_VENDA
Expected: DATA_REGISTRO_RAW but found: DATA_REGISTRO
CSV file: file:///workspaces/Mackenzie_Testando_Spark_Codspace/vendas_brutas.csv
25/11/02 16:58:55 ERROR Executor: Exception in task 0.0 in stage 26.0 (TID 18)
org.apache.spark.SparkDateTimeException: [CANNOT_PARSE_TIMESTAMP] Text '2024/03/20' could not be parsed at index 4. Use `try_to_timestamp` to tolerate invalid input string and return NULL instead. SQLSTATE: 22007
	at org.apache.spark.sql.errors.QueryExecutionErrors$.ansiDateTimeParseError(QueryExecutionErrors.scala:279)
	at org.apache.spark.sql.errors.QueryExecutionErrors.ansiDateTimeParseError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at or

DateTimeException: [CANNOT_PARSE_TIMESTAMP] Text '2024/03/20' could not be parsed at index 4. Use `try_to_timestamp` to tolerate invalid input string and return NULL instead. SQLSTATE: 22007

In [17]:
# Leitura Otimizada - Filtro na partição
# Apenas os diretórios (partições) que contêm 'CANCELADO' serão lidos
df_leitura_filtrada = spark.read.parquet(caminho_parquet).filter(F.col("STATUS_VENDA") == "CANCELADO")

print("\n--- 3.1. Predicate Pushdown: Filtro no Disco ---")
print(f"Total de Registros lidos: {df_leitura_filtrada.count()}")
df_leitura_filtrada.show()

AnalysisException: [UNABLE_TO_INFER_SCHEMA] Unable to infer schema for Parquet. It must be specified manually. SQLSTATE: 42KD9

In [18]:
# Leitura Otimizada - Seleção de Colunas (Projeção)
# O Spark carrega apenas os bytes da coluna 'ID_VENDA' e 'VALOR_VENDA'
df_projecao = spark.read.parquet(caminho_parquet).select("ID_VENDA", "VALOR_VENDA")

print("\n--- 3.2. SerDe: Deserialização Seletiva (Projeção) ---")
print(f"Colunas carregadas: {df_projecao.columns}")
df_projecao.printSchema()

# Ponto de Discussão:
# Comparação 1: Se este fosse um arquivo CSV, o SerDe leria a linha inteira para depois descartar as colunas não pedidas.
# Comparação 2: No Parquet, o Spark/SerDe acessa apenas a 'faixa' vertical da coluna no arquivo físico.

AnalysisException: [UNABLE_TO_INFER_SCHEMA] Unable to infer schema for Parquet. It must be specified manually. SQLSTATE: 42KD9