In [43]:
import pandas as pd
import polars as pl
import zipfile
import os
import pypyodbc as odbc
from dotenv import load_dotenv
from sqlalchemy import create_engine
import urllib

In [44]:
load_dotenv()

True

In [45]:
DRIVER_NAME = 'ODBC Driver 17 for SQL Server'
SERVER_NAME = 'DESKTOP-7IM04QA'
DATABASE_NAME = 'ReceitaFederal'

connection_string = f"""
DRIVER={{{DRIVER_NAME}}};
SERVER={SERVER_NAME};
DATABASE={DATABASE_NAME};
Trusted_Connection=yes;
"""

conn = odbc.connect(connectString=connection_string)
print(conn)

<pypyodbc.Connection object at 0x00000169354C3E30>


In [46]:
DRIVER = "ODBC Driver 17 for SQL Server"
SERVER = "DESKTOP-7IM04QA"
DATABASE = "ReceitaFederal"
USERNAME = os.environ.get("user")
PASSWORD = os.environ.get("password")

params = urllib.parse.quote_plus(
    f"DRIVER={{{DRIVER}}};"
    f"SERVER={SERVER};"
    f"DATABASE={DATABASE};"
    f"UID={USERNAME};"
    f"PWD={PASSWORD};"
)

engine = create_engine(f"mssql+pyodbc:///?odbc_connect={params}")

In [47]:
columnsEstabelecimentos = [
    "CNPJ BÁSICO",
    "CNPJ ORDEM",
    "CNPJ DV",
    "IDENTIFICADOR MATRIZ/FILIAL",
    "NOME FANTASIA",
    "SITUAÇÃO CADASTRAL",
    "DATA SITUAÇÃO CADASTRAL",
    "MOTIVO SITUAÇÃO CADASTRAL",
    "NOME DA CIDADE NO EXTERIOR",
    "PAIS",
    "DATA DE INÍCIO ATIVIDADE",
    "CNAE FISCAL PRINCIPAL",
    "CNAE FISCAL SECUNDÁRIA",
    "TIPO DE LOGRADOURO",
    "LOGRADOURO",
    "NÚMERO",
    "COMPLEMENTO",
    "BAIRRO",
    "CEP",
    "UF",
    "MUNICÍPIO",
    "DDD 1",
    "TELEFONE 1",
    "DDD 2",
    "TELEFONE 2",
    "DDD DO FAX",
    "FAX",
    "CORREIO ELETRÔNICO",
    "SITUAÇÃO ESPECIAL",
    "DATA DA SITUAÇÃO ESPECIAL"
]

columnsEmpresas = [
    "CNPJ BÁSICO",
    "RAZÃO SOCIAL / NOME EMPRESARIAL",
    "NATUREZA JURÍDICA",
    "QUALIFICAÇÃO DO RESPONSÁVEL",
    "CAPITAL SOCIAL DA EMPRESA",
    "PORTE DA EMPRESA",
    "ENTE FEDERATIVO RESPONSÁVEL"
]

columnsSimples = [
    "CNPJ BÁSICO",
    "OPÇÃO PELO SIMPLES",
    "DATA DE OPÇÃO PELO SIMPLES",
    "DATA DE EXCLUSÃO DO SIMPLES",
    "OPÇÃO PELO MEI",
    "DATA DE OPÇÃO PELO MEI",
    "DATA DE EXCLUSÃO DO MEI"
]

In [48]:
pathEmpresas = "DadosRF/Empresas"
pathEstabelecimentos = "DadosRF/Estabelecimentos"
pathSimples = "DadosRF/Simples"

zipfilesEmpresas = os.listdir(pathEmpresas)
zipFilesEstabelecimentos = os.listdir(pathEstabelecimentos)
zipFilesSimples = os.listdir(pathSimples)

In [49]:
def FixPath(files, basePath):
    filesPath = []
    for file in files:
        filesPath.append(os.path.join(basePath, file)) 
        
    return filesPath 

In [50]:
filesEmpresas = FixPath(zipfilesEmpresas, pathEmpresas)
fileEstabelecimentos = FixPath(zipFilesEstabelecimentos, pathEstabelecimentos)
fileSimples = FixPath(zipFilesSimples, pathSimples)

In [51]:
for file in filesEmpresas:
    with zipfile.ZipFile(file,"r") as zip_ref:
        zip_ref.extractall("TratamentoInicial/EmpresasUnzip")

for file in fileEstabelecimentos:
    with zipfile.ZipFile(file,"r") as zip_ref:
        zip_ref.extractall("TratamentoInicial/EstabelecimentosUnzip")

for file in fileSimples:
    with zipfile.ZipFile(file,"r") as zip_ref:
        zip_ref.extractall("TratamentoInicial/SimplesUnzip")


In [52]:
filesEmpresasUnzip = os.listdir("TratamentoInicial/EmpresasUnzip")
fileEstabelecimentosUnzip = os.listdir("TratamentoInicial/EstabelecimentosUnzip")
fileSimplesUnzip = os.listdir("TratamentoInicial/SimplesUnzip")

filesEmpresas = FixPath(filesEmpresasUnzip, "TratamentoInicial/EmpresasUnzip")
fileEstabelecimentos = FixPath(fileEstabelecimentosUnzip, "TratamentoInicial/EstabelecimentosUnzip")
fileSimples = FixPath(fileSimplesUnzip, "TratamentoInicial/SimplesUnzip")


In [53]:
def infer_sql_type(series: pd.Series) -> str:
    """
    Infere o tipo SQL Server a partir de uma Series do pandas.
    """
    if pd.api.types.is_integer_dtype(series):
        return "INT"
    elif pd.api.types.is_float_dtype(series):
        return "FLOAT"
    elif pd.api.types.is_bool_dtype(series):
        return "BIT"
    else:
        # Para strings, podemos usar NVARCHAR(255) por padrão
        max_len = series.dropna().astype(str).map(len).max()
        length = max(255, max_len)  # mínimo 255
        return f"NVARCHAR({length})"

In [54]:
def bulk_insert(data_file, table, column_names):

    df_sample = pd.read_csv(data_file, sep=';', names=column_names, nrows=10, encoding='latin1')
    
    columns_def = ", ".join(f"[{col}] {infer_sql_type(df_sample[col])}" for col in column_names)
    
    sql = f"""
    IF OBJECT_ID('{table}', 'U') IS NULL
    BEGIN
        CREATE TABLE {table} ({columns_def});
    END;

    BULK INSERT {table}
    FROM '{data_file}'
    WITH
    (
        FORMAT = 'CSV',
        FIRSTROW = 2,
        FIELDTERMINATOR = ';',
        ROWTERMINATOR = '0x0a'
    );
    """.strip()
    
    return sql

In [55]:
def insertFiles(data_files, table, connection, column_names):

    cursor = connection.cursor()
    try:
        print("Begin insert operation")
        with cursor:
            print("Cursor avaliable")
            for data_file in data_files:
                print("checking", os.path.abspath(data_file))
                print("Inserting", os.path.abspath(data_file))
                cursor.execute(bulk_insert(os.path.abspath(data_file), table, column_names))
                print(data_file, "inserted")
            cursor.commit()
    except Exception as err:
        print(err)
        connection.rollback()
        print("Rollback has been made")

In [None]:
insertFiles(filesEmpresas, "Empresas", conn, columnsEmpresas)

insertFiles(fileEstabelecimentos, "Estabelecimentos", conn, columnsEstabelecimentos)

insertFiles(fileSimples, "Simples", conn, columnsSimples)

Begin insert operation
Cursor avaliable
checking e:\Documentos\TCC\DetectordeViabilidade\TratamentoInicial\SimplesUnzip\F.K03200$W.SIMPLES.CSV.D50913
Inserting e:\Documentos\TCC\DetectordeViabilidade\TratamentoInicial\SimplesUnzip\F.K03200$W.SIMPLES.CSV.D50913
TratamentoInicial/SimplesUnzip\F.K03200$W.SIMPLES.CSV.D50913 inserted
