In [None]:
pip install --upgrade pip

!pip -q install gspread google-auth

In [None]:
from google.colab import auth
auth.authenticate_user()


In [None]:
FILTERS_FILE = "filters.json"

DEFAULT_FILTERS = {
    "imap_search": "ALL",
    "include": {
        "from_contains": [],
        "subject_contains": [],
        "body_contains": []
    },
    "exclude": {
        "from_contains": [],
        "subject_contains": [],
        "body_contains": []
    }
}

def load_filters():
    if not os.path.exists(FILTERS_FILE):
        return DEFAULT_FILTERS

    with open(FILTERS_FILE, "r", encoding="utf-8") as f:
        cfg = json.load(f)

    # merge simples (garante chaves)
    out = DEFAULT_FILTERS.copy()
    out["imap_search"] = cfg.get("imap_search", out["imap_search"])

    out["include"] = out["include"].copy()
    out["include"]["from_contains"] = cfg.get("include", {}).get("from_contains", [])
    out["include"]["subject_contains"] = cfg.get("include", {}).get("subject_contains", [])

    out["exclude"] = out["exclude"].copy()
    out["exclude"]["from_contains"] = cfg.get("exclude", {}).get("from_contains", [])
    out["exclude"]["subject_contains"] = cfg.get("exclude", {}).get("subject_contains", [])
    out["include"]["body_contains"] = cfg.get("include", {}).get("body_contains", [])
    out["exclude"]["body_contains"] = cfg.get("exclude", {}).get("body_contains", [])


    return out

def contains_any(text: str, needles) -> bool:
    text_low = (text or "").lower()
    return any((n or "").lower() in text_low for n in needles)

def passes_filters(subject: str, from_header: str, body: str, filters: dict) -> bool:
    inc = filters.get("include", {})
    exc = filters.get("exclude", {})

    # INCLUDE
    if inc.get("from_contains") and not contains_any(from_header, inc["from_contains"]):
        return False

    if inc.get("subject_contains") and not contains_any(subject, inc["subject_contains"]):
        return False

    if inc.get("body_contains") and not contains_any(body, inc["body_contains"]):
        return False

    # EXCLUDE
    if exc.get("from_contains") and contains_any(from_header, exc["from_contains"]):
        return False

    if exc.get("subject_contains") and contains_any(subject, exc["subject_contains"]):
        return False

    if exc.get("body_contains") and contains_any(body, exc["body_contains"]):
        return False

    return True


In [None]:
import imaplib
import email
from email.header import decode_header
import csv
import json
import os
import re
import time
import socket
from datetime import datetime

import gspread
from google.auth import default

IMAP_HOST = "imap.gmail.com"
IMAP_PORT = 993

STATE_FILE = "state.json"
CSV_FILE = "emails.csv"
CSV_HEADERS = ["id", "subject", "body"]

FILTERS_FILE = "filters.json"

# ====== Robustez ======
SOCKET_TIMEOUT_SECONDS = 45
IMAP_FETCH_RETRIES = 3
IMAP_RETRY_SLEEP_SECONDS = 2
MAX_EMAILS_PER_RUN = 50
ONLY_HEADERS = False
TRUNCATE_BODY_CHARS = 20000  # pode aumentar depois

# ====== Google Sheets config ======
ENABLE_SHEETS = True
SPREADSHEET_ID = "1PhL-0EjUfy6UlKHh0PeKdQ8Mi-yKqGBllT637pi1C2c"
WORKSHEET_NAME = "emails"


def log(msg: str):
    print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}")


# ---------- Filters ----------
DEFAULT_FILTERS = {
    "imap_search": "ALL",
    "include": {
        "from_contains": [],
        "subject_contains": [],
        "body_contains": [],
    },
    "exclude": {
        "from_contains": [],
        "subject_contains": [],
        "body_contains": [],
    }
}


def load_filters():
    if not os.path.exists(FILTERS_FILE):
        return DEFAULT_FILTERS

    with open(FILTERS_FILE, "r", encoding="utf-8") as f:
        cfg = json.load(f)

    out = {
        "imap_search": cfg.get("imap_search", DEFAULT_FILTERS["imap_search"]),
        "include": {
            "from_contains": cfg.get("include", {}).get("from_contains", []),
            "subject_contains": cfg.get("include", {}).get("subject_contains", []),
            "body_contains": cfg.get("include", {}).get("body_contains", []),
        },
        "exclude": {
            "from_contains": cfg.get("exclude", {}).get("from_contains", []),
            "subject_contains": cfg.get("exclude", {}).get("subject_contains", []),
            "body_contains": cfg.get("exclude", {}).get("body_contains", []),
        }
    }
    return out


def contains_any(text: str, needles) -> bool:
    text_low = (text or "").lower()
    return any((n or "").lower() in text_low for n in needles)


def passes_filters(subject: str, from_header: str, body: str, filters: dict) -> bool:
    inc = filters.get("include", {})
    exc = filters.get("exclude", {})

    # INCLUDE (AND)
    if inc.get("from_contains") and not contains_any(from_header, inc["from_contains"]):
        return False
    if inc.get("subject_contains") and not contains_any(subject, inc["subject_contains"]):
        return False
    if inc.get("body_contains") and not contains_any(body, inc["body_contains"]):
        return False

    # EXCLUDE (OR)
    if exc.get("from_contains") and contains_any(from_header, exc["from_contains"]):
        return False
    if exc.get("subject_contains") and contains_any(subject, exc["subject_contains"]):
        return False
    if exc.get("body_contains") and contains_any(body, exc["body_contains"]):
        return False

    return True


# ---------- State ----------
def load_state():
    if os.path.exists(STATE_FILE):
        with open(STATE_FILE, "r", encoding="utf-8") as f:
            return json.load(f)
    return {"last_uid": 0}


def save_state(state):
    with open(STATE_FILE, "w", encoding="utf-8") as f:
        json.dump(state, f, ensure_ascii=False, indent=2)


# ---------- Decoding ----------
def safe_decode_bytes(payload: bytes, charset: str) -> str:
    charset = (charset or "utf-8").strip().lower()
    if charset in ("unknown-8bit", "x-unknown", "unknown"):
        charset = "utf-8"
    try:
        return payload.decode(charset, errors="replace")
    except LookupError:
        return payload.decode("utf-8", errors="replace")


def decode_mime_words(s):
    if not s:
        return ""
    parts = decode_header(s)
    out = []
    for text, enc in parts:
        if isinstance(text, bytes):
            enc = (enc or "utf-8").strip().lower()
            if enc in ("unknown-8bit", "x-unknown", "unknown"):
                enc = "utf-8"
            try:
                out.append(text.decode(enc, errors="replace"))
            except LookupError:
                out.append(text.decode("utf-8", errors="replace"))
        else:
            out.append(text)
    return "".join(out)


def clean_text(s):
    s = s.replace("\r\n", "\n").replace("\r", "\n").strip()
    s = re.sub(r"\n{3,}", "\n\n", s)
    if len(s) > TRUNCATE_BODY_CHARS:
        s = s[:TRUNCATE_BODY_CHARS] + " ...[truncado]"
    return s


def extract_body(msg):
    text_plain = None
    text_html = None

    if msg.is_multipart():
        for part in msg.walk():
            ctype = part.get_content_type()
            disp = str(part.get("Content-Disposition") or "")
            if "attachment" in disp.lower():
                continue

            payload = part.get_payload(decode=True)
            if not payload:
                continue

            charset = part.get_content_charset() or "utf-8"
            content = safe_decode_bytes(payload, charset)

            if ctype == "text/plain" and text_plain is None:
                text_plain = content
            elif ctype == "text/html" and text_html is None:
                text_html = content
    else:
        ctype = msg.get_content_type()
        payload = msg.get_payload(decode=True) or b""
        charset = msg.get_content_charset() or "utf-8"
        content = safe_decode_bytes(payload, charset)

        if ctype == "text/plain":
            text_plain = content
        elif ctype == "text/html":
            text_html = content

    if text_plain:
        return clean_text(text_plain)

    if text_html:
        no_script = re.sub(r"<(script|style)[^>]*>.*?</\1>", "", text_html, flags=re.S | re.I)
        no_tags = re.sub(r"<[^>]+>", " ", no_script)
        no_tags = re.sub(r"\s+", " ", no_tags).strip()
        return clean_text(no_tags)

    return ""


# ---------- CSV ----------
def ensure_csv_headers():
    if not os.path.exists(CSV_FILE):
        with open(CSV_FILE, "w", newline="", encoding="utf-8") as f:
            writer = csv.writer(f)
            writer.writerow(CSV_HEADERS)
        log(f"CSV criado com cabeçalho: {CSV_FILE}")


def append_rows_csv(rows):
    if not rows:
        return
    with open(CSV_FILE, "a", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)
        writer.writerows(rows)
    log(f"CSV: escreveu {len(rows)} linhas.")


# ---------- IMAP fetch retry ----------
def imap_uid_fetch_with_retry(mail, uid_b: bytes, what: str):
    last_err = None
    for attempt in range(1, IMAP_FETCH_RETRIES + 1):
        try:
            status, msg_data = mail.uid("fetch", uid_b, what)
            if status == "OK" and msg_data and msg_data[0]:
                return status, msg_data
            last_err = (status, msg_data)
        except (imaplib.IMAP4.abort, imaplib.IMAP4.error, socket.timeout) as e:
            last_err = e
            log(f"⚠️ fetch falhou (tentativa {attempt}/{IMAP_FETCH_RETRIES}) uid={uid_b!r}: {e}")
            time.sleep(IMAP_RETRY_SLEEP_SECONDS)
    raise RuntimeError(f"Falha ao buscar mensagem (uid={uid_b!r}). Último erro: {last_err}")


# ---------- Sheets ----------
def connect_sheets_oauth_colab(spreadsheet_id: str, worksheet_name: str):
    creds, _ = default()
    gc = gspread.authorize(creds)
    sh = gc.open_by_key(spreadsheet_id)

    try:
        ws = sh.worksheet(worksheet_name)
    except gspread.WorksheetNotFound:
        ws = sh.add_worksheet(title=worksheet_name, rows=2000, cols=10)

    ws.update(values=[CSV_HEADERS], range_name="A1:C1")
    return ws


def main():
    filters = load_filters()
    log(f"Filtros carregados: imap_search={filters['imap_search']}")

    # Credenciais IMAP (hardcoded, com opção env)
    GMAIL_USER = os.environ.get("GMAIL_USER") or "seuemail@gmail.com"
    GMAIL_PASS = os.environ.get("GMAIL_PASS") or "SUA_SENHA_DE_APP"

    GMAIL_PASS = GMAIL_PASS.replace(" ", "")

    socket.setdefaulttimeout(SOCKET_TIMEOUT_SECONDS)

    log("Iniciando...")
    state = load_state()
    last_uid = int(state.get("last_uid", 0))
    log(f"Checkpoint atual (last_uid): {last_uid}")

    ensure_csv_headers()

    ws = None
    if ENABLE_SHEETS:
        try:
            log("Conectando no Google Sheets (OAuth do Colab)...")
            ws = connect_sheets_oauth_colab(SPREADSHEET_ID, WORKSHEET_NAME)
            log("Sheets conectado ✅")
        except Exception as e:
            log(f"⚠️ Falha ao conectar no Sheets ({e}). Vou salvar só no CSV.")
            ws = None

    # IMAP
    log("Conectando no IMAP...")
    mail = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT)
    log("Fazendo login...")
    mail.login(GMAIL_USER, GMAIL_PASS)

    log("Selecionando INBOX...")
    mail.select("INBOX")

    start = last_uid + 1
    imap_search = filters.get("imap_search", "ALL").strip() or "ALL"

    log(f"Buscando UIDs a partir de {start} com filtro IMAP: {imap_search}")
    # aplicando UNSEEN/ALL/SINCE etc no servidor
    status, data = mail.uid("search", None, f"(UID {start}:* {imap_search})")
    if status != "OK":
        mail.logout()
        raise RuntimeError(f"Falha ao buscar emails via IMAP: status={status}, data={data}")

    uids = data[0].split()
    total = len(uids)
    log(f"Encontrados {total} emails candidatos (UID >= {start} + {imap_search}).")

    if not uids:
        log("Nenhum email novo. Finalizando.")
        mail.logout()
        return

    if total > MAX_EMAILS_PER_RUN:
        log(f"⚠️ Limitando processamento a {MAX_EMAILS_PER_RUN} emails nesta execução.")
        uids = uids[:MAX_EMAILS_PER_RUN]

    rows = []
    max_uid = last_uid

    for i, uid_b in enumerate(uids, start=1):
        uid = int(uid_b.decode("utf-8"))
        log(f"[{i}/{len(uids)}] Processando UID {uid}...")

        # header (subject/from)
        _, msg_data = imap_uid_fetch_with_retry(mail, uid_b, "(BODY.PEEK[HEADER])")
        raw_header = msg_data[0][1]
        msg_header = email.message_from_bytes(raw_header)
        subject = decode_mime_words(msg_header.get("Subject", ""))
        from_header = decode_mime_words(msg_header.get("From", ""))

        # body (precisa pro filtro body_contains)
        _, msg_data = imap_uid_fetch_with_retry(mail, uid_b, "(RFC822)")
        raw = msg_data[0][1]
        msg_full = email.message_from_bytes(raw)
        body = extract_body(msg_full)

        if not passes_filters(subject, from_header, body, filters):
            log(f"Pulando UID {uid} (não bateu nos filtros)")
            continue

        rows.append([str(uid), subject, body])
        if uid > max_uid:
            max_uid = uid

    mail.logout()

    append_rows_csv(rows)

    if ws is not None and rows:
        ws.append_rows(rows, value_input_option="RAW")
        log(f"Sheets: adicionadas {len(rows)} linhas ✅")
    else:
        log("Sheets: nada pra adicionar nesta execução (ou não conectado).")

    state["last_uid"] = max_uid
    state["last_run"] = {
        "processed": len(rows),
        "max_uid": max_uid,
        "timestamp": datetime.now().isoformat(),
        "only_headers": ONLY_HEADERS,
        "sheets_connected": ws is not None,
        "imap_search": imap_search,
    }
    save_state(state)

    log(f"✅ Concluído. Gravados {len(rows)} emails filtrados. Novo checkpoint UID: {max_uid}")


if __name__ == "__main__":
    main()
