In [1]:
# --- Environment and dependencies ---
from telethon.sync import TelegramClient
from telethon.errors import FloodWaitError, RPCError
from dotenv import load_dotenv
import os
import json
import logging
import asyncio
from datetime import datetime, timedelta
import aiofiles
import pytz

In [2]:
# Load variables from .env file
load_dotenv()

api_id = os.getenv("TELEGRAM_API_ID")
api_hash = os.getenv("TELEGRAM_API_HASH")
phone = os.getenv("TELEGRAM_PHONE")  # Optional: for first-time login


In [3]:
# Logging configuration
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# Create handlers for logging to file and console
if not os.path.exists('../logs'):
    os.makedirs('../logs')

# Create file and console handlers
file_handler = logging.FileHandler('../logs/scrape.log')
console_handler = logging.StreamHandler()

formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)

logger.handlers = []
logger.addHandler(file_handler)
logger.addHandler(console_handler)


In [4]:
DATA_LAKE_PATH = "../data/raw/telegram_messages"
CHANNELS = ["chemed123", "lobelia4cosmetics", "tikvahpharma"]

# Scraping date range (past 7 days)
utc = pytz.UTC
end_date = datetime.now(utc)
start_date = end_date - timedelta(days=7)

logger.info(f"Scraping window STARTED AT: {start_date} UTC")


2025-07-13 19:47:40,903 - INFO - Scraping window STARTED AT: 2025-07-06 16:47:40.903891+00:00 UTC


In [5]:
async def scrape_channel(client, channel, limit=500):
    try:
        logger.info(f"Scraping from beginning for channel: {channel}")
        os.makedirs(DATA_LAKE_PATH, exist_ok=True)
        entity = await client.get_entity(channel)

        channel_path = os.path.join(DATA_LAKE_PATH, channel)
        os.makedirs(channel_path, exist_ok=True)

        count = 0

        async for message in client.iter_messages(entity, limit=limit):
            date_str = message.date.strftime("%Y-%m-%d")
            file_path = os.path.join(channel_path, f"{date_str}.json")

            message_data = {
                "message_id": message.id,
                "date": message.date.isoformat(),
                "text": message.text,
                "has_media": bool(message.media),
                "media_type": None,
                "media_path": None
            }

            if message.photo:
                media_path = os.path.join(channel_path, "images", f"{message.id}.jpg")
                os.makedirs(os.path.dirname(media_path), exist_ok=True)
                try:
                    await client.download_media(message, media_path)
                    message_data["media_type"] = "photo"
                    message_data["media_path"] = media_path
                    logger.info(f"Downloaded image for message {message.id}")
                except Exception as e:
                    logger.warning(f"X Failed to download image {message.id}: {str(e)}")

            async with aiofiles.open(file_path, "a", encoding="utf-8") as f:
                await f.write(json.dumps(message_data) + "\n")

            count += 1
            if count >= limit:
                logger.info(f"----->Reached message limit of {limit} for {channel}")
                break

            await asyncio.sleep(0.5)

        logger.info(f"=>> Finished scraping {count} messages from {channel}")

    except FloodWaitError as e:
        logger.warning(f"Rate limit: waiting {e.seconds} seconds for {channel}")
        await asyncio.sleep(e.seconds)
    except RPCError as e:
        logger.error(f"Telegram API error for {channel}: {str(e)}")
    except Exception as e:
        logger.error(f"Unexpected error for {channel}: {str(e)}")


In [6]:
async def run_scraping():
    async with TelegramClient('session', api_id, api_hash) as client:
        if not await client.is_user_authorized():
            await client.send_code_request(phone)
            code = input("Enter the code you received: ")
            await client.sign_in(phone, code)

        for channel in CHANNELS:
            await scrape_channel(client, channel)
            logger.info(f"Sleeping 30 seconds before next channel...")
            await asyncio.sleep(30)

# Trigger the scraping
await run_scraping()


2025-07-13 19:48:22,146 - INFO - Scraping from beginning for channel: chemed123


Signed in successfully as Meridius; remember to not break the ToS or you will risk an account ban!


2025-07-13 19:48:24,787 - INFO - Downloaded image for message 97
2025-07-13 19:48:28,413 - INFO - Downloaded image for message 96
2025-07-13 19:48:30,683 - INFO - Downloaded image for message 95
2025-07-13 19:48:32,735 - INFO - Downloaded image for message 94
2025-07-13 19:48:35,340 - INFO - Downloaded image for message 93
2025-07-13 19:48:38,082 - INFO - Downloaded image for message 92
2025-07-13 19:48:39,865 - INFO - Downloaded image for message 91
2025-07-13 19:48:42,949 - INFO - Downloaded image for message 90
2025-07-13 19:48:46,250 - INFO - Downloaded image for message 88
2025-07-13 19:48:48,555 - INFO - Downloaded image for message 87
2025-07-13 19:48:50,954 - INFO - Downloaded image for message 86
2025-07-13 19:48:52,451 - INFO - Downloaded image for message 85
2025-07-13 19:48:54,151 - INFO - Downloaded image for message 82
2025-07-13 19:48:56,411 - INFO - Downloaded image for message 81
2025-07-13 19:48:58,153 - INFO - Downloaded image for message 80
2025-07-13 19:49:00,448 -