In [7]:
import os
import zipfile
import duckdb
import polars as pl
from time import time

conn = duckdb.connect('cnpj.db')

# Extracting Files

In [8]:
# unzip all .zip files in the current directory and save them with the same name in the unzipped folder in a new directory called unzipped
# Source: https://dados.gov.br/dados/conjuntos-dados/cadastro-nacional-da-pessoa-juridica---cnpj
folders = ()

# if there is a folder called unzipped, delete it
if os.path.exists('unzipped'):
    os.system('rm -r unzipped')
    print("unzipped folder deleted")

#for file in 'zipped'folder

for file in os.listdir(os.getcwd() + '/zipped'):
    if file.endswith('.zip'):
        new_file_name = file.replace('.zip', '')
        folder_name = new_file_name[0:-1]
        folders += (folder_name,)
        with zipfile.ZipFile(f'zipped/{file}', 'r') as zip_ref:
            zip_ref.extractall(f'unzipped/{folder_name}')

folders = list(set(folders))
print(folders)

unzipped folder deleted
['Natureza', 'Empresas', 'Simple', 'Estabelecimentos', 'Socios', 'Cnae']


In [9]:
for folder in folders:
    counter = 0
    file_list = os.listdir(f'unzipped/{folder}')
    for file in file_list:
        os.rename(f'unzipped/{folder}/{file}', f'unzipped/{folder}/{counter}.csv')
        counter = counter + 1
print('unzipped files renamed')

unzipped files renamed


# Converting Filtes to parquet format

In [10]:
current_dir = os.getcwd()
source = 'unzipped'
source_path = os.path.join(current_dir, source)
folders_data = os.listdir(source_path)

In [11]:
empresas_pl_schema = { 'cnpj_base': pl.Utf8, 'social_name': pl.Utf8, 'legal_nature': pl.Utf8, 'responsible_qualification': pl.Utf8, 'capital': pl.Utf8, 'company_size': pl.Utf8, 'federal_entity_responsible': pl.Utf8 }
cnae_pl_schema = { 'code': pl.Utf8, 'description': pl.Utf8 }
estabelecimentos_pl_schema = { 'cnpj_base': pl.Utf8, 'cnpj_order': pl.Utf8, 'cnpj_dv': pl.Utf8, 'identifier': pl.Utf8, 'fantasy_name': pl.Utf8, 'registration_status': pl.Utf8, 'registration_date': pl.Utf8, 'registration_reason': pl.Utf8, 'city_name': pl.Utf8, 'country_code': pl.Utf8, 'activity_start_date': pl.Utf8, 'main_activity': pl.Utf8, 'secondary_activity': pl.Utf8, 'street_type': pl.Utf8, 'street_name': pl.Utf8, 'number': pl.Utf8, 'complement': pl.Utf8, 'neighborhood': pl.Utf8, 'zip_code': pl.Utf8, 'state': pl.Utf8, 'city_code': pl.Utf8, 'ddd1': pl.Utf8, 'phone1': pl.Utf8, 'ddd2': pl.Utf8, 'phone2': pl.Utf8, 'fax_ddd': pl.Utf8, 'fax': pl.Utf8, 'email': pl.Utf8, 'special_status': pl.Utf8, 'special_status_date': pl.Utf8 }
socios_pl_schema = { 'cnpj_base': pl.Utf8, 'socio_identifier': pl.Utf8, 'socio_name': pl.Utf8, 'socio_cnpj_cpf': pl.Utf8, 'socio_qualification': pl.Utf8, 'entry_date': pl.Utf8, 'country_code': pl.Utf8, 'legal_representative_cpf': pl.Utf8, 'legal_representative_name': pl.Utf8, 'legal_representative_qualification': pl.Utf8, 'age_range': pl.Utf8 }
simple_pl_schema = { 'cnpj_base': pl.Utf8, 'simples_option' : pl.Utf8, 'simples_option_date' : pl.Utf8, 'simples_exclusion_date' : pl.Utf8, 'mei_option' : pl.Utf8, 'mei_option_date' : pl.Utf8, 'mei_exclusion_date' : pl.Utf8}
natureza_pl_schema = { 'code' : pl.Utf8, 'description' : pl.Utf8}

In [12]:
for folder in folders_data:
    if folder == 'Empresas':
        schema = empresas_pl_schema
    elif folder == 'Cnae':
        schema = cnae_pl_schema
    elif folder == 'Estabelecimentos':
        schema = estabelecimentos_pl_schema
    elif folder == 'Socios':
        schema = socios_pl_schema
    elif folder == 'Simple':
        schema = simple_pl_schema
    elif folder == 'Natureza':
        schema = natureza_pl_schema
    print(folder)
    
    df_list = []
    data_files = os.listdir(os.path.join(source_path, folder))
    for file in data_files:
        time_0 = time()
        file_path = os.path.join(source_path, folder, file)
        df = pl.read_csv(file_path, encoding='latin1', separator=';',ignore_errors=True,schema= schema)
        time_1 = time()
        print(f'Time to read {file}: {round(time_1 - time_0,2)}')
        #exporting to parquet
        df.write_parquet(f'{source_path}/{folder}/{file}.parquet')
        time_2 = time()
        print(f'Time to write {file}: {round(time_2 - time_1,2)}')

Cnae
Time to read 0.csv: 0.03
Time to write 0.csv: 0.0
Natureza
Time to read 0.csv: 0.0
Time to write 0.csv: 0.0
Socios
Time to read 6.csv: 0.2
Time to write 6.csv: 0.61
Time to read 7.csv: 0.33
Time to write 7.csv: 0.67
Time to read 5.csv: 0.27
Time to write 5.csv: 0.64
Time to read 4.csv: 0.24
Time to write 4.csv: 0.6
Time to read 0.csv: 0.22
Time to write 0.csv: 0.6
Time to read 1.csv: 0.2
Time to write 1.csv: 0.59
Time to read 3.csv: 0.18
Time to write 3.csv: 0.59
Time to read 2.csv: 0.21
Time to write 2.csv: 0.63
Time to read 9.csv: 0.66
Time to write 9.csv: 1.68
Time to read 8.csv: 0.17
Time to write 8.csv: 0.59
Simple
Time to read 0.csv: 2.71
Time to write 0.csv: 3.2
Empresas
Time to read 6.csv: 0.45
Time to write 6.csv: 1.13
Time to read 7.csv: 0.32
Time to write 7.csv: 1.15
Time to read 5.csv: 0.34
Time to write 5.csv: 1.19
Time to read 4.csv: 0.35
Time to write 4.csv: 1.17
Time to read 0.csv: 0.35
Time to write 0.csv: 1.21
Time to read 1.csv: 0.34
Time to write 1.csv: 1.21
Ti

# Creating and populating tables

In [13]:
empresas_columns = [ x for x in empresas_pl_schema.keys() ]
cnae_columns = [ x for x in cnae_pl_schema.keys() ]
estabelecimentos_columns = [ x for x in estabelecimentos_pl_schema.keys() ]
socios_columns = [ x for x in socios_pl_schema.keys() ]
simple_columns = [ x for x in simple_pl_schema.keys() ]
natureza_columns = [ x for x in natureza_pl_schema.keys() ]

In [14]:
# drop tables if they exist
conn.execute('DROP TABLE IF EXISTS empresas_raw')
conn.execute('DROP TABLE IF EXISTS cnae_raw')
conn.execute('DROP TABLE IF EXISTS estabelecimentos_raw')
conn.execute('DROP TABLE IF EXISTS socios_raw')
conn.execute('DROP TABLE IF EXISTS simple_raw')
conn.execute('DROP TABLE IF EXISTS natureza_raw')

# create tables
conn.execute(f'CREATE TABLE IF NOT EXISTS cnae_raw ({", ".join([f"{column} STRING" for column in cnae_columns])})')
conn.execute(f'CREATE TABLE IF NOT EXISTS empresas_raw ({", ".join([f"{column} STRING" for column in empresas_columns])})')
conn.execute(f'CREATE TABLE IF NOT EXISTS estabelecimentos_raw ({", ".join([f"{column} STRING" for column in estabelecimentos_columns])})')
conn.execute(f'CREATE TABLE IF NOT EXISTS socios_raw ({", ".join([f"{column} STRING" for column in socios_columns])})')
conn.execute(f'CREATE TABLE IF NOT EXISTS simple_raw ({", ".join([f"{column} STRING" for column in simple_columns])})')
conn.execute(f'CREATE TABLE IF NOT EXISTS natureza_raw ({", ".join([f"{column} STRING" for column in natureza_columns])})')

<duckdb.duckdb.DuckDBPyConnection at 0x1109deb30>

In [15]:
for folder in folders_data:
    if folder == 'Empresas':
        columns = empresas_columns
    elif folder == 'Cnae':
        columns = cnae_columns
    elif folder == 'Estabelecimentos':
        columns = estabelecimentos_columns
    elif folder == 'Socios':
        columns = socios_columns
    elif folder == 'Simple':
        columns = simple_columns
    elif folder == 'Natureza':
        columns = natureza_columns
    time_0 = time()
    conn.execute(f"INSERT INTO {folder}_raw SELECT * FROM read_parquet('./unzipped/{folder}/*.parquet')")
    time_1 = time()
    print(f'Time to insert {folder}: {round(time_1 - time_0,2)} seconds')
    row_count = conn.execute(f"SELECT COUNT(*) FROM {folder}_raw").fetchall()
    time_2 = time()
    print(f'Time to count {folder}: {round(time_2 - time_1,2)} seconds - {row_count[0][0]} rows x {len(columns)} columns')

Time to insert Cnae: 0.01 seconds
Time to count Cnae: 0.0 seconds - 1358 rows x 2 columns
Time to insert Natureza: 0.0 seconds
Time to count Natureza: 0.0 seconds - 89 rows x 2 columns
Time to insert Socios: 7.86 seconds
Time to count Socios: 0.0 seconds - 23765603 rows x 11 columns
Time to insert Simple: 5.01 seconds
Time to count Simple: 0.0 seconds - 38371644 rows x 7 columns
Time to insert Empresas: 10.78 seconds
Time to count Empresas: 0.01 seconds - 56554309 rows x 7 columns
Time to insert Estabelecimentos: 49.96 seconds
Time to count Estabelecimentos: 0.04 seconds - 59495409 rows x 30 columns


In [16]:
conn.close()

Time to insert Cnae: 0.01 seconds
Time to count Cnae: 0.0 seconds - 1358 rows x 2 columns
Time to insert Natureza: 0.0 seconds
Time to count Natureza: 0.0 seconds - 89 rows x 2 columns
Time to insert Socios: 7.86 seconds
Time to count Socios: 0.0 seconds - 23765603 rows x 11 columns
Time to insert Simple: 5.01 seconds
Time to count Simple: 0.0 seconds - 38371644 rows x 7 columns
Time to insert Empresas: 10.78 seconds
Time to count Empresas: 0.01 seconds - 56554309 rows x 7 columns
Time to insert Estabelecimentos: 49.96 seconds
Time to count Estabelecimentos: 0.04 seconds - 59495409 rows x 30 columns