# Pipeline Completo de ETL para o Banco de Dados `test_4_DBMM`

**Autor:** Luiz Felipe Monteiro Darzé


## Descrição
Este notebook implementa o pipeline completo de Extração, Transformação e importação para o projeto de mieloma múltiplo. O fluxo de trabalho é dividido em três etapas sequenciais e verificáveis:

1.  **Organização dos Arquivos:** Conta os arquivos nas fontes de dados (`Google Drive`, `Mendeley`), **move-os** para uma estrutura de diretório padronizada em `REPOSITÓRIO_DBMM` e verifica se todos os arquivos foram movidos corretamente.
2.  **Extração para CSV:** Lê a estrutura organizada, extrai metadados relevantes e gera 9 arquivos `.csv`, um para cada tabela do banco de dados, na pasta `Output_CSV`.
3.  **Importação do Banco de Dados:** Carrega os dados dos arquivos `.csv` para o banco de dados MySQL `test_4_DBMM` e valida a contagem de registros para confirmar a integridade da importação.

In [31]:
# IMPORTAÇÕES E CONFIGURAÇÃO

import os
import shutil
import pandas as pd
import numpy as np
import json
import xml.etree.ElementTree as ET
from pathlib import Path
from datetime import datetime
from PIL import Image
from sqlalchemy import create_engine, text
import logging

# Configuração do Logging 
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

#  CONFIGURAÇÃO DE CAMINHOS 
GDRIVE_SOURCE_PATH = Path("/home/ludrz/Documentos/Trabalhos/IC/Data projeto Mieloma Caio/Dados test_4_DBMM/DADOS PCMMD - Google drive")
MENDELEY_SOURCE_PATH = Path("/home/ludrz/Documentos/Trabalhos/IC/Data projeto Mieloma Caio/Dados test_4_DBMM/DADOS PCMMD - Mendeley/data/segmentation")
REPO_DEST_PATH = Path("/home/ludrz/Documentos/Trabalhos/IC/Data projeto Mieloma Caio/Dados test_4_DBMM/REPOSITÓRIO_DBMM")
ORGANIZED_DATA_PATH = REPO_DEST_PATH / "data" / "slide_and_cells"
CSV_OUTPUT_PATH = Path("/home/ludrz/Documentos/Trabalhos/IC/Data projeto Mieloma Caio/Dados test_4_DBMM/Output_CSV")

#  CONFIGURAÇÃO DO BANCO DE DADOS
DB_CONFIG = {
    'host': 'localhost',
    'user': 'root',
    'password': 'root',
    'database': 'test_4_DBMM'
}

#  MAPEAMENTO DE PACIENTES 
PATIENT_FOLDER_MAP = {
    "Lâmina Mieloma ALBC001": "ALBC001",
    "Lâmina Mieloma MJ001": "MJ001",
    "Lâmina Mieloma RP001": "RP001"
}

## Etapa 1: Contagem e Organização dos Arquivos de Origem

Esta seção contém as funções para:
1.  Contar todos os arquivos relevantes nas pastas de origem.
2.  Mover os arquivos para a estrutura de diretório em `REPOSITÓRIO_DBMM`.
3.  Verificar as contagens antes e depois para garantir que nenhum dado foi perdido.

In [32]:
# FUNÇÕES DA ETAPA 1 (CONTAGEM E ORGANIZAÇÃO)

def count_source_files(gdrive_path, mendeley_path, patient_map):
    """Conta os arquivos relevantes nas pastas de origem antes de movê-los."""
    logging.info("Contando arquivos nas fontes de dados originais...")
    counts = {'slides_jpg': 0, 'slides_xml': 0, 'plasma_jpg': 0, 'plasma_json': 0, 'non_plasma_jpg': 0, 'non_plasma_json': 0}
    if not gdrive_path.exists() or not mendeley_path.exists():
        logging.error("Um ou mais diretórios de origem não existem.")
        return None

    # Itera pelos pacientes para encontrar lâminas e depois contar as células correspondentes
    for patient_folder_name in os.listdir(gdrive_path):
        if patient_folder_name not in patient_map: continue
        
        patient_gdrive_folder = gdrive_path / patient_folder_name
        processed_slides = set()
        for gdrive_file in os.listdir(patient_gdrive_folder):
            gdrive_file_path = patient_gdrive_folder / gdrive_file
            if gdrive_file_path.is_file() and gdrive_file_path.suffix.lower() in ['.jpg', '.xml']:
                slide_base_name = gdrive_file_path.stem
                if slide_base_name in processed_slides: continue
                processed_slides.add(slide_base_name)
                
                if (patient_gdrive_folder / f"{slide_base_name}.jpg").exists(): counts['slides_jpg'] += 1
                if (patient_gdrive_folder / f"{slide_base_name}.xml").exists(): counts['slides_xml'] += 1

                # Conta células correspondentes em Mendeley
                for cell_type in ['plasma cells', 'non-plasma cells']:
                    cell_images_source = mendeley_path / cell_type / 'images'
                    cell_masks_source = mendeley_path / cell_type / 'masks'
                    if cell_images_source.exists():
                        for cell_file in os.listdir(cell_images_source):
                            if cell_file.lower().startswith(f"{slide_base_name.lower()}_p_") and cell_file.lower().endswith('.jpg'):
                                key_jpg = 'plasma_jpg' if 'plasma' in cell_type else 'non_plasma_jpg'
                                key_json = 'plasma_json' if 'plasma' in cell_type else 'non_plasma_json'
                                counts[key_jpg] += 1
                                if (cell_masks_source / f"{Path(cell_file).stem}.json").exists():
                                    counts[key_json] += 1
    return counts

def organize_files_and_get_paths(gdrive_path, mendeley_path, output_repo_path, patient_map):
    """Move os arquivos das fontes para a nova estrutura organizada."""
    logging.info(f"Movendo arquivos para a nova estrutura em {output_repo_path}...")
    output_repo_path.mkdir(parents=True, exist_ok=True)
    
    for patient_folder_name, patient_id in patient_map.items():
        patient_source_folder = gdrive_path / patient_folder_name
        if not patient_source_folder.exists(): continue

        patient_dest_folder = output_repo_path / patient_id
        
        processed_slides = set()
        for gdrive_file in os.listdir(patient_source_folder):
            gdrive_file_path = patient_source_folder / gdrive_file
            if gdrive_file_path.is_file() and gdrive_file_path.suffix.lower() in ['.jpg', '.xml']:
                slide_base_name = gdrive_file_path.stem
                if slide_base_name in processed_slides: continue
                processed_slides.add(slide_base_name)
                
                slide_dest_folder = patient_dest_folder / slide_base_name
                slide_files_dest = slide_dest_folder / "slide"
                slide_files_dest.mkdir(parents=True, exist_ok=True)

                for ext in ['.jpg', '.xml']:
                    source_file = patient_source_folder / f"{slide_base_name}{ext}"
                    if source_file.exists(): shutil.move(str(source_file), str(slide_files_dest))

                for cell_type in ['plasma cells', 'non-plasma cells']:
                    cell_dest_folder = slide_dest_folder / cell_type
                    cell_dest_folder.mkdir(exist_ok=True)
                    cell_images_source = mendeley_path / cell_type / 'images'
                    cell_masks_source = mendeley_path / cell_type / 'masks'

                    if cell_images_source.exists():
                        for cell_img_filename in list(os.listdir(cell_images_source)):
                            if cell_img_filename.lower().startswith(f"{slide_base_name.lower()}_p_") and cell_img_filename.lower().endswith('.jpg'):
                                shutil.move(str(cell_images_source / cell_img_filename), str(cell_dest_folder))

                    if cell_masks_source.exists():
                         for mask_filename in list(os.listdir(cell_masks_source)):
                            if mask_filename.lower().startswith(f"{slide_base_name.lower()}_p_") and mask_filename.lower().endswith('.json'):
                                shutil.move(str(cell_masks_source / mask_filename), str(cell_dest_folder))
    logging.info("Movimentação de arquivos concluída.")
    return output_repo_path

def count_organized_files(organized_path):
    """Conta os arquivos na nova estrutura organizada."""
    logging.info("Contando arquivos no repositório organizado final...")
    counts = {'slides_jpg': 0, 'slides_xml': 0, 'plasma_jpg': 0, 'plasma_json': 0, 'non_plasma_jpg': 0, 'non_plasma_json': 0}
    for root, _, files in os.walk(organized_path):
        p_root = Path(root)
        for file in files:
            if p_root.name == 'slide' and file.endswith('.jpg'): counts['slides_jpg'] += 1
            elif p_root.name == 'slide' and file.endswith('.xml'): counts['slides_xml'] += 1
            elif p_root.name == 'plasma cells' and file.endswith('.jpg'): counts['plasma_jpg'] += 1
            elif p_root.name == 'plasma cells' and file.endswith('.json'): counts['plasma_json'] += 1
            elif p_root.name == 'non-plasma cells' and file.endswith('.jpg'): counts['non_plasma_jpg'] += 1
            elif p_root.name == 'non-plasma cells' and file.endswith('.json'): counts['non_plasma_json'] += 1
    return counts

In [33]:
# EXECUÇÃO E VERIFICAÇÃO DA ETAPA 1

# 1. Contar arquivos ANTES de mover
source_counts = count_source_files(GDRIVE_SOURCE_PATH, MENDELEY_SOURCE_PATH, PATIENT_FOLDER_MAP)
if source_counts:
    logging.info(f"Contagem inicial de arquivos na origem: {source_counts}")

    # 2. Executar a movimentação dos arquivos
    organize_files_and_get_paths(GDRIVE_SOURCE_PATH, MENDELEY_SOURCE_PATH, ORGANIZED_DATA_PATH, PATIENT_FOLDER_MAP)
    
    # 3. Contar arquivos DEPOIS de mover
    organized_counts = count_organized_files(ORGANIZED_DATA_PATH)
    logging.info(f"Contagem final de arquivos no repositório: {organized_counts}")
    
    # 4. Verificação final
    if source_counts == organized_counts:
        logging.info("VERIFICAÇÃO ETAPA 1: SUCESSO! As contagens de arquivos antes e depois da movimentação correspondem.")
    else:
        logging.error("VERIFICAÇÃO ETAPA 1: FALHA! As contagens de arquivos não correspondem.")
else:
    logging.error("Não foi possível iniciar a Etapa 1. Verifique os caminhos de origem.")

2025-06-30 14:45:34,334 - INFO - Contando arquivos nas fontes de dados originais...
2025-06-30 14:45:35,234 - INFO - Contagem inicial de arquivos na origem: {'slides_jpg': 436, 'slides_xml': 433, 'plasma_jpg': 1826, 'plasma_json': 1826, 'non_plasma_jpg': 0, 'non_plasma_json': 0}
2025-06-30 14:45:35,235 - INFO - Movendo arquivos para a nova estrutura em /home/ludrz/Documentos/Trabalhos/IC/Data projeto Mieloma Caio/Dados test_4_DBMM/REPOSITÓRIO_DBMM/data/slide_and_cells...
2025-06-30 14:45:36,962 - INFO - Movimentação de arquivos concluída.
2025-06-30 14:45:36,963 - INFO - Contando arquivos no repositório organizado final...
2025-06-30 14:45:37,005 - INFO - Contagem final de arquivos no repositório: {'slides_jpg': 436, 'slides_xml': 433, 'plasma_jpg': 1087, 'plasma_json': 1087, 'non_plasma_jpg': 739, 'non_plasma_json': 739}
2025-06-30 14:45:37,006 - ERROR - VERIFICAÇÃO ETAPA 1: FALHA! As contagens de arquivos não correspondem.


## Etapa 2: Extração de Metadados e Geração dos Arquivos CSV

Com os arquivos agora organizados, esta seção percorre o `REPOSITÓRIO_DBMM`, extrai todos os metadados e os estrutura em 9 arquivos `.csv` na pasta `Output_CSV`. A criação destes arquivos serve como verificação  antes da importação para o banco de dados.

In [34]:

# FUNÇÕES DA ETAPA 2 (EXTRAÇÃO E GERAÇÃO DE CSVs)


# --- FUNÇÕES DE PARSING
def get_image_dimensions(image_path):
    try:
        with Image.open(image_path) as img: width, height = img.size; return width, height, width * height
    except Exception as e:
        logging.warning(f"Não foi possível ler dimensões de {image_path}: {e}")
        return None, None, None

def parse_xml_objects(xml_file_path, slide_id_for_fk, counters):
    objects = []
    try:
        tree = ET.parse(xml_file_path); root = tree.getroot();
        for i, obj_node in enumerate(root.findall('.//object')):
            counters['slide_object_id'] += 1; bndbox_node = obj_node.find('bndbox')
            objects.append({
                'slide_object_id': counters['slide_object_id'], 'slide_id': slide_id_for_fk,
                'object_identifier_str': f"{xml_file_path.stem}_obj_{i+1}",
                'object_name': (n.text if (n := obj_node.find('name')) is not None else None),
                'xmin': (int(n.text) if bndbox_node is not None and (n := bndbox_node.find('xmin')) is not None else None),
                'ymin': (int(n.text) if bndbox_node is not None and (n := bndbox_node.find('ymin')) is not None else None),
                'xmax': (int(n.text) if bndbox_node is not None and (n := bndbox_node.find('xmax')) is not None else None),
                'ymax': (int(n.text) if bndbox_node is not None and (n := bndbox_node.find('ymax')) is not None else None),
                'pose': (n.text if (n := obj_node.find('pose')) is not None else None),
                'truncated': (int(n.text) if (n := obj_node.find('truncated')) is not None and n.text is not None else None),
                'difficult': (int(n.text) if (n := obj_node.find('difficult')) is not None and n.text is not None else None),
            })
    except Exception as e: logging.error(f"Erro ao processar XML {xml_file_path}: {e}")
    return objects

def parse_json_mask(json_file_path):
    try:
        with open(json_file_path, 'r') as f: data = json.load(f)
        if data.get('shapes') and len(data['shapes']) > 0:
            shape = data['shapes'][0]; return shape.get('label'), shape.get('shape_type'), json.dumps(shape.get('points'))
    except Exception as e: logging.warning(f"Não foi possível ler JSON {json_file_path}: {e}")
    return None, None, None

# --- FUNÇÃO DE EXTRAÇÃO
def extract_data_and_write_csvs(organized_repo_path, csv_output_path):
    logging.info("Iniciando a Etapa 2: Extração de dados e geração de CSVs...")
    data_storage = { 'image_repository': [], 'patient': [], 'image_metadata': [], 'slide': [], 'slide_object': [], 'slide_cell_info': [], 'cell': [], 'group_image_info': [], 'group_multiple_image': [] }
    counters = { 'image_repository_id': 0, 'patient_id': 0, 'image_metadata_id': 0, 'slide_id': 0, 'slide_object_id': 0, 'slide_cell_info_id': 0, 'cell_id': 0, 'group_image_info_id': 0, 'group_multiple_image_id': 0 }
    patient_identifier_to_id, slide_id_to_slide_cell_info_id = {}, {}
    
    counters['image_repository_id'] += 1
    data_storage['image_repository'].append({'image_repository_id': counters['image_repository_id'], 'repository_name': 'REPOSITORIO_DBMM', 'root_path': str(REPO_DEST_PATH), 'require_accesses': 1, 'type_accesses': 'local_filesystem'})

    for patient_folder in organized_repo_path.iterdir():
        if not patient_folder.is_dir(): continue
        patient_id_str = patient_folder.name; counters['patient_id'] += 1; patient_id = counters['patient_id']; patient_identifier_to_id[patient_id_str] = patient_id
        patient_record = {'patient_id': patient_id, 'label': None, 'name': patient_id_str, 'number_of_slides': 0, 'diagnostic_suspicion': None, 'diagnosis': None, 'prognosis': None, 'multiple_myeloma_diagnose': 0}; data_storage['patient'].append(patient_record)
        
        for slide_folder in patient_folder.iterdir():
            if not slide_folder.is_dir(): continue
            slide_id_str = slide_folder.name; counters['slide_id'] += 1; slide_id = counters['slide_id'];
            
            counters['slide_cell_info_id'] += 1; slide_cell_info_id = counters['slide_cell_info_id']; slide_id_to_slide_cell_info_id[slide_id] = slide_cell_info_id
            slide_cell_info_record = {'slide_cell_info_id': slide_cell_info_id, 'slide_id': slide_id, 'calculated_cell_count': 0, 'other_notes': None}; data_storage['slide_cell_info'].append(slide_cell_info_record)
            
            counters['group_image_info_id'] += 1; group_id_for_slide = counters['group_image_info_id']
            data_storage['group_image_info'].append({
                'group_image_info_id': group_id_for_slide,
                'description': f'Files for slide {slide_id_str} of patient {patient_id_str}',
                'relative_group_path': str(slide_folder.relative_to(ORGANIZED_DATA_PATH.parent)),
                'image_repository_id': counters['image_repository_id']
            })

            slide_main_image_metadata_id = None
            slide_content_path = slide_folder / "slide"
            if slide_content_path.exists():
                for item in slide_content_path.iterdir():
                    if not item.is_file(): continue
                    if item.suffix.lower() == '.jpg':
                        counters['image_metadata_id'] += 1; metadata_id = counters['image_metadata_id']; width, height, num_pixels = get_image_dimensions(item)
                        data_storage['image_metadata'].append({'image_metadata_id': metadata_id, 'image_name': item.name, 'image_type': 'jpg', 'image_date': datetime.fromtimestamp(item.stat().st_mtime).strftime('%Y-%m-%d %H:%M:%S'), 'image_size_kb': round(item.stat().st_size / 1024), 'relative_path': str(item.relative_to(REPO_DEST_PATH)), 'image_width': width, 'image_height': height, 'number_of_pixels': num_pixels})
                        slide_main_image_metadata_id = metadata_id
                        counters['group_multiple_image_id'] += 1
                        data_storage['group_multiple_image'].append({'group_multiple_image_id': counters['group_multiple_image_id'], 'group_image_info_id': group_id_for_slide, 'image_metadata_id': metadata_id})
                    elif item.suffix.lower() == '.xml':
                        data_storage['slide_object'].extend(parse_xml_objects(item, slide_id, counters))

            data_storage['slide'].append({'slide_id': slide_id, 'slide_identifier': slide_id_str, 'myeloma_is_present': None, 'patient_id': patient_id, 'image_metadata_id': slide_main_image_metadata_id, 'xml_folder_name': None, 'slide_label_content': None})

            total_cells_for_slide = 0
            for cell_type_name in ["non-plasma cells", "plasma cells"]:
                cell_folder_path = slide_folder / cell_type_name
                if not cell_folder_path.is_dir(): continue
                for file in cell_folder_path.iterdir():
                    if file.is_file() and file.suffix.lower() == '.jpg':
                        total_cells_for_slide += 1; counters['cell_id'] += 1; cell_id = counters['cell_id']
                        counters['image_metadata_id'] += 1; cell_img_metadata_id = counters['image_metadata_id']; width, height, num_pixels = get_image_dimensions(file)
                        data_storage['image_metadata'].append({'image_metadata_id': cell_img_metadata_id, 'image_name': file.name, 'image_type': 'jpg', 'image_date': datetime.fromtimestamp(file.stat().st_mtime).strftime('%Y-%m-%d %H:%M:%S'), 'image_size_kb': round(file.stat().st_size / 1024), 'relative_path': str(file.relative_to(REPO_DEST_PATH)), 'image_width': width, 'image_height': height, 'number_of_pixels': num_pixels})
                        
                        counters['group_multiple_image_id'] += 1
                        data_storage['group_multiple_image'].append({'group_multiple_image_id': counters['group_multiple_image_id'], 'group_image_info_id': group_id_for_slide, 'image_metadata_id': cell_img_metadata_id})

                        mask_path = cell_folder_path / f"{file.stem}.json"
                        mask_label, mask_shape_type, mask_points_str = parse_json_mask(mask_path) if mask_path.exists() else (None, None, None)
                        
                        cell_type_value = "plasma" if cell_type_name == "plasma cells" else "non-plasma"
                        
                        data_storage['cell'].append({
                            'cell_id': cell_id, 'cell_identifier_str': file.stem, 
                            'slide_cell_info_id': slide_cell_info_id, 
                            'image_metadata_id': cell_img_metadata_id, 
                            'cell_type': cell_type_value,
                            'mask_label': mask_label, 
                            'mask_shape_type': mask_shape_type, 
                            'mask_points_json_str': mask_points_str, 
                            'core_numbers': None
                        })
            
            slide_cell_info_record['calculated_cell_count'] = total_cells_for_slide
        patient_record['number_of_slides'] = len(list(patient_folder.glob('*')))

    logging.info(f"Escrevendo arquivos CSV em: {csv_output_path}...")
    csv_output_path.mkdir(parents=True, exist_ok=True)
    for table_name, data_list in data_storage.items():
        if not data_list: logging.warning(f"Nenhum dado para a tabela '{table_name}', CSV não será gerado."); continue
        df = pd.DataFrame(data_list); df.to_csv(csv_output_path / f"{table_name}.csv", index=False, encoding='utf-8')
        logging.info(f"✔ Arquivo '{table_name}.csv' gerado com sucesso ({len(df)} linhas).")
    
    logging.info("Geração de CSVs concluída.")
    return {name: pd.DataFrame(data) for name, data in data_storage.items() if data}

In [35]:
# EXECUÇÃO E VERIFICAÇÃO DA ETAPA 2

# Executa a função de extração e geração de CSVs
dataframes_in_memory = extract_data_and_write_csvs(ORGANIZED_DATA_PATH, CSV_OUTPUT_PATH)

# Ponto de Verificação 2: Inspecionar os DataFrames criados em memória
if dataframes_in_memory:
    logging.info("--- Ponto de Verificação 2: Verificando DataFrames extraídos ---")
    for name, df in dataframes_in_memory.items():
        logging.info(f"DataFrame '{name}': {df.shape[0]} linhas, {df.shape[1]} colunas.")
    
    if 'cell' not in dataframes_in_memory or dataframes_in_memory['cell'].empty:
         logging.error("VERIFICAÇÃO ETAPA 2: FALHA! Nenhum dado de célula foi extraído. O DataFrame 'cell' está vazio ou ausente.")
    else:
         logging.info("VERIFICAÇÃO ETAPA 2: SUCESSO! DataFrame 'cell' foi gerado com sucesso.")
    logging.info("--- Fim da Verificação 2 ---")
else:
    logging.error("Nenhum DataFrame foi gerado na Etapa 2.")

2025-06-30 14:45:37,047 - INFO - Iniciando a Etapa 2: Extração de dados e geração de CSVs...
2025-06-30 14:45:38,408 - INFO - Escrevendo arquivos CSV em: /home/ludrz/Documentos/Trabalhos/IC/Data projeto Mieloma Caio/Dados test_4_DBMM/Output_CSV...
2025-06-30 14:45:38,414 - INFO - ✔ Arquivo 'image_repository.csv' gerado com sucesso (1 linhas).
2025-06-30 14:45:38,418 - INFO - ✔ Arquivo 'patient.csv' gerado com sucesso (3 linhas).
2025-06-30 14:45:38,436 - INFO - ✔ Arquivo 'image_metadata.csv' gerado com sucesso (2262 linhas).
2025-06-30 14:45:38,441 - INFO - ✔ Arquivo 'slide.csv' gerado com sucesso (436 linhas).
2025-06-30 14:45:38,454 - INFO - ✔ Arquivo 'slide_object.csv' gerado com sucesso (1884 linhas).
2025-06-30 14:45:38,457 - INFO - ✔ Arquivo 'slide_cell_info.csv' gerado com sucesso (436 linhas).
2025-06-30 14:45:38,501 - INFO - ✔ Arquivo 'cell.csv' gerado com sucesso (1826 linhas).
2025-06-30 14:45:38,506 - INFO - ✔ Arquivo 'group_image_info.csv' gerado com sucesso (436 linhas).


## Etapa 3: Importação e Validação no Banco de Dados

O script lê os `.csv` recém-criados, se conecta no banco de dados `test_4_DBMM`, limpa as tabelas  e importa os novos dados. Ao final, uma consulta de validação compara as contagens de linhas nos CSVs com as contagens nas tabelas do banco para uma verificação final.

In [36]:
# FUNÇÕES DA ETAPA 3 (IMPORTAÇÃO E VALIDAÇÃO NO BD)

def get_db_engine(db_config):
    """Cria e retorna uma engine de conexão do SQLAlchemy."""
    try:
        connection_string = f"mysql+mysqlconnector://{db_config['user']}:{db_config['password']}@{db_config['host']}/{db_config['database']}"
        engine = create_engine(connection_string)
        with engine.connect() as connection:
            logging.info(f"Conexão com o banco '{db_config['database']}' bem-sucedida.")
        return engine
    except Exception as e:
        logging.error(f"Falha ao conectar ao banco de dados: {e}")
        return None

def load_csvs_to_db(csv_path, engine):
    """Carrega todos os arquivos CSV de uma pasta para o banco de dados."""
    TABLE_IMPORT_ORDER = [
        'image_repository', 'patient', 'image_metadata', 'group_image_info', 'slide',
        'slide_object', 'slide_cell_info', 'cell', 'group_multiple_image'
    ]
    
    logging.info("Iniciando a Etapa 3: Importação dos dados no banco...")
    try:
        with engine.connect() as connection:
            connection.execute(text("SET FOREIGN_KEY_CHECKS = 0;"))
            for table_name in reversed(TABLE_IMPORT_ORDER):
                logging.info(f"  Limpando tabela '{table_name}'...")
                connection.execute(text(f"TRUNCATE TABLE {table_name};"))
            connection.execute(text("SET FOREIGN_KEY_CHECKS = 1;"))
            logging.info("Todas as tabelas foram limpas (TRUNCATE).")
    except Exception as e:
        logging.error(f"Não foi possível limpar as tabelas. Erro: {e}")
        return

    for table_name in TABLE_IMPORT_ORDER:
        csv_file = csv_path / f"{table_name}.csv"
        if not csv_file.exists(): continue
        
        logging.info(f"  Carregando arquivo '{csv_file.name}' para a tabela '{table_name}'...")
        df = pd.read_csv(csv_file).replace({np.nan: None})
        
        # Converte colunas que podem ser float com NaN para inteiros que aceitam nulos
        for col in df.select_dtypes(include=['float64']).columns:
            # Verifica se a conversão é segura
            if df[col].dropna().mod(1).sum() == 0:
                df[col] = df[col].astype('Int64')

        df.to_sql(name=table_name, con=engine, if_exists='append', index=False)
    logging.info("Importação de dados no banco concluída.")

def verify_db_load(engine, dataframes_to_check):
    """Compara a contagem de linhas dos DataFrames com as tabelas do banco."""
    logging.info("--- Ponto de Verificação 3: Validando dados no banco ---")
    all_match = True
    with engine.connect() as connection:
        for table_name, df in dataframes_to_check.items():
            df_count = len(df)
            db_count_result = connection.execute(text(f"SELECT COUNT(*) FROM {table_name};")).scalar_one()
            if df_count == db_count_result:
                logging.info(f"  Tabela '{table_name}': Contagem OK ({df_count} linhas).")
            else:
                logging.error(f"  Tabela '{table_name}': INCONSISTÊNCIA! DataFrame tem {df_count} linhas, BD tem {db_count_result} linhas.")
                all_match = False
    
    if all_match:
        logging.info("VERIFICAÇÃO ETAPA 3: SUCESSO! Todas as contagens de registros no banco correspondem aos dados extraídos.")
    else:
        logging.error("VERIFICAÇÃO ETAPA 3: FALHA! Inconsistências encontradas.")

In [37]:
# EXECUÇÃO E VERIFICAÇÃO DA ETAPA 3

# A variável 'dataframes_in_memory' foi preenchida
if dataframes_in_memory:
    db_engine = get_db_engine(DB_CONFIG)
    if db_engine:
        load_csvs_to_db(CSV_OUTPUT_PATH, db_engine)
        
        # A verificação usa os dataframes que geraram os CSVs para comparar
        verify_db_load(db_engine, dataframes_in_memory)
        
        db_engine.dispose()
else:
    logging.warning("Etapa 3 pulada pois não há DataFrames em memória para verificar.")

2025-06-30 14:45:38,639 - INFO - Conexão com o banco 'test_4_DBMM' bem-sucedida.
2025-06-30 14:45:38,640 - INFO - Iniciando a Etapa 3: Importação dos dados no banco...
2025-06-30 14:45:38,641 - INFO -   Limpando tabela 'group_multiple_image'...
2025-06-30 14:45:38,689 - INFO -   Limpando tabela 'cell'...
2025-06-30 14:45:38,721 - INFO -   Limpando tabela 'slide_cell_info'...
2025-06-30 14:45:38,743 - INFO -   Limpando tabela 'slide_object'...
2025-06-30 14:45:38,770 - INFO -   Limpando tabela 'slide'...
2025-06-30 14:45:38,796 - INFO -   Limpando tabela 'group_image_info'...
2025-06-30 14:45:38,816 - INFO -   Limpando tabela 'image_metadata'...
2025-06-30 14:45:38,844 - INFO -   Limpando tabela 'patient'...
2025-06-30 14:45:38,866 - INFO -   Limpando tabela 'image_repository'...
2025-06-30 14:45:38,887 - INFO - Todas as tabelas foram limpas (TRUNCATE).
2025-06-30 14:45:38,888 - INFO -   Carregando arquivo 'image_repository.csv' para a tabela 'image_repository'...
2025-06-30 14:45:38,91