                                    TODOS IMPORT NECESSÁRIOS PARA RODAR PYSPARK + DELTA

In [1]:
import findspark
findspark.init()

- Define variáveis de ambiente do Hadoop (necessário no Windows).
  O Spark precisa dessas variáveis para operar com arquivos e Delta Lake.

In [2]:
import os
os.environ["HADOOP_HOME"] = "C:\\hadoop"
os.environ["HADOOP_COMMON_HOME"] = "C:\\hadoop"
os.environ["HADOOP_HDFS_HOME"] = "C:\\hadoop"
os.environ["HADOOP_TMP_DIR"] = "C:\\hadoop\\tmp"
os.environ["PATH"] = f"{os.environ['PATH']};C:\\hadoop\\bin"

In [3]:
if not os.path.exists("C:\\hadoop\\tmp"): #Cria a pasta temporária, se não existir, usada pelo Hadoop/Spark internamente
    os.makedirs("C:\\hadoop\\tmp")

# Configurações essenciais para Windows
os.environ["HADOOP_USER_NAME"] = os.getlogin()  # Define o usuário atual do sistema como dono dos processos Hadoop

In [4]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip #Importa as bibliotecas necessárias para iniciar o Spark com suporte ao Delta Lake.

In [5]:
#Garante que bibliotecas JAR do Hadoop estejam disponíveis no classpath do Spark.
from pyspark import SparkFiles
spark_jars = ":".join([
    "C:\\hadoop\\share\\hadoop\\common\\*.jar",
    "C:\\hadoop\\share\\hadoop\\common\\lib\\*.jar"
])
os.environ["SPARK_CLASSPATH"] = spark_jars

                                CRIAÇÃO DE SESSÃO COM CONFIGURAÇÃO NECESSÁRIO PARA O FUNCIONANDO DO DELTA

Cria a sessão Spark com configurações específicas:

Suporte ao Delta Lake.

Aumenta a memória do driver (4g) para evitar travamentos.

Define paths locais para logs, warehouse, HDFS etc.

In [6]:
builder = SparkSession.builder \
    .appName('DW_transformacao') \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem") \
    .config("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
    .config("spark.sql.warehouse.dir", "file:///C:/temp/spark-warehouse") \
    .config("spark.network.timeout", "600s") \
    .config("spark.hadoop.fs.defaultFS", "file:///") \
    .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.HDFSLogStore")\
    .config("spark.driver.memory", "4g")

In [7]:
spark = configure_spark_with_delta_pip(builder).getOrCreate() #Inicializa o Spark com Delta Lake usando o builder acima.

                             CRIEI VÁRIOS DF USANDO OS ARQUIVOS DELTA QUE EU TINHA SALVADO NA ETAPA ANTERIOR

In [8]:
df_empresas = spark.read.format('delta').load(r'C:\Users\ResTIC16\Documents\IBGE_PROJETO\datawarehouse_ibge\data\RAW\empresas')
df_municipios = spark.read.format('delta').load(r'C:\Users\ResTIC16\Documents\IBGE_PROJETO\datawarehouse_ibge\data\RAW\municipios')
df_CNAE = spark.read.format('delta').load(r'C:\Users\ResTIC16\Documents\IBGE_PROJETO\datawarehouse_ibge\data\RAW\CNAE')
df_natureza_juridica = spark.read.format('delta').load(r'C:\Users\ResTIC16\Documents\IBGE_PROJETO\datawarehouse_ibge\data\RAW\naturezas')
df_simples_nacional = spark.read.format('delta').load(r'C:\Users\ResTIC16\Documents\IBGE_PROJETO\datawarehouse_ibge\data\RAW\simples')
df_estabelecimentos = spark.read.format('delta').load(r'C:\Users\ResTIC16\Documents\IBGE_PROJETO\datawarehouse_ibge\data\RAW\estabelecimentos')

In [9]:
from pyspark.sql.functions import *  #IMPORTEI TODAS AS FUNCÕES DO PYSPARK PARA SER MAIS PRÁTICO

                                                LIMPEZA DOS DADOS

- Filtra valores nulos ou inválidos nas principais colunas das empresas.
- Garante que os dados são minimamente válidos antes dos joins.

In [10]:
df_empresas = df_empresas.filter(col("razao_social").isNotNull() & (trim(col("razao_social")) != ""))
df_empresas = df_empresas.filter(length(col("cnpj_basico")) >= 8)

In [11]:
df_empresas = df_empresas.filter(
    col("natureza_juridica").isNotNull() & 
    col("qualificacao_responsavel").isNotNull() & 
    col("porte_empresa").isNotNull()
)

In [12]:
df_municipios = df_municipios.filter(
    col("codigo").isNotNull() & 
    (trim(col("nome")) != "")
)

In [13]:
df_CNAE = df_CNAE.filter(
    col("codigo").isNotNull() & 
    (trim(col("descricao")) != "")
)

In [14]:
df_natureza_juridica = df_natureza_juridica.filter(
    col("codigo").isNotNull() & 
    (trim(col("descricao")) != "")
)

In [15]:
df_simples_nacional = df_simples_nacional.filter(col("cnpj_basico").isNotNull())
df_simples_nacional = df_simples_nacional.filter(col("opcao_simples").isin("S", "N"))

                                                JOINS E ENRIQUECIMENTOS

- Tabela de municipio e CNAE não tinha ligação direta com empresas, por isso foi necessário usar estabelecimentos para o join.

In [16]:
df_simples_nacional = df_simples_nacional.withColumnRenamed("cnpj_basico", "cnpj_1")

In [17]:
municipios_2 = df_estabelecimentos.select(
    col("municipio").alias("municipio_id"),
    col("uf").alias("uf"),
    col('cnpj_basico').alias('cnpj_estabelecimento')
)

In [18]:
df_municipios = df_municipios.join(municipios_2, df_municipios["codigo"] == municipios_2["municipio_id"], "left")
df_municipios = df_municipios.withColumnRenamed("nome", "nome_municipio")
df_municipios = df_municipios.drop('codigo')

In [19]:
cnae_principal = df_estabelecimentos.select(
    col("cnae_fiscal_principal").alias("cnae_fiscal"),
    col('cnpj_basico').alias('cnpj_cnae')
)

In [20]:
df_CNAE = df_CNAE.join(cnae_principal, df_CNAE['codigo'] == cnae_principal['cnae_fiscal'], 'left')

df_CNAE = df_CNAE.withColumnRenamed("codigo", "codigo_cnae")

df_CNAE = df_CNAE.withColumnRenamed("descricao", "descricao_cnae")

df_CNAE = df_CNAE.drop('cnae_fiscal')

- Une todos os dados em um único dataframe.

- df_empresas é a tabela fato.

- As demais são dimensões que enriquecem os dados.

In [21]:
df_trusted = df_empresas \
    .join(df_natureza_juridica, df_empresas["natureza_juridica"] == df_natureza_juridica["codigo"], "left") \
    .join(df_simples_nacional, df_empresas["cnpj_basico"] == df_simples_nacional["cnpj_1"], "left")\
    .join(df_municipios, df_empresas['cnpj_basico'] == df_municipios['cnpj_estabelecimento'], "left")\
    .join(df_CNAE, df_empresas['cnpj_basico'] == df_CNAE['cnpj_cnae'], 'left')

df_trusted = df_trusted.drop('cnpj_1')
df_trusted = df_trusted.drop('cnpj_estabelecimento')
df_trusted = df_trusted.drop('cnpj_cnae')

In [22]:
df_trusted = df_trusted.drop('ente_federativo_responsavel') #REMOÇÃO DE COLUNA NULL

In [23]:
df_trusted.show()

+-----------+--------------------+-----------------+------------------------+--------------+-------------+------+--------------------+-------------+------------------+---------------------+---------+--------------+-----------------+--------------+------------+----+-----------+--------------------+
|cnpj_basico|        razao_social|natureza_juridica|qualificacao_responsavel|capital_social|porte_empresa|codigo|           descricao|opcao_simples|data_opcao_simples|data_exclusao_simples|opcao_mei|data_opcao_mei|data_exclusao_mei|nome_municipio|municipio_id|  uf|codigo_cnae|      descricao_cnae|
+-----------+--------------------+-----------------+------------------------+--------------+-------------+------+--------------------+-------------+------------------+---------------------+---------+--------------+-----------------+--------------+------------+----+-----------+--------------------+
|   04631971|  2001 HANDLING LTDA|             2240|                      49|          0,00|           

                                                    SALVANDO O TRUSTED

In [24]:
df_trusted.limit(5000).write.format("delta") \
    .mode("overwrite") \
    .save(r"C:\Users\ResTIC16\Documents\IBGE_PROJETO\datawarehouse_ibge\data\TRS\empresas2")