# Cadastro Nacional da Pessoa Jurídica. Abril de 2025.
## Desafío de Procesamiento de Datos (ETL)

# Introducción

La Receita Federal do Brasil,la autoridad fiscal de Brasil publica la información del Cadastro Nacional da Pessoa Jurídica
(CNP J - Registro Nacional de Personas Jurídicas) desde junio de 2023. Esta información es de gran utilidad para las empresas, ya que les ayuda a evaluar de manera imparcial su contexto, a sus socios, cuestiones legales y fiscales, posibles impactos de caso de alguna asociación, etc. Este alojamiento de información ha implicado no solo grandes retos por parte del gobierno Brasileño, si no también a aquellos que quieren analizar su infomación ya que esta, al día de hoy abarca aproximadamente 60 millones de empresas y mes con mes se agrega más información. Debido a esto, las nuevas tecnologías y herramientas son primordiales para el almacenamiento y consulta de información, ya que medios tradicionales computaciones no consiguen ser lo bastante potentes para explotar el conocimiento que se aloja en los servidores de La Receita Federal do Brasil,la autoridad fiscal de Brasil.

Debido a esta problematica, en este trabajo se muestra el uso de la metodología ETL, Extract, Transform and Loan, la cual es de vital importancia en el matenimiento y manejo de información en sistemas importantes de datos. Llevar esta metodología implica retos computaciones, ya que se deben tener diferentes habilidades y experiencia para crear e implementar pipelines robustos que se ajusten a los requerimientos de los datos. En este caso, solo se toman los datos del mes de abril del año 2025, los cuales abarcan más de 20 GB de información.

La fuente de datos de este proyecto es el siguiente [enlace](https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/) y en este sitio se pueden encontrar direcciones de descarga a cada uno de los conjuntos de datos, comprimidos, que se requieren. Estos contienen información, segregada, de CNJP en la siguiente estructura:

- Empresas (Entidad jurídica)
- Establecimientos (Lugar físico de la empresa)
- Registro de entidades en el régimen Simples
- Catálogo CNAES (Códigos de la actividad económica)
- Socios de la empresa
- Catálogo de motivos del estado de registro actual en caso de no estar activo
- Catálogo de municipios, países, clasificación de socios y naturaleza jurídica de la empresa

La estrategia tomada para este punto fue obtener los enlaces de descarga de cada conjunto de datos del portal web para su posterior descarga y extracción de archivos con extensión csv con un nombre adecuado. 

Para garantizar una solida comunicación, se describe mediante comentarios y documentación en el código, los pasos que se llevaron a cabo en este proceso, además de notas relevantes para demostrar la eficiencia y preocupación sobre la cantidad de información recopilada y analizada. 

Como en cualquier proceso ETL, se definen tres secciones relevantes en este documento:

+ [Extracción de datos (Extract)](#extracción-de-datos):
    - [Descarga de archivos comprimidos](#descarga-de-archivos-comprimidos)
    - [Extracción de archivos csv](#extracción-de-archivos-csv)
+ [Transformación de datos (Transform)](#transformación-de-información)
    - [Carga de muestras y unificación de tablas](#limpieza-básica-en-catálogos)
    - [Cantidad de información](#cantidad-de-información)
    - [Identificación de limpieza en tablas Lookup](#)
    - [Identificación de limpieza en el resto de tablas](#limpieza-básica-en-otras-tablas)
+ [Carga de información](#carga-de-información)
    - [Conexión y creación de base de datos](#conexión-y-creación-de-base-de-datos)
    - [Carga de Catálogos](#carga-de-catálogos)
    - [Carga de la tabla Simples](#carga-de-la-tabla-simples)
    - [Carga de la tabla Empresas](#carga-de-la-tabla-empresas)
    - [Carga de la tabla Estabelecimientos](#carga-de-la-tabla-estabelecimentos)
    - [Carga de la tabla Socios](#carga-de-la-tabla-socios)
+ [Desafios Encontrados y Soluciones Aplicadas](#desafíos-encontrados-y-soluciones-aplicadas)
+ [Conclusiones y posibles mejoras](#conclusiones-y-posibles-mejoras)

# Arquitectura de la Solución Propuesta

En esta propuesta se eligieron diversas tecnologías especializadas pensadas en distintas soluciones:

+ Se hizo uso de Python como lenguaje de programación para diversas tareas de manipulación de información, así como interprete o conector con otras tecnologías.
+ El uso de editores de texto potentes siempre es escencial en proyectos de análisis de datos, en este caso se utilizó Sublime tex, Visual Studio Code y el visualizador Chrome para la extracción de enlaces de descarga.
+ Para tratar las fuertes demandas de descarga de archivos y tareas de descompresión, se utilizaron funcionales de bash, el cual es un lenguaje de programación de comandos desarollado por UNIX, en general cualquier computadora puede hacer uso de esta programa y es que, gracias a su implementación en C y su optimización en diversas tareas, esta herramienta puede ser más poderosa que algunos algoritmos hechos en otros lenguajes de programación.
+ En cuestion de la administración y alojamiento de información, se decidió utilizar un sistema de bases de datos relacional (SQL) debido a los siguientes puntos:
    + Tenemos una estructura de tablas en donde se tienen muy bien definidas las variables, y arreglando los problemas de atomicidad que presenta la tabla Estabelecimentos con un nuevo catálogo, esto se puede entener en la sección, Trasnformación de información, podemos tener al menos una primera forma normal para la base de datos.
    + Las tablas principales son relacionadas entre sí mediante el CNPJ Básico, ya que en la tabla Empresas este es único y en los otros casos puede existir duplicidad sin ningún tema por ser relaciones 1 a muchos, por ejemplo una empresa puede tener más de una sucursal o más de un socio.
    + Los catálogos o tablas Lookup cumplen su función de mapear correctamente los códigos y garantizar atomicidad en el resto tablas.
    + Ya que no hay motivos para considerar que actualmente existiran estructuras complejas más allá de las recopiladas en las tablas,  no existen ventajas en comparación de un modelo no relacional.
    + Al tener múltiples relaciones en tablas normalizadas, la constante necesidad de creación de consultas y de uniones de tablas  propician a que un modelo relacional en SQL sea la opción ideal. En comparación con otros sistemas, como MongoDB en donde las    uniones de tablas no están optimizadas como en SQL.
+ Finalmente, se utilizó Jupyter Notebook como un integrador del ETL y documento principal para mostrar de manera dinámica toda la historia realizada en este reto.

# ETL

En toda esta sección, se detalla paso a paso todos los procedimientos abarcados de esta metodología. A manera de resumen, se puede sintetizar este procedimiento en los siguientes puntos clave:

+ Descarga de archivos comprimidos de gran tamaño.
+ Extracción o descompresión de información desde los archivos comprimidos.
    - Esto involucro temas de paralelismo y competencia de velocidad entre procedimientos
+ Análisis preliminar de la información
+ Limpieza de información
+ Conexión a una base de datos y creación de la misma
+ Definición e ingesta de información en la base de datos.

In [39]:
# Herramientas
from datetime import datetime # Manejo de fechas
import os # Administración de archivos
import re # Texto y regex
from itertools import chain # Operaciones sobre iteradores
from functools import reduce # Herramientas funcionales 
import csv # Lectura eficiente (lazy) de archivos csv
# Descarga de archivos
import subprocess # Uso de funciones bash wget para descarga más rápida de archivos y unzip para su descompresión
from concurrent.futures import ThreadPoolExecutor # Paralelismo
# Análisis de datos
import pandas as pd
pd.set_option('display.max_columns', None)
# Bases de datos
from sqlalchemy import create_engine, text, types, MetaData, Table, Column, Integer, String, Date, Boolean, Index, Float, PrimaryKeyConstraint
from sqlalchemy.dialects.mysql import VARCHAR
# Visualización de datos
import cufflinks as cf
# Activar gráficos offline
cf.go_offline()


## Extracción de datos

Debido a la naturaliza del sitio web, se inspeccionó su código fuente, se llevó a Visual Studio Code y se extrajeron los urls con la ayuda de expresiones regulares, tal como se muestran en la siguiente imagen:

<center>
<img src="Images/Links_from_webSite.png" width=1000 />
</center>

Estos enlaces se muestran a continuación en forma de diccionario

In [10]:
data_links = {
    "Cnaes": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Cnaes.zip",
    "Empresas0": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Empresas0.zip",
    "Empresas1": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Empresas1.zip",
    "Empresas2": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Empresas2.zip",
    "Empresas3": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Empresas3.zip",
    "Empresas4": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Empresas4.zip",
    "Empresas5": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Empresas5.zip",
    "Empresas6": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Empresas6.zip",
    "Empresas7": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Empresas7.zip",
    "Empresas8": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Empresas8.zip",
    "Empresas9": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Empresas9.zip",
    "Estabelecimentos0": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Estabelecimentos0.zip",
    "Estabelecimentos1": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Estabelecimentos1.zip",
    "Estabelecimentos2": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Estabelecimentos2.zip",
    "Estabelecimentos3": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Estabelecimentos3.zip",
    "Estabelecimentos4": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Estabelecimentos4.zip",
    "Estabelecimentos5": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Estabelecimentos5.zip",
    "Estabelecimentos6": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Estabelecimentos6.zip",
    "Estabelecimentos7": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Estabelecimentos7.zip",
    "Estabelecimentos8": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Estabelecimentos8.zip",
    "Estabelecimentos9": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Estabelecimentos9.zip",
    "Motivos": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Motivos.zip",
    "Municipios": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Municipios.zip",
    "Naturezas": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Naturezas.zip",
    "Paises": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Paises.zip",
    "Qualificacoes": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Qualificacoes.zip",
    "Simples": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Simples.zip",
    "Socios0": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Socios0.zip",
    "Socios1": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Socios1.zip",
    "Socios2": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Socios2.zip",
    "Socios3": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Socios3.zip",
    "Socios4": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Socios4.zip",
    "Socios5": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Socios5.zip",
    "Socios6": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Socios6.zip",
    "Socios7": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Socios7.zip",
    "Socios8": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Socios8.zip",
    "Socios9": "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/2025-04/Socios9.zip"
}

### Descarga de archivos comprimidos

La estrategia utilizada para la descarga masiva de archivos comprimidos buscó reducir el tiempo de descarga y automatizar la propia descarga mediante el uso de paralelísmo. 

Para esto, se consideró el uso de la herramienta `wget` de bash por sus virtudes de descarga en grandes archivos, además de que su implementación en C mejora los tiempos de ejecución, administración de recursos y tiempos de descarga, y el uso del paquete `concurrent` de python que permite realizar paralelismo. Con estas herramientas, mediante las unidades lógicas disponibles, se realizó, de manera paralela la descarga de cada uno de los archivos.

A pesar de contar con 12 unidades lógicas, se consideró tomar 8 por cuestiones de saturamiento computacional local.

In [30]:
#Numero de CPU lógicos:
os.cpu_count()

12

Primero, se crea la función `download_wget()` que permite realizar el paralelismo y distribuir la descarga. 

In [None]:
def download_wget(info_url: tuple) -> None:
    """Descarga de archivos mediante un enlace.

    Args:
        info_url (tuple): Tupla con la siguiente estructura: (Nombre para el archivo, URL de descarga).
    """
    # Se obtiene la información de descarga
    name, url = info_url
    # Se asigna el nombre que tendrá el archivo descargado
    output = f"data/{name}.zip"
    # Mediante la función wget de bash, se descarga el archivo indicando el nombre a colocar y el enlace de descarga
    # En caso de generar un error se solicita que se mande la excepción correspondiente con el argumento check = True
    subprocess.run(["wget", "-O", output, url], check=True)

In [None]:
with ThreadPoolExecutor(max_workers=8) as executor:
    executor.map(download_wget, data_links.items())

La descarga de información de manera paralela utilizó un poco más de 4 horas debido a la descarga del archivo Estabelecimentos0.zip que consideró 4:00:32 horas debido a las condiciones tecnológicas en donde se ejecutó este código, así como del servicio de internet utilizado y del tamaño de dicho archivo. 

Mediante en análisis de logs que resultaron de la ejecución, se muestra la siguiente resumen del comportamiento de descarga en esta estrategia. El log se puede encontrar en el siguiente [enlace](https://github.com/CarlosFernandoVG/AuroraChallenge/raw/refs/heads/main/download_logs.txt)

<table>
    <tr>
        <td>Start hour</td>
        <td>File</td>
        <td>End Hour</td>
        <td>Average download speed</td>
        <td>Diff time</td>
        <td>Downloaded bytes</td>
        <td>Downloaded KB</td>
        <td>Effective bandwidth achieved during the download: <br>Size (KB) / Time consumed (S)</td>
    </tr>
    <tr>
        <td>1:49:00</td>
        <td>Empresas0.zip</td>
        <td>2:24:42</td>
        <td> (195 KB/s)</td>
        <td>0:35:42</td>
        <td>423143342</td>
        <td>413225.9</td>
        <td>192.92</td>
    </tr>
    <tr>
        <td>1:49:00</td>
        <td>Cnaes.zip</td>
        <td>1:49:23</td>
        <td> (73.6 KB/s)</td>
        <td>0:00:23</td>
        <td>22078</td>
        <td>21.6</td>
        <td>0.94</td>
    </tr>
    <tr>
        <td>1:49:00</td>
        <td>Empresas2.zip</td>
        <td>1:57:00</td>
        <td> (169 KB/s)</td>
        <td>0:08:00</td>
        <td>79059770</td>
        <td>77206.8</td>
        <td>160.85</td>
    </tr>
    <tr>
        <td>1:49:00</td>
        <td>Empresas1.zip</td>
        <td>1:56:12</td>
        <td> (185 KB/s)</td>
        <td>0:07:12</td>
        <td>77820683</td>
        <td>75996.8</td>
        <td>175.92</td>
    </tr>
    <tr>
        <td>1:49:00</td>
        <td>Empresas3.zip</td>
        <td>1:58:20</td>
        <td> (155 KB/s)</td>
        <td>0:09:20</td>
        <td>85136574</td>
        <td>83141.2</td>
        <td>148.47</td>
    </tr>
    <tr>
        <td>1:49:00</td>
        <td>Empresas4.zip</td>
        <td>1:56:18</td>
        <td> (212 KB/s)</td>
        <td>0:07:18</td>
        <td>90354194</td>
        <td>88236.5</td>
        <td>201.45</td>
    </tr>
    <tr>
        <td>1:49:00</td>
        <td>Empresas5.zip</td>
        <td>1:58:14</td>
        <td> (179 KB/s)</td>
        <td>0:09:14</td>
        <td>97538543</td>
        <td>95252.5</td>
        <td>171.94</td>
    </tr>
    <tr>
        <td>1:49:00</td>
        <td>Empresas6.zip</td>
        <td>1:56:50</td>
        <td> (206 KB/s)</td>
        <td>0:07:50</td>
        <td>94506460</td>
        <td>92291.5</td>
        <td>196.36</td>
    </tr>
    <tr>
        <td>1:49:00</td>
        <td>Empresas7.zip</td>
        <td>1:58:03</td>
        <td> (186 KB/s)</td>
        <td>0:09:03</td>
        <td>99129477</td>
        <td>96806.1</td>
        <td>178.28</td>
    </tr>
    <tr>
        <td>1:56:00</td>
        <td>Empresas8.zip</td>
        <td>2:04:45</td>
        <td> (189 KB/s)</td>
        <td>0:08:45</td>
        <td>99321249</td>
        <td>96993.4</td>
        <td>184.75</td>
    </tr>
    <tr>
        <td>1:56:00</td>
        <td>Empresas9.zip</td>
        <td>2:04:21</td>
        <td> (192 KB/s)</td>
        <td>0:08:21</td>
        <td>94865385</td>
        <td>92642.0</td>
        <td>184.91</td>
    </tr>
    <tr>
        <td>1:56:00</td>
        <td>Estabelecimentos0.zip</td>
        <td>5:56:32</td>
        <td> (113 KB/s)</td>
        <td>4:00:32</td>
        <td>1662216578</td>
        <td>1623258.4</td>
        <td>112.48</td>
    </tr>
    <tr>
        <td>1:57:00</td>
        <td>Estabelecimentos1.zip</td>
        <td>2:26:44</td>
        <td> (185 KB/s)</td>
        <td>0:29:44</td>
        <td>338182671</td>
        <td>330256.5</td>
        <td>185.12</td>
    </tr>
    <tr>
        <td>1:58:00</td>
        <td>Estabelecimentos2.zip</td>
        <td>2:26:14</td>
        <td> (194 KB/s)</td>
        <td>0:28:14</td>
        <td>334782054</td>
        <td>326935.6</td>
        <td>193.00</td>
    </tr>
    <tr>
        <td>1:58:00</td>
        <td>Estabelecimentos3.zip</td>
        <td>2:28:34</td>
        <td> (184 KB/s)</td>
        <td>0:30:34</td>
        <td>343259798</td>
        <td>335214.6</td>
        <td>182.78</td>
    </tr>
    <tr>
        <td>1:58:00</td>
        <td>Estabelecimentos4.zip</td>
        <td>2:31:20</td>
        <td> (178 KB/s)</td>
        <td>0:33:20</td>
        <td>360974709</td>
        <td>352514.4</td>
        <td>176.26</td>
    </tr>
    <tr>
        <td>2:04:00</td>
        <td>Estabelecimentos5.zip</td>
        <td>2:33:29</td>
        <td> (188 KB/s)</td>
        <td>0:29:29</td>
        <td>335757075</td>
        <td>327887.8</td>
        <td>185.35</td>
    </tr>
    <tr>
        <td>2:04:00</td>
        <td>Estabelecimentos6.zip</td>
        <td>2:38:29</td>
        <td> (161 KB/s)</td>
        <td>0:34:29</td>
        <td>334029218</td>
        <td>326200.4</td>
        <td>157.66</td>
    </tr>
    <tr>
        <td>2:24:00</td>
        <td>Estabelecimentos7.zip</td>
        <td>2:56:31</td>
        <td> (183 KB/s)</td>
        <td>0:32:31</td>
        <td>357453817</td>
        <td>349076.0</td>
        <td>178.92</td>
    </tr>
    <tr>
        <td>2:26:00</td>
        <td>Estabelecimentos8.zip</td>
        <td>3:04:38</td>
        <td> (147 KB/s)</td>
        <td>0:38:38</td>
        <td>346787532</td>
        <td>338659.7</td>
        <td>146.10</td>
    </tr>
    <tr>
        <td>2:26:00</td>
        <td>Estabelecimentos9.zip</td>
        <td>2:55:52</td>
        <td> (187 KB/s)</td>
        <td>0:29:52</td>
        <td>334294276</td>
        <td>326459.3</td>
        <td>182.18</td>
    </tr>
    <tr>
        <td>2:28:00</td>
        <td>Motivos.zip</td>
        <td>2:28:35</td>
        <td> (540 MB/s)</td>
        <td>0:00:35</td>
        <td>1133</td>
        <td>1.1</td>
        <td>0.03</td>
    </tr>
    <tr>
        <td>2:28:00</td>
        <td>Municipios.zip</td>
        <td>2:28:36</td>
        <td> (92.5 MB/s)</td>
        <td>0:00:36</td>
        <td>43444</td>
        <td>42.4</td>
        <td>1.18</td>
    </tr>
    <tr>
        <td>2:28:00</td>
        <td>Naturezas.zip</td>
        <td>2:28:37</td>
        <td> (161 MB/s)</td>
        <td>0:00:37</td>
        <td>1523</td>
        <td>1.5</td>
        <td>0.04</td>
    </tr>
    <tr>
        <td>2:28:00</td>
        <td>Paises.zip</td>
        <td>2:28:38</td>
        <td> (654 MB/s)</td>
        <td>0:00:38</td>
        <td>2745</td>
        <td>2.7</td>
        <td>0.07</td>
    </tr>
    <tr>
        <td>2:28:00</td>
        <td>Qualificacoes.zip</td>
        <td>2:28:39</td>
        <td> (234 MB/s)</td>
        <td>0:00:39</td>
        <td>980</td>
        <td>1.0</td>
        <td>0.02</td>
    </tr>
    <tr>
        <td>2:28:00</td>
        <td>Simples.zip</td>
        <td>2:52:30</td>
        <td> (175 KB/s)</td>
        <td>0:24:30</td>
        <td>256683787</td>
        <td>250667.8</td>
        <td>170.52</td>
    </tr>
    <tr>
        <td>2:31:00</td>
        <td>Socios0.zip</td>
        <td>2:44:33</td>
        <td> (226 KB/s)</td>
        <td>0:13:33</td>
        <td>183171943</td>
        <td>178878.9</td>
        <td>220.02</td>
    </tr>
    <tr>
        <td>2:33:00</td>
        <td>Socios1.zip</td>
        <td>2:35:58</td>
        <td> (326 KB/s)</td>
        <td>0:02:58</td>
        <td>49527458</td>
        <td>48366.7</td>
        <td>271.72</td>
    </tr>
    <tr>
        <td>2:35:00</td>
        <td>Socios2.zip</td>
        <td>2:37:19</td>
        <td> (604 KB/s)</td>
        <td>0:02:19</td>
        <td>49129154</td>
        <td>47977.7</td>
        <td>345.16</td>
    </tr>
    <tr>
        <td>2:37:00</td>
        <td>Socios3.zip</td>
        <td>2:38:46</td>
        <td> (559 KB/s)</td>
        <td>0:01:46</td>
        <td>49302694</td>
        <td>48147.2</td>
        <td>454.22</td>
    </tr>
    <tr>
        <td>2:38:00</td>
        <td>Socios4.zip</td>
        <td>2:40:44</td>
        <td> (357 KB/s)</td>
        <td>0:02:44</td>
        <td>48974348</td>
        <td>47826.5</td>
        <td>291.63</td>
    </tr>
    <tr>
        <td>2:38:00</td>
        <td>Socios5.zip</td>
        <td>2:41:36</td>
        <td> (285 KB/s)</td>
        <td>0:03:36</td>
        <td>49405570</td>
        <td>48247.6</td>
        <td>223.37</td>
    </tr>
    <tr>
        <td>2:40:00</td>
        <td>Socios6.zip</td>
        <td>2:42:05</td>
        <td> (607 KB/s)</td>
        <td>0:02:05</td>
        <td>49351197</td>
        <td>48194.5</td>
        <td>385.56</td>
    </tr>
    <tr>
        <td>2:41:00</td>
        <td>Socios7.zip</td>
        <td>2:45:50</td>
        <td> (189 KB/s)</td>
        <td>0:04:50</td>
        <td>49118060</td>
        <td>47966.9</td>
        <td>165.40</td>
    </tr>
    <tr>
        <td>2:42:00</td>
        <td>Socios8.zip</td>
        <td>2:43:25</td>
        <td> (606 KB/s)</td>
        <td>0:01:25</td>
        <td>49222292</td>
        <td>48068.6</td>
        <td>565.51</td>
    </tr>
    <tr>
        <td>2:43:00</td>
        <td>Socios9.zip</td>
        <td>2:44:45</td>
        <td> (607 KB/s)</td>
        <td>0:01:45</td>
        <td>49049043</td>
        <td>47899.5</td>
        <td>456.19</td>
    </tr>
</table>

Esta información se plasma aquí para dar un criterio más justo sobre la estrategia de descarga, así puede compararse, y considerarse como un factor, el ancho de banda real que se consiguio por las limitaciones tecnológicas y de servicio de internet. Se puede mencionar que, sin considerar al archivo Estabelecimentos0.zip que cuenta con 1.66 GB, el tiempo máximo que tardaron el resto de archivos fue de 38 minutos con un tiempo promedio de descarga de 13 minutos. 

Si consideramos que el tiempo que tardó en descargarse cada uno de estos archivos se aproxima al tiempo real que se hubiera tomado realizando de manera secuencial y manual la descarga, estaríamos hablando de cerca de 8 horas de espera (7:42:35 hrs), sin tomar en cuenta el archivo Estabelecimentos0.zip, lo cual muestra la eficiencia del paralelismo en estos casos.

Como comentario adicional, véase que tardó más la descarga de Estabelecimentos8.zip que Empresas0.zip a pesar de que el primero es 76.3 MB más pequeño. Esto muestra que las fluctuaciones de internet pudieron afectar a este proceso.

### Extracción de archivos csv

Para este punto, se volvió a utilizar una función de bash, `unzip`, para extraer y organizar los archivos almacenados en los distintos ficheros con extensión zip. 

La dinámica en este caso se pueden resumir en los siguientes pasos ejecutados para cada archivo:

1. Extraer el archivo comprimido en una carpeta temporal
2. Renombrar el archivo resultante por un nombre adecuado y colocarlo una ubicación de fácil acceso
3. Eliminar la carpeta temporal

In [None]:
# La siguiente función realizará los pasos mencionados

def unzip_file(zip_file: str, input_dir: str, output_dir: str, new_name: str) -> None:
    """Extracción y renombramiento de archivos comprimidos con un nombre indicado.

    Args:
        zip_file (str): Archivo a descomprimir.
        input_dir (str): Directorio en donde se ubican los archivos comprimidos.
        output_dir (str): Directorio en donde se guardarán los archivos con extensión csv.
        new_name (str): Nombre con el que se guardará el archivo.
    """
    # Por temas de identificación del archivo extraido, se guarda en un folder temporal
    temp_dir = "temp_dir/"
    # Descrompresión en la carpeta temporal
    subprocess.run(["unzip", "-q", os.path.join(input_dir, zip_file), "-d", temp_dir], check=True)
    # Solo hay un archivo en dicho folder, el extraído anteriormente
    extracted_file = os.listdir("temp_dir/")[0]  
    # Cambiamos la extensión a csv y lo movemos a la carpeta destino
    os.rename(os.path.join(temp_dir, extracted_file), os.path.join(output_dir, new_name))
    # Se elimina el directorio temporal
    os.rmdir(temp_dir)

# Creación de una lista de archivos ZIP
zip_files = os.listdir("data/ZIP files/")
# Auxiliar de nombres para el uso de la función unzip_parallel
new_names = {zip_name: name for zip_name, name in zip(zip_files, ["{0}.csv".format(file) for file in [z[:-4] for z in zip_files]])}

# Llamar a la función para descomprimir y renombrar los archivos en paralelo
for zip_name, csv_name in new_names.items():
    unzip_file(zip_name, "data/ZIP files/", "data/", csv_name)

La descompresión de un archivo no tiene comparación, en tiempo de ejecución a la descarga de información previa, pero con el código anterior se automatiza este proceso. Como último proceso de verificación, se evalua si se tienen la misma cantidad de archivos comprimidos y descomprimidos, así como la cantidad de enlaces, lo cual indicaría que se realizó con éxito la etapa de extracción de información.

In [77]:
# Utilizamos set() para eliminar duplicados
n_zip_files = len(list(set(os.listdir("data/ZIP files/"))))
n_csv_files = len(list(set(list(filter(lambda x: bool(re.match(".+csv$", x)), os.listdir("data"))))))
n_links = len(data_links)

if (n_zip_files == n_csv_files) & (n_csv_files == n_links):
    print("Extracción de datos exitosa")
else:
    print("Extracción de datos fallida")

Extracción de datos exitosa


Esto concluye con nuestro proceso de extracción de información.

<center>
<img src="Images/CSV_Files.png" width=500 />
</center>

## Transformación de información

Para decidir que sistema de base de datos considerar (más alla del software, decidir si será una estructura relacional o no relacional) y como parte de esta etapa en un ETL, se analizarón muestras de los datos obtenidos realizando las siguientes actividades:

+ Conteos para tener un panorama general de toda la información
+ Correciones tipográficas y unificación de formatos
+ Modificación o creación de diseño relacional

Referente al último punto, si se muestra un comportamiento que lleve a un modelo relacional de datos, se modificarón las tablas para cumplir alguna forma normal y garantizar la mayoría de los principios del diseño relacional 

Es importante mencionar que en este punto, la cantidad de información que se tiene hace que la limpieza sea más eficiente si se realiza desde una base de datos y que las muestras que se toman en esta etapa buscarán solo dar mayor contexto y un poco de idenficación en problemas de texto, así como en definir los tipos de datos en el modelo relacional

### Carga de muestras y unificación de tablas

Se van a unficar pequeñas muestras de cada archivo relacionado a un tabla para tener un vistazo general de toda la estructura de la información por tabla. Cabe mencionar que algunas tablas, por su tamaño no ameritan una unificación o toma de muestras, por lo que se cargará la información completa

Se comienza cargando los conjuntos de datos que no necesitan un tratamiento de unificación

In [300]:
# Debido al idioma de la información, se utiliza latin-1 para que no existan problemas de carga
# Por otra parte, tal como mencionan los metadatos, se utiliza el separador ';'
# Ya que no se tienen encabezados en los archivos, se colocará un header genérico pero descriptivo a cada catálogo
# Además, se enfatiza en no convertir los datos a un tipo específico para no perder información (dtype=str)

catalogues_header = ["Code", "Description"]
Cnaes = pd.read_csv("data/Cnaes.csv", encoding='latin-1',  sep=';', header=None, names=catalogues_header, dtype=str)
Motivos = pd.read_csv("data/Motivos.csv", encoding='latin-1',  sep=';', header=None, names=catalogues_header, dtype=str)
Municipios = pd.read_csv("data/Municipios.csv", encoding='latin-1',  sep=';', header=None, names=catalogues_header, dtype=str)
Naturezas = pd.read_csv("data/Naturezas.csv", encoding='latin-1',  sep=';', header=None, names=catalogues_header, dtype=str)
Paises = pd.read_csv("data/Paises.csv", encoding='latin-1',  sep=';', header=None, names=catalogues_header, dtype=str)
Qualificacoes = pd.read_csv("data/Qualificacoes.csv", encoding='latin-1',  sep=';', header=None, names=catalogues_header, dtype=str)

In [186]:
print(Cnaes.shape)
print(Motivos.shape)
print(Municipios.shape)
print(Naturezas.shape)
print(Paises.shape)
print(Qualificacoes.shape)

(1359, 2)
(61, 2)
(5572, 2)
(90, 2)
(255, 2)
(68, 2)


Ahora los datos segregados. Para esta sección cargaremos lotes de registros de cada grupo de tablas para posteriormente obtener una muestra de cada lote en cada segregación y unir dichas muestras, para obtener un panorama general de todas las necesidades de limpieza. Véase en la siguiente función el uso del parámetro `chunksize` que carga por lotes garantizando que no se ocupe toda la memoria.

In [176]:
#Creamos un generador que nos permitirá iterar sin cargar todo en memoria sobre los data sets y evitar el problema de big data
def read_large_file(file_path: str, chunk_size: int, colnames: list):
    """Generador para cargar por lotes un archivo csv

    Args:
        file_path (str): Ruta del archivo
        chunk_size (int): Número de registros a considerar por lote
        colnames (list): Nombres de las columnas

    Yields:
        pd.DataFrame: DataFrame con n (chunk_size) registros
    """
    for chunk in pd.read_csv(file_path, chunksize=chunk_size, encoding='latin-1',  sep=';', names=colnames, header=None, dtype=str):
        yield chunk

In [168]:
def sampling_a_big_file(name: str, col_names: list, input_dir: str, size_sample: int = 50, chunk_size: int = 10000) -> pd.DataFrame:
    """Función de muestro en un solo archivo de tamaño considerable.

    Args:
        name (str): Nombre del archivo.
        col_names (list): Nombres de las columnas
        input_dir (str):  Directorio en donde se ubica el archivo 
        size_sample (int, optional): Tamaño de la muestra. Defaults to 50.
        chunk_size (int, optional): Número de registros a considerar por lote. Defaults to 10000.

    Returns:
        pd.DataFrame: Muestra de el archivo de ingesta
    """
    # En resumen, para cada segmento de el archivo de ingesta, se obtiene una muestra y se va uniendo iterativamente las muestras
    # en un solo data frame. 
    return(
        reduce(
            lambda data1, data2: pd.concat([data1, data2], ignore_index=True, axis=0), 
            map(lambda batch_data: batch_data.sample(n=size_sample, random_state=9), read_large_file(os.path.join(input_dir, name), chunk_size, col_names)))
        )

In [169]:
def sampling_some_big_files(list_names: list, col_names: list, input_dir: str, size_sample: int = 50, chunk_size: int = 10000) -> pd.DataFrame:
    """Función de muestre de múltiples archivos de tamaño considerable

    Args:
        list_names (list): Lista de nombres de archivos.
        col_names (list): Nombres de las columnas
        input_dir (str):  Directorio en donde se ubica el archivo 
        size_sample (int, optional): Tamaño de la muestra. Defaults to 50.
        chunk_size (int, optional): Número de registros a considerar por lote. Defaults to 10000.

    Returns:
        pd.DataFrame: Union de muestras de diferentes archivos de ingesta
    """
    # Esta función aplica de manera iterativa la función sampling_a_big_file() con el propósito de que todos los archivos
    # relacionados a una tabla (las particiones) sea muestradas y después unificadas para lograr una muestra generan de toda
    # la tabla.
    return(
        reduce(
            lambda x, y: pd.concat([x, y], ignore_index=True, axis=0), 
            map(
                lambda name: sampling_a_big_file(name, col_names, input_dir, size_sample, chunk_size), 
                list_names
                ))
        )

Previo a ejecutar estas funciones, veamos la magnitud de los datos que estamos tratando, para se define la siguiente función que solo lee el archivo pero no lo carga a memoria evitando utilizar recursos de manera innecesaria. Esto se consigue con la paquetería `csv`

In [17]:
def count_lines_vars(path: str, header: bool = False) -> int:
    """Función para contar filas de un archivo sin necesidad de cargarlo a memoria

    Args:
        path (str): Ruta del archivo
        header (bool, optional): Indicador de encabezado. Defaults to False.

    Returns:
        int: Número de registros en el archivo
    """
    # Con lo siguiente se cuenta el número de renglones 
    with open(path, "r", encoding="latin-1") as f:
        # Básicamente, se recorre cada renglón y por cada renglón se va registrando un 1 que se va sumando iterativamente
        total = sum(1 for _ in f)
    n_rows = total - 1 if header else total
    # Con lo siguiente se cuenta el número de columnas
    count_columns = set()
    with open(path, encoding="latin-1") as f:
        reader = csv.reader(f, delimiter=';')
        for row in reader:
            count_columns.add(len(row))
    # Se devuelve el número de renglones y de columnas
    return n_rows, count_columns

In [15]:
def count_lines_vars_allTable(list_path: list) -> None:
    """Función para mostrar el número de registros por archivo, y el total de registros de los documentos indicados en list_path

    Args:
        list_path (list): Lista de rutas
    """
    total_rows = 0
    for file in [os.path.join("data/", path) for path in list_path]:    
        current_n_rows, number_columns = count_lines_vars(file)
        print(f"{file}: Registros: {current_n_rows:,}, Columnas de tamaño {number_columns}")
        total_rows += current_n_rows
    print(f"Total de columnas = {total_rows:,}")

In [152]:
Empresas_files = sorted(list(filter(lambda x: bool(re.match(r"Empresas\d\.csv$", x)), os.listdir("data"))))
count_lines_vars_allTable(Empresas_files)

data/Empresas0.csv: Registros: 22,018,101, Columnas de tamaño {7}
data/Empresas1.csv: Registros: 4,494,860, Columnas de tamaño {7}
data/Empresas2.csv: Registros: 4,494,860, Columnas de tamaño {7}
data/Empresas3.csv: Registros: 4,494,860, Columnas de tamaño {7}
data/Empresas4.csv: Registros: 4,494,860, Columnas de tamaño {7}
data/Empresas5.csv: Registros: 4,494,860, Columnas de tamaño {7}
data/Empresas6.csv: Registros: 4,494,860, Columnas de tamaño {7}
data/Empresas7.csv: Registros: 4,494,860, Columnas de tamaño {7}
data/Empresas8.csv: Registros: 4,494,860, Columnas de tamaño {7}
data/Empresas9.csv: Registros: 4,494,860, Columnas de tamaño {7}
Total de columnas = 62,471,841


In [153]:
Estabelecimentos_files = sorted(list(filter(lambda x: bool(re.match(r"Estabelecimentos\d\.csv$", x)), os.listdir("data"))))
count_lines_vars_allTable(Estabelecimentos_files)

data/Estabelecimentos0.csv: Registros: 22,784,095, Columnas de tamaño {30}
data/Estabelecimentos1.csv: Registros: 4,753,435, Columnas de tamaño {30}
data/Estabelecimentos2.csv: Registros: 4,753,435, Columnas de tamaño {30}
data/Estabelecimentos3.csv: Registros: 4,753,435, Columnas de tamaño {30}
data/Estabelecimentos4.csv: Registros: 4,753,435, Columnas de tamaño {30}
data/Estabelecimentos5.csv: Registros: 4,753,436, Columnas de tamaño {30}
data/Estabelecimentos6.csv: Registros: 4,753,435, Columnas de tamaño {30}
data/Estabelecimentos7.csv: Registros: 4,753,436, Columnas de tamaño {30}
data/Estabelecimentos8.csv: Registros: 4,753,435, Columnas de tamaño {30}
data/Estabelecimentos9.csv: Registros: 4,753,435, Columnas de tamaño {30}
Total de columnas = 65,565,012


In [18]:
Socios_files = sorted(list(filter(lambda x: bool(re.match(r"Socios\d\.csv$", x)), os.listdir("data"))))
count_lines_vars_allTable(Socios_files)

data/Socios0.csv: Registros: 7,512,423, Columnas de tamaño {11}
data/Socios1.csv: Registros: 2,019,150, Columnas de tamaño {11}
data/Socios2.csv: Registros: 2,019,150, Columnas de tamaño {11}
data/Socios3.csv: Registros: 2,019,150, Columnas de tamaño {11}
data/Socios4.csv: Registros: 2,019,150, Columnas de tamaño {11}
data/Socios5.csv: Registros: 2,019,150, Columnas de tamaño {11}
data/Socios6.csv: Registros: 2,019,150, Columnas de tamaño {11}
data/Socios7.csv: Registros: 2,019,150, Columnas de tamaño {11}
data/Socios8.csv: Registros: 2,019,150, Columnas de tamaño {11}
data/Socios9.csv: Registros: 2,019,150, Columnas de tamaño {11}
Total de columnas = 25,684,773


En resumen, tenemos 62,471,841 registros de empresas, asociados a 65,565,012 de establecimientos y 25,684,773 de socios. Un punto a favor es que todos conjuntos de datos tienen un tamaño bien definido (mismo número de columnas/variables).

Con esta información podemos dar una cantidad más inteligente de registros en los lotes para obtener las muestras. Esta será la cantidad de renglones por lote para cada tabla

+ Empresas: 62,471,841*0.0001 = 62,472
+ Estabelecimentos: 65,565,012*0.0001 = 65,565
+ Socios: 25,684,773*0.0001 = 25,685

Es relevante mencionar que todos los nombres de las columnas fueron derivados de la metadata proporcionada

In [177]:
columns_Empresas = ["CNPJ_BASICO", "RAZON_SOCIAL/NOMBRE_EMPRESA", "NATURALEZA_JURIDICA", "CUALIFICACIÓN_RESPONSABLE", "CAPITAL_SOCIAL_EMPRESA", "CÓDIGO_TAMAÑO_EMPRESA", "ENTIDAD_FEDERATIVA_RESPONSABLE"]
sampling_Empresas = sampling_some_big_files(Empresas_files, columns_Empresas, "data", size_sample = 100, chunk_size = 62472)
sampling_Empresas.shape

(100100, 7)

In [None]:
columns_Estabelecimentos = ["CNPJ_BASICO", "ORDEN_CNPJ", "CNPJ_DV", "ID_MATRIZ/FILIAL", "NOMBRE_COMERCIAL", "COD_ESTADO_REGISTRO", "FECHA_ESTADO_REGISTRO", "MOTIVO_ESTADO_REGISTRO", "NOMBRE_CIUDAD_EXTRANJERO", "COD_PAIS", "INICIO_ACTIVIDAD", "CNAE_FISCAL_PRINCIPAL", "CNAE_FISCAL_SECUNDARIO", "TIPO_CALLE", "CALLE", "NÚMERO", "COMPLEMENTO", "VECINDARIO", "COD_POSTAL", "UF", "MUNICIPIO", "DDD_1", "TELÉFONO_1", "DDD_2", "TELÉFONO_2", "DDD_DO_FAX", "FAX", "EMAIL", "SITUACION_ESPECIAL", "FECHA_SITUACION_ESPECIAL"]
sampling_Estabelecimentos = sampling_some_big_files(Estabelecimentos_files, columns_Estabelecimentos, "data", size_sample = 100, chunk_size = 65565)
sampling_Estabelecimentos.shape

In [181]:
columns_Socios = ["CNPJ_BASICO", "COD_IDENTIFICACION_MIEMBRO", "NOMBRE_SOCIO/RAZON_SOCIAL", "CNPJ/CPF_SOCIO", "CALIFICACION_MIEMBROS", "FECHA_ENTRADASOCIEDAD", "PAIS", "REPRESENTANTE_LEGAL", "NOMBRE_REPRESENTANTE", "CALIF_REPRESENTANTE_LEGAL", "GRUPO_EDAD"]
sampling_Socios = sampling_some_big_files(Socios_files, columns_Socios, "data", size_sample = 100, chunk_size = 25685)
sampling_Socios.shape

(100400, 11)

Finalmente el caso de Simples

In [182]:
count_lines_vars("data/Simples.csv")

(43165479, {7})

In [None]:
columns_Simples = ["CNPJ_BASICO", "OPCION_SIMPLES", "FECHA_OPCION_SIMPLES", "FECHA_EXCLUSION_SIMPLES", "OPCION_MEI", "FECHA_OPCION_MEI", "FECHA_EXCLUSION_MEI"]
sampling_Simples = sampling_a_big_file("Simples.csv", columns_Simples, "data", size_sample = 100, chunk_size = 43165)
sampling_Simples.shape

(100100, 7)

### Cantidad de información

La siguiente tabla resume y condensa las diferentes cifras obtenidas en proceso anteriores

<table>
    <tr>
        <td>Tabla</td>
        <td>Cantidad de registros</td>
        <td>Cantidad variables</td>
        <td>Particiones</td>
        <td>Tamaño de muestra</td>
    </tr>
    <tr>
        <td>Empresas</td>
        <td>62,471,841</td>
        <td>7</td>
        <td>10</td>
        <td>100,100</td>
    </tr>
    <tr>
        <td>Estabelecimentos</td>
        <td>65,565,012</td>
        <td>30</td>
        <td>10</td>
        <td>100,500</td>
    </tr>
    <tr>
        <td>Socios</td>
        <td>25,684,773</td>
        <td>11</td>
        <td>10</td>
        <td>100,400</td>
    </tr>
    <tr>
        <td>Simples</td>
        <td>43,165,479</td>
        <td>7</td>
        <td>1</td>
        <td>100,100</td>
    </tr>
    <tr>
        <td>Cnaes</td>
        <td>1,359</td>
        <td>2</td>
        <td>1</td>
        <td>N/A</td>
    </tr>
    <tr>
        <td>Motivos</td>
        <td>61</td>
        <td>2</td>
        <td>1</td>
        <td>N/A</td>
    </tr>
    <tr>
        <td>Municipios</td>
        <td>5,572</td>
        <td>2</td>
        <td>1</td>
        <td>N/A</td>
    </tr>
    <tr>
        <td>Naturezas</td>
        <td>90</td>
        <td>2</td>
        <td>1</td>
        <td>N/A</td>
    </tr>
    <tr>
        <td>Paises</td>
        <td>255</td>
        <td>2</td>
        <td>1</td>
        <td>N/A</td>
    </tr>
    <tr>
        <td>Qualificacoes </td>
        <td>68</td>
        <td>2</td>
        <td>1</td>
        <td>N/A</td>
    </tr>
</table>

### Identificación de limpieza en tablas Lookup

Los catálogos, al haberse cargado por completo por su corto tamaño, pueden ser sujetos a la limpieza desde este punto previo a su carga

In [None]:
# Identificación de valores perdidos
print(Cnaes.isna().sum())
print(Motivos.isna().sum())
print(Municipios.isna().sum())
print(Naturezas.isna().sum())
print(Paises.isna().sum())
print(Qualificacoes.isna().sum())

Code           0
Description    0
dtype: int64
Code           0
Description    0
dtype: int64
Code           0
Description    0
dtype: int64
Code           0
Description    0
dtype: int64
Code           0
Description    0
dtype: int64
Code           0
Description    0
dtype: int64


No se tienen valores perdidos en los catálogos, por lo tanto tampoco se tienen variables nulas

In [None]:
# Identificación de valores repetidos el código
print((Cnaes["Code"].value_counts() == Motivos.shape[0]).sum())
print((Motivos["Code"].value_counts() == Motivos.shape[0]).sum())
print((Municipios["Code"].value_counts() == Municipios.shape[0]).sum())
print((Naturezas["Code"].value_counts() == Naturezas.shape[0]).sum())
print((Paises["Code"].value_counts() == Paises.shape[0]).sum())
print((Qualificacoes["Code"].value_counts() == Qualificacoes.shape[0]).sum())

0
0
0
0
0


Todos los códigos son únicos en los catálogos

In [None]:
# Capitalización de descripciones
Cnaes["Description"] = Cnaes["Description"].str.upper()
Motivos["Description"] = Motivos["Description"].str.upper()
Municipios["Description"] = Municipios["Description"].str.upper()
Naturezas["Description"] = Naturezas["Description"].str.upper()
Paises["Description"] = Paises["Description"].str.upper()
Qualificacoes["Description"] = Qualificacoes["Description"].str.upper()

Se modificaron aquellas descripciones que estaban en minúsculas a mayúsculas para tener una unificación de formato entre los catálogos

In [None]:
# Identificación de valores repetidos la descripción
print((Cnaes["Description"].value_counts() > 1).sum())
print((Motivos["Description"].value_counts() > 1).sum())
print((Municipios["Description"].value_counts() > 1).sum())
print((Naturezas["Description"].value_counts() > 1).sum())
print((Paises["Description"].value_counts() > 1).sum())
print((Qualificacoes["Description"].value_counts() > 1).sum())

0
241
1
1
0


En el caso de unicidad de valores en los catálogos se tienen los siguientes comentarios:

+ En Paises y Qualificacoes se tienen repetidas las categorías excluyentes, es decir que no se informó la naturaleza jurídica, para Qualificacoes, y el código cuando el país no se declaro. En estos casos se va tomar el primer código y, posteriormente en base de datos llevaremos los códigos necesarios al código único

+ En el caso de los municipios, se tienen varios registros de municipios con el mismo nombre pero con diferentes códigos, lo cual no es un caso erróneo ya que no existe alguna prohibición legal en Brasil que lo prohiba. Por lo que tener código distinto podría implicar municipios diferentes con el mismo nombre.

In [None]:
count_Naturezas = Naturezas.merge(Naturezas["Description"].value_counts().reset_index())
count_Paises = Paises.merge(Paises["Description"].value_counts().reset_index())
count_Municipios = Municipios.merge(Municipios["Description"].value_counts().reset_index())

In [248]:
count_Naturezas[count_Naturezas["count"] > 1]

Unnamed: 0,Code,Description,count
0,0,NATUREZA JURÍDICA NÃO INFORMADA,2
1,8885,NATUREZA JURÍDICA NÃO INFORMADA,2


In [None]:
[count_Paises["count"] > 1]

Unnamed: 0,Code,Description,count
252,997,NAO DECLARADOS,2
253,999,NAO DECLARADOS,2


### Identificación de limpieza en el resto de tablas

In [256]:
sampling_Empresas.describe()

Unnamed: 0,CNPJ_BASICO,RAZON_SOCIAL/NOMBRE_EMPRESA,NATURALEZA_JURIDICA,CUALIFICACIÓN_RESPONSABLE,CAPITAL_SOCIAL_EMPRESA,CÓDIGO_TAMAÑO_EMPRESA,ENTIDAD_FEDERATIVA_RESPONSABLE
count,100100,100100,100100,100100,100100,99988,114
unique,100100,100023,54,24,1489,3,109
top,41284803,DIRETORIO MUNICIPAL DO PARTIDO DA FRENTE LIBERAL,2135,50,0,1,UNIÃO
freq,1,4,64014,63991,27866,73979,6


Aquí se puede observar que hay algunas variables, como la razón social, que podrían parecer que son únicas, pero, al menos en esta muestra representativa, cada caso tiene algún valor diferente en otra columna, por lo que se consideraremos que no existe redundancia

In [273]:
sampling_Empresas_auxiliar = sampling_Empresas.merge(sampling_Empresas[["RAZON_SOCIAL/NOMBRE_EMPRESA"]].value_counts().reset_index())
sampling_Empresas_auxiliar[sampling_Empresas_auxiliar["count"] >1]

Unnamed: 0,CNPJ_BASICO,RAZON_SOCIAL/NOMBRE_EMPRESA,NATURALEZA_JURIDICA,CUALIFICACIÓN_RESPONSABLE,CAPITAL_SOCIAL_EMPRESA,CÓDIGO_TAMAÑO_EMPRESA,ENTIDAD_FEDERATIVA_RESPONSABLE,count
3577,43550761,ANTONIO FABIO DE OLIVEIRA MAIA 75697483287,2135,50,3000000,01,,2
3578,18901881,ANTONIO FABIO DE OLIVEIRA MAIA 75697483287,2135,50,500000,01,,2
3735,43699904,FUNDO MUNICIPAL DOS DIREITOS DA CRIANCA E DO A...,1333,05,000,05,ITACAMBIRA - MG,4
3736,22812384,FUNDO MUNICIPAL DOS DIREITOS DA CRIANCA E DO A...,1333,05,000,05,SALES OLIVEIRA - SP,4
3737,28760819,FUNDO MUNICIPAL DOS DIREITOS DA CRIANCA E DO A...,1333,05,000,05,SAO PEDRO DA ALDEIA - RJ,4
...,...,...,...,...,...,...,...,...
67484,24128568,IGREJA EVANGELICA ASSEMBLEIA DE DEUS,3220,16,000,05,,2
73221,24167265,GLEISY LUCIA DE ARAUJO LUIZ SILVA 02571746723,2135,50,5000,01,,2
73222,26057346,GLEISY LUCIA DE ARAUJO LUIZ SILVA 02571746723,2135,50,10000,01,,2
73782,24506084,SOLIDARIEDADE,3271,16,000,05,,2


Nuevamente, no parece haber casos, en esta muestra representativa, de variables totalmente vacías o unarias

In [281]:
sampling_Estabelecimentos.describe()

Unnamed: 0,CNPJ_BASICO,ORDEN_CNPJ,CNPJ_DV,ID_MATRIZ/FILIAL,NOMBRE_COMERCIAL,COD_ESTADO_REGISTRO,FECHA_ESTADO_REGISTRO,MOTIVO_ESTADO_REGISTRO,NOMBRE_CIUDAD_EXTRANJERO,COD_PAIS,INICIO_ACTIVIDAD,CNAE_FISCAL_PRINCIPAL,CNAE_FISCAL_SECUNDARIO,TIPO_CALLE,CALLE,NÚMERO,COMPLEMENTO,VECINDARIO,COD_POSTAL,UF,MUNICIPIO,DDD_1,TELÉFONO_1,DDD_2,TELÉFONO_2,DDD_DO_FAX,FAX,EMAIL,SITUACION_ESPECIAL,FECHA_SITUACION_ESPECIAL
count,100500,100500,100500,100500,36722,100500,100500,100500,69,2317,100500,100500,46304,99148,100496,100497,41618,99950,100252,100500,100500,81359,81357,8337,8326,11678,11687,66689,40,40
unique,100141,473,100,2,35929,5,9560,38,52,40,14011,1078,22949,214,60142,7773,18664,22179,61454,28,4984,203,74662,77,6281,165,8049,65109,5,40
top,62955505,1,0,1,CRUZADA NACIONAL DE EVANGELIZACAO,8,20081231,0,DOVER,105,20160815,4781400,94936009499500,RUA,BRASIL,S/N,CASA,CENTRO,8210040,SP,7107,11,0,0,0,0,0,MEUCNPJ@CONTABILIZEI.COM.BR,ESPOLIO EV 407,20160411
freq,21,95705,3276,95712,20,46268,6166,39533,5,2065,283,5538,819,66000,356,4778,4746,18601,207,29123,8989,12029,3948,1771,1773,2999,2999,178,16,1


In [282]:
sampling_Estabelecimentos

Unnamed: 0,CNPJ_BASICO,ORDEN_CNPJ,CNPJ_DV,ID_MATRIZ/FILIAL,NOMBRE_COMERCIAL,COD_ESTADO_REGISTRO,FECHA_ESTADO_REGISTRO,MOTIVO_ESTADO_REGISTRO,NOMBRE_CIUDAD_EXTRANJERO,COD_PAIS,INICIO_ACTIVIDAD,CNAE_FISCAL_PRINCIPAL,CNAE_FISCAL_SECUNDARIO,TIPO_CALLE,CALLE,NÚMERO,COMPLEMENTO,VECINDARIO,COD_POSTAL,UF,MUNICIPIO,DDD_1,TELÉFONO_1,DDD_2,TELÉFONO_2,DDD_DO_FAX,FAX,EMAIL,SITUACION_ESPECIAL,FECHA_SITUACION_ESPECIAL
0,27306453,0001,05,1,,04,20190515,63,,,20170310,4619200,,AVENIDA,GEREMARIO DANTAS,807,SALA 826,PECHINCHA,22743011,RJ,6001,21,24401671,,,,,,,
1,26944275,0001,77,1,CABELOS.DIVAS,08,20170704,01,,,20170124,4772500,,RUA,DAS LETRAS,24,QUADRA 03,COHAFUMA,65074780,MA,0921,98,82912222,,,,,krsantos2@hotmail.com,,
2,27479382,0001,34,1,,04,20210408,63,,,20170406,4520008,,RUA,JOAO RODRIGUES DA SILVA,bl 115,APT 306;BLOCO 115,SANTA CRUZ,27288160,RJ,5925,24,33461617,,,,,,,
3,26904497,0001,66,1,,08,20241112,01,,,20170119,4723700,47890994729601,RUA,NOSSA SENHORA DA CONCEICAO,771,,DAS LARANJEIRAS,29175583,ES,5699,73,88157023,,,,,ELFCONTABILIDADE@GMAIL.COM,,
4,27576157,0001,16,1,CAPOEIRA EVENTOS & ALIMENTOS,08,20170724,01,,,20170422,8230001,562010285929995620101,RUA,DO CENACULO,49,,CIDADE DE DEUS,22772380,RJ,6001,21,24358176,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
100495,26874411,0001,08,1,,08,20210519,01,,,20170116,4321500,,RUA,TEIXEIRA DE MELO,355,BLOCO 2 APT 2145,TATUAPE,03067000,SP,7107,11,42275762,,,,,FLARUSSIA@GLOBO.COM,,
100496,26767259,0001,56,1,,04,20210219,63,,,20161227,4723700,47296014789001478900556112044772500,QUADRA,K-12 (CJ ANTONIO TEIXEIRA GUEIROS),60,CASA,TAPANA (ICOARACI),66825243,PA,0427,91,32483044,,,,,ganso6060@gmail.com,,
100497,26758362,0001,30,1,,02,20250407,00,,,20161223,4781400,47555034782201,RUA,JOAQUIM NABUCO,113,,PARQUE SAO PAULO,85803600,PR,7493,45,98492141,,,,,,,
100498,26865952,0001,61,1,,02,20170113,00,,,20170113,1091102,,10A AVENIDA,ROBERTO IWAKICHI URAGUCHI,205,APT 63 BLOCO B,PRAIA DO SONHO,11740000,SP,6543,11,83076064,,,,,ROBERTA.SBERNARDI@GMAIL.COM,,


En este caso se identifica lo siguiente:
+ Se puede unificar el formato de los correos a minúsuculas
+ Correccion en formato de fechas
+ En la variable CNAE_FISCAL_SECUNDARIO, no es una variable atómica, es decir que tiene más de un valor asociado con lo cual no tendríamos una buen diseño básico en la base de datos. En este caso se sugiere hacer una nueva relación en donde se lleve esta información a un formato atómico

In [283]:
sampling_Socios.describe()

Unnamed: 0,CNPJ_BASICO,COD_IDENTIFICACION_MIEMBRO,NOMBRE_SOCIO/RAZON_SOCIAL,CNPJ/CPF_SOCIO,CALIFICACION_MIEMBROS,FECHA_ENTRADASOCIEDAD,PAIS,REPRESENTANTE_LEGAL,NOMBRE_REPRESENTANTE,CALIF_REPRESENTANTE_LEGAL,GRUPO_EDAD
count,100400,100400,100390,100359,100400,100400,348,100400,2562,100400,100400
unique,99886,3,99056,92889,31,10773,49,2479,2484,9,10
top,4770650,2,ANDERSON COSTA REIS,***042745**,49,20050912,249,***000000**,SURYA GUEDES MENDONCA,0,5
freq,12,97961,10,10,62747,2106,75,97838,10,97690,24927


In [284]:
sampling_Socios

Unnamed: 0,CNPJ_BASICO,COD_IDENTIFICACION_MIEMBRO,NOMBRE_SOCIO/RAZON_SOCIAL,CNPJ/CPF_SOCIO,CALIFICACION_MIEMBROS,FECHA_ENTRADASOCIEDAD,PAIS,REPRESENTANTE_LEGAL,NOMBRE_REPRESENTANTE,CALIF_REPRESENTANTE_LEGAL,GRUPO_EDAD
0,04114788,2,DOUGLAS CELSO BAZO,***514219**,49,20001027,,***000000**,,00,5
1,05724726,2,ANA PAULA VIEIRA DOS SANTOS ESTEVES,***527267**,49,20030526,,***000000**,,00,6
2,05811199,2,LUCIA LIA CHASSOT DIEDRICH,***292610**,22,20191106,,***000000**,,00,7
3,02446016,2,NILTON PINTO DUARTE,***322548**,10,19980210,,***000000**,,00,8
4,04111960,2,IRIS DE ARAUJO LIMA,***214746**,49,20001023,,***000000**,,00,9
...,...,...,...,...,...,...,...,...,...,...,...
100395,03992918,2,ADALBERTO ALVES DE ALMEIDA,***913858**,05,20230919,,***000000**,,00,5
100396,05696392,2,GERALDO LUCAS,***589667**,49,20040921,,***000000**,,00,9
100397,04033418,2,DANIEL ROCHA,***952009**,49,20000904,,***000000**,,00,6
100398,04001114,2,FRANCISCO ANTONIO PONTELLO,***909256**,49,20000818,,***000000**,,00,8


En este caso se identifica lo siguiente:
+ Correccion en formato de fechas

In [277]:
sampling_Simples.describe()

Unnamed: 0,CNPJ_BASICO,OPCION_SIMPLES,FECHA_OPCION_SIMPLES,FECHA_EXCLUSION_SIMPLES,OPCION_MEI,FECHA DE OPCIÓN PARA EL MEI,FECHA DE EXCLUSIÓN DE MEI
count,100100,100100,100100,100100,100100,100100,100100
unique,100100,2,5956,4791,2,5441,4155
top,199853,S,20070701,0,N,0,0
freq,1,53673,5338,53663,63318,27650,64426


In [279]:
sampling_Simples

Unnamed: 0,CNPJ_BASICO,OPCION_SIMPLES,FECHA_OPCION_SIMPLES,FECHA_EXCLUSION_SIMPLES,OPCION_MEI,FECHA DE OPCIÓN PARA EL MEI,FECHA DE EXCLUSIÓN DE MEI
0,00199853,N,20070701,20081231,N,00000000,00000000
1,00185073,S,20070701,00000000,N,00000000,00000000
2,00189078,S,20230101,00000000,N,00000000,00000000
3,00184115,N,20070701,20130418,N,00000000,00000000
4,00251542,S,20250101,00000000,N,20170101,20171231
...,...,...,...,...,...,...,...
100095,98414386,N,20070701,20140226,N,00000000,00000000
100096,98313976,N,20070701,20081231,N,00000000,00000000
100097,98102502,S,20120101,00000000,N,00000000,00000000
100098,97554485,S,20110712,00000000,N,00000000,00000000


En este caso no se identifican anomalias más allá de las reportadas anteriormente en otras tablas

## Carga de información a Base de Datos

Para este apartado, vamos a utilizar el software MySQL para cargar las tablas desde python aunque en la carga se van a realizar las siguientes actividades previas:

+ Formato de fechas adecuados (en caso de tener fechas que no tengan coherencia se analizaran)
+ Extracción de la variable `CNAE_FISCAL_SECUNDARIO` en la tabla Estabelecimentos y creación de catálogo para esta variable

### Conexión y creación de base de datos

In [3]:
# Establecemos los parámetros de conexión
usuario = "root"
contraseña = "H72n01923h-s1"
host = "localhost"
puerto = "3306"
name_bd = "CNPJ_DATABASE"

# Establencemos la conexión
motor_inicial = create_engine(f"mysql+pymysql://{usuario}:{contraseña}@{host}:{puerto}")

#En este caso que no existe la base de datos, la creamos
with motor_inicial.connect() as conexion:
    conexion.execute(text(f"CREATE DATABASE IF NOT EXISTS {name_bd}"))
    print(f"Base de datos '{name_bd}' verificada o creada.")

Base de datos 'CNPJ_DATABASE' verificada o creada.


  conexion.execute(text(f"CREATE DATABASE IF NOT EXISTS {name_bd}"))


In [4]:
# Ya que existe la base de datos en el servidor de MySQL, creamos una conexión para poder subir la información
motor = create_engine(f"mysql+pymysql://{usuario}:{contraseña}@{host}:{puerto}/{name_bd}")

Como evidencia visual, en MySQL Workbench, estos cambios ya son visibles como se muestra en la siguiente imagen:

<center>
<img src="Images/MySQLWorkbech_evidence.png" width=200 />
</center>

### Carga de catálogos

Comenzamos con la carga y creación de las tablas Lookup.
Aquí solo se tienen dos variables y cada una de ellas se colocó como un tipo de dato VARCHAR. Por un lado, en la parte de la descripción se establecio un espacio de 100 caracteres para tener felixibidad por si existiera algún otro rubro, es relevante mencionar que algunos rubros actuales superan los 60 cacacteres. Por otra parte, el código se dejo flexibilidad de un caracter adicional al existente.

In [None]:
#Qualificacoes["Code"].map(lambda x: len(x)).value_counts()
#Qualificacoes["Description"].map(lambda x: len(x)).value_counts()

In [None]:
#Modificamos estos catálogos antes de subir la información
Paises = Paises[Paises["Code"] != "999"].reset_index(drop=True)
Naturezas = Naturezas[Naturezas["Code"] != "8885"].reset_index(drop=True)

In [402]:
# Creación de las tablas lookup
Cnaes.to_sql("Cnaes", con=motor, if_exists="replace", index=False,
          dtype={
              "Code": types.VARCHAR(8),
              "Description": types.VARCHAR(150),
          })
print("Tabla Cnaes OK")

Tabla Cnaes OK


In [None]:
Municipios.to_sql("Municipios", con=motor, if_exists="replace", index=False,
          dtype={
              "Code": types.VARCHAR(5),
              "Description": types.VARCHAR(100),
          })
print("Tabla Municipios OK")

In [319]:
# Creación de las tablas lookup
Motivos.to_sql("Motivos", con=motor, if_exists="replace", index=False,
          dtype={
              "Code": types.VARCHAR(3),
              "Description": types.VARCHAR(100),
          })
print("Tabla Motivos OK")

Tabla Motivos OK


In [None]:
Municipios.to_sql("Municipios", con=motor, if_exists="replace", index=False,
          dtype={
              "Code": types.VARCHAR(5),
              "Description": types.VARCHAR(100),
          })
print("Tabla Municipios OK")

Tabla Municipios OK


In [None]:
Naturezas.to_sql("Naturezas", con=motor, if_exists="replace", index=False,
          dtype={
              "Code": types.VARCHAR(5),
              "Description": types.VARCHAR(100),
          })
print("Tabla Naturezas OK")

Tabla Naturezas OK


In [328]:
Paises.to_sql("Paises", con=motor, if_exists="replace", index=False,
          dtype={
              "Code": types.VARCHAR(4),
              "Description": types.VARCHAR(100),
          })
print("Tabla Paises OK")

Tabla Paises OK


In [331]:
Qualificacoes.to_sql("Qualificacoes", con=motor, if_exists="replace", index=False,
          dtype={
              "Code": types.VARCHAR(3),
              "Description": types.VARCHAR(100),
          })
print("Tabla Qualificacoes OK")

Tabla Qualificacoes OK


Agregamos índices a los Lookup

In [None]:
with motor.connect() as conn:
    conn.execute(text("CREATE INDEX id_Code ON Cnaes (Code);"))
with motor.connect() as conn:
    conn.execute(text("CREATE INDEX id_Code ON Motivos (Code);"))
with motor.connect() as conn:
    conn.execute(text("CREATE INDEX id_Code ON Municipios (Code);"))
with motor.connect() as conn:
    conn.execute(text("CREATE INDEX id_Code ON Naturezas (Code);"))
with motor.connect() as conn:
    conn.execute(text("CREATE INDEX id_Code ON Paises (Code);"))
with motor.connect() as conn:
    conn.execute(text("CREATE INDEX id_Code ON Qualificacoes (Code);"))

### Carga de la tabla Simples

Para el caso de la tabla Simples se insertará la información por fragmentos para evitar problemas de memoria y se corregiran los formatos de fecha. Los siguientes códigos se utilzaron para visualizar e identificar la estrategia para el tratamiento de fechas

In [None]:
fechas_Simples_FECHA_OPCION_SIMPLES = list()
fechas_Simples_FECHA_EXCLUSION_SIMPLES = list()
fechas_Simples_FECHA_OPCION_MEI = list()
fechas_Simples_FECHA_EXCLUSION_MEI = list()

for chunk in pd.read_csv("data/Simples.csv", sep=";", chunksize=100_000, encoding='latin-1', names=columns_Simples, header=None, dtype=str):
    fechas_Simples_FECHA_OPCION_SIMPLES.append(chunk["FECHA_OPCION_SIMPLES"].unique())
    fechas_Simples_FECHA_EXCLUSION_SIMPLES.append(chunk["FECHA_EXCLUSION_SIMPLES"].unique())
    fechas_Simples_FECHA_OPCION_MEI.append(chunk["FECHA_OPCION_MEI"].unique())
    fechas_Simples_FECHA_EXCLUSION_MEI.append(chunk["FECHA_EXCLUSION_MEI"].unique())
    
fechas_Simples_FECHA_OPCION_SIMPLES = list(chain.from_iterable(fechas_Simples_FECHA_OPCION_SIMPLES))
pd.DataFrame({"Fechas_unicas" : fechas_Simples_FECHA_OPCION_SIMPLES}).sort_values("Fechas_unicas")
fechas_Simples_FECHA_EXCLUSION_SIMPLES = list(chain.from_iterable(fechas_Simples_FECHA_EXCLUSION_SIMPLES))
pd.DataFrame({"Fechas_unicas" : fechas_Simples_FECHA_EXCLUSION_SIMPLES}).drop_duplicates().sort_values("Fechas_unicas")
fechas_Simples_FECHA_OPCION_MEI = list(chain.from_iterable(fechas_Simples_FECHA_OPCION_MEI))
pd.DataFrame({"Fechas_unicas" : fechas_Simples_FECHA_OPCION_MEI}).drop_duplicates().sort_values("Fechas_unicas")
fechas_Simples_FECHA_EXCLUSION_MEI = list(chain.from_iterable(fechas_Simples_FECHA_EXCLUSION_MEI))
pd.DataFrame({"Fechas_unicas" : fechas_Simples_FECHA_EXCLUSION_MEI}).drop_duplicates().sort_values("Fechas_unicas")

## En estos tres últimos casos solo se debe ser cuidadoso de llevar las fechas 00000000 a nulos

In [10]:
def fix_date(date: str):
    """Función corregir fechas y llevar las que tengan formato 00000000 a nulos
    Args:
        date (str): Fecha  
    """
    if date == 00000000 or pd.isna(date):
        return pd.NaT
    try:
        return pd.to_datetime(date, format="%Y%m%d", errors="coerce")
    except:
        return pd.NaT

In [None]:
metadata = MetaData()
# Definición de la tabla Simples
Simples_toMySQL = Table("Simples", metadata,
     Column("CNPJ_BASICO", VARCHAR(8)),
     Column("OPCION_SIMPLES", VARCHAR(1)),
     Column("FECHA_OPCION_SIMPLES", Date),
     Column("FECHA_EXCLUSION_SIMPLES", Date),
     Column("OPCION_MEI", VARCHAR(1)),
     Column("FECHA_OPCION_MEI", Date),
     Column("FECHA_EXCLUSION_MEI", Date)
)

Index("idx_CNPJ_BASICO", Simples_toMySQL.c.CNPJ_BASICO)

# Crear la tabla en la base de datos
metadata.create_all(motor)
print("Tabla Simples OK")

Tabla Simples OK


In [393]:
for chunk in pd.read_csv("data/Simples.csv", sep=";", chunksize=100_000, encoding='latin-1', names=columns_Simples, header=None, dtype=str):
    chunk["FECHA_OPCION_SIMPLES"] = chunk["FECHA_OPCION_SIMPLES"].apply(fix_date)
    chunk["FECHA_OPCION_SIMPLES"] = chunk["FECHA_OPCION_SIMPLES"].dt.date
    chunk["FECHA_EXCLUSION_SIMPLES"] = chunk["FECHA_EXCLUSION_SIMPLES"].apply(fix_date)
    chunk["FECHA_EXCLUSION_SIMPLES"] = chunk["FECHA_EXCLUSION_SIMPLES"].dt.date
    chunk["FECHA_OPCION_MEI"] = chunk["FECHA_OPCION_MEI"].apply(fix_date)
    chunk["FECHA_OPCION_MEI"] = chunk["FECHA_OPCION_MEI"].dt.date
    chunk["FECHA_EXCLUSION_MEI"] = chunk["FECHA_EXCLUSION_MEI"].apply(fix_date)
    chunk["FECHA_EXCLUSION_MEI"] = chunk["FECHA_EXCLUSION_MEI"].dt.date
    chunk.to_sql("Simples", con=motor, if_exists="append", index=False)

### Carga de la tabla Empresas

In [416]:
metadata = MetaData()
# Definición de la tabla Empresas
Empresas_toMySQL = Table("Empresas", metadata,
    Column("CNPJ_BASICO", VARCHAR(8)),
	Column("RAZON_SOCIAL/NOMBRE_EMPRESA", VARCHAR(150)),
	Column("NATURALEZA_JURIDICA", VARCHAR(9)),
	Column("CUALIFICACIÓN_RESPONSABLE", VARCHAR(3)),
	Column("CAPITAL_SOCIAL_EMPRESA", Float),
	Column("CÓDIGO_TAMAÑO_EMPRESA", VARCHAR(3)),
	Column("ENTIDAD_FEDERATIVA_RESPONSABLE", VARCHAR(50))
)

Index("CNPJ_BASICO", Empresas_toMySQL.c.CNPJ_BASICO)

# Crear la tabla en la base de datos
metadata.create_all(motor)
print("Tabla Empresas OK")

Tabla Empresas OK


In [417]:
for archivo in Empresas_files:
    print(f"Cargando {archivo}...")
    for chunk in pd.read_csv(f"data/{archivo}", sep=";", chunksize=100_000, encoding='latin-1', names=columns_Empresas, header=None, dtype=str):
        chunk["CAPITAL_SOCIAL_EMPRESA"] = chunk["CAPITAL_SOCIAL_EMPRESA"].str.replace(",", ".").astype(float)
        chunk.to_sql("Empresas", con=motor, if_exists="append", index=False)

Cargando Empresas0.csv...
Cargando Empresas1.csv...
Cargando Empresas2.csv...
Cargando Empresas3.csv...
Cargando Empresas4.csv...
Cargando Empresas5.csv...
Cargando Empresas6.csv...
Cargando Empresas7.csv...
Cargando Empresas8.csv...
Cargando Empresas9.csv...


### Carga de la tabla Estabelecimentos

In [None]:
#Determinación de llave compuesta
#sampling_Estabelecimentos.groupby(["CNPJ_BASICO", "CNPJ_DV", "ORDEN_CNPJ", "ID_MATRIZ/FILIAL"]).count()

In [470]:
metadata = MetaData()
# Definición de la tabla Estabelecimentos
Estabelecimentos_toMySQL = Table("Estabelecimentos", metadata,
	Column("CNPJ_BASICO", VARCHAR(8)),
	Column("ORDEN_CNPJ", VARCHAR(4)),
	Column("CNPJ_DV", VARCHAR(2)),
	Column("ID_MATRIZ/FILIAL", VARCHAR(1)),
	Column("NOMBRE_COMERCIAL", VARCHAR(100)),
	Column("COD_ESTADO_REGISTRO", VARCHAR(2)),
	Column("FECHA_ESTADO_REGISTRO", Date),
	Column("MOTIVO_ESTADO_REGISTRO", VARCHAR(3)),
	Column("NOMBRE_CIUDAD_EXTRANJERO", VARCHAR(100)),
	Column("COD_PAIS", VARCHAR(4)),
	Column("INICIO_ACTIVIDAD", Date),
	Column("CNAE_FISCAL_PRINCIPAL", VARCHAR(10)),
	Column("TIPO_CALLE", VARCHAR(20)),
	Column("CALLE", VARCHAR(100)),
	Column("NÚMERO", VARCHAR(10)),
	Column("COMPLEMENTO", VARCHAR(200)),
	Column("VECINDARIO", VARCHAR(50)),
	Column("COD_POSTAL", VARCHAR(10)),
	Column("UF", VARCHAR(2)),
	Column("MUNICIPIO", VARCHAR(5)),
	Column("DDD_1", VARCHAR(10)),
	Column("TELÉFONO_1", VARCHAR(15)),
	Column("DDD_2", VARCHAR(10)),
	Column("TELÉFONO_2", VARCHAR(15)),
	Column("DDD_DO_FAX", VARCHAR(10)),
	Column("FAX", VARCHAR(10)),
	Column("EMAIL", VARCHAR(200)),
	Column("SITUACION_ESPECIAL", VARCHAR(50)),
	Column("FECHA_SITUACION_ESPECIAL", Date),
 PrimaryKeyConstraint("CNPJ_BASICO", "ORDEN_CNPJ", "CNPJ_DV", "ID_MATRIZ/FILIAL",name="pk_cnpj_compuesta")
)

# Crear la tabla en la base de datos
metadata.create_all(motor)
print("Tabla Estabelecimentos OK")

Tabla Estabelecimentos OK


Vamos a hacer un catálogo de CNAE_FISCAL_SECUNDARIO y a vaciar la información en la base de datos

In [471]:
CNAE_FISCAL_SECUNDARIO_list = list()
for archivo in Estabelecimentos_files:
    print(f"Cargando {archivo}...")
    for chunk in pd.read_csv(f"data/{archivo}", sep=";", chunksize=100_000, encoding='latin-1', names=columns_Estabelecimentos, header=None, dtype=str):
        CNAE_FISCAL_SECUNDARIO_list.append(chunk[["CNPJ_BASICO", "ORDEN_CNPJ", "CNPJ_DV", "ID_MATRIZ/FILIAL", "CNAE_FISCAL_SECUNDARIO"]])
        chunk["FECHA_ESTADO_REGISTRO"] = chunk["FECHA_ESTADO_REGISTRO"].apply(fix_date)
        chunk["FECHA_ESTADO_REGISTRO"] = chunk["FECHA_ESTADO_REGISTRO"].dt.date
        chunk["INICIO_ACTIVIDAD"] = chunk["INICIO_ACTIVIDAD"].apply(fix_date)
        chunk["INICIO_ACTIVIDAD"] = chunk["INICIO_ACTIVIDAD"].dt.date
        chunk["FECHA_SITUACION_ESPECIAL"] = chunk["FECHA_SITUACION_ESPECIAL"].apply(fix_date)
        chunk["FECHA_SITUACION_ESPECIAL"] = chunk["FECHA_SITUACION_ESPECIAL"].dt.date
        chunk.drop(columns=['CNAE_FISCAL_SECUNDARIO']).to_sql("Estabelecimentos", con=motor, if_exists="append", index=False)

Cargando Estabelecimentos0.csv...
Cargando Estabelecimentos1.csv...
Cargando Estabelecimentos2.csv...
Cargando Estabelecimentos3.csv...
Cargando Estabelecimentos4.csv...
Cargando Estabelecimentos5.csv...
Cargando Estabelecimentos6.csv...
Cargando Estabelecimentos7.csv...
Cargando Estabelecimentos8.csv...
Cargando Estabelecimentos9.csv...


Aplicamos algunas operaciones en los datos del nuevo catálogo para agregarlo a nuestra base de datos

In [473]:
CNAE_fiscal_secundario = pd.concat(CNAE_FISCAL_SECUNDARIO_list, ignore_index=True)

In [475]:
CNAE_fiscal_secundario = CNAE_fiscal_secundario.dropna(subset=["CNAE_FISCAL_SECUNDARIO"]).reset_index(drop=True)
CNAE_fiscal_secundario["CNAE_FISCAL_SECUNDARIO"] = CNAE_fiscal_secundario["CNAE_FISCAL_SECUNDARIO"].str.split(",")
CNAE_fiscal_secundario = CNAE_fiscal_secundario.explode("CNAE_FISCAL_SECUNDARIO").reset_index(drop=True)

In [None]:
metadata = MetaData()
CNAE_FISCAL_SECUNDARIO_toMySQL = Table("CNAE_fiscal_secundario", metadata,
	Column("CNPJ_BASICO", VARCHAR(8)),
	Column("ORDEN_CNPJ", VARCHAR(4)),
	Column("CNPJ_DV", VARCHAR(2)),
	Column("ID_MATRIZ/FILIAL", VARCHAR(1)),
  Column("CNAE_FISCAL_SECUNDARIO", VARCHAR(10)),
 PrimaryKeyConstraint("CNPJ_BASICO", "ORDEN_CNPJ", "CNPJ_DV", "ID_MATRIZ/FILIAL",name="pk_cnpj_compuesta")
)
metadata.create_all(motor)
print("Tabla CNAE_fiscal_secundario OK")

Tabla CNAE_fiscal_secundario OK


In [None]:
CNAE_fiscal_secundario.to_sql("CNAE_fiscal_secundario", con=motor, if_exists="append", index=False)

### Carga de la tabla Socios

In [25]:
metadata = MetaData()
# Definición de la tabla Socios
Socios_toMySQL = Table("Socios", metadata,
	Column("CNPJ_BASICO", VARCHAR(8)),
	Column("COD_IDENTIFICACION_MIEMBRO", Integer),
	Column("NOMBRE_SOCIO/RAZON_SOCIAL", VARCHAR(1500)),
	Column("CNPJ/CPF_SOCIO", VARCHAR(50)),
	Column("CALIFICACION_MIEMBROS", VARCHAR(3)),
	Column("FECHA_ENTRADASOCIEDAD", Date),
	Column("PAIS", VARCHAR(4)),
	Column("REPRESENTANTE_LEGAL", VARCHAR(50)),
	Column("NOMBRE_REPRESENTANTE", VARCHAR(100)),
	Column("CALIF_REPRESENTANTE_LEGAL", VARCHAR(10)),
	Column("GRUPO_EDAD", Integer)
)

# Crear la tabla en la base de datos
metadata.create_all(motor)
print("Tabla Socios OK")

Tabla Socios OK


In [26]:
for archivo in Socios_files:
    print(f"Cargando {archivo}...")
    for chunk in pd.read_csv(f"data/{archivo}", sep=";", chunksize=100_000, encoding='latin-1', names=columns_Socios, header=None, dtype=str):
        chunk["FECHA_ENTRADASOCIEDAD"] = chunk["FECHA_ENTRADASOCIEDAD"].apply(fix_date)
        chunk["FECHA_ENTRADASOCIEDAD"] = chunk["FECHA_ENTRADASOCIEDAD"].dt.date
        chunk.to_sql("Socios", con=motor, if_exists="append", index=False)

Cargando Socios0.csv...
Cargando Socios1.csv...
Cargando Socios2.csv...
Cargando Socios3.csv...
Cargando Socios4.csv...
Cargando Socios5.csv...
Cargando Socios6.csv...
Cargando Socios7.csv...
Cargando Socios8.csv...
Cargando Socios9.csv...


Con la siguiente imagen comprobamos que todas las tablas han sido añadidas a la base de datos y las siguientes consultas reforzarán la conectividad


<center>
<img src="Images/all_tables.png" width=200 />
</center>

# Consultas solicitadas

Como parte de este análisis, surgen las siguientes preguntas naturales:

1. ¿Cuál es la cantidad total de empresas que están con la situación catastral “ATIVA” (ACTIVA)?
2. ¿Cuáles son las los diez CNAEs (Clasificación Nacional de Actividades Económicas) principales más frecuentes?
3. ¿Cuál es el promedio del capital social de las empresas, agrupado por el tamaño de la empresa?

A continuación daremos respuesta a estas preguntas apoyandonos en que los datos están en un sitema gestor de base de datos, lo cual hace toda la diferencia, ya que los métodos de búsqueda y optimización de consultas no es comparable al que nos puede otorgar un programa que envía todo a la memoria RAM, como pandas, sin hacer algún técnica.


## Cantidad total de empresas que están con la situación catastral “ATIVA”

In [None]:
query_1 = """
SELECT COUNT(Empresas.CNPJ_BASICO) AS total_empresas_activas
FROM Estabelecimentos
JOIN Empresas ON Estabelecimentos.CNPJ_BASICO=Empresas.CNPJ_BASICO
WHERE COD_ESTADO_REGISTRO = 2
"""
df_activas = pd.read_sql(query_1, motor)

In [35]:
df_activas

Unnamed: 0,total_empresas_activas
0,25686166


Es importante mencionar que al tener una situación catastral activo indica que la empresa está regularizada y no hay asuntos pendientes. Lo cual se esperaría que este número fuera creciente a medida que avanza el tiempo

¿Cuáles son las los diez CNAEs (Clasificación Nacional de Actividades Económicas) principales más frecuentes?

In [36]:
# Consulta 2
query_2 = """
SELECT Cnaes.Description, COUNT(Cnaes.Description) AS Cnaes_freq
FROM Estabelecimentos
JOIN Cnaes ON Estabelecimentos.CNAE_FISCAL_PRINCIPAL=Cnaes.Code
JOIN Empresas ON Estabelecimentos.CNPJ_BASICO=Empresas.CNPJ_BASICO
GROUP BY Cnaes.Description
ORDER BY Cnaes_freq DESC
LIMIT 10 
"""
df_cnae = pd.read_sql(query_2, motor)

In [37]:
df_cnae

Unnamed: 0,Description,Cnaes_freq
0,COMÉRCIO VAREJISTA DE ARTIGOS DO VESTUÁRIO E A...,3506998
1,ATIVIDADES DE ORGANIZAÇÕES POLÍTICAS,3304577
2,"LANCHONETES, CASAS DE CHÁ, DE SUCOS E SIMILARES",1907658
3,ATIVIDADE ECONÔNICA NÃO INFORMADA,1807009
4,"CABELEIREIROS, MANICURE E PEDICURE",1801680
5,"COMÉRCIO VAREJISTA DE MERCADORIAS EM GERAL, CO...",1654123
6,PROMOÇÃO DE VENDAS,1488575
7,OBRAS DE ALVENARIA,1229759
8,RESTAURANTES E SIMILARES,1170721
9,PREPARAÇÃO DE DOCUMENTOS E SERVIÇOS ESPECIALIZ...,956483


In [42]:
# Consulta 3
query_3 = """
SELECT CÓDIGO_TAMAÑO_EMPRESA, AVG(CAPITAL_SOCIAL_EMPRESA) AS promedio_capital_social
FROM EMPRESAS
GROUP BY CÓDIGO_TAMAÑO_EMPRESA;
"""
df_capital = pd.read_sql(query_3, motor)

In [43]:
df_capital

Unnamed: 0,CÓDIGO_TAMAÑO_EMPRESA,promedio_capital_social
0,1.0,848024.9
1,5.0,6938322.0
2,3.0,77174330.0
3,,1724.587


Es clara que a medida que la empresa crece, su capital social también lo hace.

# Desafíos Encontrados y Soluciones Aplicadas

+ Uno de los principales problemas a los que me enfrente en este reto fue la infrestructura computacional y de internet, por lo que hubo una busqueda constante y competente sobre distintas metodologías para obtener resultados más rápidos y precisos.
+ Considerar diversas herramientas siempre otorga una mejor experencia, por ejemplo el uso de Bash resulto más eficiente que algunos métodos de python
+ La cantidad de información siempre será un desafío interesante y aquí la cantidad de información me hizo buscar e investigar sobre los distintos parámetros que pueden tener otras librerías para lidiar con estos problemas

# Conclusiones y Posibles Mejoras

+ De manera personal, puedo decir que me resulto muy entretenido aplicar para este desafió ya que siempre resulta interesante otra perspectiva a la que buscar soluciones eficientes.
+ Esto solo se aplico a Abril, pero se puede aplicar a toda la información de la página mediante el uso de web Scrapping y hasta creando un pipeline más optimizado, por ejemplo con Ariflow que permite integrar bien los comandos de python y eso puede resultar escalable. 