# Telegram Monitor

Kaggle notebook for scanning Telegram sources and exporting `telegram_results.json`.


In [None]:
!pip install -q telethon google-generativeai requests pillow imagehash python-dotenv cryptography supabase


In [None]:
import asyncio
import hashlib
import io
import json
import logging
import os
import random
import re
import statistics
import time
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
from pathlib import Path

import requests
from PIL import Image
import imagehash
import google.generativeai as genai
from telethon import TelegramClient
from telethon.sessions import StringSession
from telethon.tl.types import Channel, Chat

logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger('telegram_monitor')
import telethon
logger.info('tg_monitor.telethon version=%s', getattr(telethon, '__version__', 'unknown'))


In [None]:
KAGGLE_INPUT = Path('/kaggle/input')

def _find_file(filename: str) -> Path | None:
    if not KAGGLE_INPUT.exists():
        return None
    for path in KAGGLE_INPUT.rglob(filename):
        if path.is_file():
            return path
    return None

def _load_json(path: Path) -> dict:
    return json.loads(path.read_text(encoding='utf-8'))

def load_config() -> dict:
    path = _find_file('config.json')
    if not path:
        raise RuntimeError('config.json not found in /kaggle/input')
    return _load_json(path)

def load_secrets() -> dict:
    enc_path = _find_file('secrets.enc')
    key_path = _find_file('fernet.key')
    if not enc_path or not key_path:
        raise RuntimeError('secrets.enc/fernet.key not found in /kaggle/input')
    from cryptography.fernet import Fernet
    fernet = Fernet(key_path.read_bytes().strip())
    decrypted = fernet.decrypt(enc_path.read_bytes())
    return json.loads(decrypted.decode('utf-8'))

config = load_config()
secrets = load_secrets()
for k, v in (secrets or {}).items():
    if k and v and not os.getenv(k):
        os.environ[k] = str(v)

TG_SESSION = os.getenv('TG_SESSION', '')
TG_API_ID = os.getenv('TG_API_ID', '')
TG_API_HASH = os.getenv('TG_API_HASH', '')
GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY', '')

if not TG_SESSION or not TG_API_ID or not TG_API_HASH:
    raise RuntimeError('Missing TG credentials after secrets load')
if not GOOGLE_API_KEY:
    raise RuntimeError('Missing GOOGLE_API_KEY after secrets load')

logger.info(
    'tg_monitor.secrets tg_session_len=%s tg_api_id_set=%s tg_api_hash_set=%s google_api_key_set=%s',
    len(TG_SESSION) if TG_SESSION else 0,
    bool(TG_API_ID),
    bool(TG_API_HASH),
    bool(GOOGLE_API_KEY),
)

# Gemma models
TEXT_MODEL = os.getenv('TG_MONITORING_TEXT_MODEL', 'gemma-3-27b-it')
VISION_MODEL = os.getenv('TG_MONITORING_VISION_MODEL', TEXT_MODEL)
FALLBACK_TEXT_MODEL = os.getenv('TG_MONITORING_TEXT_MODEL_FALLBACK', TEXT_MODEL)
FALLBACK_VISION_MODEL = os.getenv('TG_MONITORING_VISION_MODEL_FALLBACK', VISION_MODEL)

# Scan limits
MAX_MESSAGES_PER_SOURCE = int(os.getenv('TG_MONITORING_LIMIT', '50'))
MAX_DAYS_BACK = int(os.getenv('TG_MONITORING_DAYS_BACK', '3'))
MAX_IMAGES_PER_MESSAGE = int(os.getenv('TG_MONITORING_MAX_IMAGES', '4'))
ENABLE_OCR = os.getenv('TG_MONITORING_ENABLE_OCR', '1') == '1'

# Human-like delays
HUMAN_DELAY_MIN = float(os.getenv('TG_MONITORING_DELAY_MIN', '0.8'))
HUMAN_DELAY_MAX = float(os.getenv('TG_MONITORING_DELAY_MAX', '2.2'))
HUMAN_LONG_PAUSE_EVERY = int(os.getenv('TG_MONITORING_LONG_PAUSE_EVERY', '7'))
HUMAN_LONG_PAUSE_MIN = float(os.getenv('TG_MONITORING_LONG_PAUSE_MIN', '4'))
HUMAN_LONG_PAUSE_MAX = float(os.getenv('TG_MONITORING_LONG_PAUSE_MAX', '9'))
SOURCE_PAUSE_MIN = float(os.getenv('TG_MONITORING_SOURCE_PAUSE_MIN', '2'))
SOURCE_PAUSE_MAX = float(os.getenv('TG_MONITORING_SOURCE_PAUSE_MAX', '6'))

# Gemma rate limits (single limiter for all requests)
RATE_RPM = int(os.getenv('TG_GEMMA_RPM', '30'))
RATE_TPM = int(os.getenv('TG_GEMMA_TPM', '15000'))
RATE_RPD = int(os.getenv('TG_GEMMA_RPD', '14400'))
RATE_MINUTE_MARGIN = float(os.getenv('TG_GEMMA_MINUTE_MARGIN', '0.45'))
RATE_DAILY_MARGIN = float(os.getenv('TG_GEMMA_DAILY_MARGIN', '0.85'))

logger.info(
    'tg_monitor.config sources=%d run_id=%s',
    len(config.get('sources') or []),
    config.get('run_id') or 'auto',
)
logger.info(
    'tg_monitor.limits max_messages=%d max_days_back=%d max_images=%d ocr=%s',
    MAX_MESSAGES_PER_SOURCE,
    MAX_DAYS_BACK,
    MAX_IMAGES_PER_MESSAGE,
    ENABLE_OCR,
)
for src in config.get('sources') or []:
    logger.info(
        'tg_monitor.source_config username=%s last_id=%s default_location=%s trust_level=%s',
        src.get('username'),
        src.get('last_scanned_message_id'),
        src.get('default_location'),
        src.get('trust_level'),
    )


In [None]:
@dataclass
class RateLimitConfig:
    rpm: int = RATE_RPM
    tpm: int = RATE_TPM
    rpd: int = RATE_RPD
    minute_margin: float = RATE_MINUTE_MARGIN
    daily_margin: float = RATE_DAILY_MARGIN

    @property
    def effective_rpm(self) -> int:
        return int(self.rpm * (1 - self.minute_margin))

    @property
    def effective_tpm(self) -> int:
        return int(self.tpm * (1 - self.minute_margin))

    @property
    def effective_rpd(self) -> int:
        return int(self.rpd * (1 - self.daily_margin))


class TokenBucket:
    def __init__(self, capacity: int, refill_rate: float):
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.tokens = capacity
        self.last_refill = time.monotonic()

    def _refill(self) -> None:
        now = time.monotonic()
        elapsed = now - self.last_refill
        self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
        self.last_refill = now

    def consume(self, tokens: int = 1) -> bool:
        self._refill()
        if self.tokens >= tokens:
            self.tokens -= tokens
            return True
        return False

    def wait_time(self, tokens: int = 1) -> float:
        self._refill()
        if self.tokens >= tokens:
            return 0.0
        needed = tokens - self.tokens
        return needed / self.refill_rate


class GemmaRateLimiter:
    def __init__(self, config: RateLimitConfig | None = None):
        self.config = config or RateLimitConfig()
        self._rpm_bucket = TokenBucket(
            capacity=self.config.effective_rpm,
            refill_rate=self.config.effective_rpm / 60.0,
        )
        self._tpm_bucket = TokenBucket(
            capacity=self.config.effective_tpm,
            refill_rate=self.config.effective_tpm / 60.0,
        )
        self._daily_requests = 0
        self._last_reset_day: str | None = None

    def _check_daily_reset(self) -> None:
        today = datetime.now(timezone.utc).strftime('%Y-%m-%d')
        if self._last_reset_day != today:
            self._daily_requests = 0
            self._last_reset_day = today

    async def wait_if_needed(self, estimated_tokens: int) -> None:
        self._check_daily_reset()
        if self._daily_requests >= self.config.effective_rpd:
            logger.warning('Gemma daily request limit reached: %s', self.config.effective_rpd)
        while True:
            rpm_wait = self._rpm_bucket.wait_time(1)
            if rpm_wait <= 0:
                break
            await asyncio.sleep(min(rpm_wait, 5.0))
        while True:
            tpm_wait = self._tpm_bucket.wait_time(estimated_tokens)
            if tpm_wait <= 0:
                break
            await asyncio.sleep(min(tpm_wait, 5.0))
        self._rpm_bucket.consume(1)
        self._tpm_bucket.consume(estimated_tokens)
        self._daily_requests += 1

    def acquire(self, estimated_tokens: int = 500):
        return RateLimitContext(self, estimated_tokens)


class RateLimitContext:
    def __init__(self, limiter: GemmaRateLimiter, estimated_tokens: int):
        self._limiter = limiter
        self._estimated_tokens = estimated_tokens

    async def __aenter__(self):
        await self._limiter.wait_if_needed(self._estimated_tokens)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        return False


rate_limiter = GemmaRateLimiter()


In [None]:
async def human_sleep(min_s: float, max_s: float) -> None:
    delay = random.uniform(min_s, max_s)
    if random.random() < 0.12:
        delay += random.uniform(0.8, 2.5)
    await asyncio.sleep(delay)


def _estimate_tokens(text: str, has_images: bool = False) -> int:
    if not text:
        return 200
    base = max(200, len(text) // 4)
    if has_images:
        base += 800
    return base


def _safe_json(text: str):
    if not text:
        return None
    raw = text.strip()
    if raw.startswith('```'):
        raw = re.sub(r'^```[a-zA-Z]*\n?', '', raw).strip()
        if raw.endswith('```'):
            raw = raw[:-3].strip()
    start = min([i for i in [raw.find('{'), raw.find('[')] if i != -1] or [-1])
    end = max(raw.rfind('}'), raw.rfind(']'))
    if start != -1 and end != -1 and end > start:
        raw = raw[start:end+1]
    try:
        return json.loads(raw)
    except Exception:
        return None


def _is_not_found(exc: Exception) -> bool:
    msg = str(exc).lower()
    return 'not found' in msg or '404' in msg


In [None]:
genai.configure(api_key=GOOGLE_API_KEY)

MODEL_REGISTRY = {
    'text': {
        'name': TEXT_MODEL,
        'fallback': FALLBACK_TEXT_MODEL,
        'model': genai.GenerativeModel(TEXT_MODEL),
    },
    'vision': {
        'name': VISION_MODEL,
        'fallback': FALLBACK_VISION_MODEL,
        'model': genai.GenerativeModel(VISION_MODEL),
    },
}

SUPABASE_URL = os.getenv('SUPABASE_URL', '').strip()
SUPABASE_SERVICE_KEY = os.getenv('SUPABASE_SERVICE_KEY', '').strip()
SUPABASE_KEY = (SUPABASE_SERVICE_KEY or os.getenv('SUPABASE_KEY', '')).strip()
SUPABASE_SCHEMA = (os.getenv('SUPABASE_SCHEMA', 'public') or 'public').strip()
SUPABASE_ENABLED = bool(SUPABASE_URL and SUPABASE_KEY)
SUPABASE_STRICT = os.getenv('TG_MONITORING_SUPABASE_STRICT', '1') == '1'
SUPABASE_CONSUMER = os.getenv('TG_MONITORING_CONSUMER', 'kaggle')
SUPABASE_ACCOUNT = os.getenv('GOOGLE_API_LOCALNAME')
SUPABASE_TPM_RESERVE_EXTRA = int(os.getenv('TG_GEMMA_TPM_RESERVE_EXTRA', '1000'))
SUPABASE_MAX_RETRIES = int(os.getenv('TG_GEMMA_RETRIES', '2'))


def _normalize_model_id(value: str) -> str:
    v = (value or '').strip()
    if v.startswith('models/'):
        v = v[len('models/') :]
    if v.startswith('gemma-') and v.endswith('-it'):
        v = v[: -len('-it')]
    return v or 'gemma-3-27b'


def _supabase_rpc(fn_name: str, payload: dict):
    if not SUPABASE_ENABLED:
        return None
    url = SUPABASE_URL.rstrip('/') + f"/rest/v1/rpc/{fn_name}"
    headers = {
        'apikey': SUPABASE_KEY,
        'Authorization': f'Bearer {SUPABASE_KEY}',
        'Content-Type': 'application/json',
        'Accept-Profile': SUPABASE_SCHEMA,
        'Content-Profile': SUPABASE_SCHEMA,
    }
    resp = requests.post(url, headers=headers, json=payload, timeout=20)
    if resp.status_code >= 400:
        if SUPABASE_STRICT:
            raise RuntimeError(f"Supabase RPC {fn_name} failed: {resp.status_code} {resp.text}")
        logger.warning('supabase rpc failed: %s %s', resp.status_code, resp.text[:200])
        if resp.status_code == 404 and 'PGRST202' in resp.text:
            return {'fallback_local': True}
        return None
    data = resp.json()
    if isinstance(data, list) and data:
        return data[0]
    return data or {}


class SupabaseLimiter:
    def __init__(self):
        self.enabled = SUPABASE_ENABLED

    def reserve(self, request_uid: str, attempt_no: int, model_id: str, reserved_tpm: int):
        if not self.enabled:
            return {'ok': True, 'env_var_name': 'GOOGLE_API_KEY'}
        payload = {
            'p_request_uid': request_uid,
            'p_attempt_no': attempt_no,
            'p_consumer': SUPABASE_CONSUMER,
            'p_account_name': SUPABASE_ACCOUNT,
            'p_model': model_id,
            'p_reserved_tpm': reserved_tpm,
            'p_candidate_key_ids': None,
        }
        result = _supabase_rpc('google_ai_reserve', payload)
        if isinstance(result, dict) and result.get('fallback_local'):
            return {'ok': True, 'fallback_local': True, 'env_var_name': 'GOOGLE_API_KEY'}
        return result or {'ok': False}

    def mark_sent(self, request_uid: str, attempt_no: int):
        if not self.enabled:
            return
        _supabase_rpc('google_ai_mark_sent', {
            'p_request_uid': request_uid,
            'p_attempt_no': attempt_no,
        })

    def finalize(self, request_uid: str, attempt_no: int, *, usage=None, error=None, duration_ms: int | None = None, model_id: str | None = None):
        if not self.enabled:
            return
        payload = {
            'p_request_uid': request_uid,
            'p_attempt_no': attempt_no,
            'p_usage_input_tokens': None,
            'p_usage_output_tokens': None,
            'p_usage_total_tokens': None,
            'p_duration_ms': duration_ms,
            'p_provider_status': 'succeeded' if error is None else 'failed',
            'p_error_type': None,
            'p_error_code': None,
            'p_error_message': None,
        }
        if usage:
            payload['p_usage_input_tokens'] = usage.get('input_tokens')
            payload['p_usage_output_tokens'] = usage.get('output_tokens')
            payload['p_usage_total_tokens'] = usage.get('total_tokens')
        if error is not None:
            payload['p_error_type'] = type(error).__name__
            payload['p_error_message'] = str(error)[:500]
        result = _supabase_rpc('google_ai_finalize', payload)
        if isinstance(result, dict) and result.get('fallback_local'):
            legacy_payload = {
                'p_request_uid': request_uid,
                'p_api_key_id': None,
                'p_model': model_id,
                'p_actual_input_tokens': payload.get('p_usage_input_tokens'),
                'p_actual_output_tokens': payload.get('p_usage_output_tokens'),
                'p_status': 'success' if error is None else 'failed',
            }
            _supabase_rpc('finalize_google_ai_usage', legacy_payload)

async def _call_model(kind: str, prompt: str, images=None):
    model_state = MODEL_REGISTRY[kind]
    model = model_state['model']
    fallback_name = model_state.get('fallback')
    has_images = bool(images)

    def _gen_config(use_json: bool):
        cfg = {
            'temperature': 0,
            'max_output_tokens': 800,
        }
        if use_json:
            cfg['response_mime_type'] = 'application/json'
        return cfg

    async def _generate(use_json: bool):
        return model.generate_content(
            prompt if not images else [prompt, *images],
            generation_config=_gen_config(use_json),
        )

    async def _do_call():
        try:
            return await _generate(True)
        except Exception as exc:
            msg = str(exc).lower()
            if 'json mode' in msg or 'response_mime_type' in msg:
                logger.warning('json mode disabled for model=%s', model_state['name'])
                return await _generate(False)
            if fallback_name and _is_not_found(exc) and fallback_name != model_state['name']:
                logger.warning('model not found, fallback to %s', fallback_name)
                _set_model(kind, fallback_name)
                return await MODEL_REGISTRY[kind]['model'].generate_content(
                    prompt if not images else [prompt, *images],
                    generation_config=_gen_config(True),
                )
            raise

    use_supabase = supabase_limiter.enabled
    if use_supabase:
        request_uid = uuid.uuid4().hex
        model_id = _normalize_model_id(model_state['name'])
        reserved_tpm = 800 + SUPABASE_TPM_RESERVE_EXTRA
        for attempt in range(1, SUPABASE_MAX_RETRIES + 1):
            reserve = supabase_limiter.reserve(request_uid, attempt, model_id, reserved_tpm)
            if reserve.get('fallback_local'):
                supabase_limiter.enabled = False
                use_supabase = False
                break
            if not reserve or not reserve.get('ok'):
                blocked = reserve.get('blocked_reason') if reserve else 'unknown'
                raise RuntimeError(f'Supabase rate limit blocked: {blocked}')
            env_var = reserve.get('env_var_name') or 'GOOGLE_API_KEY'
            api_key = os.getenv(env_var) or os.getenv('GOOGLE_API_KEY')
            if not api_key:
                raise RuntimeError(f'Missing API key for {env_var}')
            genai.configure(api_key=api_key)
            supabase_limiter.mark_sent(request_uid, attempt)
            start = time.time()
            try:
                resp = await _do_call()
                usage_meta = getattr(resp, 'usage_metadata', None)
                usage = None
                if usage_meta is not None:
                    usage = {
                        'input_tokens': getattr(usage_meta, 'prompt_token_count', None),
                        'output_tokens': getattr(usage_meta, 'candidates_token_count', None),
                        'total_tokens': getattr(usage_meta, 'total_token_count', None),
                    }
                supabase_limiter.finalize(request_uid, attempt, usage=usage, duration_ms=int((time.time() - start) * 1000), model_id=model_id)
                return resp
            except Exception as exc:
                supabase_limiter.finalize(request_uid, attempt, error=exc, duration_ms=int((time.time() - start) * 1000), model_id=model_id)
                msg = str(exc).lower()
                retryable = any(x in msg for x in ['timeout', 'connection', 'temporary', '503', '502', '504'])
                if retryable and attempt < SUPABASE_MAX_RETRIES:
                    await asyncio.sleep(0.4 * attempt)
                    continue
                raise

    if use_supabase:
        raise RuntimeError('Supabase limiter enabled but no call succeeded')

    async with rate_limiter.acquire(_estimate_tokens(prompt, has_images=has_images)):
        return await _do_call()


def _compute_hash(image_bytes: bytes) -> str:
    return hashlib.sha256(image_bytes).hexdigest()


def _compute_phash(image_bytes: bytes) -> str | None:
    try:
        img = Image.open(io.BytesIO(image_bytes))
        return str(imagehash.phash(img))
    except Exception:
        return None


def upload_to_catbox(image_bytes: bytes) -> str | None:
    try:
        resp = requests.post(
            'https://catbox.moe/user/api.php',
            data={'reqtype': 'fileupload'},
            files={'fileToUpload': ('image.jpg', image_bytes)},
            timeout=30,
        )
        if resp.status_code == 200:
            return resp.text.strip()
    except Exception as exc:
        logger.warning('catbox upload failed: %s', exc)
    return None


def _message_date_iso(msg):
    dt = msg.date
    if dt and dt.tzinfo is None:
        dt = dt.replace(tzinfo=timezone.utc)
    return dt.astimezone(timezone.utc).isoformat() if dt else None


def _message_likes(msg) -> int | None:
    reactions = getattr(msg, 'reactions', None)
    if not reactions or not getattr(reactions, 'results', None):
        return None
    return sum(r.count for r in reactions.results if getattr(r, 'count', None))


def _source_type(entity) -> str:
    if isinstance(entity, Channel):
        return 'channel' if getattr(entity, 'broadcast', False) else 'supergroup'
    if isinstance(entity, Chat):
        return 'group'
    return 'unknown'


async def extract_events(text: str, ocr_text: str | None = None):
    content = (text or '').strip()
    if ocr_text:
        content = (content + '\n\nOCR:\n' + ocr_text).strip()
    if not content or len(content) < 10:
        return []
    prompt = (
        'You extract events from a Telegram message. '
        'Return strict JSON array of event objects. '
        'If there are no events, return [] only. '
        'Fields per event: title, date (YYYY-MM-DD), time (HH:MM or empty), '
        'end_date (YYYY-MM-DD or null), location_name, location_address, city, '
        'ticket_link, ticket_price_min, ticket_price_max, ticket_status, raw_excerpt, '
        'event_type, emoji, is_free, pushkin_card, search_digest, festival. '
        'Use null for unknown fields. '
        'Message text:\n' + content
    )
    try:
        resp = await _call_model('text', prompt)
    except Exception as exc:
        logger.warning('extract_events failed: %s', exc)
        return []
    data = _safe_json(getattr(resp, 'text', '') if resp else '')
    if data is None:
        fix_prompt = (
            'Fix and return valid JSON only. '
            'Do not include any extra text. '
            'Input:\n' + (getattr(resp, 'text', '') if resp else '')
        )
        try:
            resp_fix = await _call_model('text', fix_prompt)
            data = _safe_json(getattr(resp_fix, 'text', '') if resp_fix else '')
        except Exception as exc:
            logger.warning('extract_events json_fix failed: %s', exc)
    if isinstance(data, dict) and isinstance(data.get('events'), list):
        return data['events']
    if isinstance(data, list):
        return data
    return []


async def ocr_image(image_bytes: bytes):
    if not ENABLE_OCR:
        return None, None
    try:
        img = Image.open(io.BytesIO(image_bytes))
    except Exception:
        return None, None
    prompt = (
        'Extract readable text from the image. '
        'Return JSON: {"text": "...", "title": "..."}. '
        'If no text, return {"text": ""}. '
    )
    try:
        resp = await _call_model('vision', prompt, images=[img])
    except Exception as exc:
        logger.warning('ocr_image failed: %s', exc)
        return None, None
    data = _safe_json(getattr(resp, 'text', '') if resp else '')
    if data is None:
        fix_prompt = (
            'Fix and return valid JSON only. '
            'Do not include any extra text. '
            'Input:\n' + (getattr(resp, 'text', '') if resp else '')
        )
        try:
            resp_fix = await _call_model('vision', fix_prompt, images=[img])
            data = _safe_json(getattr(resp_fix, 'text', '') if resp_fix else '')
        except Exception as exc:
            logger.warning('ocr_image json_fix failed: %s', exc)
    if isinstance(data, dict):
        text = data.get('text') or ''
        title = data.get('title') or None
        if text and not title:
            title = text.split('\n', 1)[0].strip() if text else None
        return text or None, title
    return None, None


In [None]:
async def scan_source(client: TelegramClient, source: dict) -> list[dict]:
    username = (source.get('username') or '').strip()
    if not username:
        logger.warning('source.skip reason=missing_username')
        return []
    entity = await client.get_entity(username)
    s_type = _source_type(entity)
    last_id = source.get('last_scanned_message_id') or 0
    default_location = source.get('default_location')
    default_ticket_link = source.get('default_ticket_link')

    cutoff = datetime.now(timezone.utc) - timedelta(days=MAX_DAYS_BACK)

    latest_id = None
    latest_date = None
    try:
        latest = await client.get_messages(entity, limit=1)
        if latest:
            latest_msg = latest[0]
            latest_id = latest_msg.id
            latest_date = _message_date_iso(latest_msg)
    except Exception as exc:
        logger.warning('source.latest_failed %s: %s', username, exc)

    logger.info(
        'source.start username=%s type=%s last_id=%s latest_id=%s latest_date=%s cutoff=%s',
        username,
        s_type,
        last_id or 0,
        latest_id,
        latest_date,
        cutoff.isoformat(),
    )
    if last_id and latest_id and latest_id <= last_id:
        logger.info(
            'source.skip reason=no_new_messages username=%s last_id=%s latest_id=%s',
            username,
            last_id,
            latest_id,
        )
        return []

    messages_out = []
    views_vals = []
    likes_vals = []
    processed = 0
    messages_with_events = 0
    events_total = 0
    first_id = None
    last_id_seen = None
    first_date = None
    last_date = None
    cutoff_hit = False

    async for msg in client.iter_messages(entity, limit=MAX_MESSAGES_PER_SOURCE, min_id=last_id or 0):
        if not last_id and msg.date and msg.date.replace(tzinfo=timezone.utc) < cutoff:
            cutoff_hit = True
            break
        if first_id is None:
            first_id = msg.id
            first_date = _message_date_iso(msg)
        last_id_seen = msg.id
        last_date = _message_date_iso(msg)

        text = msg.message or ''
        views = getattr(msg, 'views', None)
        likes = _message_likes(msg)
        if isinstance(views, int):
            views_vals.append(views)
        if isinstance(likes, int):
            likes_vals.append(likes)

        events = await extract_events(text)

        posters = []
        ocr_probe_text = None
        if not events and msg.photo:
            try:
                probe_bytes = await client.download_media(msg, bytes)
                if probe_bytes:
                    ocr_probe_text, _ = await ocr_image(probe_bytes)
                    if ocr_probe_text:
                        events = await extract_events(text, ocr_probe_text)
            except Exception:
                pass

        if events and msg.photo:
            try:
                image_bytes = await client.download_media(msg, bytes)
                if image_bytes:
                    sha = _compute_hash(image_bytes)
                    phash = _compute_phash(image_bytes)
                    catbox_url = upload_to_catbox(image_bytes)
                    ocr_text, ocr_title = await ocr_image(image_bytes) if not ocr_probe_text else (ocr_probe_text, None)
                    posters.append({
                        'catbox_url': catbox_url,
                        'sha256': sha,
                        'phash': phash,
                        'ocr_text': ocr_text,
                        'ocr_title': ocr_title,
                    })
            except Exception as exc:
                logger.warning('media download failed for %s/%s: %s', username, msg.id, exc)

        cleaned_events = []
        for ev in events or []:
            if not isinstance(ev, dict):
                continue
            if default_location and not ev.get('location_name'):
                ev['location_name'] = default_location
            if default_ticket_link and not ev.get('ticket_link'):
                ev['ticket_link'] = default_ticket_link
            cleaned_events.append(ev)

        if cleaned_events:
            messages_with_events += 1
            events_total += len(cleaned_events)

        messages_out.append({
            'source_username': username,
            'source_type': s_type,
            'source_chat_id': getattr(entity, 'id', None),
            'message_id': msg.id,
            'message_date': _message_date_iso(msg),
            'source_link': f'https://t.me/{username}/{msg.id}',
            'text': text,
            'metrics': {
                'views': views,
                'likes': likes,
            },
            'posters': posters,
            'events': cleaned_events,
        })

        processed += 1
        await human_sleep(HUMAN_DELAY_MIN, HUMAN_DELAY_MAX)
        if HUMAN_LONG_PAUSE_EVERY > 0 and processed % HUMAN_LONG_PAUSE_EVERY == 0:
            await human_sleep(HUMAN_LONG_PAUSE_MIN, HUMAN_LONG_PAUSE_MAX)

    median_views = int(statistics.median(views_vals)) if views_vals else None
    median_likes = int(statistics.median(likes_vals)) if likes_vals else None
    for msg in messages_out:
        msg['metrics']['channel_median_views'] = median_views
        msg['metrics']['channel_median_likes'] = median_likes

    if not messages_out:
        logger.info(
            'source.empty username=%s last_id=%s latest_id=%s cutoff_hit=%s',
            username,
            last_id or 0,
            latest_id,
            cutoff_hit,
        )

    logger.info(
        'source.done username=%s messages=%d processed=%d messages_with_events=%d events=%d first_id=%s last_id=%s cutoff_hit=%s',
        username,
        len(messages_out),
        processed,
        messages_with_events,
        events_total,
        first_id,
        last_id_seen,
        cutoff_hit,
    )
    if first_date or last_date:
        logger.info(
            'source.dates username=%s first_date=%s last_date=%s',
            username,
            first_date,
            last_date,
        )
    return messages_out


In [None]:
async def main():
    sources = config.get('sources') or []
    run_id = config.get('run_id') or f'kaggle_{uuid.uuid4().hex[:8]}'
    all_messages = []

    logger.info('tg_monitor.run start run_id=%s sources=%d', run_id, len(sources))
    if not sources:
        logger.warning('tg_monitor.run no sources configured')

    device_config = {
        'device_model': 'Samsung S22 Ultra',
        'system_version': '13.0',
        'app_version': '9.6.6',
    }

    async with TelegramClient(StringSession(TG_SESSION), int(TG_API_ID), TG_API_HASH, **device_config) as client:
        for source in sources:
            try:
                await human_sleep(SOURCE_PAUSE_MIN, SOURCE_PAUSE_MAX)
                msgs = await scan_source(client, source)
                all_messages.extend(msgs)
                logger.info('scanned %s messages for %s', len(msgs), source.get('username'))
            except Exception as exc:
                logger.exception('scan failed for %s: %s', source.get('username'), exc)
            await human_sleep(SOURCE_PAUSE_MIN, SOURCE_PAUSE_MAX)

    messages_with_events = sum(1 for m in all_messages if m.get('events'))
    events_extracted = sum(len(m.get('events') or []) for m in all_messages)

    logger.info(
        'tg_monitor.run summary run_id=%s messages=%d messages_with_events=%d events=%d',
        run_id,
        len(all_messages),
        messages_with_events,
        events_extracted,
    )

    payload = {
        'schema_version': 1,
        'run_id': run_id,
        'generated_at': datetime.now(timezone.utc).isoformat(),
        'messages': all_messages,
        'stats': {
            'sources_total': len(sources),
            'messages_scanned': len(all_messages),
            'messages_with_events': messages_with_events,
            'events_extracted': events_extracted,
        },
    }

    out_path = Path('telegram_results.json')
    out_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding='utf-8')
    logger.info('Saved telegram_results.json with %s messages', len(all_messages))

try:
    _loop = asyncio.get_running_loop()
except RuntimeError:
    asyncio.run(main())
else:
    await main()
