<a href="https://colab.research.google.com/github/base-datos-empresas/Email-verifier/blob/main/Kaiju_Email_Verifier_by_Central_%26_CompaniesData.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# =============================================================================
# Centraldecomunicacion.es & CompaniesData.cloud – Email Quality Validation Tool
# Google Colab – single-cell version (Markdown-style comments + Python code)
# =============================================================================
#
# ABOUT THIS CELL
# ----------------
# This Google Colab cell implements the email validation pipeline used internally by:
#   - Centraldecomunicacion.es → https://www.centraldecomunicacion.es/
#   - CompaniesData.cloud      → https://companiesdata.cloud/
#
# to audit and clean B2B email databases before delivering them to clients.
#
# Main features:
#   - High-concurrency DNS checks (A / AAAA / MX)
#   - Optional SPF, DMARC and DKIM inspection (ultra mode)
#   - Disposable domain detection
#   - Syntax validation via `pyisemail`
#   - Automatic detection of the email column in CSV / Excel files
#   - Generation of two Excel outputs:
#       * *_VALIDATED.xlsx → full dataset with technical fields + QualityScore
#       * *_CLEAN.xlsx     → campaign-ready email list with low estimated bounce rate
#
# This is the SAME method used when we contract and validate “valid email” datasets
# for Centraldecomunicacion.es and CompaniesData.cloud. That is why our B2B email lists
# are known for their maximum technical quality and very low bounce rates.
#
# HOW TO USE (STEP BY STEP)
# -------------------------
# 1) Run this cell in Google Colab.
# 2) Follow the console instructions:
#       - Upload an Excel (.xls/.xlsx) or CSV file.
#       - Choose validation mode:
#            1 = Normal   → Syntax + domain DNS (fast).
#            2 = Advanced → Normal + MX records (slower, more precise).
#            3 = Ultra    → Advanced + SPF/DMARC/DKIM + disposable check (slowest).
# 3) The tool will:
#       - Auto-detect the email column (no need to specify it).
#       - Validate all emails in parallel (DNS/MX/SPF/DMARC/DKIM, depending on mode).
#       - Compute a quality score and a label for each email.
# 4) At the end it will generate TWO Excel files and trigger their download:
#       - <name>_VALIDATED.xlsx → Full dataset + technical columns + QualityScore + QualityLabel.
#       - <name>_CLEAN.xlsx     → Only high-quality, non-disposable emails
#                                 (Status == "Valid" and QualityScore ≥ CLEAN_MIN_SCORE).
#
# CONSOLE SUMMARY
# ---------------
# When the process finishes, the console will show:
#   - An ASCII bar chart with the distribution of Status values.
#   - A detailed deliverability summary.
#   - A CLEAR BOUNCE RATE (%) indicator:
#         non-valid emails / emails with non-empty value * 100
#
# You can show this console output to clients as a transparency report:
# “This is the same technical validation we use at Centraldecomunicacion.es
#  and CompaniesData.cloud to guarantee maximum email quality in our databases.”
#
# NOTE (GitHub-friendly):
# -----------------------
# All documentation is kept as Python comments in this single cell.
# We do NOT use Jupyter widgets here, so the notebook should not contain
# any `metadata.widgets` block that can break GitHub’s renderer.
# =============================================================================

!pip install --quiet dnspython tqdm pyisemail openpyxl

import os
import smtplib
import unicodedata
from functools import lru_cache
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, Tuple

import pandas as pd
import dns.resolver
import pyisemail
from tqdm import tqdm

# -----------------------
# Global parameters
# -----------------------
TOOL_BRAND = "Centraldecomunicacion.es & CompaniesData.cloud Email Quality Tool"

MAX_WORKERS = 32          # Number of concurrent DNS checks
MAX_CONSOLE_MSGS = 0      # 0 = no limit → print every email; set to e.g. 1000 to truncate logs
SMTP_CHECK = False        # Optional SMTP check (usually better disabled in bulk)

DNS_TIMEOUT = 3
DNS_LIFETIME = 3
SMTP_TIMEOUT = 5

SHOW_ASCII_STATS = True
CLEAN_MIN_SCORE = 70      # Minimum score for an email to be kept in CLEAN file

resolver = dns.resolver.Resolver()
resolver.timeout = DNS_TIMEOUT
resolver.lifetime = DNS_LIFETIME


# -----------------------
# Intro banner
# -----------------------
def print_intro_banner() -> None:
    print("=" * 80)
    print(f"{TOOL_BRAND}")
    print("=" * 80)
    print("High-concurrency DNS-based validator for B2B email lists.")
    print("This is the SAME internal method used by Centraldecomunicacion.es")
    print("and CompaniesData.cloud to validate and contract 'valid email' datasets.")
    print("- Very low bounce → thanks to DNS / MX / SPF / DMARC / DKIM analysis.")
    print("- High transparency → all steps and results visible in the console.")
    print("- Two Excel outputs → FULL VALIDATED + CLEAN high-quality subset.")
    print("=" * 80)
    print("STEP 1 → Upload your Excel/CSV file when requested.")
    print("STEP 2 → Choose validation mode (Normal / Advanced / Ultra).")
    print("STEP 3 → Wait until the progress bar reaches 100%.")
    print("STEP 4 → Download the two generated Excel files.")
    print("=" * 80)
    print()


# -----------------------
# DNS utilities with cache
# -----------------------
@lru_cache(maxsize=20_000)
def dns_query(name: str, record_type: str):
    """DNS query with LRU cache and controlled timeouts."""
    try:
        return resolver.resolve(name, record_type)
    except dns.exception.DNSException:
        return None


# -----------------------
# Individual checks
# -----------------------
def check_email_syntax(email: str) -> bool:
    """Syntax validation only, without DNS checks."""
    return pyisemail.is_email(email, check_dns=False)


@lru_cache(maxsize=20_000)
def check_domain_has_any_record(domain: str) -> bool:
    """Return True if the domain has at least one A, AAAA or MX record."""
    for rtype in ("A", "AAAA", "MX"):
        if dns_query(domain, rtype):
            return True
    return False


@lru_cache(maxsize=20_000)
def check_mx_records(domain: str) -> bool:
    return dns_query(domain, "MX") is not None


@lru_cache(maxsize=2_000)
def check_spf_records(domain: str) -> bool:
    txt_records = dns_query(domain, "TXT")
    return any("v=spf1" in str(r).lower() for r in (txt_records or []))


@lru_cache(maxsize=2_000)
def check_dmarc_records(domain: str) -> bool:
    txt_records = dns_query(f"_dmarc.{domain}", "TXT")
    return any("v=dmarc1" in str(r).lower() for r in (txt_records or []))


@lru_cache(maxsize=2_000)
def check_dkim_records(domain: str) -> bool:
    selectors = ["default", "dkim", "selector1", "selector2", "mail"]
    for selector in selectors:
        txt_records = dns_query(f"{selector}._domainkey.{domain}", "TXT")
        if any("v=dkim1" in str(r).lower() for r in (txt_records or [])):
            return True
    return False


@lru_cache(maxsize=20_000)
def is_disposable_domain(domain: str) -> bool:
    """
    Very small sample list of disposable email domains.
    For real use, extend this list with your own dataset.
    """
    disposable_domains = {
        "mailinator.com",
        "trashmail.com",
        "tempmail.com",
        "10minutemail.com",
    }
    return domain in disposable_domains


@lru_cache(maxsize=20_000)
def check_smtp_server(domain: str) -> bool:
    """Optional SMTP check. Disabled by default because it is slow and unstable at scale."""
    if not SMTP_CHECK:
        return False
    try:
        mx_records = dns_query(domain, "MX")
        if not mx_records:
            return False
        mx_host = str(min(mx_records, key=lambda r: r.preference).exchange)
        with smtplib.SMTP(mx_host, timeout=SMTP_TIMEOUT) as server:
            server.noop()
        return True
    except Exception:
        return False


# -----------------------
# Single email verification
# -----------------------
def verify_single_email(email_raw: str, mode: str = "normal") -> Dict[str, str]:
    """
    Validate a single email and return a dictionary of checks.

    Keys may include:
      - Syntax: "Valid" / "Invalid"
      - Domain: "Valid" / "No DNS"
      - MX: "Valid" / "No MX"
      - SPF: "Valid" / "No SPF"
      - DMARC: "Valid" / "No DMARC"
      - DKIM: "Valid" / "No DKIM"
      - Disposable: "Yes" / "No"
      - SMTP: "Up" / "Down" / "Skipped"
    """
    email = (email_raw or "").strip()
    result: Dict[str, str] = {}

    # Syntax
    if not check_email_syntax(email):
        result["Syntax"] = "Invalid"
        return result
    result["Syntax"] = "Valid"

    # Domain DNS
    domain = email.split("@")[-1].lower()
    if not check_domain_has_any_record(domain):
        result["Domain"] = "No DNS"
        return result
    result["Domain"] = "Valid"

    # MX check (advanced, ultra)
    if mode in ("advanced", "ultra"):
        if not check_mx_records(domain):
            result["MX"] = "No MX"
            return result
        result["MX"] = "Valid"

    # Extra checks (ultra)
    if mode == "ultra":
        result["SPF"] = "Valid" if check_spf_records(domain) else "No SPF"
        result["DMARC"] = "Valid" if check_dmarc_records(domain) else "No DMARC"
        result["DKIM"] = "Valid" if check_dkim_records(domain) else "No DKIM"
        result["Disposable"] = "Yes" if is_disposable_domain(domain) else "No"
        result["SMTP"] = (
            "Up" if check_smtp_server(domain) else
            ("Down" if SMTP_CHECK else "Skipped")
        )

    return result


# -----------------------
# Scoring & labels
# -----------------------
def compute_quality_score(result: Dict[str, str]) -> int:
    """
    Compute a simple quality score (0–100) based on the technical checks.
    This is heuristic; adjust weights to your business logic if needed.
    """
    if result.get("Status", "").startswith("Error:"):
        return 0

    # Hard fails
    if result.get("Syntax") == "Invalid":
        return 0

    score = 0

    # Syntax ok
    if result.get("Syntax") == "Valid":
        score += 40

    # Domain
    if result.get("Domain") == "Valid":
        score += 30
    elif result.get("Domain"):
        # Some DNS problem
        return 10

    # MX
    if result.get("MX") == "Valid":
        score += 20

    # Extra technical signals
    if result.get("SPF") == "Valid":
        score += 5
    if result.get("DMARC") == "Valid":
        score += 3
    if result.get("DKIM") == "Valid":
        score += 2

    # Disposable domains penalised
    if result.get("Disposable") == "Yes":
        score = max(score - 40, 0)

    return max(0, min(100, score))


def derive_status(result: Dict[str, str]) -> str:
    """
    Human-readable status for each email.

    Examples:
      - "Valid"
      - "Invalid syntax"
      - "Domain error"
      - "No MX records"
      - "Disposable domain"
      - "Error: <exception>"
    """
    if result.get("Status", "").startswith("Error:"):
        return result["Status"]
    if result.get("Syntax") == "Invalid":
        return "Invalid syntax"
    if result.get("Domain") == "No DNS":
        return "Domain error"
    if result.get("MX") == "No MX":
        return "No MX records"
    if result.get("Disposable") == "Yes":
        return "Disposable domain"

    return "Valid"


def label_from_score(score: int, result: Dict[str, str]) -> str:
    """
    Convert score + result into a quality label.

    Labels:
      - "High"    → very strong technical signals.
      - "Medium"  → acceptable, but not perfect.
      - "Low"     → risky but not completely broken.
      - "Bad"     → unusable / broken.
      - "Invalid" / "Domain error" / "Error" for hard fails.
    """
    if result.get("Status", "").startswith("Error:"):
        return "Error"
    if result.get("Syntax") == "Invalid":
        return "Invalid"
    if result.get("Domain") == "No DNS":
        return "Domain error"

    if score >= 80:
        return "High"
    if score >= 60:
        return "Medium"
    if score > 0:
        return "Low"
    return "Bad"


# -----------------------
# ASCII statistics
# -----------------------
def print_ascii_stats(stats: Dict[str, int], title: str = "Validation stats") -> None:
    """Print an ASCII bar chart for a dictionary of counts."""
    if not stats:
        print("(no statistics)")
        return
    total = sum(stats.values())
    max_len = max(len(k) for k in stats)
    print(f"\n=== {title} ===")
    for label, count in stats.items():
        pct = count * 100 / total
        bars = "█" * int(pct / 2)
        print(f"{label.ljust(max_len)} | {count} | {bars} ({pct:.2f}%)")
    print(f"Total: {total}\n")


# -----------------------
# Email column auto-detection
# -----------------------
def normalize_column_name(name: str) -> str:
    """
    Normalize a column name:
      - remove accents
      - lowercase
      - remove spaces, hyphens, dots and underscores
    """
    if not isinstance(name, str):
        name = str(name)
    # Remove accents
    name = unicodedata.normalize("NFKD", name)
    name = "".join(ch for ch in name if not unicodedata.combining(ch))
    # Lowercase and strip
    name = name.lower().strip()
    # Remove common separators
    for ch in (" ", "-", ".", "_"):
        name = name.replace(ch, "")
    return name


def auto_detect_email_column(df: pd.DataFrame) -> str:
    """
    Automatically detect the email column by scanning all headers and looking
    for common patterns like:
      - email, e-mail, e_mail
      - correo, correo electronico
      - mail, mailing, etc.
    """
    if df.empty or len(df.columns) == 0:
        raise ValueError("The DataFrame has no columns to inspect.")

    columns = list(df.columns)
    normalized_map = {col: normalize_column_name(col) for col in columns}

    # Exact candidates (normalized)
    exact_candidates = [
        "email",
        "emails",
        "emailaddress",
        "correo",
        "correos",
        "correoelectronico",
        "correoselectronicos",
        "contactemail",
        "contactoemail",
        "contactocorreo",
        "contactocorreoelectronico",
        "mail",
        "mails",
    ]

    # 1) Exact matches on normalized names
    for target in exact_candidates:
        for col, norm in normalized_map.items():
            if norm == target:
                print(f"[{TOOL_BRAND}] Auto-detected email column by exact match: '{col}'")
                return col

    # 2) Fuzzy matches: column names containing key tokens
    tokens = ["email", "correo", "mail"]
    for col, norm in normalized_map.items():
        if any(tok in norm for tok in tokens):
            print(f"[{TOOL_BRAND}] Auto-detected email column by fuzzy match: '{col}'")
            return col

    # If we reach this point, no column looks like an email column.
    raise ValueError(
        "Could not auto-detect an email column. "
        "Please rename one of your columns to something like 'email', 'correo', 'e-mail', etc."
    )


# -----------------------
# Bulk processing
# -----------------------
def verify_and_tag_emails(
    df: pd.DataFrame,
    email_column: str,
    mode: str = "normal",
) -> Tuple[pd.DataFrame, Dict[str, int]]:
    """
    Validate all emails in a DataFrame and add:
      - Status
      - QualityScore
      - QualityLabel
      - Technical columns (Syntax, Domain, MX, SPF, DMARC, DKIM, Disposable, SMTP)

    Returns:
      (validated_df, status_statistics)
    """
    if email_column not in df.columns:
        raise ValueError(f"Column '{email_column}' not found in the DataFrame.")

    df_res = df.copy()
    df_res["Status"] = "Pending"
    df_res["QualityScore"] = 0
    df_res["QualityLabel"] = "Unknown"

    email_series = df_res[email_column].fillna("").astype(str)
    email_index = {
        idx: email.strip()
        for idx, email in email_series.items()
        if email.strip()
    }

    status_stats: Dict[str, int] = {}
    total = len(email_index)
    processed = 0

    if total == 0:
        print("No non-empty emails found in the selected column.")
        empty_mask = email_series == ""
        df_res.loc[empty_mask, "Status"] = "No email"
        df_res.loc[empty_mask, "QualityScore"] = 0
        df_res.loc[empty_mask, "QualityLabel"] = "No email"
        return df_res, status_stats

    print(f"\n[{TOOL_BRAND}] Starting validation of {total} non-empty email(s)...")
    print(f"Mode: {mode.upper()}  |  Threads: {MAX_WORKERS}")
    print("Step A → DNS / MX / SPF / DMARC / DKIM checks (depending on mode)")
    print("Step B → Compute QualityScore and QualityLabel for each email")
    print("Step C → Build VALIDATED and CLEAN Excel files\n")

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = {
            executor.submit(verify_single_email, email, mode): idx
            for idx, email in email_index.items()
        }

        with tqdm(total=total, desc="Validating", unit="email") as pbar:
            for future in as_completed(futures):
                idx = futures[future]
                email = email_index[idx]
                try:
                    result = future.result()
                except Exception as exc:
                    result = {"Status": f"Error: {exc}"}

                # Add technical columns
                for key, value in result.items():
                    df_res.at[idx, key] = value

                status = derive_status(result)
                score = compute_quality_score(result)
                label = label_from_score(score, result)

                df_res.at[idx, "Status"] = status
                df_res.at[idx, "QualityScore"] = score
                df_res.at[idx, "QualityLabel"] = label

                status_stats[status] = status_stats.get(status, 0) + 1

                processed += 1
                if MAX_CONSOLE_MSGS == 0 or processed <= MAX_CONSOLE_MSGS:
                    print(f"[{TOOL_BRAND}] [{processed}/{total}] {email} → {status} (score={score})")

                pbar.update()

    # Mark rows with empty emails
    empty_mask = email_series == ""
    df_res.loc[empty_mask, "Status"] = "No email"
    df_res.loc[empty_mask, "QualityScore"] = 0
    df_res.loc[empty_mask, "QualityLabel"] = "No email"

    print(f"\n[{TOOL_BRAND}] Validation finished for {total} email(s).")
    return df_res, status_stats


# -----------------------
# File I/O helpers (Colab)
# -----------------------
def detect_file_type(file_name: str) -> str:
    _, ext = os.path.splitext(file_name.lower())
    if ext == ".csv":
        return "csv"
    if ext in (".xls", ".xlsx"):
        return "excel"
    raise ValueError(f"Unsupported file extension: {ext}")


def load_dataframe(file_name: str) -> pd.DataFrame:
    file_type = detect_file_type(file_name)
    if file_type == "csv":
        print(f"[{TOOL_BRAND}] Detected CSV file. Using pandas.read_csv().")
        return pd.read_csv(file_name)
    print(f"[{TOOL_BRAND}] Detected Excel file. Using pandas.read_excel().")
    return pd.read_excel(file_name, sheet_name=0)


def save_dataframe_to_excel(df: pd.DataFrame, file_name: str) -> None:
    """Save a DataFrame to Excel using openpyxl engine (no index)."""
    df.to_excel(file_name, index=False, engine="openpyxl")


def print_deliverability_summary(
    df_validated: pd.DataFrame,
    df_clean: pd.DataFrame,
    email_column: str,
    validated_output: str,
    clean_output: str,
) -> None:
    """
    Print a detailed summary of list quality, including bounce rate.

    Bounce rate is calculated as:
        non-valid emails / emails with non-empty value * 100
    """
    total_rows = len(df_validated)

    email_series = df_validated[email_column].fillna("").astype(str)
    has_email_mask = email_series.str.strip() != ""
    total_with_email = has_email_mask.sum()

    valid_mask = has_email_mask & (df_validated["Status"] == "Valid")
    valid_count = valid_mask.sum()

    bounced_count = total_with_email - valid_count
    bounce_rate = (bounced_count * 100 / total_with_email) if total_with_email > 0 else 0.0

    clean_count = len(df_clean)

    valid_pct = (valid_count * 100 / total_with_email) if total_with_email > 0 else 0.0
    clean_pct = (clean_count * 100 / total_with_email) if total_with_email > 0 else 0.0

    print("\n=== Deliverability summary (email quality) ===")
    print(f"Tool: {TOOL_BRAND}")
    print("This is the SAME technical method used by Centraldecomunicacion.es")
    print("and CompaniesData.cloud to validate and contract email datasets.")
    print("It is focused on technical deliverability (DNS/MX/SPF/DMARC/DKIM/disposable).")
    print()
    print(f"Total rows in original file:              {total_rows}")
    print(f"Rows with non-empty emails:              {total_with_email}")
    print(f"Valid emails (Status == 'Valid'):        {valid_count} ({valid_pct:.2f}%)")
    print(f"Potentially bouncing/problematic emails: {bounced_count} ({bounce_rate:.2f}%)")
    print(f"Clean emails kept in CLEAN file:         {clean_count} ({clean_pct:.2f}% of emails with value)")
    print("\nBOUNCE RATE (non-valid / emails with value): "
          f"{bounce_rate:.2f}%  <-- THIS IS YOUR ESTIMATED BOUNCE PERCENTAGE\n")

    print("OUTPUT FILES (Excel):")
    print(f"  - Validated dataset (full technical view): {validated_output}")
    print(f"  - Clean dataset (ready for campaigns):     {clean_output}")
    print("\nGenerated by:")
    print("  - Centraldecomunicacion.es (Central de Comunicación, Spain)")
    print("  - CompaniesData.cloud (European companies database platform)")
    print("\nInterpretation for clients and internal teams:")
    print("  - A lower bounce rate means a safer, more reliable list.")
    print("  - The CLEAN file only keeps high-quality, non-disposable emails "
          f"(QualityScore ≥ {CLEAN_MIN_SCORE} and Status == 'Valid').")
    print("  - This is the same internal auditing logic used before shipping any")
    print("    email database from Centraldecomunicacion.es / CompaniesData.cloud.")
    print("  - You can adjust CLEAN_MIN_SCORE or compute_quality_score() if you want")
    print("    to tighten or relax what you consider 'campaign-ready'.")


def process_file_in_colab(
    file_name: str,
    mode: str = "normal",
) -> None:
    from google.colab import files

    base, _ = os.path.splitext(file_name)
    validated_output = f"{base}_VALIDATED.xlsx"
    clean_output = f"{base}_CLEAN.xlsx"

    print(f"\n[{TOOL_BRAND}] Loading file: {file_name}")
    df = load_dataframe(file_name)
    print(f"[{TOOL_BRAND}] File loaded successfully.")
    print(f"Rows: {len(df)}, Columns: {len(df.columns)}")
    print(f"Columns: {list(df.columns)}\n")

    # Auto-detect email column
    email_column = auto_detect_email_column(df)
    print(f"[{TOOL_BRAND}] Using email column: '{email_column}'")

    # Validation
    df_validated, status_stats = verify_and_tag_emails(
        df, email_column=email_column, mode=mode
    )

    # Save full validated dataset to Excel
    save_dataframe_to_excel(df_validated, validated_output)
    print(f"\n[{TOOL_BRAND}] Saved full validated dataset to: {validated_output}")

    # Build CLEAN dataset (high-quality, non-disposable emails)
    if "Disposable" in df_validated.columns:
        clean_mask = (
            (df_validated["QualityScore"] >= CLEAN_MIN_SCORE)
            & (df_validated["Status"] == "Valid")
            & (df_validated["Disposable"] != "Yes")
        )
    else:
        clean_mask = (
            (df_validated["QualityScore"] >= CLEAN_MIN_SCORE)
            & (df_validated["Status"] == "Valid")
        )

    df_clean = df_validated[clean_mask].copy()
    save_dataframe_to_excel(df_clean, clean_output)
    print(f"[{TOOL_BRAND}] Saved CLEAN dataset to: {clean_output}")
    print(f"Original rows: {len(df_validated)} | Clean rows: {len(df_clean)}\n")

    # Print statistics and detailed summary (including bounce rate)
    if SHOW_ASCII_STATS:
        print_ascii_stats(status_stats, title="Status distribution")
    print_deliverability_summary(
        df_validated=df_validated,
        df_clean=df_clean,
        email_column=email_column,
        validated_output=validated_output,
        clean_output=clean_output,
    )

    # Trigger downloads in Colab
    print("\nStarting downloads of Excel files...")
    files.download(validated_output)
    files.download(clean_output)
    print("Downloads triggered. You can now save the reports locally.")
    print(f"\n[{TOOL_BRAND}] Process completed successfully.\n")


# -----------------------
# Interactive entrypoint
# -----------------------
def run_interactive() -> None:
    from google.colab import files

    print_intro_banner()
    print(">>> STEP 1: File upload")
    print("Please select an Excel (.xls/.xlsx) or CSV file from your system.")
    print("Once the upload finishes, the tool will auto-detect the email column.")
    uploaded = files.upload()
    if not uploaded:
        print("No file uploaded. Process aborted.")
        return

    print("\n>>> STEP 2: Choose validation mode")
    print("1 = Normal   → Syntax + domain DNS (fast)")
    print("2 = Advanced → Normal + MX records")
    print("3 = Ultra    → Advanced + SPF/DMARC/DKIM + disposable check")
    mode_input = input("Choose mode [1/2/3, default 1]: ").strip() or "1"
    mode = {"1": "normal", "2": "advanced", "3": "ultra"}.get(mode_input, "normal")
    print(f"\n[{TOOL_BRAND}] Selected mode: {mode.upper()}")

    print("\n>>> STEP 3: Validation will start for each uploaded file.")
    print("You will see:")
    print("  - A progress bar with the number of emails validated.")
    print("  - Per-email log lines with Status and QualityScore (for early inspection).")
    print("  - A final bounce rate and summary report.\n")

    for file_name in uploaded.keys():
        process_file_in_colab(file_name, mode=mode)

    print(">>> END: You now have two Excel files per input:")
    print("  - *_VALIDATED.xlsx = full technical view")
    print("  - *_CLEAN.xlsx     = high-quality subset, ready for campaigns")
    print("These outputs reflect the same validation pipeline used internally by")
    print("Centraldecomunicacion.es and CompaniesData.cloud for client datasets.")


# Auto-run in Colab
if __name__ == "__main__":
    run_interactive()


Centraldecomunicacion.es & CompaniesData.cloud Email Quality Tool
High-concurrency DNS-based validator for B2B email lists.
This is the SAME internal method used by Centraldecomunicacion.es
and CompaniesData.cloud to validate and contract 'valid email' datasets.
- Very low bounce → thanks to DNS / MX / SPF / DMARC / DKIM analysis.
- High transparency → all steps and results visible in the console.
- Two Excel outputs → FULL VALIDATED + CLEAN high-quality subset.
STEP 1 → Upload your Excel/CSV file when requested.
STEP 2 → Choose validation mode (Normal / Advanced / Ultra).
STEP 3 → Wait until the progress bar reaches 100%.
STEP 4 → Download the two generated Excel files.

>>> STEP 1: File upload
Please select an Excel (.xls/.xlsx) or CSV file from your system.
Once the upload finishes, the tool will auto-detect the email column.


Saving Belgium - Special Quality package_CALIDADVERIFICADA.xlsx to Belgium - Special Quality package_CALIDADVERIFICADA.xlsx

>>> STEP 2: Choose validation mode
1 = Normal   → Syntax + domain DNS (fast)
2 = Advanced → Normal + MX records
3 = Ultra    → Advanced + SPF/DMARC/DKIM + disposable check
Choose mode [1/2/3, default 1]: 3

[Centraldecomunicacion.es & CompaniesData.cloud Email Quality Tool] Selected mode: ULTRA

>>> STEP 3: Validation will start for each uploaded file.
You will see:
  - A progress bar with the number of emails validated.
  - Per-email log lines with Status and QualityScore (for early inspection).
  - A final bounce rate and summary report.


[Centraldecomunicacion.es & CompaniesData.cloud Email Quality Tool] Loading file: Belgium - Special Quality package_CALIDADVERIFICADA.xlsx
[Centraldecomunicacion.es & CompaniesData.cloud Email Quality Tool] Detected Excel file. Using pandas.read_excel().
[Centraldecomunicacion.es & CompaniesData.cloud Email Quality Tool] File

Validating:   0%|          | 0/216151 [00:00<?, ?email/s]

[1;30;43mSe han truncado las últimas 5000 líneas del flujo de salida.[0m
[Centraldecomunicacion.es & CompaniesData.cloud Email Quality Tool] [211200/216151] ga30260@der-winnie.de → Valid (score=95)
[Centraldecomunicacion.es & CompaniesData.cloud Email Quality Tool] [211201/216151] svf.mechelen@acerta.be → Valid (score=95)
[Centraldecomunicacion.es & CompaniesData.cloud Email Quality Tool] [211202/216151] soc.mechelen@acerta.be → Valid (score=95)
[Centraldecomunicacion.es & CompaniesData.cloud Email Quality Tool] [211203/216151] erikbuys@telenet.be → Valid (score=98)
[Centraldecomunicacion.es & CompaniesData.cloud Email Quality Tool] [211204/216151] carrein@sip.be → Valid (score=95)
[Centraldecomunicacion.es & CompaniesData.cloud Email Quality Tool] [211205/216151] cyl@skynet.be → Valid (score=95)
[Centraldecomunicacion.es & CompaniesData.cloud Email Quality Tool] [211206/216151] directie@sjt-kleuter.be → Valid (score=95)
[Centraldecomunicacion.es & CompaniesData.cloud Email Quality T

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Downloads triggered. You can now save the reports locally.

[Centraldecomunicacion.es & CompaniesData.cloud Email Quality Tool] Process completed successfully.

>>> END: You now have two Excel files per input:
  - *_VALIDATED.xlsx = full technical view
  - *_CLEAN.xlsx     = high-quality subset, ready for campaigns
These outputs reflect the same validation pipeline used internally by
Centraldecomunicacion.es and CompaniesData.cloud for client datasets.
