In [0]:
# libs
import os
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql.functions import col, split
from pyspark.sql import functions as F

# paths
path_bronze_empre = "dbfs:/FileStore/shared_uploads/default_user/bronze/*EMPRECSV"

In [0]:
df_raw_emp = spark.read.options(header=False, inferSchema=True, sep=';').format("csv").load(path_bronze_empre)
df_raw_emp.printSchema()

In [0]:
df_raw_emp.count()

In [0]:
df_raw_emp.limit(20).display()

In [0]:
# Analisar estrutura de registros
# razao social
df_raw_emp.select("_c1").distinct().orderBy("_c1", how="asc").limit(100).display()

In [0]:
# Analisar estrutura de registros
# capital social
df_raw_emp.select("_c4").distinct().orderBy("_c4", how="asc").limit(100).display()

In [0]:
# Analisar estrutura de registros
# natureza juridica
df_raw_emp.select("_c2").distinct().orderBy("_c2", how="asc").limit(100).display()

In [0]:
# Analisar estrutura de registros
# codigo porte
df_raw_emp.select("_c5").distinct().orderBy("_c5", how="asc").limit(100).display()

In [0]:
# Empresas de porte 1 com maior capital social
df_raw_emp.select("*").filter(col("_c5") == 1).orderBy(col("_c4").desc()).limit(50).display()

In [0]:
# Empresas de porte 3 com maior capital social
df_raw_emp.select("*").filter(col("_c5") == 3).orderBy(col("_c4").desc()).limit(50).display()

In [0]:
# Empresas de porte 5 com maior capital social
df_raw_emp.select("*").filter(col("_c5") == 5).orderBy(col("_c4").desc()).limit(50).display()

In [0]:
# qualificacao do responsavel
df_raw_emp.select("_c3").distinct().orderBy("_c3", ascending=True).limit(100).display()

In [0]:
# Validacao da estrutura de qualificacao
df_raw_emp \
    .select("_c1", "_c2","_c3", "_c5") \
        .filter((col("_c3") == "0,00")) \
            .limit(100) \
                .display()

In [0]:
df_raw_emp.select("*").filter(col("_c1") == '"RONALD RIBEIRO CARDOSO";"2135"').display()

In [0]:
df_raw_emp.select("_c1").filter(col("_c1").like('%"%')).limit(1000).display()

In [0]:
df_raw_emp.select("_c1").filter(col("_c1").like(' %')).limit(1000).display()

In [0]:
# cidade e estado
df_raw_emp.select("_c6").distinct().orderBy("_c6", ascending=True).limit(100).display()

In [0]:
# empresas localizadas em sao paulo

df_raw_emp \
    .select("*") \
        .filter((col("_c6") == "SAO PAULO - SP")) \
            .limit(100) \
                    .display()

In [0]:
df_raw_emp.select("_c0").distinct().orderBy("_c0", ascending=False).limit(100).display()

In [0]:
df_raw_emp.limit(10).display()

In [0]:
# definicao de schema
schemaEmpresas = StructType([
    StructField("id", IntegerType(), True),
    StructField("razao_social", StringType(), True),
    StructField("cod_natureza_juridica", IntegerType(), True),
    StructField("cod_qualificao_responsavel", IntegerType(), True),
    StructField("capital_social", FloatType(), True),
    StructField("cod_porte", StringType(), True),
    StructField("localizacao", StringType(), True)
])

df_list_emp = spark.read.options(header=False, inferSchema=True, sep=';') \
                .format("csv") \
                    .schema(schemaEmpresas) \
                        .load(path_bronze_empre)

df_list_emp.printSchema()

In [0]:
df_list_emp.limit(20).display()

In [0]:
# lista de referencia para qualificação do responsável consta na documentação
df_emp_quali_resp = df_list_emp.withColumn("desc_qualificacao_responsavel",
                       F.when(col("cod_qualificao_responsavel") == 5, "Administrador")
                       .when(col("cod_qualificao_responsavel") == 10, "Diretor")
                       .when(col("cod_qualificao_responsavel") == 16, "Presidente")
                       .when(col("cod_qualificao_responsavel") == 17, "Procurador")
                       .when(col("cod_qualificao_responsavel") == 19, "Síndico (Condomínio)")
                       .when(col("cod_qualificao_responsavel") == 24, "Sócio Comanditado")
                       .when(col("cod_qualificao_responsavel") == 31, "Sócio Ostensivo")
                       .when(col("cod_qualificao_responsavel") == 32, "Tabelião")
                       .when(col("cod_qualificao_responsavel") == 34, "Titular de Empresa Individual Imobiliária")
                       .when(col("cod_qualificao_responsavel") == 39, "Diplomata")
                       .when(col("cod_qualificao_responsavel") == 40, "Cônsul")
                       .when(col("cod_qualificao_responsavel") == 41, "Representante de Organização Internacional")
                       .when(col("cod_qualificao_responsavel") == 42, "Oficial de Registro")
                       .when(col("cod_qualificao_responsavel") == 43, "Responsável")
                       .when(col("cod_qualificao_responsavel") == 46, "Ministro de Estado das Relações Exteriores")
                       .when(col("cod_qualificao_responsavel") == 49, "Sócio- Administrador")
                       .when(col("cod_qualificao_responsavel") == 50, "Empresário")
                       .when(col("cod_qualificao_responsavel") == 51, "Candidato a Cargo Político Eletivo")
                       .when(col("cod_qualificao_responsavel") == 54, "Fundador")
                       .when(col("cod_qualificao_responsavel") == 59, "Produtor Rural")
                       .when(col("cod_qualificao_responsavel") == 60, "Cônsul Honorário")
                       .when(col("cod_qualificao_responsavel") == 61, "Responsável Indígena")
                       .when(col("cod_qualificao_responsavel") == 62, "Representante da Instituição Extraterritorial")
                       .when(col("cod_qualificao_responsavel") == 65, "Titular Pessoa Física Residente ou Domiciliado no Brasil")
                       .otherwise("Qualificação Indefinida")
                       )

In [0]:
df_emp_quali_resp.limit(30).display()

In [0]:
df_list_emp = (df_list_emp
               .withColumn("razao_social_format",
                           F.regexp_replace(col("razao_social"), "^\\s+", ""))
                    .drop("razao_social")
                        .withColumnRenamed("razao_social_format", "razao_social")
                        ).select(
                            'id'
                            , 'razao_social'
                            , 'cod_natureza_juridica'
                            , 'cod_qualificao_responsavel'
                            , 'capital_social'
                            , 'cod_porte'
                            , 'localizacao'
                        )

In [0]:
df_list_emp.limit(20).display()

In [0]:
df_list_emp.select("razao_social").filter(col("razao_social_format").like(' %')).display()

In [0]:
df_list_emp.select("razao_social_format").limit(30).display()

In [0]:
df_list_emp.select("razao_social").filter(col("razao_social").like('%";"%')).limit(100).display()

In [0]:
split_df = (df_list_emp.filter(col("razao_social").like('%";"%'))
            .select([split(col(c), ';').alias(c) for c in df_list_emp.columns]))

display(split_df)

In [0]:
%python
from pyspark.sql.functions import col, split, regexp_replace

# Split all columns by ';'
split_df = (df_list_emp.filter(col("razao_social").like('%";"%'))
            .select([split(col(c), ';').alias(c) for c in df_list_emp.columns]))

# Remove square brackets from all columns
cleaned_df = split_df.select([regexp_replace(col(c).cast("string"), r'[\[\]]', '').alias(c) for c in split_df.columns])

display(cleaned_df)

In [0]:
df_list_emp.limit(20).display()

In [0]:
# Criar schema no metastore
spark.sql("CREATE SCHEMA IF NOT EXISTS hive_metastore.db_rfb")
# Salvar tabela delta
df_list_emp.write.mode("append").format("delta").saveAsTable("hive_metastore.db_rfb.tbl_slv_empresas")

In [0]:
# Path source
path_bronze_empre = "dbfs:/FileStore/shared_uploads/default_user/bronze/*EMPRECSV"

# definicao de schema
schemaEmpresas = StructType([
    StructField("cnpj", IntegerType(), True),
    StructField("razao_social", StringType(), True),
    StructField("natureza_juridica", IntegerType(), True),
    StructField("qualificao_responsavel", IntegerType(), True),
    StructField("capital_social", FloatType(), True),
    StructField("cod_porte", StringType(), True),
    StructField("localizacao", StringType(), True)
])

# Leitura de arquivo bruto
df_list_emp = spark.read.options(header=False, inferSchema=True, sep=';') \
                .format("csv") \
                    .schema(schemaEmpresas) \
                        .load(path_bronze_empre)

# Remoção de espaços em branco                       
df_list_emp = (df_list_emp
               .withColumn("razao_social_format",
                           F.regexp_replace(col("razao_social"), "^\\s+", ""))
                    .drop("razao_social")
                        .withColumnRenamed("razao_social_format", "razao_social")
                        ).select(
                            'cnpj'
                            , 'razao_social'
                            , 'natureza_juridica'
                            , 'qualificao_responsavel'
                            , 'capital_social'
                            , 'cod_porte'
                        )

In [0]:
# Criar schema no metastore
spark.sql("CREATE SCHEMA IF NOT EXISTS hive_metastore.db_rfb")
# Salvar tabela delta
df_list_emp.write.mode("append").format("delta").saveAsTable("hive_metastore.db_rfb.tbl_slv_empresas")