<a href="https://colab.research.google.com/github/Matheusups/Pyspark-OpenData/blob/main/Projeto_CNPJ_Open_Data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Nesse projeto estamos consumindo uma base do governo no qual o dicionario de dados é:
https://apicenter.estaleiro.serpro.gov.br/documentacao/consulta-cnpj/pt/tipos_socio/

In [1]:
# Instalando/atualizando as dependências
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
import pyspark

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .getOrCreate()

spark

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Criação de Diretórios para os dados particionados:
- Empresas
- Estabelecimentos
- Socios

In [6]:
import zipfile
zipfile.ZipFile('/content/drive/MyDrive/curso-spark/empresas/empresas.zip','r').extractall('/content/drive/MyDrive/curso-spark/empresas')

In [7]:
zipfile.ZipFile('/content/drive/MyDrive/curso-spark/estabelecimentos/estabelecimentos.zip','r').extractall('/content/drive/MyDrive/curso-spark/estabelecimentos')

In [8]:
zipfile.ZipFile('/content/drive/MyDrive/curso-spark/socios/socios.zip','r').extractall('/content/drive/MyDrive/curso-spark/socios')

### Leitura dos dados particionados em dataframes

- Criação de listas para definir o nome das colunas de cada dataframe de acordo com o dicionario dos dados públicos no site do governo.
- Definição de option para delimitador e para definição de schema.
- Criação dos respectivos dataframes já configurados de acordo com as escpeficidades

In [62]:
colunas_socios = ['cnpj_basico', 'identificador_de_socio', 'nome_do_socio_ou_razao_social', 'cnpj_ou_cpf_do_socio', 'qualificacao_do_socio', 'data_de_entrada_sociedade', 'pais', 'representante_legal', 'nome_do_representante', 'qualificacao_do_representante_legal', 'faixa_etaria']
df_socios = spark.read.format('csv')\
                  .option("delimiter",';')\
                  .option("inferSchema",True)\
                  .load('/content/drive/MyDrive/curso-spark/socios/socios').toDF(*colunas_socios)

colunas_estab = ['cnpj_basico', 'cnpj_ordem', 'cnpj_dv', 'identificador_matriz_filial', 'nome_fantasia', 'situacao_cadastral', 'data_situacao_cadastral', 'motivo_situacao_cadastral', 'nome_da_cidade_no_exterior', 'pais', 'data_de_inicio_atividade', 'cnae_fiscal_principal', 'cnae_fiscal_secundaria', 'tipo_de_logradouro', 'logradouro', 'numero', 'complemento', 'bairro', 'cep', 'uf', 'municipio', 'ddd_1', 'telefone_1', 'ddd_2', 'telefone_2', 'ddd_do_fax', 'fax', 'correio_eletronico', 'situacao_especial', 'data_da_situacao_especial']
df_estabelecimentos = spark.read.format('csv')\
                  .option("delimiter",';')\
                  .option("inferSchema",True)\
                  .load('/content/drive/MyDrive/curso-spark/estabelecimentos/estabelecimentos').toDF(*colunas_estab)

col_empresas = ['cnpj_basico', 'razao_social_nome_empresarial', 'natureza_juridica', 'qualificacao_do_responsavel', 'capital_social_da_empresa', 'porte_da_empresa', 'ente_federativo_responsavel']
df_empresas = spark.read.format('csv')\
                  .option("delimiter",';')\
                  .option("inferSchema",True)\
                  .load('/content/drive/MyDrive/curso-spark/empresas/empresas').toDF(*col_empresas)

## Explorando os dataframes

1. Verificar se os tipos de dados estão adequados para que possamos prosseguir com as operações.
2. Caso estejam fora dos padrões ou do esperado, iremos tratar esses campos (colunas).

In [63]:
from pyspark.sql.functions import *

In [64]:
#df_empresas.limit(5).toPandas()

# Transformação da coluna [capital_social_da_empresa] para float, o spark reconheceu como string pois o separador decimal dele é o .
df_empresas = df_empresas.withColumn("capital_social_da_empresa", regexp_replace(col('capital_social_da_empresa'),'[,]','.').cast('double'))
df_empresas.printSchema()

root
 |-- cnpj_basico: integer (nullable = true)
 |-- razao_social_nome_empresarial: string (nullable = true)
 |-- natureza_juridica: integer (nullable = true)
 |-- qualificacao_do_responsavel: integer (nullable = true)
 |-- capital_social_da_empresa: double (nullable = true)
 |-- porte_da_empresa: integer (nullable = true)
 |-- ente_federativo_responsavel: string (nullable = true)



In [65]:
# Transformação das colunas de data
df_estabelecimentos = df_estabelecimentos\
      .withColumn('data_situacao_cadastral', to_date(col('data_situacao_cadastral').cast('string'),'yyyyMMdd'))\
      .withColumn('data_de_inicio_atividade', to_date(col('data_de_inicio_atividade').cast('string'),'yyyyMMdd'))\
      .withColumn('data_da_situacao_especial', to_date(col('data_da_situacao_especial').cast('string'),'yyyyMMdd'))

#df_estabelecimentos.printSchema()
df_estabelecimentos.limit(5).toPandas()

Unnamed: 0,cnpj_basico,cnpj_ordem,cnpj_dv,identificador_matriz_filial,nome_fantasia,situacao_cadastral,data_situacao_cadastral,motivo_situacao_cadastral,nome_da_cidade_no_exterior,pais,...,municipio,ddd_1,telefone_1,ddd_2,telefone_2,ddd_do_fax,fax,correio_eletronico,situacao_especial,data_da_situacao_especial
0,1879,1,96,1,PIRAMIDE M. C.,8,2001-10-29,1,,,...,7107,,,,,,,,,
1,2818,1,43,1,,8,2008-12-31,71,,,...,7107,,,,,,,,,
2,3110,1,7,1,,8,1997-12-31,1,,,...,7107,,,,,,,,,
3,3733,1,80,1,,8,2008-12-31,71,,,...,7107,,,,,,,,,
4,4628,3,27,2,EMBROIDERY & GIFT,8,1998-04-29,1,,,...,7075,,,,,,,,,


In [66]:
# Tratamento do campo de data
df_socios = df_socios\
      .withColumn('data_de_entrada_sociedade', to_date(col('data_de_entrada_sociedade').cast('string'),'yyyyMMdd'))
df_socios.limit(5).toPandas()

Unnamed: 0,cnpj_basico,identificador_de_socio,nome_do_socio_ou_razao_social,cnpj_ou_cpf_do_socio,qualificacao_do_socio,data_de_entrada_sociedade,pais,representante_legal,nome_do_representante,qualificacao_do_representante_legal,faixa_etaria
0,411,2,LILIANA PATRICIA GUASTAVINO,***678188**,22,1994-07-25,,***000000**,,0,7
1,411,2,CRISTINA HUNDERTMARK,***637848**,28,1994-07-25,,***000000**,,0,7
2,5813,2,CELSO EDUARDO DE CASTRO STEPHAN,***786068**,49,1994-05-16,,***000000**,,0,8
3,5813,2,EDUARDO BERRINGER STEPHAN,***442348**,49,1994-05-16,,***000000**,,0,5
4,14798,2,HANNE MAHFOUD FADEL,***760388**,49,1994-06-09,,***000000**,,0,8


### Criando dataframes para dados especificos

Cenário onde temos os nomes, ultimos nomes, a faixa etaria do sócio, ano e trimestre que entrou na sociedade/cnpj. O filtro que é executado no inicio serve para trazermos somente os sócios que são Diretores.

In [84]:
df_socios.filter('qualificacao_do_socio = 10').select(
    substring_index('nome_do_socio_ou_razao_social',' ',1).alias('primeiro_nome'),
    substring_index('nome_do_socio_ou_razao_social',' ',-1).alias('ultimo_nome'),
    'faixa_etaria',
    year('data_de_entrada_sociedade').alias('ano_entrada'),
    (quarter('data_de_entrada_sociedade')).alias('trimestre_ano')
    ).show(10)

+-------------+-----------+------------+-----------+-------------+
|primeiro_nome|ultimo_nome|faixa_etaria|ano_entrada|trimestre_ano|
+-------------+-----------+------------+-----------+-------------+
|        TIAGO|    MELLONI|           4|       2020|            1|
|    GUILHERME|    RIBEIRO|           9|       1996|            3|
|       ROSANA|      COSTA|           6|       2005|            3|
|       ADRIAN|      SIBIN|           5|       2015|            2|
|      JOSIANE|   OLIVEIRA|           4|       2019|            3|
|      RICARDO|     JUNIOR|           5|       2018|            4|
|        PEDRO|   MARLETTI|           5|       2018|            4|
|        LUCAS|  FERNANDES|           3|       2018|            1|
|         JOSE|   TEIXEIRA|           7|       2014|            3|
|      JESMINO|  RODRIGUES|           7|       2014|            3|
+-------------+-----------+------------+-----------+-------------+
only showing top 10 rows



## Consulta

Consulta para criar uma agregação de quantidade de empresas por ANO de incio das atividades e UF no ano de 2020 com situação ativa. De acordo com algumas pesquisa no site do governo o código 2 representa situação ativa para o cpnj.

In [68]:
df_rank = df_estabelecimentos.filter('situacao_cadastral = 2').select(
    'nome_fantasia',
    'uf',
    'municipio',
    'situacao_cadastral',
    year('data_de_inicio_atividade').alias('ano_inicio_atividade')
).groupby('ano_inicio_atividade','uf','situacao_cadastral').agg(count(col('nome_fantasia')).alias('contagem_de_empresas'))\
    .filter('ano_inicio_atividade=2020')

df_rank.orderBy(col("contagem_de_empresas").desc()).show(10)

+--------------------+---+------------------+--------------------+
|ano_inicio_atividade| uf|situacao_cadastral|contagem_de_empresas|
+--------------------+---+------------------+--------------------+
|                2020| SP|                 2|               66447|
|                2020| RJ|                 2|               26008|
|                2020| MG|                 2|               24847|
|                2020| PR|                 2|               15638|
|                2020| RS|                 2|               13999|
|                2020| BA|                 2|               12505|
|                2020| SC|                 2|               12028|
|                2020| GO|                 2|               10332|
|                2020| PE|                 2|                7796|
|                2020| CE|                 2|                7259|
+--------------------+---+------------------+--------------------+
only showing top 10 rows



In [85]:
# Criação de De Para do PORTE DA EMPRESA, olhando para o dicionario de dados
df_empresas = df_empresas.withColumn('porte_da_empresa', when(col('porte_da_empresa')==1,'Microempresa - ME')\
                                           .when(col('porte_da_empresa')==3,'Empresa de pequeno porte - EPP')
                                           .when(col('porte_da_empresa')==5,'Demais empresas').otherwise(col('porte_da_empresa')))

In [87]:
df_empresas.limit(10).toPandas()

Unnamed: 0,cnpj_basico,razao_social_nome_empresarial,natureza_juridica,qualificacao_do_responsavel,capital_social_da_empresa,porte_da_empresa,ente_federativo_responsavel
0,306,FRANCAMAR REFRIGERACAO TECNICA S/C LTDA,2240,49,0.0,Microempresa - ME,
1,1355,BRASILEIRO & OLIVEIRA LTDA,2062,49,0.0,Demais empresas,
2,4820,"REGISTRO DE IMOVEIS, TABELIONATO 1 DE NOTAS E ...",3034,32,0.0,Demais empresas,
3,5347,ROSELY APARECIDA MONTEIRO CALTABIANO FREITAS,2135,50,0.0,Demais empresas,
4,6846,BADU E FILHOS TECIDOS LTDA,2062,49,4000.0,Microempresa - ME,
5,8416,ELETRICA RUBI LTDA,2062,49,0.0,Demais empresas,
6,8992,SHIROMA VEICULOS LTDA.,2062,49,0.0,Demais empresas,
7,9091,CONTATOS BAR E LANCHONETE LTDA,2062,49,0.0,Demais empresas,
8,9614,ANTONIA APARECIDA DE SOUZA ULIANA,2135,50,0.0,Demais empresas,
9,9896,DORACY CORAT DA COSTA,2135,50,0.0,Demais empresas,


# Relacionamento das Tabelas


In [93]:
df_empresas_estab = df_estabelecimentos.join(df_empresas, 'cnpj_basico', how='inner')


# Exibição da quantidade de estabelecimentos pelo nome_fantasia agrupando por porte da empresa

df_empresas_estab.groupby('razao_social_nome_empresarial','porte_da_empresa')\
                  .agg(count(col('nome_fantasia')).alias('qtd_estabelecimentos'),
                       sum(col('capital_social_da_empresa')).alias('vlr_capital_total')).show(50)

+-----------------------------+--------------------+--------------------+-----------------+
|razao_social_nome_empresarial|    porte_da_empresa|qtd_estabelecimentos|vlr_capital_total|
+-----------------------------+--------------------+--------------------+-----------------+
|         MARGEM TRANSPORTE...|     Demais empresas|                   0|              0.0|
|         M A DOMINGOS AUTO...|     Demais empresas|                   0|              0.0|
|         RALF PRODUCOES E ...|   Microempresa - ME|                   0|              0.0|
|         REAL BINGO JATIUC...|     Demais empresas|                   0|              0.0|
|         GERALDO AFRANIO S...|   Microempresa - ME|                   1|              0.0|
|         MONTADORA EDUFRAN...|     Demais empresas|                   0|              0.0|
|                  PAULO JONER|     Demais empresas|                   0|              0.0|
|         CONSELHO ESCOLAR ...|     Demais empresas|                   0|       