In [0]:
# Importar as bibliotecas necessárias
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Iniciar a SparkSession com configurações otimizadas
spark = SparkSession.builder \
    .appName("Load Data Bronze") \
    .config("spark.sql.shuffle.partitions", "200")  \
    .config("spark.sql.files.maxPartitionBytes", "128MB") \
    .config("spark.sql.parquet.compression.codec", "snappy") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# Define um número fixo de partições para shuffle, melhorando o paralelismo                 
# Define o tamanho máximo de partições para evitar muitos arquivos pequenos        
# Usa o codec Snappy para compressão rápida, otimizando tempo de leitura e escrita    
# Habilita otimizações adaptativas, ajustando o número de partições dinamicamente com base no tamanho dos dados

# Definir caminhos de armazenamento no Data Lake
lz_path_in = "/Volumes/workspace/lhdw/landingzone/vendas/processar"
lz_path_out = "/Volumes/workspace/lhdw/landingzone/vendas/processado"
bronze_path = "/Volumes/workspace/lhdw/bronze/vendas"





In [0]:
#from pyspark.sql.functions import regexp_extrac
from pyspark.sql.types import (
    StructType,
    StructField,
    IntegerType,
    DateType,
    StringType,
    DoubleType
)


# Definir o esquema dos dados brutos
schema_lz = StructType([
    StructField("IDProduto", IntegerType(), True),
    StructField("Data", DateType(), True),
    StructField("IDCliente", IntegerType(), True),
    StructField("IDCampanha", IntegerType(), True),
    StructField("Unidades", IntegerType(), True),
    StructField("Produto", StringType(), True),
    StructField("Categoria", StringType(), True),
    StructField("Segmento", StringType(), True),
    StructField("IDFabricante", IntegerType(), True),
    StructField("Fabricante", StringType(), True),
    StructField("CustoUnitario", DoubleType(), True),
    StructField("PrecoUnitario", DoubleType(), True),
    StructField("CodigoPostal", StringType(), True),
    StructField("EmailNome", StringType(), True),
    StructField("Cidade", StringType(), True),
    StructField("Estado", StringType(), True),
    StructField("Regiao", StringType(), True),
    StructField("Distrito", StringType(), True),
    StructField("Pais", StringType(), True)
])


# Exibindo o DataFrame para verificar a leitura correta dos dados

df_vendas = (
    spark.read
    .option("header", "true")
    .schema(schema_lz)
    .csv(lz_path_in)
    .withColumn(
        "filename",
        regexp_extract(
            "_metadata.file_path",
            "([^/]+)$",
            0
        )
    )
)

distinct_filenames = df_vendas.select("filename").distinct()

display(df_vendas)

IDProduto,Data,IDCliente,IDCampanha,Unidades,Produto,Categoria,Segmento,IDFabricante,Fabricante,CustoUnitario,PrecoUnitario,CodigoPostal,EmailNome,Cidade,Estado,Regiao,Distrito,Pais,filename


In [0]:
display(distinct_filenames)

filename


In [0]:
# Escrever a tabela no formato Parquet, particionando por DataVenda (ano e mês)
df_vendas.withColumn("Ano", year("Data")) \
             .withColumn("Mes", month("Data")) \
             .write.mode("overwrite").partitionBy("Ano", "Mes").parquet(bronze_path)

# Apresentando o DataFrame
#display(df_vendas)

In [0]:
from pyspark.sql import functions as F

# Mover os arquivos processados para o caminho lz_path_out
if distinct_filenames.select("filename").distinct().count() > 0:
    filenames = distinct_filenames.select("filename").distinct().collect()

    for row in filenames:
        src_path = row.filename
        dbutils.fs.mv(lz_path_in + "/" + src_path, lz_path_out)

In [0]:

%fs ls /Volumes/workspace/lhdw/landingzone/vendas/processar


[]

In [0]:
%fs ls /Volumes/workspace/lhdw/landingzone/vendas/processado/

path,name,size,modificationTime
dbfs:/Volumes/workspace/lhdw/landingzone/vendas/processado/dados_2011.csv,dados_2011.csv,21493733,1758580275000


In [0]:
%fs ls /Volumes/workspace/lhdw/bronze/vendas/Ano=2011/Mes=10

path,name,size,modificationTime
dbfs:/Volumes/workspace/lhdw/bronze/vendas/Ano=2011/Mes=10/_committed_459145594140797369,_committed_459145594140797369,824,1758580272000
dbfs:/Volumes/workspace/lhdw/bronze/vendas/Ano=2011/Mes=10/_committed_4917773363566813796,_committed_4917773363566813796,819,1758191376000
dbfs:/Volumes/workspace/lhdw/bronze/vendas/Ano=2011/Mes=10/_committed_6626776131120291231,_committed_6626776131120291231,824,1758580437000
dbfs:/Volumes/workspace/lhdw/bronze/vendas/Ano=2011/Mes=10/_committed_vacuum5965171572479591911,_committed_vacuum5965171572479591911,96,1758580273000
dbfs:/Volumes/workspace/lhdw/bronze/vendas/Ano=2011/Mes=10/_started_459145594140797369,_started_459145594140797369,0,1758580271000


In [0]:
#df_vendas.withColumn("Ano", year("Data")) \
 #        .withColumn("Mes", month("Data")) \
  #       .write.mode("append").partitionBy("Ano", "Mes").parquet(bronze_path)

In [0]:
del df_vendas
# O comando del remove o objeto da memória. Isso é útil quando você tem grandes DataFrames ou objetos Python que já não são necessários.

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:132)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:132)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

In [0]:
import gc
gc.collect()

#Esse comando força o coletor de lixo a executar imediatamente, liberando a memória de objetos Python que não estão mais em uso.

14240