<a href="https://colab.research.google.com/github/honoratothi/honoratothi/blob/main/spark_dados_receita.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark --quiet


In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
!tar xf spark-3.5.3-bin-hadoop3.tgz
!pip install -q findspark

In [3]:
# prompt: montar o google drive como unidade

from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['SPARK_HOME'] = '/content/spark-3.5.3-bin-hadoop3'

import findspark
findspark.init()

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#      .config('spark.jars', 'https://github.com/GoogleCloudDataproc/spark-bigquery-connector/releases/download/0.41.0/spark-3.5-bigquery-0.41.0.jar')\

[Diretório arquivos cnpj receita federal](https://dadosabertos.rfb.gov.br/CNPJ/dados_abertos_cnpj/2024-09/)

[Portal de dados abertos receita federal do Brasil](https://dados.gov.br/dados/conjuntos-dados/cadastro-nacional-da-pessoa-juridica---cnpj)

In [6]:
import pandas as pd

url = 'https://dadosabertos.rfb.gov.br/CNPJ/dados_abertos_cnpj/2024-09/'

df = pd.read_html(url)
df = df[0][['Name', 'Last modified', 'Size']]
df.dropna(inplace=True)
df.reset_index(drop=True, inplace=True)
df.sort_values(by='Name', ascending=False, inplace=True)
df.reset_index(drop=True, inplace=True)

In [7]:
lista_arquivos = df['Name'].tolist()
download_dir = '/content/drive/MyDrive/dados_receita'

In [8]:
!mkdir /content/drive/MyDrive/dados_receita

mkdir: cannot create directory ‘/content/drive/MyDrive/dados_receita’: File exists


In [33]:
baixar_arquivos, extrair_arquivos = False, False

In [34]:
import urllib.request

if baixar_arquivos:

  for file_name in lista_arquivos:
    if file_name.endswith('.zip'):
      file_url = url + file_name
      file_dest = f'{download_dir}/{file_name}'
      try:
        urllib.request.urlretrieve(file_url, file_dest)
        print(f"Downloaded: {file_name}")
      except Exception as e:
        print(f"Error downloading {file_name}: {e}")


In [10]:
extract_dir = f'{download_dir}/extracao'

In [None]:
import zipfile

arquivos_locais = os.listdir(download_dir)

if extrair_arquivos:
  for arquivo in arquivos_locais:
      if arquivo.endswith('.zip'):
          caminho_completo = os.path.join(download_dir, arquivo)
          with zipfile.ZipFile(caminho_completo, 'r') as zip_ref:
              zip_ref.extractall(extract_dir)



[Dicionário de dados](https://www.gov.br/receitafederal/dados/cnpj-metadados.pdf)

In [11]:
# prompt: importar arquivo dicionario de dados .xlsx da pasta download_dir e listar todas as abas deste arquivo gerando um dicionário onde a chave será o nome da planilha e o valor será um dataframe spark com o conteúdo da planilha

import pandas as pd

dicionario_dfs = {}

for arquivo in os.listdir(download_dir):
  if arquivo.endswith('dados_receita_metadados.xlsx'):
    caminho_completo = os.path.join(download_dir, arquivo)
    xls = pd.ExcelFile(caminho_completo)
    for sheet_name in xls.sheet_names:
      df_pandas = pd.read_excel(caminho_completo, sheet_name=sheet_name)
      df_spark = spark.createDataFrame(df_pandas)
      dicionario_dfs[sheet_name] = df_spark

In [12]:
mapa_dicionario = {'CNAES': 'cnae',
 'EMPRESAS': 'empre',
 'ESTABELECIMENTOS': 'estab',
 'MOTIVOS':'moti',
 'MUNICIPIOS': 'munic',
 'NATUREZA_JURIDICA': 'natju',
 'PAISES': 'pais',
 'SIMPLES': 'simpl',
 'SOCIOS': 'socio',
 'SOCIOS_QUALIFICACAO': 'qual'}

In [13]:
# prompt: criar uma nova coluna em arquivos_sem_tar_gz, o valor desta coluna será a chave em mapa_dicionário quando o valor for encontrado na coluna extensao do dataframe

def get_key_from_value(value, dictionary):
  value= value.lower()
  for key, val in dictionary.items():
    if value.find(val) > 0:
      return key
  return None

In [14]:
# prompt: vamos listar os arquivos da pasta download_dir que terminem não terminem com  .tar.gz usando gloob, o código deve encontrar arquivos tanto usando miúsculas como minúsculas

import glob

arquivos_sem_tar_gz = glob.glob(extract_dir + '/*', recursive=True)
arquivos_sem_tar_gz = [arquivo for arquivo in arquivos_sem_tar_gz if not arquivo.lower().endswith('.zip')]

arquivos_sem_tar_gz = pd.DataFrame(arquivos_sem_tar_gz, columns=['arquivo'])
arquivos_sem_tar_gz['extensao'] = arquivos_sem_tar_gz['arquivo'].str.split('/').str[-1]
arquivos_sem_tar_gz['tipo_arquivo'] = arquivos_sem_tar_gz['extensao'].apply(lambda x: get_key_from_value(x, mapa_dicionario))
arquivos_sem_tar_gz.sample(10)


Unnamed: 0,arquivo,extensao,tipo_arquivo
31,/content/drive/MyDrive/dados_receita/extracao/...,K3241.K03200Y4.D40914.EMPRECSV,EMPRESAS
33,/content/drive/MyDrive/dados_receita/extracao/...,K3241.K03200Y2.D40914.EMPRECSV,EMPRESAS
27,/content/drive/MyDrive/dados_receita/extracao/...,K3241.K03200Y8.D40914.EMPRECSV,EMPRESAS
7,/content/drive/MyDrive/dados_receita/extracao/...,K3241.K03200Y2.D40914.SOCIOCSV,SOCIOS
2,/content/drive/MyDrive/dados_receita/extracao/...,K3241.K03200Y7.D40914.SOCIOCSV,SOCIOS
29,/content/drive/MyDrive/dados_receita/extracao/...,K3241.K03200Y6.D40914.EMPRECSV,EMPRESAS
19,/content/drive/MyDrive/dados_receita/extracao/...,K3241.K03200Y6.D40914.ESTABELE,ESTABELECIMENTOS
9,/content/drive/MyDrive/dados_receita/extracao/...,K3241.K03200Y0.D40914.SOCIOCSV,SOCIOS
16,/content/drive/MyDrive/dados_receita/extracao/...,K3241.K03200Y9.D40914.ESTABELE,ESTABELECIMENTOS
8,/content/drive/MyDrive/dados_receita/extracao/...,K3241.K03200Y1.D40914.SOCIOCSV,SOCIOS


[arquivos drive](https://drive.google.com/drive/folders/1ZOzxKhRb4sT531NJYCip_jttvTF_EEoj?usp=drive_link)

In [23]:
from datetime import datetime

In [31]:
df_spark = pd.DataFrame()
inicio = datetime.now()

for item, valor in mapa_dicionario.items():
  print(item, valor)
  df_dados = spark.read.option('header', 'false').option('delimiter', ';').csv(f'{extract_dir}/*{valor.upper()}*')
  df_dados = df_dados.withColumnRenamed('_c0', 'cnpj')
  df_parcial = pd.DataFrame({'arquivo': [item], 'quantidade': [df_dados.count()]})
  df_spark = pd.concat([df_spark, df_parcial], ignore_index=True)
  print(df_dados.count())

duracao = datetime.now() - inicio
print(duracao)
print(df_spark)

CNAES cnae
1359
EMPRESAS empre
59616973
ESTABELECIMENTOS estab
62634863
MOTIVOS moti
61
MUNICIPIOS munic
5571
NATUREZA_JURIDICA natju
90
PAISES pais
255
SIMPLES simpl
40459179
SOCIOS socio
24683401
SOCIOS_QUALIFICACAO qual
68
0:09:13.143204
               arquivo  quantidade
0                CNAES        1359
1             EMPRESAS    59616973
2     ESTABELECIMENTOS    62634863
3              MOTIVOS          61
4           MUNICIPIOS        5571
5    NATUREZA_JURIDICA          90
6               PAISES         255
7              SIMPLES    40459179
8               SOCIOS    24683401
9  SOCIOS_QUALIFICACAO          68


In [32]:
inicio = datetime.now()

lista_estabelecimentos = arquivos_sem_tar_gz['arquivo'].to_list()
df_saida = pd.DataFrame(columns=['arquivo', 'quantidade'])

for item in lista_estabelecimentos:
  with pd.read_csv(item, sep=';', dtype=str, encoding='ISO-8859-1', header=None,
                   chunksize=5000000) as reader:
    for df in reader:
      df = pd.DataFrame({'arquivo': [item], 'quantidade': [df.shape[0]]})
      df_saida = pd.concat([df_saida, df], ignore_index=True)
      print(df)

duracao = datetime.now() - inicio

df_saida['quantidade'].sum()
print(duracao)

                                             arquivo  quantidade
0  /content/drive/MyDrive/dados_receita/extracao/...     2019150
                                             arquivo  quantidade
0  /content/drive/MyDrive/dados_receita/extracao/...     2019150
                                             arquivo  quantidade
0  /content/drive/MyDrive/dados_receita/extracao/...     2019150
                                             arquivo  quantidade
0  /content/drive/MyDrive/dados_receita/extracao/...     2019150
                                             arquivo  quantidade
0  /content/drive/MyDrive/dados_receita/extracao/...     2019150
                                             arquivo  quantidade
0  /content/drive/MyDrive/dados_receita/extracao/...     2019150
                                             arquivo  quantidade
0  /content/drive/MyDrive/dados_receita/extracao/...     2019150
                                             arquivo  quantidade
0  /content/drive/MyDrive