In [0]:
%run ../notebooks/configs

In [0]:
import os
import base64
import email
from email import policy
from email.parser import BytesParser

In [0]:
email_directory = f"{RAW_PATH}2025/01"
output_directory = f"{DESSERIALIZED_PATH}2025/01"
tmp_directory = "/dbfs/tmp/"

os.makedirs(tmp_directory, exist_ok=True)
os.makedirs(output_directory, exist_ok=True)

In [0]:
def decode_header_safe(header_value):
    """Decodifica um cabeçalho de e-mail com fallback seguro."""
    if not header_value:
        return "Desconhecido"
    
    decoded_parts = email.header.decode_header(header_value)
    result = []
    
    for part, encoding in decoded_parts:
        try:
            result.append(part.decode(encoding) if isinstance(part, bytes) and encoding else part.decode("utf-8", errors="ignore"))
        except (AttributeError, LookupError):
            result.append(str(part))
    
    return " ".join(result)

In [0]:
def process_email(file_path, output_directory):
    """Processa um e-mail e salva um arquivo limpo"""
    with open(file_path, "rb") as f:
        msg = BytesParser(policy=policy.default).parse(f)

    subject = decode_header_safe(msg["subject"])
    from_email = decode_header_safe(msg["from"])
    to_email = decode_header_safe(msg["to"])
    date = decode_header_safe(msg["date"])
    content_type = msg.get_content_type()

    email_body = ""
    attachments = []

    if msg.is_multipart():
        for part in msg.walk():
            content_disposition = str(part.get("Content-Disposition") or "").lower()
            if "attachment" in content_disposition:
                filename = decode_header_safe(part.get_filename())
                attachments.append(filename)
            elif part.get_content_type() == "text/plain" and "attachment" not in content_disposition:
                payload = part.get_payload(decode=True)
                if payload:
                    email_body += payload.decode(part.get_content_charset() or "utf-8", errors="ignore") + "\n"
    else:
        payload = msg.get_payload(decode=True)
        if payload:
            email_body = payload.decode(msg.get_content_charset() or "utf-8", errors="ignore")

    # Corrige a extensão do arquivo processado
    base_name, _ = os.path.splitext(os.path.basename(file_path))
    clean_filename = os.path.join(output_directory, f"{base_name}.txt")

    with open(clean_filename, "w", encoding="utf-8") as f:
        f.write(email_body.strip())  # Escreve o corpo primeiro
        f.write("\n\n")  # Separador
        f.write(f"--- Metadados ---\n")
        f.write(f"Subject: {subject}\n")
        f.write(f"From: {from_email}\n")
        f.write(f"To: {to_email}\n")
        f.write(f"Date: {date}\n")
        f.write(f"Content-Type: {content_type}\n")
        f.write(f"Attachments: {', '.join(attachments) if attachments else 'Nenhum'}\n")

    print(f"Processed: {file_path} → {clean_filename}")

    return clean_filename  # Retorna o caminho do arquivo processado


In [0]:
# **Passo 1: Listar e-mails no Azure**
files = dbutils.fs.ls(email_directory)

# **Passo 2: Processar os arquivos**
for file_info in files:
    azure_path = file_info.path  # Caminho no Blob Storage
    local_file = os.path.join(tmp_directory, os.path.basename(azure_path))

    # Baixar o arquivo
    dbutils.fs.cp(azure_path, f"file:{local_file}")

    # Processar o e-mail
    clean_filename = process_email(local_file, tmp_directory)

    # **Corrige o caminho para upload**
    dbutils.fs.cp(clean_filename.replace("/dbfs", ""), f"{DESSERIALIZED_PATH}2025/01/")

print("✅ Processamento concluído!")