# Análise de Sócios e Empresas usando Apache Spark

Este notebook documenta o desenvolvimento de um pipeline de processamento de dados utilizando Apache Spark. O objetivo principal é analisar grandes volumes de dados da Receita Federal do Brasil (RFB) para identificar padrões e anomalias entre sócios e suas qualificações. A análise foca em identificar sócios que participam de múltiplas empresas, sócios com diversas qualificações e a distribuição etária dos sócios.

O projeto faz parte de um estudo acadêmico no contexto do mestrado em Ciências de Dados do Programa de Pós-Graduação em Ciência da Computação da Universidade de Brasília (PPCA-UNB). Através deste pipeline, buscamos explorar técnicas de processamento paralelo e big data para lidar com desafios comuns em cenários de dados massivos.

### Objetivos Específicos:
1. **Importação e Configuração**: Configurar o ambiente Spark e importar os dados da RFB armazenados no Google Cloud Storage.
2. **Transformações Iniciais**: Limpar e filtrar os dados para obter um conjunto de dados relevante para a análise.
3. **Análises Específicas**: Identificar sócios com múltiplas empresas, sócios com diversas qualificações e a distribuição etária dos sócios.
4. **Persistência dos Resultados**: Salvar os resultados processados de volta no Google Cloud Storage para análises posteriores.

### Estrutura do Notebook:
1. **Configuração do Ambiente**: Importação das bibliotecas e configuração do Spark.
2. **Leitura dos Dados**: Leitura dos dados brutos do Google Cloud Storage.
3. **Pré-processamento**: Limpeza e filtração dos dados.
4. **Análise de Dados**: Identificação de sócios com múltiplas empresas e diversas qualificações, análise da faixa etária dos sócios.
5. **Salvamento dos Resultados**: Persistência dos dados processados em formato CSV.

Este pipeline demonstra a eficácia do uso de Apache Spark para manipulação e análise de grandes volumes de dados, oferecendo insights valiosos sobre as atividades dos sócios e empresas no Brasil. Esperamos que este estudo contribua para uma melhor compreensão e gestão dos dados empresariais no país.

In [72]:
from google.cloud import storage
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, FloatType, ArrayType
from pyspark.sql.functions import col, when, explode, split, concat_ws, year, current_date

# Definir o caminho para o arquivo JSON de credenciais
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/home/dataproc/semiotic-cove-430613-t7-c9df69e29a70.json"

## Configuração do ambiente

### Abertura de Sessão

In [2]:
spark = SparkSession.builder \
    .appName("Leitura Arquivos CSV's do GCS") \
    .getOrCreate()

24/08/02 19:24:18 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [12]:
# Obter o contexto Spark
sc = spark.sparkContext

# Verificar o número de núcleos totais no cluster
total_cores = sc._jsc.sc().getExecutorMemoryStatus().size()
print(f"Total number of cores in the cluster: {total_cores}")

# Verificar o número de núcleos disponíveis por executor
conf = sc.getConf()
executor_cores = conf.get("spark.executor.cores")
print(f"Number of cores per executor: {executor_cores}")

Total number of cores in the cluster: 9
Number of cores per executor: 1


Os arquivos já foram baixados, decompactados e feito `upload` no Google Cloud Storage. Permite uma interação mais transparente com os clusters do Google Cloud DataProc.

*i.* Lista os arquivos no bucket GCS

In [3]:
def list_gcs_files(bucket_name, prefix):
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)
    blobs = bucket.list_blobs(prefix=prefix)
    return [blob.name for blob in blobs]

*ii.* Agrupa os arquivos pelo 'prefixo'do nome do arquivo

In [4]:
def list_and_group_files(files):
    grouped_files = {
        "EMPRECSV": [f for f in files if "EMPRE" in f],
        "ESTABELE": [f for f in files if "ESTABELE" in f],
        "SOCIOCSV": [f for f in files if "SOCIO" in f],
        "SIMPLES": [f for f in files if "SIMPLES" in f],
        "CNAE": [f for f in files if "CNAE" in f],
        "MOTI": [f for f in files if "MOTI" in f],
        "MUNIC": [f for f in files if "MUNIC" in f],
        "NATJU": [f for f in files if "NATJU" in f],
        "PAIS": [f for f in files if "PAIS" in f],
        "QUALS": [f for f in files if "QUALS" in f]
    }
    return grouped_files

## Coleta dos dados

Leitura e agrupamentos dos dados

In [5]:
bucket_name = "dados-massivos"
prefix = "extracted_files/"
try:
    file_list = list_gcs_files(bucket_name, prefix)
    print("Arquivos no bucket GCS:")
    for file in file_list:
        print(file)

    grouped_files = list_and_group_files(file_list)
    print("Arquivos agrupados:")
    for key, files in grouped_files.items():
        print(f"{key}: {files}")
except Exception as e:
    print(f"Erro ao listar os arquivos: {e}")

Arquivos no bucket GCS:
extracted_files/F.K03200$W.SIMPLES.CSV.D40608.csv
extracted_files/F.K03200$Z.D40608.CNAECSV.csv
extracted_files/F.K03200$Z.D40608.MOTICSV.csv
extracted_files/F.K03200$Z.D40608.MUNICCSV.csv
extracted_files/F.K03200$Z.D40608.NATJUCSV.csv
extracted_files/F.K03200$Z.D40608.PAISCSV.csv
extracted_files/F.K03200$Z.D40608.QUALSCSV.csv
extracted_files/K3241.K03200Y0.D40608.EMPRECSV.csv
extracted_files/K3241.K03200Y0.D40608.ESTABELE.csv
extracted_files/K3241.K03200Y0.D40608.SOCIOCSV.csv
extracted_files/K3241.K03200Y1.D40608.EMPRECSV.csv
extracted_files/K3241.K03200Y1.D40608.ESTABELE.csv
extracted_files/K3241.K03200Y1.D40608.SOCIOCSV.csv
extracted_files/K3241.K03200Y2.D40608.EMPRECSV.csv
extracted_files/K3241.K03200Y2.D40608.ESTABELE.csv
extracted_files/K3241.K03200Y2.D40608.SOCIOCSV.csv
extracted_files/K3241.K03200Y3.D40608.EMPRECSV.csv
extracted_files/K3241.K03200Y3.D40608.ESTABELE.csv
extracted_files/K3241.K03200Y3.D40608.SOCIOCSV.csv
extracted_files/K3241.K03200Y4.D406

### Definição dos *schemas* DataFrames Spark

In [6]:
schemas = {
    "EMPRECSV": StructType([
        StructField("cnpj_basico", StringType(), True),
        StructField("razao_social", StringType(), True),
        StructField("natureza_juridica", StringType(), True),
        StructField("qualificacao_responsavel", StringType(), True),
        StructField("capital_social", FloatType(), True),
        StructField("porte_empresa", StringType(), True),
        StructField("ente_federativo_responsavel", StringType(), True)
    ]),
    "ESTABELE": StructType([
        StructField("cnpj_basico", StringType(), True),
        StructField("cnpj_ordem", StringType(), True),
        StructField("cnpj_dv", StringType(), True),
        StructField("identificador_matriz_filial", StringType(), True),
        StructField("nome_fantasia", StringType(), True),
        StructField("situacao_cadastral", StringType(), True),
        StructField("data_situacao_cadastral", StringType(), True),
        StructField("motivo_situacao_cadastral", StringType(), True),
        StructField("nome_cidade_exterior", StringType(), True),
        StructField("pais", StringType(), True),
        StructField("data_inicio_atividade", StringType(), True),
        StructField("cnae_fiscal_principal", StringType(), True),
        StructField("cnae_fiscal_secundaria", StringType(), True),
        StructField("tipo_logradouro", StringType(), True),
        StructField("logradouro", StringType(), True),
        StructField("numero", StringType(), True),
        StructField("complemento", StringType(), True),
        StructField("bairro", StringType(), True),
        StructField("cep", StringType(), True),
        StructField("uf", StringType(), True),
        StructField("municipio", StringType(), True),
        StructField("ddd_1", StringType(), True),
        StructField("telefone_1", StringType(), True),
        StructField("ddd_2", StringType(), True),
        StructField("telefone_2", StringType(), True),
        StructField("ddd_fax", StringType(), True),
        StructField("fax", StringType(), True),
        StructField("correio_eletronico", StringType(), True),
        StructField("situacao_especial", StringType(), True),
        StructField("data_situacao_especial", StringType(), True)
    ]),
    "SOCIOCSV": StructType([
        StructField("cnpj_basico", StringType(), True),
        StructField("identificador_socio", StringType(), True),
        StructField("nome_socio_razao_social", StringType(), True),
        StructField("cpf_cnpj_socio", StringType(), True),
        StructField("qualificacao_socio", StringType(), True),
        StructField("data_entrada_sociedade", StringType(), True),
        StructField("pais", StringType(), True),
        StructField("representante_legal", StringType(), True),
        StructField("nome_do_representante", StringType(), True),
        StructField("qualificacao_representante_legal", StringType(), True),
        StructField("faixa_etaria", StringType(), True)
    ]),
    "SIMPLES": StructType([
        StructField("cnpj_basico", StringType(), True),
        StructField("opcao_pelo_simples", StringType(), True),
        StructField("data_opcao_simples", StringType(), True),
        StructField("data_exclusao_simples", StringType(), True),
        StructField("opcao_mei", StringType(), True),
        StructField("data_opcao_mei", StringType(), True),
        StructField("data_exclusao_mei", StringType(), True)
    ]),
    "CNAE": StructType([
        StructField("codigo", StringType(), True),
        StructField("descricao", StringType(), True)
    ]),
    "MOTI": StructType([
        StructField("codigo", StringType(), True),
        StructField("descricao", StringType(), True)
    ]),
    "MUNIC": StructType([
        StructField("codigo", StringType(), True),
        StructField("descricao", StringType(), True)
    ]),
    "NATJU": StructType([
        StructField("codigo", StringType(), True),
        StructField("descricao", StringType(), True)
    ]),
    "PAIS": StructType([
        StructField("codigo", StringType(), True),
        StructField("descricao", StringType(), True)
    ]),
    "QUALS": StructType([
        StructField("codigo", StringType(), True),
        StructField("descricao", StringType(), True)
    ])
}

### Leitura dos CSV

In [7]:
dataframes = {}
for key, schema in schemas.items():
    if key in grouped_files and grouped_files[key]:
        csv_paths = ["gs://{}/{}".format(bucket_name, file) for file in grouped_files[key]]
        df = spark.read.format("csv") \
            .option("header", "false") \
            .option("sep", ";") \
            .option("encoding", "ISO-8859-1") \
            .schema(schema) \
            .load(csv_paths)
        dataframes[key] = df
        print(f"{key} DataFrame:")
        df.show(3)

EMPRECSV DataFrame:


                                                                                

+-----------+--------------------+-----------------+------------------------+--------------+-------------+---------------------------+
|cnpj_basico|        razao_social|natureza_juridica|qualificacao_responsavel|capital_social|porte_empresa|ente_federativo_responsavel|
+-----------+--------------------+-----------------+------------------------+--------------+-------------+---------------------------+
|   41273594|OZINETE DELFINO C...|             2135|                      50|          NULL|           01|                       NULL|
|   41273595|GILVAN PEREIRA XA...|             2135|                      50|          NULL|           01|                       NULL|
|   41273596|RODRIGO JOSE FERR...|             2135|                      50|          NULL|           01|                       NULL|
+-----------+--------------------+-----------------+------------------------+--------------+-------------+---------------------------+
only showing top 3 rows

ESTABELE DataFrame:


24/08/02 19:24:37 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-----------+----------+-------+---------------------------+--------------------+------------------+-----------------------+-------------------------+--------------------+----+---------------------+---------------------+----------------------+---------------+-----------+------+-----------+-----------+--------+---+---------+-----+----------+-----+----------+-------+----+--------------------+-----------------+----------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|       nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_cidade_exterior|pais|data_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_logradouro| logradouro|numero|complemento|     bairro|     cep| uf|municipio|ddd_1|telefone_1|ddd_2|telefone_2|ddd_fax| fax|  correio_eletronico|situacao_especial|data_situacao_especial|
+-----------+----------+-------+---------------------------+--------------------+------------------+-----------------------+------

In [8]:
type(dataframes)

dict

### Exploração dos dados

In [9]:
for key, df in dataframes.items():
    print(f"Schema for {key}:")
    df.printSchema()

Schema for EMPRECSV:
root
 |-- cnpj_basico: string (nullable = true)
 |-- razao_social: string (nullable = true)
 |-- natureza_juridica: string (nullable = true)
 |-- qualificacao_responsavel: string (nullable = true)
 |-- capital_social: float (nullable = true)
 |-- porte_empresa: string (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)

Schema for ESTABELE:
root
 |-- cnpj_basico: string (nullable = true)
 |-- cnpj_ordem: string (nullable = true)
 |-- cnpj_dv: string (nullable = true)
 |-- identificador_matriz_filial: string (nullable = true)
 |-- nome_fantasia: string (nullable = true)
 |-- situacao_cadastral: string (nullable = true)
 |-- data_situacao_cadastral: string (nullable = true)
 |-- motivo_situacao_cadastral: string (nullable = true)
 |-- nome_cidade_exterior: string (nullable = true)
 |-- pais: string (nullable = true)
 |-- data_inicio_atividade: string (nullable = true)
 |-- cnae_fiscal_principal: string (nullable = true)
 |-- cnae_fiscal_secun

In [10]:
for key, df in dataframes.items():
    row_count = df.count()
    print(f"Number of rows in {key}: {row_count}")

                                                                                

Number of rows in EMPRECSV: 57963081


                                                                                

Number of rows in ESTABELE: 60944826


                                                                                

Number of rows in SOCIOCSV: 24251230


                                                                                

Number of rows in SIMPLES: 39373015
Number of rows in CNAE: 1359
Number of rows in MOTI: 61
Number of rows in MUNIC: 5571
Number of rows in NATJU: 90
Number of rows in PAIS: 255
Number of rows in QUALS: 68


In [11]:
dataframes['EMPRECSV'].count()

                                                                                

57963081

### Persistências do dados brutos

In [None]:
bucket_output_path = "gs://dados-massivos/dados-agrupados"

for key, df in dataframes.items():
    output_path = f"{bucket_output_path}/{key}.csv"
    df.coalesce(1).write.mode("overwrite").option("header", "true").csv(output_path)
    print(f"DataFrame for {key} saved to {output_path}")

## Preparação dos dados

### Limpeza e feature engineering dos df Spark

#### DataFrames Spark

In [13]:
list(dataframes.keys())[0]

'EMPRECSV'

In [14]:
dataframes.keys()

dict_keys(['EMPRECSV', 'ESTABELE', 'SOCIOCSV', 'SIMPLES', 'CNAE', 'MOTI', 'MUNIC', 'NATJU', 'PAIS', 'QUALS'])

In [15]:
empresa_df = dataframes['EMPRECSV']
estabelecimento_df = dataframes['ESTABELE']
socio_df = dataframes['SOCIOCSV']
simples_df = dataframes['SIMPLES']
cnae_df = dataframes['CNAE']
moti_df = dataframes['MOTI']
munic_df = dataframes['MUNIC']
natju_df = dataframes['NATJU']
pais_df = dataframes['PAIS']
quals_df = dataframes['QUALS']

Verificação da quantidade de registros para manupulações

In [16]:
empresa_df.count()

                                                                                

57963081

In [17]:
type(empresa_df)

pyspark.sql.dataframe.DataFrame

In [18]:
natju_df.show()

+------+--------------------+
|codigo|           descricao|
+------+--------------------+
|  0000|Natureza Jurídica...|
|  3271|Órgão de Direção ...|
|  3280|Comitê Financeiro...|
|  3298|Frente Plebiscitá...|
|  3301|Organização Socia...|
|  3999|  Associação Privada|
|  4014|Empresa Individua...|
|  4090|Candidato a Cargo...|
|  4120|Produtor Rural (P...|
|  5010|Organização Inter...|
|  5029|Representação Dip...|
|  1015|Órgão Público do ...|
|  1023|Órgão Público do ...|
|  1031|Órgão Público do ...|
|  1040|Órgão Público do ...|
|  1058|Órgão Público do ...|
|  1066|Órgão Público do ...|
|  1074|Órgão Público do ...|
|  1082|Órgão Público do ...|
|  1104|   Autarquia Federal|
+------+--------------------+
only showing top 20 rows



## Dados de Empresa

1. Os dados do arquivo "Empresa" se relaciona com as informações de "Natureza Jurídica" e "Qualificação Sócios"

In [19]:
# Join com natju_df para obter a descrição da natureza jurídica
empresa_df = empresa_df.join(natju_df, empresa_df.natureza_juridica == natju_df.codigo, 'left') \
    .withColumnRenamed('descricao', 'descricao_natureza_juridica')

# Join com quals_df para obter a descrição da qualificação do responsável
empresa_df = empresa_df.join(quals_df, empresa_df.qualificacao_responsavel == quals_df.codigo, 'left') \
    .withColumnRenamed('descricao', 'descricao_qualificacao_responsavel')

# Dataste final-> apenas as colunas necessárias
empresa_final_df = empresa_df.select(
    col('cnpj_basico'),
    col('razao_social'),
    col('descricao_natureza_juridica'),
    col('descricao_qualificacao_responsavel'),
    col('natureza_juridica')
)

In [20]:
type(empresa_final_df)

pyspark.sql.dataframe.DataFrame

In [21]:
empresa_final_df.show(5)

                                                                                

+-----------+--------------------+---------------------------+----------------------------------+-----------------+
|cnpj_basico|        razao_social|descricao_natureza_juridica|descricao_qualificacao_responsavel|natureza_juridica|
+-----------+--------------------+---------------------------+----------------------------------+-----------------+
|   41273594|OZINETE DELFINO C...|       Empresário (Indiv...|                        Empresário|             2135|
|   41273595|GILVAN PEREIRA XA...|       Empresário (Indiv...|                        Empresário|             2135|
|   41273596|RODRIGO JOSE FERR...|       Empresário (Indiv...|                        Empresário|             2135|
|   41273597|PACHARRUS QUEIROZ...|       Empresário (Indiv...|                        Empresário|             2135|
|   41273598|GLORIA VIANA DIAS...|       Empresário (Indiv...|                        Empresário|             2135|
+-----------+--------------------+---------------------------+----------

Persistência

In [22]:
empresa_final_df.coalesce(1).write.mode("overwrite").option("header", "true").csv("gs://dados-massivos/dados-finais/empresa_final")

                                                                                

### Distribuição da Natureza Jurídica
Contar e visualizar a frequência de cada tipo de natureza jurídica.

In [23]:
natureza_juridica_counts = empresa_df.groupBy(['descricao_natureza_juridica', 'natureza_juridica']).count().orderBy('count', ascending=False)

# Mostrar o resultado
natureza_juridica_counts.show(5)



+---------------------------+-----------------+--------+
|descricao_natureza_juridica|natureza_juridica|   count|
+---------------------------+-----------------+--------+
|       Empresário (Indiv...|             2135|37042209|
|       Sociedade Empresá...|             2062|13798874|
|       Candidato a Cargo...|             4090| 2473325|
|         Associação Privada|             3999| 1382737|
|       Sociedade Simples...|             2240|  810248|
+---------------------------+-----------------+--------+
only showing top 5 rows



                                                                                

Persistência

In [24]:
natureza_juridica_counts.coalesce(1).write.mode("overwrite").option("header", "true").csv("gs://dados-massivos/dados-finais/natureza_juridica_counts")

                                                                                

### Distribuição da Qualificação dos Responsáveis
Contar e visualizar a frequência de cada tipo de qualificação dos responsáveis.

In [25]:
qualificacao_responsavel_counts = empresa_df.groupBy('descricao_qualificacao_responsavel').count().orderBy('count', ascending=False)

# Mostrar o resultado
qualificacao_responsavel_counts.show(5)



+----------------------------------+--------+
|descricao_qualificacao_responsavel|   count|
+----------------------------------+--------+
|                        Empresário|37027671|
|               Sócio-Administrador|14198204|
|              Candidato a cargo...| 2473326|
|                        Presidente| 1781067|
|                    Produtor Rural|  600120|
+----------------------------------+--------+
only showing top 5 rows



                                                                                

Persistência

In [26]:
qualificacao_responsavel_counts.coalesce(1).write.mode("overwrite").option("header", "true").csv("gs://dados-massivos/dados-finais/qualificacao_responsavel_counts")

                                                                                

### Empresas por Porte
Contar e visualizar a frequência das empresas por porte.

In [27]:
porte_empresa_counts = empresa_df.groupBy('porte_empresa').count().orderBy('count', ascending=False)

# Mostrar o resultado
porte_empresa_counts.show(5)



+-------------+--------+
|porte_empresa|   count|
+-------------+--------+
|           01|42388757|
|           05|13869829|
|           03| 1634685|
|         NULL|   69810|
+-------------+--------+



                                                                                

Persistência

In [28]:
porte_empresa_counts.coalesce(1).write.mode("overwrite").option("header", "true").csv("gs://dados-massivos/dados-finais/porte_empresa_counts")

                                                                                

### Relação entre Natureza Jurídica e Qualificação dos Responsáveis
Explorar a relação entre natureza jurídica e a qualificação dos responsáveis.

In [29]:
relacao_natureza_qualificacao = empresa_df.groupBy('descricao_natureza_juridica', 'descricao_qualificacao_responsavel').count().orderBy('count', ascending=False)

# Mostrar o resultado
relacao_natureza_qualificacao.show()



+---------------------------+----------------------------------+--------+
|descricao_natureza_juridica|descricao_qualificacao_responsavel|   count|
+---------------------------+----------------------------------+--------+
|       Empresário (Indiv...|                        Empresário|37027559|
|       Sociedade Empresá...|               Sócio-Administrador|13509437|
|       Candidato a Cargo...|              Candidato a cargo...| 2473325|
|         Associação Privada|                        Presidente| 1344894|
|       Produtor Rural (P...|                    Produtor Rural|  600120|
|       Sociedade Simples...|               Sócio-Administrador|  575755|
|        Condomínio Edilício|              Síndico (Condomínio)|  293284|
|       Sociedade Empresá...|                     Administrador|  273042|
|       Sociedade Simples...|                     Sócio-Gerente|  228381|
|       Empresa Individua...|              Titular de Empres...|  215409|
|       Empresa Individua...|         

                                                                                

Persistencia

In [30]:
relacao_natureza_qualificacao.coalesce(1).write.mode("overwrite").option("header", "true").csv("gs://dados-massivos/dados-finais/relacao_natureza_qualificacao")

                                                                                

## Dados Cadastrais das Empresas

1. Os dados do arquivo "Estabelecimento" se relaciona com as informações de "CNAE"

In [31]:
# Selecionar as colunas necessárias
estabelecimento_df = estabelecimento_df.select(
    'cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'situacao_cadastral',
    'data_situacao_cadastral', 'data_inicio_atividade', 'cnae_fiscal_principal',
    'cnae_fiscal_secundaria', 'uf', 'municipio', 'identificador_matriz_filial'
)

# Mapear os valores das colunas identificador_matriz_filial e situacao_cadastral
estabelecimento_df = estabelecimento_df.withColumn(
    'identificador_matriz_filial',
    when(col('identificador_matriz_filial') == 1, 'MATRIZ').otherwise('FILIAL')
)

estabelecimento_df = estabelecimento_df.withColumn(
    'situacao_cadastral',
    when(col('situacao_cadastral') == 1, 'NULA')
    .when(col('situacao_cadastral') == 2, 'ATIVA')
    .when(col('situacao_cadastral') == 3, 'SUSPENSA')
    .when(col('situacao_cadastral') == 4, 'INAPTA')
    .when(col('situacao_cadastral') == 8, 'BAIXADA')
)

# Fazer o explode da coluna cnae_fiscal_secundaria
estabelecimento_df = estabelecimento_df.withColumn(
    'cnae_fiscal_secundaria', split(col('cnae_fiscal_secundaria'), ',')
)

estabelecimento_df = estabelecimento_df.withColumn(
    'cnae_fiscal_secundaria', explode(col('cnae_fiscal_secundaria'))
)

# Realizar os joins com o cnae_df para obter as descrições dos CNAEs
estabelecimento_df = estabelecimento_df.join(
    cnae_df.withColumnRenamed('descricao', 'descricao_cnae_fiscal_principal'),
    estabelecimento_df.cnae_fiscal_principal == cnae_df.codigo,
    'left'
).drop(cnae_df.codigo)

estabelecimento_df = estabelecimento_df.join(
    cnae_df.withColumnRenamed('descricao', 'descricao_cnae_fiscal_secundaria'),
    estabelecimento_df.cnae_fiscal_secundaria == cnae_df.codigo,
    'left'
).drop(cnae_df.codigo)

# Selecionar e renomear as colunas necessárias no DataFrame final
estabelecimento_final_df = estabelecimento_df.select(
    col('cnpj_basico'),
    col('cnpj_ordem'),
    col('cnpj_dv'),
    col('situacao_cadastral'),
    col('data_situacao_cadastral'),
    col('data_inicio_atividade'),
    col('descricao_cnae_fiscal_principal').alias('cnae_fiscal_principal'),
    col('descricao_cnae_fiscal_secundaria').alias('cnae_fiscal_secundaria'),
    col('uf'),
    col('municipio'),
    col('identificador_matriz_filial')
)

In [32]:
estabelecimento_final_df.show(5)

+-----------+----------+-------+------------------+-----------------------+---------------------+---------------------+----------------------+---+---------+---------------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|situacao_cadastral|data_situacao_cadastral|data_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria| uf|municipio|identificador_matriz_filial|
+-----------+----------+-------+------------------+-----------------------+---------------------+---------------------+----------------------+---+---------+---------------------------+
|   00336285|      0001|     36|           BAIXADA|               20180612|             19941206| Comércio varejist...|  Comércio varejist...| ES|     5625|                     MATRIZ|
|   00336285|      0001|     36|           BAIXADA|               20180612|             19941206| Comércio varejist...|  Comércio varejist...| ES|     5625|                     MATRIZ|
|   00336285|      0001|     36|           BAIXADA|               20180612|

### Contagem por Situação Cadastral
Contar a frequência de cada situação cadastral

In [33]:
estabelecimento_final_df_filtrada = estabelecimento_final_df

In [34]:
situacao_cadastral_counts = estabelecimento_final_df_filtrada.groupBy('situacao_cadastral').count().orderBy('count', ascending=False)

# Mostrar o resultado
situacao_cadastral_counts.show(5)



+------------------+--------+
|situacao_cadastral|   count|
+------------------+--------+
|             ATIVA|56971947|
|           BAIXADA|27074146|
|            INAPTA|11867496|
|          SUSPENSA|  260950|
|              NULA|   68757|
+------------------+--------+
only showing top 5 rows



                                                                                

Persistência

In [35]:
situacao_cadastral_counts.coalesce(1).write.mode("overwrite").option("header", "true").csv("gs://dados-massivos/dados-finais/situacao_cadastral_counts")

                                                                                

### Contagem por Estado (UF)
Contar a frequência de empresas que não estão baixadas por estado (UF).

In [36]:
estabelecimento_final_df_filtrada = estabelecimento_final_df.filter(estabelecimento_final_df.situacao_cadastral != 'BAIXADA')

In [37]:
uf_counts = estabelecimento_final_df_filtrada.groupBy('uf').count().orderBy('count', ascending=False)

# Mostrar o resultado
uf_counts.show(5)



+---+--------+
| uf|   count|
+---+--------+
| SP|16680450|
| MG| 6243105|
| RJ| 5618747|
| PR| 4453362|
| RS| 4369506|
+---+--------+
only showing top 5 rows



                                                                                

Persistências

In [38]:
uf_counts.coalesce(1).write.mode("overwrite").option("header", "true").csv("gs://dados-massivos/dados-finais/uf_counts")

                                                                                

### Contagem por Município
Contar a frequência de empresas ativas por município.

In [39]:
municipio_counts = estabelecimento_final_df_filtrada.groupBy('municipio').count().orderBy('count', ascending=False)

# Mostrar o resultado
municipio_counts.show(5)



+---------+-------+
|municipio|  count|
+---------+-------+
|     7107|5496331|
|     6001|2331080|
|     9701|1395717|
|     0255|1249418|
|     4123|1104842|
+---------+-------+
only showing top 5 rows



                                                                                

Persistências

In [40]:
municipio_counts.coalesce(1).write.mode("overwrite").option("header", "true").csv("gs://dados-massivos/dados-finais/municipio_counts")

                                                                                

### Distribuição por Data de Início de Atividade
Verificar a distribuição das empresas pela data de início de atividade.

In [42]:
# df_filtered = estabelecimento_final_df_filtrada.withColumn('ano_inicio_atividade', year('data_inicio_atividade'))
# ano_inicio_counts = estabelecimento_final_df_filtrada.groupBy('ano_inicio_atividade').count().orderBy('ano_inicio_atividade')
# ano_inicio_counts.show(5)

Persistência

In [43]:
# ano_inicio_counts.coalesce(1).write.mode("overwrite").option("header", "true").csv("gs://dados-massivos/dados-finais/ano_inicio_counts")

### Principais CNAE Fiscal Principal
Listar os principais códigos de CNAE fiscal principal.

In [44]:
correlacao_uf_cnae = estabelecimento_final_df_filtrada.groupBy('uf', 'cnae_fiscal_principal').count().orderBy('count', ascending=False)

# Mostrar o resultado
correlacao_uf_cnae.show()



+---+---------------------+------+
| uf|cnae_fiscal_principal| count|
+---+---------------------+------+
| SP| Comércio varejist...|653629|
| SP|   Obras de alvenaria|496528|
| SP|   Promoção de vendas|298451|
| SP| Instalação e manu...|293318|
| MG| Comércio varejist...|287458|
| SP| Construção de edi...|268017|
| SP| Preparação de doc...|246039|
| RS| Comércio varejist...|237605|
| SP| Comércio varejist...|219563|
| SP| Treinamento em de...|218724|
| SP| Outras atividades...|217727|
| SP| Restaurantes e si...|207157|
| SP| Transporte rodovi...|203983|
| SP| Comércio varejist...|203450|
| RJ|   Obras de alvenaria|201910|
| SP| Serviços combinad...|201268|
| SP| Serviços de organ...|199998|
| RS|   Obras de alvenaria|198997|
| RJ| Comércio varejist...|194488|
| SP| Transporte rodovi...|191000|
+---+---------------------+------+
only showing top 20 rows



                                                                                

Persistencia

In [45]:
correlacao_uf_cnae.coalesce(1).write.mode("overwrite").option("header", "true").csv("gs://dados-massivos/dados-finais/correlacao_uf_cnae")

                                                                                

### Análise de Atividades por Região
Analisar a distribuição de atividades econômicas por região

In [47]:
regiao_atividade_counts = estabelecimento_final_df_filtrada.groupBy('municipio', 'cnae_fiscal_principal').count().orderBy('count', ascending=False)

# Mostrar o resultado
regiao_atividade_counts.show()



+---------+---------------------+------+
|municipio|cnae_fiscal_principal| count|
+---------+---------------------+------+
|     7107| Comércio varejist...|216362|
|     7107|   Obras de alvenaria|114826|
|     7107| Serviços de organ...| 98714|
|     7107| Outras atividades...| 98188|
|     7107|   Promoção de vendas| 97652|
|     7107| Serviços combinad...| 97031|
|     7107| Suporte técnico, ...| 95671|
|     7107| Preparação de doc...| 94217|
|     7107| Construção de edi...| 79986|
|     7107| Treinamento em de...| 79956|
|     9701| Construção de edi...| 77841|
|     7107| Instalação e manu...| 77695|
|     7107| Comércio varejist...| 75505|
|     7107| Atividades de con...| 74437|
|     6001| Comércio varejist...| 68209|
|     7107| Restaurantes e si...| 67633|
|     7107| Comercio varejist...| 63321|
|     6001|   Obras de alvenaria| 63163|
|     7107| Consultoria em te...| 60626|
|     7107| Transporte rodovi...| 60432|
+---------+---------------------+------+
only showing top

                                                                                

Persistencia

In [48]:
regiao_atividade_counts.coalesce(1).write.mode("overwrite").option("header", "true").csv("gs://dados-massivos/dados-finais/regiao_atividade_counts")

                                                                                

## Dados dos Sócios e Representantes

1. Os dados do arquivo "Estabelecimento" se relaciona com as informações de "CNAE"

In [49]:
# Mapear os valores das colunas identificador_socio e faixa_etaria
socio_df = socio_df.withColumn(
    'identificador_socio',
    when(col('identificador_socio') == 1, 'PESSOA JURIDICA')
    .when(col('identificador_socio') == 2, 'PESSOA FISICA')
    .when(col('identificador_socio') == 3, 'ESTRANGEIRO')
)

socio_df = socio_df.withColumn(
    'faixa_etaria',
    when(col('faixa_etaria') == 0, 'não se aplica')
    .when(col('faixa_etaria') == 1, '0 a 12 anos')
    .when(col('faixa_etaria') == 2, '13 a 20 anos')
    .when(col('faixa_etaria') == 3, '21 a 30 anos')
    .when(col('faixa_etaria') == 4, '31 a 40 anos')
    .when(col('faixa_etaria') == 5, '41 a 50 anos')
    .when(col('faixa_etaria') == 6, '51 a 60 anos')
    .when(col('faixa_etaria') == 7, '61 a 70 anos')
    .when(col('faixa_etaria') == 8, '71 a 80 anos')
    .when(col('faixa_etaria') == 9, 'maiores de 80 anos')
)

# Realizar os joins com quals_df para obter as descrições
socio_df = socio_df.join(
    quals_df.withColumnRenamed('descricao', 'descricao_qualificacao_socio'),
    socio_df.qualificacao_socio == quals_df.codigo,
    'left'
).drop(quals_df.codigo)

socio_df = socio_df.join(
    quals_df.withColumnRenamed('descricao', 'descricao_qualificacao_representante_legal'),
    socio_df.qualificacao_representante_legal == quals_df.codigo,
    'left'
).drop(quals_df.codigo)

# Criar as novas colunas id_socio e id_representante
socio_df = socio_df.withColumn(
    'id_socio', concat_ws('-', col('nome_socio_razao_social'), col('cpf_cnpj_socio'))
)

socio_df = socio_df.withColumn(
    'id_representante', concat_ws('-', col('nome_do_representante'), col('representante_legal'))
)

# Selecionar e renomear as colunas necessárias no DataFrame final
socio_final_df = socio_df.select(
    col('cnpj_basico'),
    col('nome_socio_razao_social'),
    col('descricao_qualificacao_socio').alias('qualificacao_socio'),
    col('identificador_socio'),
    col('faixa_etaria'),
    col('data_entrada_sociedade'),
    col('id_socio'),
    col('nome_do_representante'),
    col('descricao_qualificacao_representante_legal').alias('qualificacao_representante_legal'),
    col('id_representante')
)

In [50]:
socio_final_df.show(5)

+-----------+-----------------------+-------------------+-------------------+------------+----------------------+--------------------+---------------------+--------------------------------+----------------+
|cnpj_basico|nome_socio_razao_social| qualificacao_socio|identificador_socio|faixa_etaria|data_entrada_sociedade|            id_socio|nome_do_representante|qualificacao_representante_legal|id_representante|
+-----------+-----------------------+-------------------+-------------------+------------+----------------------+--------------------+---------------------+--------------------------------+----------------+
|   13107428|   MARIA HELENICE DO...|Sócio-Administrador|      PESSOA FISICA|51 a 60 anos|              20161116|MARIA HELENICE DO...|                 NULL|                   Não informada|     ***000000**|
|   13155962|   NAIANA MAIARA DE ...|Sócio-Administrador|      PESSOA FISICA|31 a 40 anos|              20110118|NAIANA MAIARA DE ...|                 NULL|                

Persistência

In [51]:
socio_final_df.coalesce(1).write.mode("overwrite").option("header", "true").csv("gs://dados-massivos/dados-finais/socio_final_df")

                                                                                

### Agrupar por id_socio e Contar cnpj_basico
Agrupar pelo id_socio e contar a quantidade de cnpj_basico que cada sócio está associado, levando em conta sua qualificacao_socio. 

In [52]:
# Agrupar por id_socio e qualificacao_socio e contar quantos cnpj_basico estão associados
socio_anomalias = socio_final_df.groupBy('id_socio', 'qualificacao_socio') \
    .agg(F.countDistinct('cnpj_basico').alias('numero_empresas')) \
    .orderBy(F.desc('numero_empresas'))

# Exibir os resultados
socio_anomalias.show()



+--------------------+-------------------+---------------+
|            id_socio| qualificacao_socio|numero_empresas|
+--------------------+-------------------+---------------+
|KATIA MARIA BEZER...|      Administrador|           1949|
|RICARDO ELIAS RES...|      Administrador|           1508|
|MEDICALMAIS SERVI...|    Sócio Ostensivo|           1478|
|EBES SISTEMAS DE ...|              Sócio|           1245|
|SURYA GUEDES MEND...|      Administrador|           1243|
|FINCO ASSESSORIA ...|              Sócio|           1236|
|OSVALDO ANTUNES C...|      Administrador|           1184|
|ROGERIO MARCHINI ...|      Administrador|           1121|
|DAVID JHONATAS DO...|      Administrador|           1117|
|AVDV ESTETICA LTD...|    Sócio Ostensivo|           1101|
|         ***000000**|         Presidente|           1088|
|4ID MEDICOS ASSOC...|    Sócio Ostensivo|           1003|
|RODOLFO MOLINARI ...|      Administrador|            985|
|FERNANDO ANTONIO ...|      Administrador|            96

                                                                                

Persistências

In [53]:
socio_anomalias.coalesce(1).write.mode("overwrite").option("header", "true").csv("gs://dados-massivos/dados-finais/socio_anomalias")

                                                                                

### Sócios com Múltiplas Empresas (Multi-empresa)
Descrição: Sócios associados a um número elevado de empresas.
Relevância: Pode indicar práticas fraudulentas, como criação de empresas de fachada.
Como identificar: Agrupar por id_socio e contar a quantidade de cnpj_basico associados.

In [54]:
socio_multi_empresa = socio_final_df.groupBy('id_socio') \
    .agg(F.countDistinct('cnpj_basico').alias('numero_empresas')) \
    .filter(F.col('numero_empresas') > 1000) \
    .orderBy(F.desc('numero_empresas'))
socio_multi_empresa.show()



+--------------------+---------------+
|            id_socio|numero_empresas|
+--------------------+---------------+
|KATIA MARIA BEZER...|           1954|
|RICARDO ELIAS RES...|           1511|
|MEDICALMAIS SERVI...|           1478|
|SURYA GUEDES MEND...|           1288|
|EBES SISTEMAS DE ...|           1272|
|FINCO ASSESSORIA ...|           1261|
|OSVALDO ANTUNES C...|           1184|
|ROGERIO MARCHINI ...|           1160|
|DAVID JHONATAS DO...|           1125|
|AVDV ESTETICA LTD...|           1101|
|         ***000000**|           1089|
|ROSSI RESIDENCIAL...|           1045|
|RODOLFO MOLINARI ...|           1033|
|4ID MEDICOS ASSOC...|           1005|
+--------------------+---------------+



                                                                                

Persistência

In [55]:
socio_multi_empresa.coalesce(1).write.mode("overwrite").option("header", "true").csv("gs://dados-massivos/dados-finais/socio_multi_empresa")

                                                                                

### Representantes Legais com Múltiplas Empresas
Descrição: Representantes legais associados a um número elevado de empresas.
Relevância: Pode indicar uso de representantes para mascarar os verdadeiros proprietários.
Como identificar: Agrupar por id_representante e contar a quantidade de cnpj_basico associados.

In [56]:
representante_multi_empresa = socio_final_df.groupBy('id_representante') \
    .agg(F.countDistinct('cnpj_basico').alias('numero_empresas')) \
    .filter(F.col('numero_empresas') > 50) \
    .orderBy(F.desc('numero_empresas'))
representante_multi_empresa.show()



+--------------------+---------------+
|    id_representante|numero_empresas|
+--------------------+---------------+
|         ***000000**|       13735370|
|SURYA GUEDES MEND...|           1116|
|ROGERIO MARCHINI ...|            703|
|ROBERTO GIARELLI-...|            515|
|DANIEL FERREIRA M...|            512|
|ANTONIO GABRIEL T...|            458|
|MIGUEL MAIA MICKE...|            376|
|FLAVIO RICARDO LI...|            323|
|RICARDO RIBEIRO V...|            317|
|JUNIA MARIA DE SO...|            297|
|RAFAELLA NOGUEIRA...|            295|
|CARLOS BIANCONI-*...|            277|
|SAMANTHA VIANNA D...|            269|
|ANTONIO ALBERTO S...|            268|
|ALAN ALVES DOS SA...|            237|
|ALEXANDRE MACHADO...|            228|
|RITA DE CASSIA BA...|            221|
|RICARDO PAIXAO PI...|            220|
|CRISTIANE LOCATEL...|            218|
|BENTO ODILON MORE...|            209|
+--------------------+---------------+
only showing top 20 rows



                                                                                

Persistencia

In [57]:
representante_multi_empresa.coalesce(1).write.mode("overwrite").option("header", "true").csv("gs://dados-massivos/dados-finais/representante_multi_empresa")

                                                                                

### Sócios com Qualificações Diversas
Descrição: Sócios que possuem múltiplas qualificações em diferentes empresas.
Relevância: Pode indicar uso de diferentes papéis em diversas empresas para ocultar atividades.
Como identificar: Agrupar por id_socio e listar as diferentes qualificacao_socio.

In [58]:
socio_diversas_qualificacoes = socio_final_df.groupBy('id_socio') \
    .agg(F.collect_set('qualificacao_socio').alias('qualificacoes')) \
    .filter(F.size(F.col('qualificacoes')) > 1)
socio_diversas_qualificacoes.show()

[Stage 250:>                                                        (0 + 1) / 1]

+--------------------+--------------------+
|            id_socio|       qualificacoes|
+--------------------+--------------------+
|3 INC DESENVOLVIM...|[Sócio Ostensivo,...|
|3I-INCORPORACOES ...|[Sócio Ostensivo,...|
|8EM7 INTELIGENCIA...|[Sócio Ostensivo,...|
|A55 CONSULTORIA E...|[Sócio Ostensivo,...|
|AARAO BOECHAT MAR...|[Sócio-Administra...|
|AARON FABRICIO DA...|[Titular Pessoa F...|
|ABADIA DE FATIMA ...|[Sócio-Administra...|
|ABADIA DIVINA DE ...|[Sócio-Administra...|
|ABADIA FONSECA PE...|[Administrador, S...|
|ABADIA MARIA DE S...|[Sócio-Administra...|
|ABADIA MOREIRA DE...|[Sócio-Administra...|
|ABADIA SILVIA MEL...|[Sócio-Administra...|
|ABADIO BAIRD-***0...|[Presidente, Titu...|
|ABARACART PINTO-*...|[Presidente, Sóci...|
|ABBAS AHMAD JEZZI...|[Sócio-Administra...|
|ABBAS GUILHERME B...|[Sócio-Administra...|
|ABBUD AISSUM-***8...|[Titular Pessoa F...|
|ABDALA ABRAO-***5...|[Sócio-Administra...|
|ABDALA SUFIAN KIL...|[Sócio-Administra...|
|ABDALLA HANNA ATT...|[Sócio-Adm

                                                                                

In [73]:
socio_diversas_qualificacoes = socio_diversas_qualificacoes.withColumn(
    "qualificacoes", F.split(F.col("qualificacoes"), ","))

In [74]:
socio_diversas_qualificacoes = socio_diversas_qualificacoes.withColumn(
    "tamanho_qualificacoes", F.size("qualificacoes"))

In [75]:
# Ordenar o DataFrame pelo tamanho da lista de qualificações em ordem decrescente
socio_diversas_qualificacoes = socio_diversas_qualificacoes.orderBy(
    F.desc("tamanho_qualificacoes"))

socio_diversas_qualificacoes.show()



+--------------------+--------------------+---------------------+
|            id_socio|       qualificacoes|tamanho_qualificacoes|
+--------------------+--------------------+---------------------+
|RICARDO ULISSES M...|[Produtor Rural, ...|                    8|
|WALDEMAR VERDI JU...|[Produtor Rural, ...|                    8|
|VERONICA VALENTE ...|[Fundador, Consel...|                    8|
|MARCOS DE CARVALH...|[Produtor Rural, ...|                    8|
|HENRI ARMAND SZLE...|[Fundador, Consel...|                    7|
|ANDRE DOMINGOS PI...|[Produtor Rural, ...|                    7|
|JOSE ONERIO DA SI...|[Produtor Rural, ...|                    7|
|ANDRE CAPISTRANO ...|[Conselheiro de A...|                    7|
|ODAIR GARCIA SENR...|[Produtor Rural, ...|                    7|
|ANDRE VICTOR NEUD...|[Conselheiro de A...|                    7|
|ROBERTO MALZONI F...|[Produtor Rural, ...|                    7|
|CELIA REGINA DORN...|[Conselheiro de A...|                    7|
|FRANCISCO

                                                                                

In [76]:
# Converte a coluna ARRAY<STRING> para STRING usando concat_ws
socio_diversas_qualificacoes = socio_diversas_qualificacoes.withColumn(
    'qualificacoes', 
    F.concat_ws(',', 'qualificacoes')
)

socio_diversas_qualificacoes.show()



+--------------------+--------------------+---------------------+
|            id_socio|       qualificacoes|tamanho_qualificacoes|
+--------------------+--------------------+---------------------+
|MARCOS DE CARVALH...|Produtor Rural,Só...|                    8|
|WALDEMAR VERDI JU...|Produtor Rural,Co...|                    8|
|VERONICA VALENTE ...|Fundador,Conselhe...|                    8|
|RICARDO ULISSES M...|Produtor Rural,Só...|                    8|
|ANTONIO CAVALCANT...|Produtor Rural,Pr...|                    7|
|ANDRE DOMINGOS PI...|Produtor Rural,Co...|                    7|
|FERNANDO BRUNO DE...|Produtor Rural,Co...|                    7|
|HENRI ARMAND SZLE...|Fundador,Conselhe...|                    7|
|OTAVIO PILON FILH...|Produtor Rural,Co...|                    7|
|ANDRE VICTOR NEUD...|Conselheiro de Ad...|                    7|
|ANTONIO CARLOS RO...|Conselheiro de Ad...|                    7|
|FRANCISCO MESQUIT...|Produtor Rural,Co...|                    7|
|JOSE ONER

                                                                                

Persistência

In [77]:
socio_diversas_qualificacoes.coalesce(1).write.mode("overwrite").option("header", "true").csv("gs://dados-massivos/dados-finais/socio_diversas_qualificacoes")

                                                                                

### Empresas com Data de Entrada de Sócio Recente
Descrição: Empresas que adicionaram sócios recentemente.
Relevância: Mudanças recentes podem indicar tentativas de ocultar propriedade ou transferir ativos.
Como identificar: Filtrar por data_entrada_sociedade recente.

In [67]:
socio_final_df = socio_final_df.withColumn("data_entrada_sociedade", F.to_date(F.col("data_entrada_sociedade"), "yyyyMMdd"))

In [68]:
data_limite = current_date() - F.expr("INTERVAL 1 YEAR")
empresa_socio_recente = socio_final_df.filter(F.col('data_entrada_sociedade') > data_limite)
empresa_socio_recente.show()

+-----------+-----------------------+--------------------+-------------------+-------------+----------------------+--------------------+---------------------+--------------------------------+--------------------+
|cnpj_basico|nome_socio_razao_social|  qualificacao_socio|identificador_socio| faixa_etaria|data_entrada_sociedade|            id_socio|nome_do_representante|qualificacao_representante_legal|    id_representante|
+-----------+-----------------------+--------------------+-------------------+-------------+----------------------+--------------------+---------------------+--------------------------------+--------------------+
|   13160907|   FABIANO ACAFRAO A...| Sócio-Administrador|      PESSOA FISICA| 41 a 50 anos|            2023-11-28|FABIANO ACAFRAO A...|                 NULL|                   Não informada|         ***000000**|
|   13161095|   FELIPE RODRIGUES ...|               Sócio|      PESSOA FISICA| 41 a 50 anos|            2024-02-20|FELIPE RODRIGUES ...|            

                                                                                

Persistencia

In [69]:
empresa_socio_recente.coalesce(1).write.mode("overwrite").option("header", "true").csv("gs://dados-massivos/dados-finais/empresa_socio_recente")

                                                                                

### Empresas com Sócios de Faixa Etária
Descrição: Empresas com sócios muito jovens ou muito velhos em comparação ao padrão.
Relevância: Pode indicar que sócios são usados como laranjas.
Como identificar: Agrupar por faixa_etaria e comparar a distribuição com o padrão esperado.

In [62]:
socio_faixa_etaria = socio_final_df.groupBy('faixa_etaria') \
    .count() \
    .orderBy(F.desc('count'))
socio_faixa_etaria.show()

# Verificar faixas etárias muito jovens ou muito velhas
socio_faixa_etaria_anomalia = socio_final_df.filter((F.col('faixa_etaria') == 'Menor de 18 anos') | (F.col('faixa_etaria') == 'Maior de 70 anos'))
socio_faixa_etaria_anomalia.show()

[Stage 262:>                                                        (0 + 1) / 1]

+------------------+-------+
|      faixa_etaria|  count|
+------------------+-------+
|      41 a 50 anos|5989988|
|      51 a 60 anos|5030314|
|      31 a 40 anos|4373294|
|      61 a 70 anos|3738613|
|      71 a 80 anos|1871320|
|      21 a 30 anos|1589147|
|maiores de 80 anos| 933194|
|     não se aplica| 578725|
|      13 a 20 anos| 121915|
|       0 a 12 anos|  24720|
+------------------+-------+

+-----------+-----------------------+------------------+-------------------+------------+----------------------+--------+---------------------+--------------------------------+----------------+
|cnpj_basico|nome_socio_razao_social|qualificacao_socio|identificador_socio|faixa_etaria|data_entrada_sociedade|id_socio|nome_do_representante|qualificacao_representante_legal|id_representante|
+-----------+-----------------------+------------------+-------------------+------------+----------------------+--------+---------------------+--------------------------------+----------------+
+----------

                                                                                

Persistencia

In [70]:
socio_faixa_etaria.coalesce(1).write.mode("overwrite").option("header", "true").csv("gs://dados-massivos/dados-finais/socio_faixa_etaria")

                                                                                

_____

## Anexos

### Leitura individual de arquivo CSV

In [None]:
gcs_uri = "gs://dados-massivos/extracted_files/K3241.K03200Y0.D40608.EMPRECSV.csv"
df = spark.read.csv(gcs_uri, sep=';', header=True, inferSchema=True, encoding='ISO-8859-1')

# Mostrar algumas linhas do DataFrame
df.show()

# Contar o número de linhas no DataFrame
num_rows = df.count()
print(f"Número de linhas: {num_rows}")