In [1]:
# CÉLULA 1: Imports e Configurações Iniciais
import os
import hashlib
import logging
from pathlib import Path
from datetime import datetime

from minio import Minio
from minio.error import S3Error
import psycopg2
from psycopg2 import sql
from tqdm.notebook import tqdm # Usar tqdm.notebook para Jupyter

# --- Configurações ---
# Caminho da montagem do Google Drive dentro do contêiner Jupyter
GDRIVE_MOUNT_PATH = Path("/home/jovyan/work/gdrive_local_mount/")

# Configurações do MinIO
MINIO_ENDPOINT = "minio:9000"
MINIO_ACCESS_KEY = "admin"
MINIO_SECRET_KEY = "senhasegura"
MINIO_BUCKET_RAW = "raw" # Bucket principal para os dados

# Configurações do PostgreSQL
POSTGRES_HOST = "postgres_db"
POSTGRES_PORT = "5432"
POSTGRES_DB = "postgres"
POSTGRES_USER = "postgres"
POSTGRES_PASSWORD = "senhasegura"

# Configuração do Logging
LOG_DIR = Path("/home/jovyan/work/logs/")
LOG_DIR.mkdir(parents=True, exist_ok=True)
LOG_FILE = LOG_DIR / f"processamento_drive_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.FileHandler(LOG_FILE),
        logging.StreamHandler() # Para output no console/notebook
    ]
)

logger = logging.getLogger(__name__)

logger.info("Configurações iniciais e imports carregados.")
logger.info(f"Montagem do Google Drive esperada em: {GDRIVE_MOUNT_PATH}")
logger.info(f"Log será salvo em: {LOG_FILE}")


2025-05-25 12:04:38,023 - INFO - Configurações iniciais e imports carregados.
2025-05-25 12:04:38,024 - INFO - Montagem do Google Drive esperada em: /home/jovyan/work/gdrive_local_mount
2025-05-25 12:04:38,024 - INFO - Log será salvo em: /home/jovyan/work/logs/processamento_drive_20250525_120438.log


In [None]:
# Nova célula: 0_gdrive_to_minio_widget
# Insira esta célula logo após as configurações iniciais do notebook

import os
import logging
from tqdm.notebook import tqdm
from minio import Minio
import ipywidgets as widgets
from IPython.display import display, clear_output

# --- Configurações MinIO ---
MINIO_ENDPOINT = "minio:9000"
MINIO_ACCESS_KEY = "admin"
MINIO_SECRET_KEY = "senhasegura"
BUCKET_NAME = "teste-raw"

client = Minio(
    MINIO_ENDPOINT,
    access_key=MINIO_ACCESS_KEY,
    secret_key=MINIO_SECRET_KEY,
    secure=False
)
orencida 
    client.make_bucket(BUCKET_NAME)

# Caminho do mount do Google Drive (dentro do container)
DRIVE_MOUNT = os.path.join(os.getcwd(), "gdrive_local_mount")

# --- Coleta de diretórios ---
dir_list = []
for root, dirs, files in os.walk(DRIVE_MOUNT):
    for d in dirs:
        rel = os.path.relpath(os.path.join(root, d), DRIVE_MOUNT)
        dir_list.append(rel)

# --- Widgets interativos ---
dropdown = widgets.Dropdown(options=sorted(dir_list), description="Pasta:")
button = widgets.Button(description="Enviar para MinIO")
output = widgets.Output()

def upload_directory(btn):
    with output:
        clear_output()
        selected = dropdown.value
        local_dir = os.path.join(DRIVE_MOUNT, selected)
        file_list = []
        for r, _, files in os.walk(local_dir):
            for fname in files:
                full_path = os.path.join(r, fname)
                obj_path = os.path.join(selected, os.path.relpath(full_path, local_dir))
                file_list.append((full_path, obj_path))
        print(f"Enviando {len(file_list)} arquivos para bucket '{BUCKET_NAME}'...")
        for local_path, obj_path in tqdm(file_list, desc="Upload"):
            # Ajusta separador para MinIO
            object_name = obj_path.replace(os.sep, "/")
            client.fput_object(BUCKET_NAME, object_name, local_path)
        print("Upload concluído.")

button.on_click(upload_directory)
display(dropdown, button, output)


Dropdown(description='Pasta:', options=('0_PROJETOS WRMELO', '0_PROJETOS WRMELO/0. PROJETO MBA', '0_PROJETOS W…

Button(description='Enviar para MinIO', style=ButtonStyle())

Output()

In [3]:
# CÉLULA 2: Funções Utilitárias (Conexões, SHA256)

# --- MinIO Client ---
def get_minio_client():
    """Retorna um cliente MinIO configurado."""
    try:
        client = Minio(
            MINIO_ENDPOINT,
            access_key=MINIO_ACCESS_KEY,
            secret_key=MINIO_SECRET_KEY,
            secure=False # Conforme docker-compose, MinIO está em HTTP
        )
        # Verifica se o bucket 'raw' existe, senão cria
        if not client.bucket_exists(MINIO_BUCKET_RAW):
            client.make_bucket(MINIO_BUCKET_RAW)
            logger.info(f"Bucket MinIO '{MINIO_BUCKET_RAW}' criado.")
        else:
            logger.info(f"Bucket MinIO '{MINIO_BUCKET_RAW}' já existe.")
        return client
    except Exception as e:
        logger.error(f"Erro ao conectar/configurar MinIO: {e}")
        raise

# --- PostgreSQL Connection ---
def get_postgres_connection():
    """Retorna uma conexão PostgreSQL configurada."""
    try:
        conn = psycopg2.connect(
            host=POSTGRES_HOST,
            port=POSTGRES_PORT,
            dbname=POSTGRES_DB,
            user=POSTGRES_USER,
            password=POSTGRES_PASSWORD
        )
        logger.info("Conexão com PostgreSQL estabelecida.")
        return conn
    except Exception as e:
        logger.error(f"Erro ao conectar ao PostgreSQL: {e}")
        raise

def initialize_db_tables(conn):
    """Cria as tabelas 'projetos' e 'arquivos_processados' se não existirem."""
    try:
        with conn.cursor() as cur:
            # Tabela de Projetos
            cur.execute("""
                CREATE TABLE IF NOT EXISTS projetos (
                    id SERIAL PRIMARY KEY,
                    nome_projeto VARCHAR(255) UNIQUE NOT NULL,
                    data_criacao TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
                );
            """)
            logger.info("Tabela 'projetos' verificada/criada.")

            # Tabela de Arquivos Processados
            cur.execute("""
                CREATE TABLE IF NOT EXISTS arquivos_processados (
                    id SERIAL PRIMARY KEY,
                    id_projeto INTEGER NOT NULL REFERENCES projetos(id),
                    diretorio_origem_drive TEXT NOT NULL,
                    caminho_relativo_arquivo_drive TEXT NOT NULL,
                    nome_arquivo_original_drive TEXT NOT NULL,
                    hash_sha256 VARCHAR(64) NOT NULL,
                    tamanho_bytes BIGINT NOT NULL,
                    data_processamento TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
                    caminho_minio_drive_structure TEXT NOT NULL,
                    caminho_minio_raw_unicos TEXT NOT NULL,
                    UNIQUE (id_projeto, caminho_minio_raw_unicos),
                    UNIQUE (id_projeto, caminho_relativo_arquivo_drive, diretorio_origem_drive) -- Para evitar reprocessamento idêntico para o mesmo projeto
                );
            """)
            logger.info("Tabela 'arquivos_processados' verificada/criada.")
        conn.commit()
    except Exception as e:
        conn.rollback()
        logger.error(f"Erro ao inicializar tabelas no PostgreSQL: {e}")
        raise

# --- SHA256 ---
def calculate_sha256(file_path):
    """Calcula o hash SHA256 de um arquivo."""
    sha256_hash = hashlib.sha256()
    try:
        with open(file_path, "rb") as f:
            for byte_block in iter(lambda: f.read(4096), b""):
                sha256_hash.update(byte_block)
        return sha256_hash.hexdigest()
    except Exception as e:
        logger.error(f"Erro ao calcular SHA256 para {file_path}: {e}")
        return None

# Inicializa clientes e tabelas
minio_client = get_minio_client()
pg_conn = get_postgres_connection()
if pg_conn:
    initialize_db_tables(pg_conn)


2025-05-24 19:44:57,989 - INFO - Bucket MinIO 'raw' já existe.


2025-05-24 19:44:57,995 - INFO - Conexão com PostgreSQL estabelecida.
2025-05-24 19:44:57,996 - INFO - Tabela 'projetos' verificada/criada.
2025-05-24 19:44:57,997 - INFO - Tabela 'arquivos_processados' verificada/criada.


In [None]:
# Célula 3: Função para Seleção de Diretório Fonte no Google Drive Montado

import logging
from pathlib import Path

# Supondo que GDRIVE_MOUNT_PATH já foi definido na Célula 1
# Exemplo: GDRIVE_MOUNT_PATH = Path("/home/jovyan/work/gdrive_local_mount/")
# Supondo que o logger já foi configurado na Célula 1 ou 2
# Exemplo: logger = logging.getLogger(__name__)

def list_gdrive_source_directories(base_path: Path):
    """
    Lista os diretórios na raiz do caminho base fornecido.
    Retorna uma lista de nomes de diretórios.
    """
    logger.info(f"Iniciando listagem de diretórios em: {base_path}")
    if not base_path.exists():
        logger.error(f"O caminho base {base_path} não existe.")
        print(f"ERRO: O caminho base {base_path} não existe.")
        return []
    if not base_path.is_dir():
        logger.error(f"O caminho base {base_path} não é um diretório.")
        print(f"ERRO: O caminho base {base_path} não é um diretório.")
        return []

    try:
        directories = [d.name for d in base_path.iterdir() if d.is_dir()]
        if not directories:
            logger.warning(f"Nenhum subdiretório encontrado em {base_path}.")
            print(f"Nenhum subdiretório encontrado em {base_path}.")
        else:
            logger.info(f"Diretórios encontrados: {directories}")
        return directories
    except PermissionError:
        logger.exception(f"Erro de permissão ao acessar {base_path}.")
        print(f"ERRO: Permissão negada para acessar {base_path}.")
        return []
    except Exception as e:
        logger.exception(f"Erro inesperado ao listar diretórios em {base_path}: {e}")
        print(f"ERRO inesperado ao listar diretórios em {base_path}: {e}")
        return []

def select_gdrive_source_directory():
    """
    Lista os diretórios disponíveis no GDRIVE_MOUNT_PATH e permite ao usuário selecionar um.
    Retorna o Path completo do diretório selecionado ou None se nenhum for selecionado.
    """
    logger.info("Iniciando Passo 1: Seleção da Fonte no Drive Montado.")
    print("Passo 1: Seleção da Fonte no Drive Montado")
    print("=" * 40)

    source_directories = list_gdrive_source_directories(GDRIVE_MOUNT_PATH)

    if not source_directories:
        print("Não foi possível listar os diretórios de origem. Verifique os logs.")
        logger.error("Seleção abortada: Nenhum diretório de origem encontrado ou erro na listagem.")
        return None

    print("\nDiretórios disponíveis no Google Drive Montado:")
    for i, dir_name in enumerate(source_directories):
        print(f"  {i+1}. {dir_name}")
    print("-" * 40)

    while True:
        try:
            choice = input(f"Digite o número do diretório que deseja usar (1-{len(source_directories)}) ou 'c' para cancelar: ")
            if choice.lower() == 'c':
                logger.info("Seleção de diretório cancelada pelo usuário.")
                print("Seleção cancelada.")
                return None
            
            choice_num = int(choice)
            if 1 <= choice_num <= len(source_directories):
                selected_dir_name = source_directories[choice_num - 1]
                selected_dir_path = GDRIVE_MOUNT_PATH / selected_dir_name
                logger.info(f"Usuário selecionou o diretório número {choice_num}: {selected_dir_path}")
                return selected_dir_path
            else:
                print(f"Seleção inválida. Por favor, digite um número entre 1 e {len(source_directories)}.")
                logger.warning(f"Tentativa de seleção inválida: {choice}")
        except ValueError:
            print("Entrada inválida. Por favor, digite um número.")
            logger.warning(f"Entrada não numérica para seleção: {choice}")
        except Exception as e:
            logger.exception(f"Erro durante a seleção do diretório: {e}")
            print(f"Ocorreu um erro: {e}")
            return None

# --- Execução da seleção ---
# Certifique-se de que GDRIVE_MOUNT_PATH está definido (geralmente na Célula 1)
# Exemplo: GDRIVE_MOUNT_PATH = Path("/home/jovyan/work/gdrive_local_mount/")

# Verifica se GDRIVE_MOUNT_PATH está definido antes de chamar a função
if 'GDRIVE_MOUNT_PATH' in globals() and isinstance(GDRIVE_MOUNT_PATH, Path):
    logger.info(f"GDRIVE_MOUNT_PATH está definido como: {GDRIVE_MOUNT_PATH}")
    
    selected_directory = select_gdrive_source_directory()

    if selected_directory:
        print(f"\nVocê selecionou o diretório: {selected_directory}")
        logger.info(f"Diretório fonte final selecionado: {selected_directory}")
    else:
        print("\nNenhum diretório foi selecionado ou a operação foi cancelada.")
        logger.warning("Seleção de diretório fonte não concluída.")
else:
    error_msg = "ERRO CRÍTICO: A variável GDRIVE_MOUNT_PATH não está definida ou não é um objeto Path. Verifique a Célula 1."
    print(error_msg)
    if 'logger' in globals():
        logger.critical(error_msg)
    else: # Fallback se o logger não estiver disponível
        import sys
        sys.stderr.write(error_msg + "\n")


In [5]:
# Linha para adicionar para testar a função:
selected_directory = select_gdrive_source_directory()

if selected_directory:
    print(f"\nVocê selecionou o diretório: {selected_directory}")
else:
    print("\nNenhum diretório foi selecionado ou a operação foi cancelada.")

2025-05-24 19:45:11,948 - INFO - Iniciando Passo 1: Seleção da Fonte no Drive Montado.


Diretórios disponíveis na montagem do Google Drive:
1. 0_PROJETOS WRMELO
2. ATACAMA
3. Colab Notebooks
4. DLProject
5. FIAP_PI
6. GRUPO MARINGÁ
7. GemeoDigital／Referencias
8. INFRA DATA SCIENCE
9. Logotipos
10. PAMPA SUL
11. PESSOAL
12. modelos
13. rl


2025-05-24 19:46:00,466 - INFO - Diretório fonte selecionado: /home/jovyan/work/gdrive_local_mount/FIAP_PI



Você selecionou o diretório: /home/jovyan/work/gdrive_local_mount/FIAP_PI


In [None]:
# CÉLULA 4: Passo 2 - Cópia Estruturada para o Bucket 'raw' no MinIO

def copy_to_minio_drive_structure(minio_client, source_gdrive_path: Path):
    """
    Copia todo o conteúdo do diretório do Drive selecionado para o bucket 'raw' no MinIO,
    preservando a estrutura de diretórios.
    Ex: Pasta_Cliente_X/Relatorios/anual.pdf -> s3://raw/Pasta_Cliente_X/Relatorios/anual.pdf
    """
    logger.info(f"Iniciando Passo 2: Cópia da estrutura do Drive para MinIO (s3://{MINIO_BUCKET_RAW}/{source_gdrive_path.name}/).")
    if not source_gdrive_path or not source_gdrive_path.is_dir():
        logger.error(f"Diretório fonte inválido: {source_gdrive_path}")
        return 0 # Retorna 0 arquivos copiados

    copied_files_count = 0
    files_to_copy = list(source_gdrive_path.rglob("*")) # rglob para recursividade

    for local_file_path in tqdm(files_to_copy, desc=f"Copiando '{source_gdrive_path.name}' para MinIO (estrutura Drive)"):
        if local_file_path.is_file():
            relative_path = local_file_path.relative_to(source_gdrive_path)
            # O objeto no MinIO incluirá o nome da pasta raiz selecionada do Drive
            minio_object_name = f"{source_gdrive_path.name}/{relative_path}"
            
            try:
                minio_client.fput_object(
                    MINIO_BUCKET_RAW,
                    minio_object_name,
                    str(local_file_path)
                )
                logger.debug(f"Arquivo copiado para MinIO (estrutura Drive): {local_file_path} -> s3://{MINIO_BUCKET_RAW}/{minio_object_name}")
                copied_files_count += 1
            except S3Error as e:
                logger.error(f"Erro S3 ao copiar {local_file_path} para {minio_object_name}: {e}")
            except Exception as e:
                logger.error(f"Erro ao copiar {local_file_path} para MinIO: {e}")
    
    logger.info(f"Passo 2 concluído. {copied_files_count} arquivos copiados para s3://{MINIO_BUCKET_RAW}/{source_gdrive_path.name}/")
    return copied_files_count

# Exemplo de uso (a ser chamado na célula de orquestração)
# if selected_gdrive_source_path:
#     copy_to_minio_drive_structure(minio_client, selected_gdrive_source_path)

In [None]:
# CÉLULA 5: Passo 3 - Gerenciamento de "Nomes de Projetos" no PostgreSQL

def list_existing_projects(conn):
    """Lista os projetos existentes no PostgreSQL."""
    projects = []
    try:
        with conn.cursor() as cur:
            cur.execute("SELECT id, nome_projeto FROM projetos ORDER BY nome_projeto;")
            projects = cur.fetchall() # Lista de tuplas (id, nome_projeto)
    except Exception as e:
        logger.error(f"Erro ao listar projetos do PostgreSQL: {e}")
    return projects

def create_new_project(conn, project_name):
    """Cria um novo projeto no PostgreSQL e retorna seu ID."""
    try:
        with conn.cursor() as cur:
            cur.execute(
                "INSERT INTO projetos (nome_projeto) VALUES (%s) RETURNING id;",
                (project_name,)
            )
            project_id = cur.fetchone()[0]
            conn.commit()
            logger.info(f"Novo projeto '{project_name}' criado com ID: {project_id}.")
            return project_id
    except psycopg2.errors.UniqueViolation:
        conn.rollback()
        logger.warning(f"Projeto '{project_name}' já existe. Não foi criado novamente.")
        # Buscar o ID do projeto existente
        with conn.cursor() as cur:
            cur.execute("SELECT id FROM projetos WHERE nome_projeto = %s;", (project_name,))
            project_id = cur.fetchone()[0]
            return project_id
    except Exception as e:
        conn.rollback()
        logger.error(f"Erro ao criar novo projeto '{project_name}': {e}")
        return None

def select_or_create_project(conn):
    """Permite ao usuário selecionar um projeto existente ou criar um novo."""
    logger.info("Iniciando Passo 3: Gerenciamento de Nomes de Projetos.")
    existing_projects = list_existing_projects(conn)

    print("\n--- Gerenciamento de Projetos ---")
    if existing_projects:
        print("Projetos existentes:")
        for i, (proj_id, proj_name) in enumerate(existing_projects):
            print(f"{i+1}. {proj_name} (ID: {proj_id})")
        print(f"{len(existing_projects)+1}. Criar um novo projeto")
    else:
        print("Nenhum projeto existente.")
        print("1. Criar um novo projeto")

    while True:
        try:
            if existing_projects:
                prompt = f"Selecione o número do projeto (1-{len(existing_projects)+1}) ou '0' para cancelar: "
            else:
                prompt = "Selecione '1' para criar um novo projeto ou '0' para cancelar: "
            
            choice_str = input(prompt)
            if choice_str == '0':
                logger.info("Seleção/criação de projeto cancelada pelo usuário.")
                return None, None # project_id, project_name

            choice_idx = int(choice_str) -1

            if existing_projects and 0 <= choice_idx < len(existing_projects):
                project_id, project_name = existing_projects[choice_idx]
                logger.info(f"Projeto selecionado: '{project_name}' (ID: {project_id})")
                return project_id, project_name
            elif (existing_projects and choice_idx == len(existing_projects)) or (not existing_projects and choice_idx == 0) :
                new_project_name = input("Digite o nome para o novo projeto: ").strip()
                if not new_project_name:
                    print("Nome do projeto não pode ser vazio.")
                    continue
                project_id = create_new_project(conn, new_project_name)
                if project_id:
                    return project_id, new_project_name
                else: # Erro ao criar
                    return None, None 
            else:
                print("Seleção inválida. Tente novamente.")
        except ValueError:
            print("Entrada inválida. Por favor, insira um número.")
        except Exception as e:
            logger.error(f"Erro durante a seleção/criação de projeto: {e}")
            return None, None

# Exemplo de uso (a ser chamado na célula de orquestração)
# if pg_conn:
#    selected_project_id, selected_project_name = select_or_create_project(pg_conn)
#    if selected_project_id:
#        print(f"Projeto escolhido: {selected_project_name} (ID: {selected_project_id})")

In [None]:
# CÉLULA 6: Passo 4 - Cópia para raw/raw_unicos/ no MinIO (Organizada por "Nome do Projeto")

def copy_to_minio_raw_unicos(minio_client, source_gdrive_path: Path, project_name: str, pg_conn):
    """
    Copia arquivos do diretório do Drive para s3://raw/raw_unicos/NOME_DO_PROJETO/nome_original_do_arquivo.ext.
    Garante unicidade: não copia se o objeto já existir no MinIO (nome_projeto + nome_arquivo).
    Retorna uma lista de tuplas (local_file_path, minio_object_name_raw_unicos) para os arquivos efetivamente copiados.
    """
    logger.info(f"Iniciando Passo 4: Cópia para MinIO raw_unicos (s3://{MINIO_BUCKET_RAW}/raw_unicos/{project_name}/).")
    if not source_gdrive_path or not source_gdrive_path.is_dir():
        logger.error(f"Diretório fonte inválido: {source_gdrive_path}")
        return []
    if not project_name:
        logger.error("Nome do projeto não fornecido para cópia em raw_unicos.")
        return []

    copied_files_details = [] # Lista de (local_path, minio_path_raw_unicos, sha256, size)
    skipped_files_count = 0
    
    # Lista todos os arquivos recursivamente no diretório fonte do GDrive
    all_files_in_source = [f for f in source_gdrive_path.rglob("*") if f.is_file()]

    for local_file_path in tqdm(all_files_in_source, desc=f"Copiando para MinIO raw_unicos (Projeto: {project_name})"):
        original_file_name = local_file_path.name
        minio_object_name_raw_unicos = f"raw_unicos/{project_name}/{original_file_name}"

        try:
            # Verifica se o objeto já existe no MinIO para este projeto e nome de arquivo
            minio_client.stat_object(MINIO_BUCKET_RAW, minio_object_name_raw_unicos)
            logger.info(f"Arquivo já existe em MinIO raw_unicos, pulando: s3://{MINIO_BUCKET_RAW}/{minio_object_name_raw_unicos}")
            skipped_files_count +=1
            # Mesmo pulando a cópia para o MinIO (raw_unicos), ainda pode ser necessário registrar no PG
            # se ele foi copiado na etapa 2 e ainda não está registrado para este projeto.
            # A lógica de registro no PG (Passo 5) cuidará da unicidade baseada em
            # (id_projeto, caminho_relativo_arquivo_drive, diretorio_origem_drive).
            # No entanto, para esta função, só retornamos o que foi EFETIVAMENTE copiado para raw_unicos.
            # Se a política fosse de "atualização", aqui calcularíamos o hash e compararíamos.
            continue # Pula para o próximo arquivo

        except S3Error as e:
            if e.code == "NoSuchKey": # O arquivo não existe, então podemos copiar
                file_hash = calculate_sha256(local_file_path)
                file_size = local_file_path.stat().st_size
                if not file_hash:
                    logger.warning(f"Não foi possível calcular o hash para {local_file_path}. Pulando cópia para raw_unicos.")
                    continue

                minio_client.fput_object(
                    MINIO_BUCKET_RAW,
                    minio_object_name_raw_unicos,
                    str(local_file_path)
                )
                logger.debug(f"Arquivo copiado para MinIO raw_unicos: {local_file_path} -> s3://{MINIO_BUCKET_RAW}/{minio_object_name_raw_unicos}")
                copied_files_details.append({
                    "local_path": local_file_path,
                    "minio_path_raw_unicos": minio_object_name_raw_unicos, # Ex: raw_unicos/ProjetoX/arquivo.pdf
                    "sha256": file_hash,
                    "size": file_size
                })
            else: # Outro erro S3
                logger.error(f"Erro S3 ao verificar/copiar {local_file_path} para {minio_object_name_raw_unicos}: {e}")
        except Exception as e:
            logger.error(f"Erro ao processar {local_file_path} para MinIO raw_unicos: {e}")
            
    logger.info(f"Passo 4 concluído. {len(copied_files_details)} arquivos copiados para s3://{MINIO_BUCKET_RAW}/raw_unicos/{project_name}/. {skipped_files_count} arquivos pulados (já existentes).")
    return copied_files_details


# Exemplo de uso (a ser chamado na célula de orquestração)
# if selected_gdrive_source_path and selected_project_name:
#    files_for_pg_registration = copy_to_minio_raw_unicos(minio_client, selected_gdrive_source_path, selected_project_name, pg_conn)
#    print(f"{len(files_for_pg_registration)} arquivos copiados para raw_unicos e prontos para registro.")


In [None]:
# CÉLULA 7: Passo 5 - Registro Detalhado no PostgreSQL

def register_file_in_postgres(conn, project_id, gdrive_source_dir_name, local_file_path_obj: Path, 
                              minio_path_drive_structure, minio_path_raw_unicos, file_hash, file_size):
    """
    Registra os metadados de um arquivo processado na tabela 'arquivos_processados' do PostgreSQL.
    """
    # gdrive_source_dir_name é o nome da pasta raiz selecionada no Drive (ex: Pasta_Cliente_X)
    # local_file_path_obj é o Path absoluto do arquivo local (ex: /home/jovyan/work/gdrive_local_mount/Pasta_Cliente_X/Relatorios/anual.pdf)
    # Queremos o caminho relativo DENTRO da pasta raiz do Drive
    # Ex: Se gdrive_source_dir_name = "Pasta_Cliente_X"
    # e local_file_path_obj = GDRIVE_MOUNT_PATH / "Pasta_Cliente_X" / "Relatorios" / "anual.pdf"
    # então relative_path_in_drive_source = "Relatorios/anual.pdf"
    
    try:
        # Constrói o caminho da pasta raiz do GDrive selecionada
        gdrive_root_selected_path = GDRIVE_MOUNT_PATH / gdrive_source_dir_name
        relative_path_in_drive_source = local_file_path_obj.relative_to(gdrive_root_selected_path)
    except ValueError:
        logger.error(f"Erro ao calcular caminho relativo para {local_file_path_obj} a partir de {gdrive_root_selected_path}")
        # Tenta usar apenas o nome do arquivo se o cálculo do relativo falhar (caso seja um arquivo na raiz da pasta selecionada)
        # Isso pode acontecer se gdrive_source_dir_name for o próprio arquivo, o que não deveria ocorrer
        # pela lógica de seleção de diretório. Mas como fallback:
        if local_file_path_obj.parent == gdrive_root_selected_path:
             relative_path_in_drive_source = Path(local_file_path_obj.name)
        else:
            # Se não for possível determinar o caminho relativo corretamente, logar e pular
            logger.error(f"Não foi possível determinar o caminho relativo para {local_file_path_obj} dentro de {gdrive_source_dir_name}. Pulando registro.")
            return False


    original_file_name = local_file_path_obj.name

    sql_insert = sql.SQL("""
        INSERT INTO arquivos_processados (
            id_projeto, diretorio_origem_drive, caminho_relativo_arquivo_drive, 
            nome_arquivo_original_drive, hash_sha256, tamanho_bytes, 
            caminho_minio_drive_structure, caminho_minio_raw_unicos, data_processamento
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
        ON CONFLICT (id_projeto, caminho_relativo_arquivo_drive, diretorio_origem_drive) 
        DO NOTHING; 
    """)
    # ON CONFLICT (id_projeto, caminho_minio_raw_unicos) DO NOTHING; -- Alternativa de unicidade
    # A unicidade (id_projeto, caminho_relativo_arquivo_drive, diretorio_origem_drive) garante que
    # o mesmo arquivo da mesma origem não seja registrado múltiplas vezes para o mesmo projeto.

    try:
        with conn.cursor() as cur:
            cur.execute(sql_insert, (
                project_id,
                gdrive_source_dir_name, # Ex: "Pasta_Cliente_X"
                str(relative_path_in_drive_source), # Ex: "Relatorios/anual.pdf"
                original_file_name, # Ex: "anual.pdf"
                file_hash,
                file_size,
                minio_path_drive_structure, # Ex: "Pasta_Cliente_X/Relatorios/anual.pdf" (sem o bucket)
                minio_path_raw_unicos, # Ex: "raw_unicos/NomeDoProjeto/anual.pdf" (sem o bucket)
                datetime.now()
            ))
            conn.commit()
            if cur.rowcount > 0:
                logger.debug(f"Arquivo registrado no PostgreSQL: {original_file_name} para o projeto ID {project_id}")
                return True
            else:
                logger.info(f"Registro para '{original_file_name}' (projeto ID {project_id}, origem {gdrive_source_dir_name}/{relative_path_in_drive_source}) já existe no PostgreSQL. Nenhuma ação tomada.")
                return False # Indica que não houve inserção (conflito)
    except Exception as e:
        conn.rollback()
        logger.error(f"Erro ao registrar arquivo '{original_file_name}' no PostgreSQL: {e}")
        return False

def process_and_register_all_files(pg_conn, minio_client, selected_gdrive_source_path: Path, 
                                    project_id: int, project_name: str):
    """
    Itera sobre todos os arquivos no diretório fonte do GDrive,
    calcula hashes, tamanhos e registra no PostgreSQL.
    Esta função é chamada DEPOIS que as cópias para MinIO (Passos 2 e 4) foram feitas.
    A ideia é registrar cada arquivo encontrado na fonte GDrive, e seus respectivos caminhos no MinIO.
    """
    logger.info(f"Iniciando Passo 5: Registro Detalhado no PostgreSQL para projeto '{project_name}' (ID: {project_id}).")
    
    if not selected_gdrive_source_path or not selected_gdrive_source_path.is_dir():
        logger.error(f"Diretório fonte GDrive inválido: {selected_gdrive_source_path}")
        return 0
    
    gdrive_source_dir_name = selected_gdrive_source_path.name
    registered_count = 0
    
    all_files_in_source = [f for f in selected_gdrive_source_path.rglob("*") if f.is_file()]

    for local_file_path in tqdm(all_files_in_source, desc=f"Registrando arquivos no PostgreSQL (Projeto: {project_name})"):
        file_hash = calculate_sha256(local_file_path)
        if not file_hash:
            logger.warning(f"Não foi possível calcular hash para {local_file_path}. Pulando registro.")
            continue
        
        file_size = local_file_path.stat().st_size
        
        # Caminho no MinIO - Estrutura Drive (Passo 2)
        relative_path_for_drive_structure = local_file_path.relative_to(selected_gdrive_source_path)
        minio_path_drive_structure = f"{gdrive_source_dir_name}/{relative_path_for_drive_structure}"
        
        # Caminho no MinIO - Raw Unicos (Passo 4)
        original_file_name = local_file_path.name
        minio_path_raw_unicos = f"raw_unicos/{project_name}/{original_file_name}"
        
        if register_file_in_postgres(pg_conn, project_id, gdrive_source_dir_name, local_file_path,
                                     minio_path_drive_structure, minio_path_raw_unicos, file_hash, file_size):
            registered_count += 1
            
    logger.info(f"Passo 5 concluído. {registered_count} novos registros de arquivos inseridos no PostgreSQL para o projeto '{project_name}'.")
    return registered_count

# Exemplo de uso (a ser chamado na célula de orquestração)
# if pg_conn and selected_project_id and selected_gdrive_source_path:
#     process_and_register_all_files(pg_conn, minio_client, selected_gdrive_source_path, selected_project_id, selected_project_name)


In [None]:
# CÉLULA 8: Orquestração Principal do Processo

def main_processing_logic():
    logger.info("="*50)
    logger.info("INICIANDO NOVO PROCESSAMENTO DE DADOS DO GOOGLE DRIVE")
    logger.info("="*50)

    # Passo 1: Seleção da Fonte no Drive Montado
    selected_gdrive_source_path = select_gdrive_source_directory()
    if not selected_gdrive_source_path:
        logger.warning("Processamento encerrado: Nenhum diretório fonte do Google Drive selecionado.")
        print("Processamento encerrado.")
        return

    # Passo 2: Cópia Estruturada para o Bucket 'raw' no MinIO
    # Esta cópia é feita independentemente do projeto, baseada apenas na pasta do Drive.
    copy_to_minio_drive_structure(minio_client, selected_gdrive_source_path)

    # Passo 3: Gerenciamento de "Nomes de Projetos"
    if not pg_conn:
        logger.error("Processamento encerrado: Conexão com PostgreSQL não disponível.")
        print("Erro: Conexão com PostgreSQL não disponível.")
        return
        
    selected_project_id, selected_project_name = select_or_create_project(pg_conn)
    if not selected_project_id or not selected_project_name:
        logger.warning("Processamento encerrado: Nenhum projeto selecionado ou criado.")
        print("Processamento encerrado.")
        return

    # Passo 4: Cópia para raw/raw_unicos/ no MinIO (Organizada por "Nome do Projeto")
    # Esta função agora retorna detalhes dos arquivos COPIADOS para raw_unicos,
    # mas o registro no PG (Passo 5) vai iterar sobre TODOS os arquivos da fonte GDrive.
    # A função copy_to_minio_raw_unicos já implementa a lógica de não duplicar em raw_unicos.
    copy_to_minio_raw_unicos(minio_client, selected_gdrive_source_path, selected_project_name, pg_conn)
        
    # Passo 5: Registro Detalhado no PostgreSQL
    # Esta função irá iterar sobre todos os arquivos na fonte GDrive selecionada
    # e registrar seus metadados, incluindo os caminhos de MinIO (estrutura e raw_unicos)
    # A unicidade no PG é tratada pela constraint (id_projeto, caminho_relativo_arquivo_drive, diretorio_origem_drive)
    process_and_register_all_files(pg_conn, minio_client, selected_gdrive_source_path, 
                                   selected_project_id, selected_project_name)

    logger.info("="*50)
    logger.info("PROCESSAMENTO CONCLUÍDO COM SUCESSO!")
    logger.info("="*50)
    print("\nProcessamento concluído! Verifique os logs para detalhes.")

if __name__ == '__main__': # Para execução via script, se necessário. No Jupyter, execute a célula.
    # Esta chamada não será executada automaticamente no Jupyter,
    # você precisa chamar main_processing_logic() explicitamente em uma célula.
    pass

# Para executar o processo completo no Jupyter Notebook, chame:
# main_processing_logic()


In [None]:
# CÉLULA 9: Execução do Processo e Encerramento de Conexões

# Para executar, descomente e rode esta célula:
# main_processing_logic()

# É uma boa prática fechar conexões quando não são mais necessárias,
# especialmente se o notebook for ficar ativo por muito tempo ou
# se for convertido para um script de longa duração.
# No entanto, para uso interativo, pode-se manter as conexões abertas.

# if pg_conn:
#     pg_conn.close()
#     logger.info("Conexão com PostgreSQL fechada.")

# print("Fim do notebook.")