1. Importações

In [1]:
import pandas as pd
from sqlalchemy import (
    create_engine, Table, Column, Integer, String, Float, Numeric, Date,
    CheckConstraint, ForeignKey, func, MetaData
)
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import text
from datetime import datetime
import os
import logging

import argparse


Opcional -> CONFIGURAÇÕES DE LOGGING

In [2]:
# Configuração básica do logging
# logging.basicConfig(
#     level=logging.INFO,
#     format='%(asctime)s - %(levelname)s - %(message)s',
#     handlers=[
#         logging.FileHandler("processamento.log"),
#         logging.StreamHandler()
#     ]
# )

logger = logging.getLogger(__name__)

2. CONFIGURAÇÕES DE BANCO DE DADOS

In [3]:
# Configurações da conexão com o PostgreSQL na GCP usando variáveis de ambiente
DATABASE_CONFIG = {
    'username': os.getenv('DB_USERNAME', 'postgres'),
    'password': os.getenv('DB_PASSWORD', 'U7urkInVDg[(D^{&'),  # Ajuste conforme necessário
    'host': os.getenv('DB_HOST', '34.130.95.218'),
    'port': os.getenv('DB_PORT', '5432'),
    'database': os.getenv('DB_NAME', 'postgres'),
}

def get_database_url(config):
    """
    Constrói a URL de conexão para o PostgreSQL.
    """
    return f"postgresql://{config['username']}:{config['password']}@{config['host']}:{config['port']}/{config['database']}"

# Criação do engine de conexão
DATABASE_URL = get_database_url(DATABASE_CONFIG)
engine = create_engine(DATABASE_URL, echo=False)  # echo=False para reduzir a verbosidade dos logs
metadata = MetaData()

3. DEFINIÇÃO DAS TABELAS

In [4]:
# Definição da tabela marketplaces
marketplaces = Table(
    'marketplaces', metadata,
    Column('id', Integer, primary_key=True, autoincrement=True),
    Column('nome', String(255), nullable=False),
    Column('descricao', String, nullable=True),
    extend_existing=True  # Permite redefinir a tabela se já existir no MetaData
)

# Definição da tabela arquivos_processados
arquivos_processados = Table(
    'arquivos_processados', metadata,
    Column('id', Integer, primary_key=True, autoincrement=True),
    Column('nome_arquivo', String(255), nullable=False, unique=True),
    Column('data_processo', Date, nullable=False, server_default=func.current_date()),
    Column('status', String(50), nullable=False),
    Column('observacoes', String, nullable=True),
    CheckConstraint("status IN ('PENDENTE', 'PROCESSADO', 'ERRO')", name='status_check'),
    extend_existing=True
)

# Definição da tabela sku_marketplace
sku_marketplace = Table(
    'sku_marketplace', metadata,
    Column('id', Integer, primary_key=True, autoincrement=True),
    Column('marketplace_id', Integer, ForeignKey('marketplaces.id'), nullable=False),
    Column('numero_pedido', String(50), nullable=False),
    Column('valor_liquido', Float, nullable=False),
    Column('valor_bruto', Float, nullable=True),
    Column('valor_final', Float, nullable=False),
    Column('valor_frete', Float, nullable=True),
    extend_existing=True
)

# Definição da tabela comissoes_pedido
comissoes_pedido = Table(
    'comissoes_pedido', metadata,
    Column('id', Integer, primary_key=True, autoincrement=True),
    Column('data', Date, nullable=False),
    Column('porcentagem', Numeric(5,4), nullable=False),
    Column('sku_marketplace_id', Integer, ForeignKey('sku_marketplace.id'), nullable=False),
    CheckConstraint("porcentagem >= 0 AND porcentagem <= 1", name='comissoes_pedido_porcentagem_check'),
    extend_existing=True
)

# Definição da tabela comissoes_periodo
comissoes_periodo = Table(
    'comissoes_periodo', metadata,
    Column('id', Integer, primary_key=True, autoincrement=True),
    Column('data_inicio', Date, nullable=False),
    Column('data_fim', Date, nullable=False),
    Column('porcentagem', Numeric(5,4), nullable=False),
    Column('sku_marketplace', String(255), nullable=True),
    Column('marketplace_id', Integer, ForeignKey('marketplaces.id'), nullable=True),
    extend_existing=True
)

evento_centauro = Table(
    'evento_centauro', metadata,
    Column('id', Integer, primary_key=True, autoincrement=True),
    Column('numero_pedido', String(50), nullable=False),
    Column('tipo_evento', String(255), nullable=True),
    Column('repasse_liquido_evento', Float, nullable=False),
    Column('data', Date, nullable=False),
    extend_existing=True
)
# Criação das tabelas no banco (se não existirem)
metadata.create_all(engine)

4. CONFIGURAÇÃO DO SQLALCHEMY ORM

In [5]:
Session = sessionmaker(bind=engine)
session = Session()

5. FUNÇÕES AUXILIARES

In [6]:


def verificar_marketplace_id(marketplace_id=5):
    """
    Verifica se o marketplace_id existe na tabela marketplaces.
    """
    try:
        query = marketplaces.select().where(marketplaces.c.id == marketplace_id)
        result = session.execute(query).fetchone()
        if result:
            logger.info(f"Marketplace com ID {marketplace_id} encontrado.")
            return True
        else:
            logger.error(f"Marketplace com ID {marketplace_id} não encontrado.")
            return False
    except Exception as e:
        logger.error(f"Erro ao verificar marketplace_id {marketplace_id}: {e}")
        return False


def parse_date_from_filename(filename):
    base = os.path.basename(filename)
    try:
        date_part = base.split(' - ')[0]  # Exemplo: "2024-09"
        year, month = map(int, date_part.split('-'))
        return datetime(year, month, 1).date()
    except ValueError as ve:
        logger.error(f"Erro ao extrair data do nome '{filename}': {ve}")
        return None
    except Exception as e:
        logger.error(f"Erro inesperado ao extrair data do nome '{filename}': {e}")
        return None


def inserir_arquivos_processados(nome_arquivo, data_processo, status, observacoes=None):
    """
    Insere ou verifica o processamento de arquivos na tabela 'arquivos_processados'.
    
    Parâmetros:
    - nome_arquivo (str): Nome do arquivo CSV.
    - data_processo (date): Data de processamento.
    - status (str): Status do processamento ('PENDENTE', 'PROCESSADO', 'ERRO', 'REPROCESSANDO').
    - observacoes (str, optional): Observações adicionais.
    
    Retorna:
    - bool: True se o arquivo foi inserido com sucesso, False se já estava processado ou houve erro.
    """
    if isinstance(data_processo, datetime):
        data_processo = data_processo.date()
    if data_processo is None:
        data_processo = datetime.now().date()

    try:
        with engine.begin() as conn:
            # Verifica se o arquivo já foi processado
            sel = text("""
                SELECT id, status
                FROM arquivos_processados
                WHERE nome_arquivo = :na
            """)
            row = conn.execute(sel, {"na": nome_arquivo}).fetchone()

            if row:
                # Acessa o status usando o índice correto (1)
                logger.warning(f"[arquivos_processados] Arquivo '{nome_arquivo}' já foi processado com status '{row[1]}'.")
                return False  # Indica que o arquivo já foi processado

            # Insere o arquivo como processado
            sql = text("""
                INSERT INTO arquivos_processados (nome_arquivo, data_processo, status, observacoes)
                VALUES (:na, :dp, :st, :obs)
            """)
            conn.execute(
                sql, {"na": nome_arquivo, "dp": data_processo, "st": status, "obs": observacoes})
        logger.info(f"[arquivos_processados] Inserido => '{nome_arquivo}' / {status}")
        return True  # Indica que o arquivo foi inserido com sucesso
    except Exception as e:
        logger.error(f"Erro ao inserir em 'arquivos_processados': {e}")
        return False


def inserir_comissoes_pedido(data_pedido, porcentagem_comissao, sku_marketplace_id):
    """
    Insere uma comissão na tabela 'comissoes_pedido'.
    
    Parâmetros:
    - data_pedido (date): A data do pedido.
    - porcentagem_comissao (float): A porcentagem da comissão (0-1).
    - sku_marketplace_id (int): ID do SKU no marketplace.
    """
    pc = round(porcentagem_comissao, 4)
    if not (0 <= pc <= 1):
        raise ValueError("Porcentagem de comissão fora do intervalo (0-1).")

    try:
        with engine.begin() as conn:
            sql = text("""
                INSERT INTO comissoes_pedido (data, porcentagem, sku_marketplace_id)
                VALUES (:dt, :pc, :sid)
            """)
            conn.execute(sql, {"dt": data_pedido, "pc": pc,
                             "sid": sku_marketplace_id})
        logger.info(f"[comissoes_pedido] Inserido => sku_marketplace_id={sku_marketplace_id}, pc={pc*100:.2f}%.")
    except Exception as e:
        logger.error(f"Erro inserir_comissoes_pedido: {e}")


def upsert_sku_marketplace(pedido, marketplace_id=5):
    """
    Garante que o pedido exista na tabela sku_marketplace.
    Se não existir, insere com valores padrão (0 ou NULL).
    Retorna o ID do sku_marketplace.
    """
    try:
        with engine.begin() as conn:
            sel = text("""
                SELECT id
                FROM sku_marketplace
                WHERE numero_pedido = :ped
            """)
            row = conn.execute(sel, {"ped": pedido}).fetchone()

            if row:
                sku_id = row[0]
                logger.info(f"[sku_marketplace] Pedido existente => pedido={pedido}, ID={sku_id}")
                return sku_id
            else:
                ins = text("""
                    INSERT INTO sku_marketplace
                    (marketplace_id, numero_pedido, valor_liquido, valor_frete, valor_final)
                    VALUES (:mk, :ped, 0, 0, 0)
                    RETURNING id
                """)
                ret = conn.execute(ins, {
                    "mk": marketplace_id,
                    "ped": pedido
                })
                sku_id = ret.fetchone()[0]
                logger.info(f"[sku_marketplace] INSERT => pedido={pedido}, ID={sku_id}")
                return sku_id
    except Exception as e:
        logger.error(f"Erro upsert_sku_marketplace: {e}")
        return None


def atualizar_sku_marketplace_repasse(pedido, valor_pedido, valor_frete, valor_final, marketplace_id=5):
    """
    Atualiza os valores de repasse normal na tabela sku_marketplace.
    Se o pedido não existir, insere com os valores fornecidos.
    """
    try:
        with engine.begin() as conn:
            sel = text("""
                SELECT id
                FROM sku_marketplace
                WHERE numero_pedido = :ped
            """)
            row = conn.execute(sel, {"ped": pedido}).fetchone()

            if row:
                sku_id = row[0]
                upd = text("""
                    UPDATE sku_marketplace
                    SET
                        valor_liquido = :vliq,
                        valor_frete   = :vfre,
                        valor_final   = :vfin
                    WHERE id = :sid
                """)
                conn.execute(upd, {
                    "vliq": valor_pedido,
                    "vfre": valor_frete,
                    "vfin": valor_final,
                    "sid":  sku_id
                })
                logger.info(f"[sku_marketplace] UPDATE => pedido={pedido}, ID={sku_id}")
                return sku_id
            else:
                ins = text("""
                    INSERT INTO sku_marketplace
                    (marketplace_id, numero_pedido, valor_liquido, valor_frete, valor_final)
                    VALUES (:mk, :ped, :vliq, :vfre, :vfin)
                    RETURNING id
                """)
                ret = conn.execute(ins, {
                    "mk": marketplace_id,
                    "ped": pedido,
                    "vliq": valor_pedido,
                    "vfre": valor_frete,
                    "vfin": valor_final
                })
                sku_id = ret.fetchone()[0]
                logger.info(f"[sku_marketplace] INSERT => pedido={pedido}, ID={sku_id}")
                return sku_id
    except Exception as e:
        logger.error(f"Erro atualizar_sku_marketplace_repasse: {e}")
        return None


def upsert_evento_centauro(numero_pedido, tipo_evento, repasse_liquido, data, data_repasse=None):
    """
    Upsert em evento_centauro. 
    - Se (numero_pedido, tipo_evento) já existir, atualiza repasse_liquido_evento, data, data_repasse.
    - Se o tipo_evento for diferente de 'Repasse Normal' e já existir um pedido com o mesmo numero_pedido,
      soma repasse_liquido_evento ao existente.
    - Senão, insere um novo registro.
    """
    try:
        with engine.begin() as conn:
            # Verifica se já existe um evento com o mesmo numero_pedido e tipo_evento
            sel = text("""
                SELECT id, repasse_liquido_evento
                FROM evento_centauro
                WHERE numero_pedido = :ped
                  AND tipo_evento = :tevt
            """)
            row = conn.execute(sel, {
                "ped": numero_pedido,
                "tevt": tipo_evento
            }).fetchone()

            if row:
                ec_id, repasse_existente = row
                if tipo_evento not in ["Repasse Normal", "Repasse - Normal"]:
                    # Converte repasse_existente para float antes de somar
                    novo_repasse = float(repasse_existente) + repasse_liquido
                    upd = text("""
                        UPDATE evento_centauro
                        SET repasse_liquido_evento = :rle,
                            data = :dt,
                            data_repasse = :dr
                        WHERE id = :id
                    """)
                    conn.execute(upd, {
                        "rle": novo_repasse,
                        "dt":  data,
                        "dr":  data_repasse,
                        "id":  ec_id
                    })
                    logger.info(f"[evento_centauro] UPDATE (soma repasse) => pedido={numero_pedido}, tipo={tipo_evento}, ID={ec_id}, novo_repasse={novo_repasse}")
                else:
                    # Atualiza normalmente para 'Repasse Normal'
                    upd = text("""
                        UPDATE evento_centauro
                        SET repasse_liquido_evento = :rle,
                            data = :dt,
                            data_repasse = :dr
                        WHERE id = :id
                    """)
                    conn.execute(upd, {
                        "rle": repasse_liquido,
                        "dt":  data,
                        "dr":  data_repasse,
                        "id":  ec_id
                    })
                    logger.info(f"[evento_centauro] UPDATE => pedido={numero_pedido}, tipo={tipo_evento}, ID={ec_id}")
                return ec_id
            else:
                # Se não existir, insere um novo registro
                ins = text("""
                    INSERT INTO evento_centauro
                    (numero_pedido, tipo_evento, repasse_liquido_evento, data, data_repasse)
                    VALUES (:ped, :tevt, :rle, :dt, :dr)
                    RETURNING id
                """)
                ret = conn.execute(ins, {
                    "ped": numero_pedido,
                    "tevt": tipo_evento,
                    "rle": repasse_liquido,
                    "dt":  data,
                    "dr":  data_repasse
                })
                ec_id = ret.fetchone()[0]
                logger.info(f"[evento_centauro] INSERT => pedido={numero_pedido}, tipo={tipo_evento}, ID={ec_id}")
                return ec_id
    except Exception as e:
        logger.error(f"Erro upsert_evento_centauro: {e}")
        return None


def processar_csv(file_path, reprocessar=True):
    """
    Lê o CSV, interpretando 'Pedido' como string (ex. '00000000002').
    - Insere todos os pedidos na tabela sku_marketplace.
    - Se status for 'Repasse Normal' ou 'Repasse - Normal':
      -> atualiza os valores do repasse normal.
    - Se for outro status, apenas registra o evento.
    - SEMPRE faz upsert_evento_centauro, registrando o evento e data.
    - Se for repasse normal, busca comissão e insere comissoes_pedido.
    """
    nome_arquivo = os.path.basename(file_path)
    data_processo = parse_date_from_filename(nome_arquivo)

    if not data_processo:
        # Atualiza o status para 'ERRO' e adiciona observações
        try:
            with engine.begin() as conn:
                upd = text("""
                    UPDATE arquivos_processados
                    SET status = 'ERRO', observacoes = 'Erro ao extrair data do nome do arquivo.'
                    WHERE nome_arquivo = :na
                """)
                conn.execute(upd, {"na": nome_arquivo})
            logger.info(f"[arquivos_processados] Atualizado => '{nome_arquivo}' / ERRO")
        except Exception as e:
            logger.error(f"Erro ao atualizar status para 'ERRO' em '{nome_arquivo}': {e}")
        return

    try:
        df = pd.read_csv(file_path, sep=';')

        # Tratar '-' como '0'
        for col in ["ValorPedido", "ValorFrete", "Comissao", "RepasseLiquido"]:
            if col in df.columns:
                df[col] = df[col].replace('-', '0')

        logger.info(f"Lendo CSV '{nome_arquivo}'. Registros: {len(df)}")

        for idx, row in df.iterrows():
            pedido_str = str(row.get("Pedido", "")).strip()
            data_pedido = None
            if "DataPedido" in row:
                dtp = pd.to_datetime(row["DataPedido"], errors='coerce')
                if pd.notnull(dtp):
                    data_pedido = dtp.date()
            data_repasse = None
            if "Ciclo" in row:
                drp = pd.to_datetime(row["Ciclo"], errors='coerce')
                if pd.notnull(drp):
                    data_repasse = drp.date()

            status_venda = str(row.get("StatusAtendimento", "")).strip()
            valor_pedido = float(row.get("ValorPedido", 0.0))
            valor_frete = float(row.get("ValorFrete", 0.0))
            valor_final = float(row.get("RepasseLiquido", 0.0))

            # 1) Garantir que o pedido exista na tabela sku_marketplace
            sku_id = upsert_sku_marketplace(pedido=pedido_str, marketplace_id=5)

            # 2) Se status for 'Repasse Normal' ou 'Repasse - Normal', atualiza os valores do repasse
            if status_venda in ["Repasse Normal", "Repasse - Normal"]:
                if sku_id:
                    # Antes de atualizar, verificar se os valores já correspondem para evitar duplicação
                    try:
                        with engine.begin() as conn:
                            sel = text("""
                                SELECT valor_liquido, valor_frete, valor_final
                                FROM sku_marketplace
                                WHERE id = :sid
                            """)
                            row_sku = conn.execute(sel, {"sid": sku_id}).fetchone()
                            if row_sku:
                                current_vliq, current_vfre, current_vfin = row_sku
                                if (current_vliq != valor_pedido or
                                    current_vfre != valor_frete or
                                    current_vfin != valor_final):
                                    atualizar_sku_marketplace_repasse(
                                        pedido=pedido_str,
                                        valor_pedido=valor_pedido,
                                        valor_frete=valor_frete,
                                        valor_final=valor_final,
                                        marketplace_id=5
                                    )
                                else:
                                    logger.info(f"[sku_marketplace] Valores já atualizados para pedido={pedido_str}, ID={sku_id}")
                    except Exception as e:
                        logger.error(f"Erro ao verificar valores para pedido={pedido_str}: {e}")

            # 3) upsert_evento_centauro (sempre)
            upsert_evento_centauro(
                numero_pedido=pedido_str,
                tipo_evento=status_venda,
                repasse_liquido=valor_final,
                data=data_pedido,
                data_repasse=data_repasse
            )

            # 4) Se status repasse normal, busca comissão e insere se não existir
            if status_venda in ["Repasse Normal", "Repasse - Normal"] and data_pedido and sku_id:
                comissao_encontrada = buscar_comissao_periodo(
                    data_pedido, pedido_str)
                if comissao_encontrada is not None:
                    try:
                        with engine.begin() as conn:
                            sel = text("""
                                SELECT COUNT(*)
                                FROM comissoes_pedido
                                WHERE data = :dt
                                  AND porcentagem = :pc
                                  AND sku_marketplace_id = :sid
                            """)
                            count = conn.execute(sel, {
                                "dt": data_pedido,
                                "pc": comissao_encontrada,
                                "sid": sku_id
                            }).scalar()
                            if count == 0:
                                inserir_comissoes_pedido(
                                    data_pedido, comissao_encontrada, sku_id)
                            else:
                                logger.info(f"[comissoes_pedido] Comissão já inserida para pedido={pedido_str}, ID={sku_id}")
                    except Exception as e:
                        logger.error(f"Erro ao verificar comissão para pedido={pedido_str}: {e}")
                else:
                    logger.info(
                        f"Sem comissão p/ '{pedido_str}' data={data_pedido}.")

        # Ao final, marcar o arquivo como processado ou reprocessado
        try:
            with engine.begin() as conn:
                status_final = 'PROCESSADO'
                observacoes_final = None
                if reprocessar:
                    observacoes_final = 'Reprocessamento concluído.'
                upd = text("""
                    UPDATE arquivos_processados
                    SET status = :st, observacoes = :obs
                    WHERE nome_arquivo = :na
                """)
                conn.execute(upd, {"st": status_final, "obs": observacoes_final, "na": nome_arquivo})
            logger.info(f"Processamento do arquivo '{nome_arquivo}' concluído com sucesso.")
        except Exception as e:
            logger.error(f"Erro ao marcar arquivo '{nome_arquivo}' como PROCESSADO: {e}")
    except:
        print("ERROU AQQ")

def buscar_comissao_periodo(data_pedido, sku):
    """
    Busca a comissão correspondente ao período e SKU no banco de dados.
    
    Parâmetros:
    - data_pedido (date): A data do pedido.
    - sku (str): O SKU do marketplace (numero_pedido).
    
    Retorna:
    - float: A porcentagem da comissão se encontrada, caso contrário, None.
    """
    sql_union = text("""
        SELECT porcentagem
        FROM comissoes_periodo cp
        WHERE :data_pedido BETWEEN cp.data_inicio AND cp.data_fim
          AND cp.sku_marketplace = :sku
          
        UNION
        
        SELECT porcentagem
        FROM comissoes_periodo cp
        WHERE :data_pedido BETWEEN cp.data_inicio AND cp.data_fim
          AND cp.sku_marketplace IS NULL
          AND NOT EXISTS (
              SELECT 1
              FROM comissoes_periodo cp2
              WHERE :data_pedido BETWEEN cp2.data_inicio AND cp2.data_fim
                AND cp2.sku_marketplace = :sku
          )
    """)
    try:
        with engine.begin() as conn:
            row = conn.execute(
                sql_union, {"data_pedido": data_pedido, "sku": sku}).fetchone()
            if row:
                return float(row[0])
            else:
                return None
    except Exception as e:
        logger.error(f"Erro buscar_comissao_periodo p/ sku='{sku}': {e}")
        return None

In [7]:
def reprocessar_planilha(file_path):
    """
    Reprocessa uma planilha já inserida no banco de dados.
    - Garante que todos os pedidos existam na tabela sku_marketplace.
    - Atualiza os valores de repasse normal sem somar repetidamente.
    - Atualiza eventos sem duplicar.
    """
    nome_arquivo = os.path.basename(file_path)
    data_processo = parse_date_from_filename(nome_arquivo)

    if not data_processo:
        inserir_arquivos_processados(
            nome_arquivo, None, 'ERRO', "Erro ao extrair data do arquivo.")
        return

    try:
        # Atualiza o status para 'REPROCESSANDO'
        with engine.begin() as conn:
            # Em vez de 'REPROCESSANDO', use 'PENDENTE' ou outro status apropriado
            upd = text("""
                UPDATE arquivos_processados
                SET status = 'PENDENTE', observacoes = 'Reprocessamento solicitado.'
                WHERE nome_arquivo = :na
            """)
            
            conn.execute(upd, {"na": nome_arquivo})

        df = pd.read_csv(file_path, sep=';')

        # Tratar '-' como '0'
        for col in ["ValorPedido", "ValorFrete", "Comissao", "RepasseLiquido"]:
            if col in df.columns:
                df[col] = df[col].replace('-', '0')

        logger.info(f"Reprocessando CSV '{nome_arquivo}'. Registros: {len(df)}")

        for idx, row in df.iterrows():
            pedido_str = str(row.get("Pedido", "")).strip()
            data_pedido = None
            if "DataPedido" in row:
                dtp = pd.to_datetime(row["DataPedido"], errors='coerce')
                if pd.notnull(dtp):
                    data_pedido = dtp.date()
            data_repasse = None
            if "Ciclo" in row:
                drp = pd.to_datetime(row["Ciclo"], errors='coerce')
                if pd.notnull(drp):
                    data_repasse = drp.date()

            status_venda = str(row.get("StatusAtendimento", "")).strip()
            valor_pedido = float(row.get("ValorPedido", 0.0))
            valor_frete = float(row.get("ValorFrete", 0.0))
            valor_final = float(row.get("RepasseLiquido", 0.0))

            # 1) Garantir que o pedido exista na tabela sku_marketplace
            sku_id = upsert_sku_marketplace(pedido=pedido_str, marketplace_id=5)

            # 2) Se status for 'Repasse Normal' ou 'Repasse - Normal', atualiza os valores do repasse
            if status_venda in ["Repasse Normal", "Repasse - Normal"]:
                if sku_id:
                    # Antes de atualizar, verificar se os valores já correspondem para evitar duplicação
                    try:
                        with engine.begin() as conn:
                            sel = text("""
                                SELECT valor_liquido, valor_frete, valor_final
                                FROM sku_marketplace
                                WHERE id = :sid
                            """)
                            row_sku = conn.execute(sel, {"sid": sku_id}).fetchone()
                            if row_sku:
                                current_vliq, current_vfre, current_vfin = row_sku
                                if (current_vliq != valor_pedido or
                                    current_vfre != valor_frete or
                                    current_vfin != valor_final):
                                    atualizar_sku_marketplace_repasse(
                                        pedido=pedido_str,
                                        valor_pedido=valor_pedido,
                                        valor_frete=valor_frete,
                                        valor_final=valor_final,
                                        marketplace_id=5
                                    )
                                else:
                                    logger.info(f"[sku_marketplace] Valores já atualizados para pedido={pedido_str}, ID={sku_id}")
                    except Exception as e:
                        logger.error(f"Erro ao verificar valores para pedido={pedido_str}: {e}")

            # 3) upsert_evento_centauro (sempre)
            upsert_evento_centauro(
                numero_pedido=pedido_str,
                tipo_evento=status_venda,
                repasse_liquido=valor_final,
                data=data_pedido,
                data_repasse=data_repasse
            )

            # 4) Se status repasse normal, busca comissão
            if status_venda in ["Repasse Normal", "Repasse - Normal"] and data_pedido and sku_id:
                comissao_encontrada = buscar_comissao_periodo(
                    data_pedido, pedido_str)
                if comissao_encontrada is not None:
                    # Verificar se a comissão já foi inserida para evitar duplicação
                    try:
                        with engine.begin() as conn:
                            sel = text("""
                                SELECT COUNT(*)
                                FROM comissoes_pedido
                                WHERE data = :dt
                                  AND porcentagem = :pc
                                  AND sku_marketplace_id = :sid
                            """)
                            count = conn.execute(sel, {
                                "dt": data_pedido,
                                "pc": comissao_encontrada,
                                "sid": sku_id
                            }).scalar()
                            if count == 0:
                                inserir_comissoes_pedido(
                                    data_pedido, comissao_encontrada, sku_id)
                            else:
                                logger.info(f"[comissoes_pedido] Comissão já inserida para pedido={pedido_str}, ID={sku_id}")
                    except Exception as e:
                        logger.error(f"Erro ao verificar comissão para pedido={pedido_str}: {e}")
                else:
                    logger.info(
                        f"Sem comissão p/ '{pedido_str}' data={data_pedido}.")

        # Ao final, marcar o arquivo como reprocessado
        with engine.begin() as conn:
            upd = text("""
                UPDATE arquivos_processados
                SET status = 'PROCESSADO', observacoes = 'Reprocessamento concluído.'
                WHERE nome_arquivo = :na
            """)
            conn.execute(upd, {"na": nome_arquivo})
        logger.info(f"Reprocessamento do arquivo '{nome_arquivo}' concluído com sucesso.")
    except Exception as e:
        logger.error(f"Erro ao reprocessar '{nome_arquivo}': {e}")
        inserir_arquivos_processados(
            nome_arquivo, data_processo, 'ERRO', str(e))


6. EXECUÇÃO PRINCIPAL

In [8]:
def main(reprocessar=True):
    """
    Função principal para processar ou reprocessar arquivos CSV na pasta 'Repasse Centauro'.
    
    Parâmetros:
    - reprocessar (bool): Se True, reprocessa todos os arquivos, mesmo os já processados.
    """
    # Verificar se o marketplace_id=5 existe
    if not verificar_marketplace_id(5):
        logger.error("Marketplace ID 5 não encontrado. Encerrando o processamento.")
        return

    # Ajuste o caminho abaixo para a pasta onde estão seus CSVs
    base_dir = os.getcwd()
    folder_path_centauro = os.path.join(base_dir, 'Repasse Centauro')

    if not os.path.isdir(folder_path_centauro):
        logger.error(f"Pasta '{folder_path_centauro}' não encontrada.")
        inserir_arquivos_processados(
            nome_arquivo="Centauro",
            data_processo=None,
            status='ERRO',
            observacoes=f"Pasta '{folder_path_centauro}' não encontrada."
        )
        return

    # Iterar sobre todos os arquivos na pasta
    for filename in os.listdir(folder_path_centauro):
        # Processar apenas arquivos .csv
        if filename.lower().endswith('.csv'):
            file_path = os.path.join(folder_path_centauro, filename)
            if not os.path.exists(file_path):
                logger.error(f"Arquivo '{file_path}' não encontrado.")
                inserir_arquivos_processados(
                    nome_arquivo=os.path.basename(file_path),
                    data_processo=None,
                    status='ERRO',
                    observacoes="Arquivo não encontrado."
                )
                continue

            if reprocessar:
                reprocessar_planilha(file_path)
            else:
                # Tentar inserir no 'arquivos_processados'. Se retornar False, pular o arquivo.
                data_processo = parse_date_from_filename(filename)
                processado = inserir_arquivos_processados(
                    nome_arquivo=filename,
                    data_processo=data_processo,
                    status='PENDENTE',
                    observacoes="Início do processamento."
                )
                if not processado:
                    # Arquivo já foi processado, pular
                    continue

                # Processar o arquivo
                processar_csv(file_path, reprocessar=reprocessar)
        else:
            logger.info(f"Ignorando arquivo que não é CSV: {filename}")

if __name__ == "__main__":
    main()