<a href="https://colab.research.google.com/github/Matheusups/Datasets/blob/main/Projeto_CNPJ_Open_Data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Instalando/atualizando as dependências
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
import pyspark

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .getOrCreate()

spark

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Criação de Diretórios para os dados particionados:
- Empresas
- Estabelecimentos
- Socios

In [6]:
import zipfile
zipfile.ZipFile('/content/drive/MyDrive/curso-spark/empresas/empresas.zip','r').extractall('/content/drive/MyDrive/curso-spark/empresas')

In [7]:
zipfile.ZipFile('/content/drive/MyDrive/curso-spark/estabelecimentos/estabelecimentos.zip','r').extractall('/content/drive/MyDrive/curso-spark/estabelecimentos')

In [8]:
zipfile.ZipFile('/content/drive/MyDrive/curso-spark/socios/socios.zip','r').extractall('/content/drive/MyDrive/curso-spark/socios')

### Leitura dos dados particionados em dataframes

- Criação de listas para definir o nome das colunas de cada dataframe de acordo com o dicionario dos dados públicos no site do governo.
- Definição de option para delimitador e para definição de schema.
- Criação dos respectivos dataframes já configurados de acordo com as escpeficidades

In [41]:
colunas_socios = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']
df_socios = spark.read.format('csv')\
                  .option("delimiter",';')\
                  .option("inferSchema",True)\
                  .load('/content/drive/MyDrive/curso-spark/socios/socios').toDF(*colunas_socios)

colunas_estab = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']
df_estabelecimentos = spark.read.format('csv')\
                  .option("delimiter",';')\
                  .option("inferSchema",True)\
                  .load('/content/drive/MyDrive/curso-spark/estabelecimentos/estabelecimentos').toDF(*colunas_estab)

col_empresas = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']
df_empresas = spark.read.format('csv')\
                  .option("delimiter",';')\
                  .option("inferSchema",True)\
                  .load('/content/drive/MyDrive/curso-spark/empresas/empresas').toDF(*col_empresas)

## Explorando os dataframes

1. Verificar se os tipos de dados estão adequados para que possamos prosseguir com as operações.
2. Caso estejam fora dos padrões ou do esperado, iremos tratar esses campos (colunas).

In [42]:
from pyspark.sql.functions import *

In [43]:
#df_empresas.limit(5).toPandas()

# Transformação da coluna [capital_social_da_empresa] para float, o spark reconheceu como string pois o separador decimal dele é o .
df_empresas = df_empresas.withColumn("capital_social_da_empresa", regexp_replace(col('capital_social_da_empresa'),'[,]','.').cast('double'))
df_empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [45]:
# Transformação das colunas de data
df_estabelecimentos = df_estabelecimentos\
      .withColumn('data_situacao_cadastral', to_date(col('data_situacao_cadastral').cast('string'),'yyyyMMdd'))\
      .withColumn('data_de_inicio_atividade', to_date(col('data_de_inicio_atividade').cast('string'),'yyyyMMdd'))\
      .withColumn('data_da_situacao_especial', to_date(col('data_da_situacao_especial').cast('string'),'yyyyMMdd'))

#df_estabelecimentos.printSchema()
df_estabelecimentos.show(5)

+-----------+----------+-------+---------------------------+-----------------+------------------+-----------------------+-------------------------+--------------------------+----+------------------------+---------------------+----------------------+------------------+--------------------+------+-----------+------------------+-------+---+---------+-----+----------+-----+----------+----------+----+------------------+-----------------+-------------------------+
|cnpj_basico|cnpj_ordem|cnpj_dv|identificador_matriz_filial|    nome_fantasia|situacao_cadastral|data_situacao_cadastral|motivo_situacao_cadastral|nome_da_cidade_no_exterior|pais|data_de_inicio_atividade|cnae_fiscal_principal|cnae_fiscal_secundaria|tipo_de_logradouro|          logradouro|numero|complemento|            bairro|    cep| uf|municipio|ddd_1|telefone_1|ddd_2|telefone_2|ddd_do_fax| fax|correio_eletronico|situacao_especial|data_da_situacao_especial|
+-----------+----------+-------+---------------------------+--------------

In [46]:
# Tratamento do campo de data
df_socios = df_socios\
      .withColumn('data_de_entrada_sociedade', to_date(col('data_de_entrada_sociedade').cast('string'),'yyyyMMdd'))
df_socios.show(5)

+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|cnpj_basico|identificador_de_socio|nome_do_socio_ou_razao_social|cnpj_ou_cpf_do_socio|qualificacao_do_socio|data_de_entrada_sociedade|pais|representante_legal|nome_do_representante|qualificacao_do_representante_legal|faixa_etaria|
+-----------+----------------------+-----------------------------+--------------------+---------------------+-------------------------+----+-------------------+---------------------+-----------------------------------+------------+
|        411|                     2|         LILIANA PATRICIA ...|         ***678188**|                   22|               1994-07-25|null|        ***000000**|                 null|                                  0|           7|
|        411|                     2|         CRISTINA HUNDERTMARK|      

### Criando dataframes para dados especificos

In [47]:
df_socios.select(
    'nome_do_socio_ou_razao_social',
    'faixa_etaria',
    year('data_de_entrada_sociedade').alias('ano_entrada'),
    (quarter('data_de_entrada_sociedade')).alias('trimestre_ano')
    ).show(5)

+-----------------------------+------------+-----------+-------------+
|nome_do_socio_ou_razao_social|faixa_etaria|ano_entrada|trimestre_ano|
+-----------------------------+------------+-----------+-------------+
|         LILIANA PATRICIA ...|           7|       1994|            3|
|         CRISTINA HUNDERTMARK|           7|       1994|            3|
|         CELSO EDUARDO DE ...|           8|       1994|            2|
|         EDUARDO BERRINGER...|           5|       1994|            2|
|          HANNE MAHFOUD FADEL|           8|       1994|            2|
+-----------------------------+------------+-----------+-------------+
only showing top 5 rows



## Consulta

Consulta para criar uma agregação de quantidade de empresas por ANO de incio das atividades e UF no ano de 2020 com situação ativa. De acordo com algumas pesquisa no site do governo o código 2 representa situação ativa para o cpnj.

In [60]:
df_rank = df_estabelecimentos.filter('situacao_cadastral = 2').select(
    'nome_fantasia',
    'uf',
    'municipio',
    'situacao_cadastral',
    year('data_de_inicio_atividade').alias('ano_inicio_atividade')
).groupby('ano_inicio_atividade','uf','situacao_cadastral').agg(count(col('nome_fantasia')).alias('contagem_de_empresas'))\
    .filter('ano_inicio_atividade=2020')

df_rank.orderBy(col("contagem_de_empresas").desc()).show(10)

+--------------------+---+------------------+--------------------+
|ano_inicio_atividade| uf|situacao_cadastral|contagem_de_empresas|
+--------------------+---+------------------+--------------------+
|                2020| SP|                 2|               66447|
|                2020| RJ|                 2|               26008|
|                2020| MG|                 2|               24847|
|                2020| PR|                 2|               15638|
|                2020| RS|                 2|               13999|
|                2020| BA|                 2|               12505|
|                2020| SC|                 2|               12028|
|                2020| GO|                 2|               10332|
|                2020| PE|                 2|                7796|
|                2020| CE|                 2|                7259|
+--------------------+---+------------------+--------------------+
only showing top 10 rows

