In [1]:
import os, mailbox, email, hashlib, re
from email.utils import getaddresses, parsedate_to_datetime
from datetime import timezone
from bs4 import BeautifulSoup
from tqdm import tqdm
import clickhouse_connect

In [2]:
CH_HOST = '84.201.160.255'   # если подключаешься через SSH-туннель (рекомендую)
CH_PORT = 8123         # локальный порт туннеля для HTTP (8123 на сервере → 18123 локально)
CH_USER = 'peter'   # пользователь ClickHouse
CH_PASS = '1234'      # пароль пользователя (если пустой — оставь '')

In [4]:
# ingest_pst_to_clickhouse.py
# -*- coding: utf-8 -*-
import os
from pathlib import Path
from datetime import datetime, timezone
from email import policy
from email.parser import HeaderParser
from email.utils import getaddresses, parsedate_to_datetime

from tqdm import tqdm
import clickhouse_connect

# ---- Настройки путей ----
PST_DIR = r"E:\outlook"                    # ВАШ каталог с .pst
ATTACH_DIR = r"E:\outlook\attachments"     # куда складывать вложения (опционально)
SAVE_ATTACHMENTS = True                    # True = сохранять файлы вложений на диск

# ---- Подключение к ClickHouse ----
CH_HOST = "localhost"      # если используете SSH-туннель, так и оставьте
CH_PORT = 8123
CH_USER = "default"        # подставьте своего
CH_PASS = ""               # подставьте свой пароль, если есть
CH_DB   = "mailkb"         # как создавали ранее

client = clickhouse_connect.get_client(
    host=CH_HOST, port=CH_PORT, username=CH_USER, password=CH_PASS, database=CH_DB
)

# ---- Подготовка парсера заголовков ----
header_parser = HeaderParser(policy=policy.default)

# ---- Вспомогалки ----
def _parse_addrs(value):
    """Парсит список адресов в массив 'email' без имён."""
    if not value:
        return []
    return [addr for _, addr in getaddresses([value]) if addr]

def _parse_date_to_utc(value):
    """Date: → datetime (UTC). Если не получилось — вернём epoch."""
    if not value:
        return datetime(1970, 1, 1, tzinfo=timezone.utc)
    try:
        dt = parsedate_to_datetime(value)
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        return dt.astimezone(timezone.utc)
    except Exception:
        return datetime(1970, 1, 1, tzinfo=timezone.utc)

def _ensure_dir(p: Path):
    p.mkdir(parents=True, exist_ok=True)

# ---- Основной проход по PST ----
def iter_pst_files(base: str):
    basep = Path(base)
    for pst in basep.rglob("*.pst"):
        if pst.is_file():
            yield pst

def ingest():
    # лениво импортируем libratom только тут (после установки)
    from libratom.lib.core import open_mail_archive  # :contentReference[oaicite:4]{index=4}

    emails_rows = []       # батч для mailkb.emails
    attach_rows = []       # батч для mailkb.attachments
    BATCH = 500

    if SAVE_ATTACHMENTS:
        _ensure_dir(Path(ATTACH_DIR))

    for pst_path in iter_pst_files(PST_DIR):
        print(f"\nPST: {pst_path}")
        # открываем PST/OST напрямую, без конвертации
        with open_mail_archive(pst_path) as archive:   # :contentReference[oaicite:5]{index=5}
            # Лучше идти по сообщениям — libratom разрулит рекурсивно
            for message in tqdm(archive.messages(), desc=pst_path.name):  # :contentReference[oaicite:6]{index=6}
                # транспортные заголовки в «сыром» виде
                th = message.transport_headers or ""
                headers = header_parser.parsestr(th)

                # поля
                message_id = headers.get("Message-ID", "") or headers.get("Message-Id", "")
                subject    = headers.get("Subject", "") or ""
                from_addr  = _parse_addrs(headers.get("From"))
                to_addr    = _parse_addrs(headers.get("To"))
                cc_addr    = _parse_addrs(headers.get("Cc"))
                bcc_addr   = _parse_addrs(headers.get("Bcc"))
                sent_raw   = headers.get("Date", "") or ""
                sent_utc   = _parse_date_to_utc(sent_raw)

                # Тексты письма (libratom/libpff поля тела) :contentReference[oaicite:7]{index=7}
                body_text = getattr(message, "plain_text_body", None) or ""
                body_html = getattr(message, "html_body", None) or ""

                # стабильный id: Message-ID, иначе — pst::internal-id
                stable_id = message_id.strip() or f"{pst_path.name}::{message.identifier}"

                # Папку не всегда просто дать из объекта message; если нет — 'unknown'
                folder_name = getattr(message, "folder_name", None) or "unknown"

                emails_rows.append((
                    stable_id,
                    message_id or "",
                    subject,
                    from_addr,
                    to_addr,
                    cc_addr,
                    bcc_addr,
                    sent_utc,
                    sent_raw,
                    folder_name,
                    body_text,
                    body_html,
                ))

                # Вложения (если нужны файлы на диск — сохраним)
                for att in getattr(message, "attachments", []) :
                    fname = (att.name or f"att_{att.identifier}").replace("\\", "_").replace("/", "_")
                    fpath = ""
                    size  = getattr(att, "size", 0) or 0
                    if SAVE_ATTACHMENTS:
                        try:
                            data = att.read_buffer(size) if size else att.read()
                            dest_dir = Path(ATTACH_DIR) / f"{pst_path.stem}" / f"{message.identifier}"
                            _ensure_dir(dest_dir)
                            dest_fp = dest_dir / fname
                            dest_fp.write_bytes(data or b"")
                            fpath = str(dest_fp)
                        except Exception:
                            # если что-то пошло не так – просто пропустим сохранение файла
                            pass

                    attach_rows.append((stable_id, fname, fpath, int(size)))

                # Батч-инсерт, чтобы не держать много в памяти
                if len(emails_rows) >= BATCH:
                    client.insert(
                        "mailkb.emails",
                        emails_rows,
                        column_names=[
                            "id","message_id","subject",
                            "from_addr","to_addr","cc_addr","bcc_addr",
                            "sent_at_utc","sent_at_raw","folder",
                            "body_text","body_html"
                        ]
                    )
                    emails_rows.clear()

                if len(attach_rows) >= BATCH:
                    client.insert(
                        "mailkb.attachments",
                        attach_rows,
                        column_names=["email_id","filename","path","size_bytes"]
                    )
                    attach_rows.clear()

    # добросить «хвосты»
    if emails_rows:
        client.insert(
            "mailkb.emails",
            emails_rows,
            column_names=[
                "id","message_id","subject",
                "from_addr","to_addr","cc_addr","bcc_addr",
                "sent_at_utc","sent_at_raw","folder",
                "body_text","body_html"
            ]
        )
    if attach_rows:
        client.insert(
            "mailkb.attachments",
            attach_rows,
            column_names=["email_id","filename","path","size_bytes"]
        )

if __name__ == "__main__":
    ingest()
    print("Done.")


Unexpected Http Driver Exception


OperationalError: Error HTTPConnectionPool(host='localhost', port=8123): Max retries exceeded with url: /? (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x000001A53FC63910>: Failed to establish a new connection: [WinError 10061] Подключение не установлено, т.к. конечный компьютер отверг запрос на подключение')) executing HTTP request attempt 1 (http://localhost:8123)

In [5]:
# CH_HOST = '127.0.0.1'
# CH_PORT = 18123               # локальный порт туннеля на HTTP
# CH_USER = os.getenv('CH_USER', 'default')
# CH_PASS = os.getenv('CH_PASSWORD', '')

DB     = 'mailkb'
TABLEE = 'emails'
TABLEA = 'attachments'

MBOX_ROOT   = r"D:\outlook_mb\out_mbox"         # <— сюда положит readpst .mbox файлы
ATTACH_ROOT = r"D:\outlook_mb\attachments"      # <— куда сохранять вложения
os.makedirs(ATTACH_ROOT, exist_ok=True)

def addr_array(value: str):
    addrs = getaddresses([value or ""])
    return [a for _, a in addrs if a]

def to_utc_iso(date_raw: str):
    if not date_raw:
        return None, None
    try:
        dt = parsedate_to_datetime(date_raw)
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        return dt.astimezone(timezone.utc), date_raw
    except Exception:
        return None, date_raw

def choose_body(msg):
    text_parts, html_parts = [], []
    if msg.is_multipart():
        for part in msg.walk():
            if part.get_content_disposition() == "attachment":
                continue
            try:
                payload = part.get_payload(decode=True)
                if payload is None:
                    continue
                charset = part.get_content_charset() or 'utf-8'
                content = payload.decode(charset, errors='ignore')
            except Exception:
                continue
            if part.get_content_type() == "text/plain":
                text_parts.append(content)
            elif part.get_content_type() == "text/html":
                html_parts.append(content)
    else:
        try:
            payload = msg.get_payload(decode=True)
            if payload is not None:
                charset = msg.get_content_charset() or 'utf-8'
                content = payload.decode(charset, errors='ignore')
                if msg.get_content_type() == "text/html":
                    html_parts.append(content)
                else:
                    text_parts.append(content)
        except Exception:
            pass

    body_html = "\n\n-----\n\n".join(html_parts) if html_parts else None
    if text_parts:
        body_text = "\n\n-----\n\n".join(text_parts)
    else:
        if body_html:
            body_text = BeautifulSoup(body_html, "lxml").get_text("\n")
        else:
            body_text = None
    return body_text, body_html

def make_id(msg, sent_dt, subject):
    mid = (msg.get('Message-Id') or '').strip()
    if mid:
        return mid
    raw = f"{(sent_dt.isoformat() if sent_dt else '')}|{subject}|{msg.get('From','')}|{msg.get('To','')}"
    return hashlib.sha1(raw.encode('utf-8','ignore')).hexdigest()

def save_attachments(msg, email_id):
    saved = []
    for part in msg.walk():
        if part.get_content_disposition() == "attachment":
            filename = part.get_filename() or "attachment.bin"
            try:
                payload = part.get_payload(decode=True)
                if payload is None:
                    continue
                folder = os.path.join(ATTACH_ROOT, email_id)
                os.makedirs(folder, exist_ok=True)
                safe_name = re.sub(r"[\\/:*?\"<>|]", "_", filename)
                path = os.path.join(folder, safe_name)
                with open(path, "wb") as f:
                    f.write(payload)
                sz = os.path.getsize(path)
                saved.append((email_id, filename, path, sz))
            except Exception:
                continue
    return saved

def iter_mbox_files(root):
    for r, _, files in os.walk(root):
        for f in files:
            if f.endswith('.mbox'):
                yield os.path.join(r, f), os.path.relpath(r, root)

def main():
    client = clickhouse_connect.get_client(
        host=CH_HOST, port=CH_PORT, username=CH_USER, password=CH_PASS, database=DB
    )

    batch_rows = []
    batch_atts = []
    BATCH = 500

    for mbox_path, folder in tqdm(list(iter_mbox_files(MBOX_ROOT)), desc="MBOX files"):
        mbox = mailbox.mbox(mbox_path)
        for msg in mbox:
            subject = (msg.get('Subject') or '').strip()
            from_   = addr_array(msg.get('From'))
            to_     = addr_array(msg.get('To'))
            cc_     = addr_array(msg.get('Cc'))
            bcc_    = addr_array(msg.get('Bcc'))

            sent_dt, sent_raw = to_utc_iso(msg.get('Date'))
            body_text, body_html = choose_body(msg)
            message_id = (msg.get('Message-Id') or '').strip()
            email_id = make_id(msg, sent_dt, subject)

            # копим строки
            batch_rows.append([
                email_id,
                message_id,
                subject,
                from_,
                to_,
                cc_,
                bcc_,
                sent_dt,               # driver сам приведёт в нужный формат
                sent_raw,
                folder,
                body_text or '',
                body_html or ''
            ])

            # вложения
            for rec in save_attachments(msg, email_id):
                batch_atts.append(list(rec))

            if len(batch_rows) >= BATCH:
                client.insert(f'{DB}.{TABLEE}', batch_rows, column_names=[
                    'id','message_id','subject','from_addr','to_addr','cc_addr','bcc_addr',
                    'sent_at_utc','sent_at_raw','folder','body_text','body_html'
                ])
                batch_rows.clear()

            if len(batch_atts) >= BATCH:
                client.insert(f'{DB}.{TABLEA}', batch_atts, column_names=[
                    'email_id','filename','path','size_bytes'
                ])
                batch_atts.clear()

    if batch_rows:
        client.insert(f'{DB}.{TABLEE}', batch_rows, column_names=[
            'id','message_id','subject','from_addr','to_addr','cc_addr','bcc_addr',
            'sent_at_utc','sent_at_raw','folder','body_text','body_html'
        ])
    if batch_atts:
        client.insert(f'{DB}.{TABLEA}', batch_atts, column_names=[
            'email_id','filename','path','size_bytes'
        ])

if __name__ == "__main__":
    main()

Unexpected Http Driver Exception


OperationalError: Error HTTPConnectionPool(host='localhost', port=8123): Max retries exceeded with url: /? (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x000001A53FBFCEB0>: Failed to establish a new connection: [WinError 10061] Подключение не установлено, т.к. конечный компьютер отверг запрос на подключение')) executing HTTP request attempt 1 (http://localhost:8123)

In [7]:
import sqlite3; conn=sqlite3.connect(r"E:\outlook\mail_local.db")
print("rows:", conn.execute("SELECT count(*) FROM emails").fetchone()[0])
print(conn.execute("SELECT subject, sent_at_utc FROM emails ORDER BY sent_at_utc DESC LIMIT 5").fetchall())


rows: 28935
[('Постановка оценки за 6 месяцев работы стажера', '2021-02-28T20:22:34+00:00'), ('SAPSP-18265 Открыть доступ к поставкам Лесосибирского ЛДК', '2021-02-28T20:02:00+00:00'), ('[JIRA] (SAPSP-18265) Открыть доступ к поставкам Лесосибирского ЛДК', '2021-02-28T20:01:00+00:00'), ('RE: еще раз про логистику', '2021-02-28T16:51:43+00:00'), ('[JIRA] (SAPSP-17588) Замечание ОПЭ СЦБК №62', '2021-02-26T14:57:00+00:00')]
