In [3]:
import os
import re
import csv
import datetime
import firebirdsql

from dotenv import load_dotenv

# Carregar variáveis de ambiente
load_dotenv()

True

In [4]:


def get_firebird_connection():
    # Ajustar com os parâmetros corretos do Firebird, inclusive charset
    return firebirdsql.connect(
        host=os.getenv('HOST'),
        port=int(os.getenv('PORT', '3050')),
        database=os.getenv('DB_PATH'),
        user=os.getenv('APP_USER'),
        password=os.getenv('PASSWORD'),
        role=os.getenv('ROLE'),
        auth_plugin_name=os.getenv('AUTH'),
        wire_crypt=False,
        charset='ISO8859_1'
    )

In [2]:


def normalizar_texto(texto):
    """Normaliza o texto conforme requerido:
    - Substituir ";" interno por ","
    - Caso seja necessário, pode-se adicionar mais correções aqui.
    A remoção e reintrodução de acentos não está clara. Pressupõe-se que ler do BD já trará corretamente,
    mas iremos tratar explicitamente os caracteres conhecidos mencionados.
    """
    if texto is None:
        return ""
    # Substituir ";" por "," dentro do texto (exceto o separador, mas esse é só na hora do CSV)
    texto = texto.replace(";", ",")
    return texto.strip()

def limpar_telefone(telefone):
    """Remove espaços em branco, caracteres alfabéticos e deixa apenas números."""
    if telefone is None:
        return ""
    tel_limpo = re.sub(r'\D', '', telefone)  # remove tudo que não for dígito
    return tel_limpo

def extrair_ddd_telefone(telefone):
    """
    Extrair DDD (2 primeiros dígitos) e o restante do telefone.
    Caso não seja possível extrair, retornar ddd vazio e telefone vazio.
    """
    tel = limpar_telefone(telefone)
    if len(tel) >= 3:
        ddd = tel[:2]
        numero = tel[2:]
        return ddd, numero
    return "", ""

def classificar_item_descricao(descricao):
    """Se a descrição contiver 'ORIGINAL', 'GENUINO' ou 'ORIG', retorna 1.
    Caso contrário, retorna 5.
    Considerar que pode haver caracteres especiais. Vamos buscar as palavras em modo case-insensitive."""
    if descricao is None:
        return 5
    desc_up = descricao.upper()
    if "ORIGINAL" in desc_up or "GENUINO" in desc_up or "ORIG" in desc_up:
        return 1
    return 5

def remover_caracteres_nao_numericos(texto):
    """Remove todos os caracteres não numéricos."""
    if texto is None:
        return ""
    return re.sub(r'\D', '', texto)

def determinar_tipo_cliente(cpf_cnpj_limpo):
    """Se tiver 11 dígitos => 'F' (física), se tiver 14 dígitos => 'J' (jurídica).
    As instruções estavam confusas, mas tradicionalmente CPF = 11, CNPJ = 14."""
    length = len(cpf_cnpj_limpo)
    if length == 11:
        return 'F'  # Pessoa Física
    elif length == 14:
        return 'J'  # Pessoa Jurídica
    return ''  # caso não seja nem 11 nem 14

def recuperar_historico_numdocumento(conn, cdproduto, data_venda):
    """
    Precisa buscar nas tabelas HISTORICOPRODUTO1 a HISTORICOPRODUTO10 
    o registro mais recente anterior ou igual à data da venda com TIPO = 'NF COMPRA'.
    Como não sabemos a estrutura exata, tentaremos todas de 1 a 10.
    Vamos assumir que a estrutura das tabelas é a mesma.
    """
    # Convert data_venda para o formato do BD se necessário. Pressupondo que é datetime.date.
    # Queremos a data anterior OU IGUAL mais próxima. Vamos tentar <= data_venda.
    # De acordo com a descrição, "a data anterior mais próxima da data da venda".
    # Se não achar exatamente menor, pode igualar a data da venda.
    
    # Tentaremos do mais próximo da venda: 
    # SELECT NUMDOCUMENTO, DATA FROM HISTORICOPRODUTOX WHERE TIPO='NF COMPRA' AND CDPRODUTO=? AND DATA<=? ORDER BY DATA DESC ROWS 1
    # Caso não encontre em nenhuma das 10 tabelas, retorna None.
    
    for i in range(1,11):
        tabela = f"HISTORICOPRODUTO{i}"
        sql = f"""
            SELECT NUMDOCUMENTO, DATA
            FROM {tabela}
            WHERE TIPO = 'NF COMPRA'
              AND CDPRODUTO = ?
              AND DATA <= ?
            ORDER BY DATA DESC
            ROWS 1
        """
        cur = conn.cursor()
        cur.execute(sql, (cdproduto, data_venda))
        row = cur.fetchone()
        if row:
            return row[0]  # NUMDOCUMENTO da NF de compra encontrada
    return None

def obter_valores_custo_imposto_margem(conn, numdocumento, cdproduto, valor_final):
    """
    Para obter valor_custo, valor_imposto, valor_margem:
    1) Achar NOTACOMPRA.CDNOTACOMPRA usando NOTACOMPRA.NUMNOTA = numdocumento
    2) Achar ITENSNOTACOMPRA onde ITENSNOTACOMPRA.CDNOTACOMPRA = NOTACOMPRA.CDNOTACOMPRA e ITENSNOTACOMPRA.CDPRODUTO = cdproduto
    3) valor_custo = ITENSNOTACOMPRA.VALORTOTAL
    4) ipi = ITENSNOTACOMPRA.IPI/100
    5) icmsforn = ITENSNOTACOMPRA.ICMS/100

    valor_imposto = ((valor_custo + (((((100 + 100 * (((1 + 0.7178) * (1 - icmsforn)) / (1 - 0.205) - 1)) * (1 + ipi) * (0.205 * 100)) / 100 - 100 * icmsforn) / 100) * valor_custo)) 
                     + (valor_custo * (100 * (ipi))) / 100) - valor_custo

    valor_margem = (valor_final - 0.33 * valor_final) - valor_custo
                  = valor_final*(1 - 0.33) - valor_custo
                  = valor_final*0.67 - valor_custo
    """
    if numdocumento is None:
        # Não achamos a NF de compra
        return "", "", ""

    # Achar NOTACOMPRA
    cur = conn.cursor()
    cur.execute("SELECT CDNOTACOMPRA FROM NOTACOMPRA WHERE NUMNOTA = ?", (numdocumento,))
    nota = cur.fetchone()
    if not nota:
        return "", "", ""
    cdnotacompra = nota[0]

    # Achar ITENSNOTACOMPRA
    cur.execute("""
        SELECT VALORTOTAL, IPI, ICMS
        FROM ITENSNOTACOMPRA
        WHERE CDNOTACOMPRA = ? AND CDPRODUTO = ?
    """, (cdnotacompra, cdproduto))
    itemnc = cur.fetchone()
    if not itemnc:
        return "", "", ""

    valor_custo = itemnc[0]
    ipi = (itemnc[1] or 0) / 100.0
    icmsforn = (itemnc[2] or 0) / 100.0

    # Calcular valor_impostos
    # Fórmula fornecida (adaptar a vírgula para ponto, assumindo operações em float)
    # valor_impostos = ((valor_custo + (((((100 + 100 * (((1 + 0.7178) * (1 - icmsforn)) / (1 - 0.205) - 1)) * (1 + ipi) * (0.205 * 100)) / 100 - 100 * icmsforn) / 100) * valor_custo)) + (valor_custo * (100 * ipi) / 100)) - valor_custo

    # Vamos quebrar a fórmula:
    # fator = ((1 + 0.7178) * (1 - icmsforn)) / (1 - 0.205) - 1
    fator = ((1 + 0.7178) * (1 - icmsforn)) / (1 - 0.205) - 1
    # parte_interna = ((((100 + 100 * fator) * (1 + ipi) * (0.205 * 100)) / 100) - 100 * icmsforn) / 100
    parte_interna = ((((100 + 100 * fator) * (1 + ipi) * (0.205 * 100)) / 100) - 100 * icmsforn) / 100
    # valor_impostos = ((valor_custo + (parte_interna * valor_custo)) + (valor_custo * ipi)) - valor_custo
    valor_impostos = ((valor_custo + (parte_interna * valor_custo)) + (valor_custo * ipi)) - valor_custo

    # valor_margem = valor_final*0.67 - valor_custo
    if valor_final is None:
        valor_margem = ""
    else:
        valor_margem = valor_final * 0.67 - valor_custo

    return valor_custo, valor_impostos, valor_margem


def main():
    conn = get_firebird_connection()

    # 1) CONCESSÃO
    # Vamos supor que há apenas uma empresa principal, ou pegar a primeira. Caso precise filtrar, 
    # o usuário não especificou qual empresa. Vamos supor que é a empresa contratada.
    # Caso haja várias, poderia precisar um filtro, mas não foi especificado. Pegaremos a empresa LOJA 
    # se não for especificado. Entretanto, no faturamento citou a empresa "Loja (14.255.350/0001-03)".
    # Vamos pegar esta, pois parece ser a empresa principal.
    cnpj_loja = "14.255.350/0001-03"
    cur = conn.cursor()
    cur.execute("""
        SELECT CNPJ, INSCRICAOESTADUAL, NOMEFANTASIA, RAZAOSOCIAL, ENDERECO, NUMERO, CEP, CIDADE, UF
        FROM EMPRESA
        WHERE CNPJ = ?
    """, (cnpj_loja,))
    empresa = cur.fetchone()
    if empresa:
        (cnpj, ie, nfantasia, razao_social, ender, num, cep, cidade, uf) = empresa
    else:
        # Caso não encontre, deixa vazio
        cnpj = ie = nfantasia = razao_social = ender = num = cep = cidade = uf = ""

    # Normalizações
    cnpj = normalizar_texto(cnpj)
    ie = normalizar_texto(ie)
    nfantasia = normalizar_texto(nfantasia)
    razao_social = normalizar_texto(razao_social)
    ender = normalizar_texto(ender)
    num = normalizar_texto(num)
    cep = normalizar_texto(cep)
    cidade = normalizar_texto(cidade)
    uf = normalizar_texto(uf)

    # Cria o CSV concessão-comagro.csv
    with open("arquivos/concessao-comagro.csv", "w", encoding="utf-8", newline='') as f:
        writer = csv.writer(f, delimiter=';', quoting=csv.QUOTE_NONE, escapechar='\\')
        # Cabeçalho
        writer.writerow([
            "conta", "dms", "cnpj", "inscricao_estadual", "nome", "razao_social", "tipo_logradouro",
            "logradouro", "numero", "complemento", "cep", "cidade", "uf", "telefone_principal", "telefone_secundario"
        ])
        # Linha de dados
        # tipo_logradouro e complemento não foram informados, deixar em branco.
        # nome = NOMEFANTASIA
        writer.writerow([
            cnpj, "Proprio", cnpj, ie, nfantasia, razao_social, "", ender, num, "", cep, cidade, uf, "7732017400", "7732017410"
        ])

    # 2) USUÁRIOS
    # Pegar todos usuários com SETOR em ('VENDAS','ADM')
    # JOIN USUARIO CDFUNC = FUNCIONARIO CDFUNC
    # conta = empresa cnpj
    cur.execute("""
        SELECT U.SETOR, F.EMAIL, F.CDFUNC, F.CPF, F.NOME, F.FONE, F.CELULAR, F.NUMCNH
        FROM USUARIO U
        JOIN FUNCIONARIO F ON U.CDFUNC = F.CDFUNC
        JOIN EMPRESA E ON E.CNPJ = ?
        WHERE U.SETOR IN ('VENDAS', 'ADM')
    """, (cnpj_loja,)) 
    usuarios = cur.fetchall()

    with open("arquivos/usuarios-comagro.csv", "w", encoding="utf-8", newline='') as f:
        writer = csv.writer(f, delimiter=';', quoting=csv.QUOTE_NONE, escapechar='\\')
        # Cabeçalho
        writer.writerow(["conta", "email_login", "codigo_externo", "cpf", "nome", "telefone", "celular", "canal", "papel"])
        for (setor, email, cd_func, cpf, nome, fone, celular, num_cnh) in usuarios:
            email = normalizar_texto(email)
            cpf = normalizar_texto(cpf)
            nome = normalizar_texto(nome)
            fone = limpar_telefone(fone)
            celular = limpar_telefone(celular)
            num_cnh = normalizar_texto(num_cnh)

            # papel
            if setor == 'ADM':
                papel = 'GERENTE'
            elif setor == 'VENDAS':
                papel = 'VENDEDOR'
            else:
                papel = ''

            writer.writerow([cnpj, email, cd_func, cpf, nome, fone, celular, num_cnh, papel])

    # 3) FATURAMENTO
    # Empresa Loja (CNPJ = 14.255.350/0001-03)
    # Data: de 2022-12-18 até hoje (17/12/2024)
    # PEDIDOVENDA + ITENSPEDIDOVENDA + CLIENTE + FONE + FUNCIONARIO ...
    # Para cada item de cada pedido, uma linha.
    start_date = (datetime.now() - datetime.timedelta(days=2*365)).strftime('%Y-%m-%d')
    end_date = datetime.now().strftime('%Y-%m-%d')

    # Vamos buscar pedidos desta empresa e deste período
    # Assumindo: PEDIDOVENDA possui CDPEDIDOVENDA, DATA, NOMECLIENTE, CDCLIENTE, CDFUNC.
    # ITENSPEDIDOVENDA: CDPEDIDOVENDA, CDPRODUTO, NUMORIGINAL, QUANTIDADE, VALORCDESC, DESCRICAO
    # FUNCIONARIO: CDFUNC, NUMCNH
    # CLIENTE: CDCLIENTE, CPF_CNPJ, CEP, CIDADE, ESTADO
    # FONE: CDCLIENTE, FONE (uma entrada? se houver várias, pegar uma. Não está claro, vamos pegar a primeira.)
    # NOTA: Para cada item, buscaremos a NF de compra mais recente. Se não houver, campos vazios.
    # Canal: se NUMCNH = "TELEPECAS", então canal = "TELEP", senão canal = NUMCNH

    # Primeiro pegamos todos PEDIDOVENDA da empresa neste período
    cur.execute("""
        SELECT P.CDPEDIDOVENDA, P.DATA, P.NOMECLIENTE, P.CDCLIENTE, P.CDFUNC
        FROM PEDIDOVENDA P
        JOIN EMPRESA E ON E.CNPJ = ?
        WHERE P.DATA BETWEEN ? AND ?
    """, (cnpj_loja, start_date, end_date))
    pedidos = cur.fetchall()

    # Vamos construir um dicionário de pedidos -> itens
    # Depois buscar os dados auxiliares
    pedidos_ids = [p[0] for p in pedidos]
    if pedidos_ids:
        # Obter itens
        # Vamos montar uma condição IN se houver muitos pedidos
        format_strings = ','.join(['?']*len(pedidos_ids))
        sql_itens = f"""
            SELECT I.CDPEDIDOVENDA, I.CDPRODUTO, I.NUMORIGINAL, I.QUANTIDADE, I.VALORCDESC, I.DESCRICAO
            FROM ITENSPEDIDOVENDA I
            WHERE I.CDPEDIDOVENDA IN ({format_strings})
        """
        cur.execute(sql_itens, tuple(pedidos_ids))
        itens = cur.fetchall()
    else:
        itens = []

    # Organizar itens por pedido
    itens_por_pedido = {}
    for (cd_p, cd_prod, num_orig, qtd, valorcdesc, desc) in itens:
        if cd_p not in itens_por_pedido:
            itens_por_pedido[cd_p] = []
        itens_por_pedido[cd_p].append((cd_prod, num_orig, qtd, valorcdesc, desc))

    # Precisamos dados dos clientes dos pedidos
    cd_clientes = set(p[3] for p in pedidos if p[3] is not None)
    clientes_dict = {}
    if cd_clientes:
        format_strings = ','.join(['?']*len(cd_clientes)) # IN (?, ?, ...)
        sql_clientes = f"""
            SELECT CDCLIENTE, CPF_CNPJ, CEP, CIDADE, ESTADO
            FROM CLIENTE
            WHERE CDCLIENTE IN ({format_strings})
        """
        cur.execute(sql_clientes, tuple(cd_clientes))
        for row in cur.fetchall():
            (cdcli, cpf_cnpj_cli, cep_cli, cid_cli, uf_cli) = row
            clientes_dict[cdcli] = {
                'cpf_cnpj': normalizar_texto(cpf_cnpj_cli),
                'cep': normalizar_texto(cep_cli),
                'cidade': normalizar_texto(cid_cli),
                'uf': normalizar_texto(uf_cli)
            }

    # Dados de telefone do cliente
    # Vários clientes, precisamos pegar o primeiro fone encontrado
    # Caso tenha mais de um FONE por CLIENTE, pegamos o primeiro.
    if cd_clientes:
        sql_fones = f"""
            SELECT CDCLIENTE, FONE
            FROM FONE
            WHERE CDCLIENTE IN ({format_strings})
        """
        cur.execute(sql_fones, tuple(cd_clientes))
        fones_dict = {}
        for (cdcli, fone) in cur.fetchall():
            if cdcli not in fones_dict:  # Pega apenas o primeiro
                fones_dict[cdcli] = normalizar_texto(fone)
    else:
        fones_dict = {}

    # Dados dos funcionários (para canal)
    cd_funcs = set(p[4] for p in pedidos if p[4] is not None)
    funcs_dict = {}
    if cd_funcs:
        format_strings = ','.join(['?']*len(cd_funcs))
        sql_func = f"""
            SELECT CDFUNC, NUMCNH
            FROM FUNCIONARIO
            WHERE CDFUNC IN ({format_strings})
        """
        cur.execute(sql_func, tuple(cd_funcs))
        for (cdf, ncnh) in cur.fetchall():
            ncnh = normalizar_texto(ncnh)
            if ncnh.upper() == "TELEPECAS":
                canal = "TELEP"
            else:
                canal = ncnh.upper()
            funcs_dict[cdf] = canal

    # Montar o CSV de faturamento
    # Sem cabeçalho, sem rodapé, sem espaço extra
    with open("arquivos/faturamento-comagro.csv", "w", encoding="utf-8", newline='') as f:
        writer = csv.writer(f, delimiter=';', quoting=csv.QUOTE_NONE, escapechar='\\')
        # Sem cabeçalho

        for (cd_ped, data_ped, nome_cli, cdcli, cd_func) in pedidos:
            # Dados do cliente
            if cdcli in clientes_dict:
                cpf_cnpj_cli = clientes_dict[cdcli]['cpf_cnpj']
                cep_cli = clientes_dict[cdcli]['cep']
                cid_cli = clientes_dict[cdcli]['cidade']
                uf_cli = clientes_dict[cdcli]['uf']
            else:
                cpf_cnpj_cli = cep_cli = cid_cli = uf_cli = ""

            cpf_cnpj_limpo = remover_caracteres_nao_numericos(cpf_cnpj_cli)
            tipo_cli = determinar_tipo_cliente(cpf_cnpj_limpo)

            fone_cli = fones_dict.get(cdcli, "")
            ddd, telnum = extrair_ddd_telefone(fone_cli)

            # Canal
            canal = funcs_dict.get(cd_func, "")

            # nome cliente
            nome_cli = normalizar_texto(nome_cli)

            # Para cada item do pedido
            if cd_ped in itens_por_pedido:
                for (cd_prod, num_orig, qtd, valorcd, desc) in itens_por_pedido[cd_ped]:
                    # Normalizar campos
                    num_orig = normalizar_texto(num_orig)
                    desc = normalizar_texto(desc)
                    # Classificacao
                    classificacao_peca = classificar_item_descricao(desc)
                    # valor_final = valorcd (float?), assume que é numérico
                    valor_final = valorcd if valorcd else 0.0

                    # Procurar NF de compra anterior
                    numdocumento = recuperar_historico_numdocumento(conn, cd_prod, data_ped)
                    valor_custo, valor_impostos, valor_margem = "", "", ""
                    if numdocumento is not None:
                        # Obter custos e impostos
                        vc, vi, vm = obter_valores_custo_imposto_margem(conn, numdocumento, cd_prod, valor_final)
                        valor_custo = vc
                        valor_impostos = vi
                        valor_margem = vm

                    # Montar linha
                    # [EMPRESA]CNPJ = cnpj
                    # numero_nota = CDPEDIDOVENDA
                    # canal = canal
                    # data = data_ped
                    # nome_cliente = nome_cli
                    # tipo_cliente = tipo_cli
                    # cpf_cnpj = cpf_cnpj_limpo
                    # cep = cep_cli
                    # cidade = cid_cli
                    # uf = uf_cli
                    # ddd_telefone = ddd
                    # telefone = telnum
                    # chassi = ""
                    # modelo = ""
                    # ano = ""
                    # placa = ""
                    # codigo_peca = num_orig
                    # quantidade = qtd
                    # valor = valor_final
                    # codigo_externo = cd_func
                    # classificacao_item = "" 
                    # descricao_peca = desc
                    # valor_custo, valor_impostos, valor_margem conforme calculado
                    # A ordem não foi 100% clara no enunciado da planilha de faturamento. 
                    # O enunciado dá a ordem após o exemplo. Vamos seguir a ordem do enunciado exatamente:
                    # código_concessionária | numero_nota | canal | data | nome_cliente | tipo_cliente | cpf_cnpj | cep | cidade | uf |
                    # ddd_telefone | telefone | chassi | modelo | ano | placa | codigo_peca | quantidade | valor | codigo_externo | classificacao_item | descricao_peca |
                    # valor_custo | valor_impostos | valor_margem | classificacao_peca

                    linha = [
                        cnpj, cd_ped, canal, data_ped.strftime("%Y-%m-%d"), nome_cli, tipo_cli, cpf_cnpj_limpo,
                        cep_cli, cid_cli, uf_cli, ddd, telnum, "", "", "", "", num_orig,
                        qtd, valor_final, cd_func, "", desc, valor_custo, valor_impostos, valor_margem, classificacao_peca
                    ]

                    # Converter todos para strings e garantir que não adiciona ponto e vírgula no final extra
                    linha_str = []
                    for x in linha:
                        if x is None:
                            x = ""
                        elif isinstance(x, float):
                            x = str(x).replace('.', ',')  # Caso precise trocar ponto por vírgula no valor
                            # A instrução não disse explicitamente para trocar ponto por vírgula em valores numéricos.
                            # Apenas disse para trocar ";" por "," nos dados. Vamos manter ponto no float.
                            # Reverter ao original:
                            x = str(x)
                        else:
                            x = str(x)
                        linha_str.append(x)
                    writer.writerow(linha_str)

    conn.close()

if __name__ == "__main__":
    main()

AttributeError: module 'datetime' has no attribute 'now'

In [12]:
conn = get_firebird_connection()
curs = conn.cursor()

# Pegar todos os dados das colunas HISTORICOPRODUTO (de 1 a 10), onde TIPO = 'PEDIDO'
for i in range(1, 11):
    curs.execute(f"""
        SELECT NUMDOCUMENTO, DATA
        FROM HISTORICOPRODUTO{i}
        WHERE TIPO = 'PEDIDO'
        AND DATA BETWEEN '2024-11-01' AND '2024-12-18'
        AND CDPRODUTO = 51005
    """)
    rows = curs.fetchall()
    for row in rows:
        print(row)

('140137', datetime.date(2024, 11, 8))
