# Data Ingestion & Preprocessing

In [1]:
import os
import pandas as pd
import numpy as np
from pathlib import Path
from dotenv import load_dotenv
import asyncio
from telethon import TelegramClient, events
from telethon.tl.types import MessageMediaPhoto, MessageMediaDocument
import logging
from datetime import datetime
import re
from typing import List, Dict, Any, Optional
# Add src to Python path to enable imports
import sys
from pathlib import Path

# Add the src directory to Python path
current_dir = Path.cwd()
src_path = current_dir.parent / 'src'
sys.path.insert(0, str(src_path))

## Load environment variables

In [2]:
# Load environment variables
load_dotenv('../.env')

# Create necessary directories
Path('../data/processed').mkdir(parents=True, exist_ok=True)
Path('../data/raw/photos').mkdir(parents=True, exist_ok=True)

# Telegram API configuration - load directly from environment
API_ID = os.getenv('TELEGRAM_API_ID')
API_HASH = os.getenv('TELEGRAM_API_HASH')
PHONE = os.getenv('TELEGRAM_PHONE_NUMBER')

# File paths
CHANNELS_FILE = '../data/raw/channels_to_crawl.xlsx'
OUTPUT_RAW_FILE = '../data/raw/telegram_data.csv'
OUTPUT_PROCESSED_FILE = '../data/processed/processed_telegram_data.csv'
PHOTOS_DIR = '../data/raw/photos'


## Read Telegram channel names from Excel file

In [3]:
def read_channel_list(file_path: str) -> List[str]:
    """
    Read Telegram channel names from Excel file
    """
    # Use only the specified channels
    channels = [
        '@ZemenExpress',
        '@ethio_brand_collection', 
        '@Leyueqa',
        '@modernshoppingcenter',
        '@qnashcom',
        '@MerttEka'
    ]
    print(f"Found {len(channels)} channels: {channels}")
    return channels

## Scrape messages from Telegram channels using Telethon

In [4]:
async def scrape_telegram_channels(channels: List[str]) -> pd.DataFrame:
    """
    Scrape messages from Telegram channels using Telethon
    """
    client = TelegramClient('scraping_session', API_ID, API_HASH)
    
    all_messages = []
    
    try:
        await client.start()
        print("Connected to Telegram")
        
        for channel in channels:
            try:
                print(f"Scraping channel: {channel}")
                entity = await client.get_entity(channel)
                channel_title = entity.title
                
                async for message in client.iter_messages(entity, limit=1000):
                    if message.message:  # Only process messages with text
                        media_path = None
                        
                        # Handle media downloads
                        if message.media:
                            if hasattr(message.media, 'photo'):
                                filename = f"{channel.replace('@', '')}_{message.id}.jpg"
                                media_path = os.path.join(PHOTOS_DIR, filename)
                                await client.download_media(message.media, media_path)
                        
                        message_data = {
                            'channel': channel_title,
                            'channel_username': channel,
                            'sender_id': message.sender_id,
                            'timestamp': message.date,
                            'message_text': message.message,
                            'message_id': message.id,
                            'media_path': media_path
                        }
                        all_messages.append(message_data)
                        
            except Exception as e:
                print(f"Error scraping channel {channel}: {e}")
                continue
                
    finally:
        await client.disconnect()
    
    # Convert to DataFrame
    df = pd.DataFrame(all_messages)
    print(f"Scraped {len(df)} messages from {len(channels)} channels")
    return df


## Clean and normalize Amharic text

In [5]:
def clean_amharic_text(text: str) -> str:
    """
    Clean and normalize Amharic text
    """
    if pd.isna(text) or not text:
        return ""
    
    # Convert to string if not already
    text = str(text)
    
    # Remove URLs
    text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text)
    
    # Remove emojis and special characters (keeping Amharic characters)
    # Amharic Unicode range: \u1200-\u137F
    text = re.sub(r'[^\u1200-\u137F\u0020\u0027\u0022\u002E\u002C\u003F\u0021\u003A\u003B\u0028\u0029\u002D\u002F\u005C\u0040\u0023\u0024\u0025\u005E\u0026\u002A\u002B\u003D\u005B\u005D\u007B\u007D\u007C\u003C\u003E\u007E\u0060\u0027\u0022]', '', text)
    
    # Normalize whitespace
    text = re.sub(r'\s+', ' ', text)
    
    # Remove leading/trailing whitespace
    text = text.strip()
    
    return text

def tokenize_amharic_text(text: str) -> List[str]:
    """
    Basic tokenization for Amharic text using whitespace splitting
    """
    if not text:
        return []
    
    # Split by whitespace and filter out empty tokens
    tokens = [token.strip() for token in text.split() if token.strip()]
    return tokens

## Preprocess the scraped Telegram data

In [6]:
def preprocess_telegram_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    Preprocess the scraped Telegram data
    """
    # Create a copy to avoid modifying original data
    processed_df = df.copy()
    
    # Clean the message text
    processed_df['cleaned_text'] = processed_df['message_text'].apply(clean_amharic_text)
    
    # Tokenize the cleaned text
    processed_df['tokens'] = processed_df['cleaned_text'].apply(tokenize_amharic_text)
    
    # Add token count
    processed_df['token_count'] = processed_df['tokens'].apply(len)
    
    # Filter out messages with no meaningful content after cleaning
    processed_df = processed_df[processed_df['cleaned_text'].str.len() > 0]
    
    # Select final columns for output
    final_columns = ['channel', 'sender_id', 'timestamp', 'message_text', 'cleaned_text', 'token_count']
    processed_df = processed_df[final_columns]
    
    return processed_df

## Main pipeline for data ingestion and preprocessing

In [7]:
async def main_ingestion_pipeline():
    """
    Main pipeline for data ingestion and preprocessing
    """
    print("Starting Telegram data ingestion pipeline...")
    
    # Step 1: Read channel list
    channels = read_channel_list(CHANNELS_FILE)
    if not channels:
        print("No channels found. Exiting.")
        return
    
    # Step 2: Scrape Telegram data
    print("Scraping Telegram channels...")
    raw_df = await scrape_telegram_channels(channels)
    
    if raw_df.empty:
        print("No data scraped. Exiting.")
        return
    
    # Save raw data
    raw_df.to_csv(OUTPUT_RAW_FILE, index=False, encoding='utf-8')
    print(f"Raw data saved to {OUTPUT_RAW_FILE}")
    
    # Step 3: Preprocess data
    print("Preprocessing data...")
    processed_df = preprocess_telegram_data(raw_df)
    
    # Save processed data
    processed_df.to_csv(OUTPUT_PROCESSED_FILE, index=False, encoding='utf-8')
    print(f"Processed data saved to {OUTPUT_PROCESSED_FILE}")
    
    # Print summary statistics
    print("\n=== Pipeline Summary ===")
    print(f"Channels processed: {len(channels)}")
    print(f"Raw messages scraped: {len(raw_df)}")
    print(f"Processed messages: {len(processed_df)}")
    print(f"Average tokens per message: {processed_df['token_count'].mean():.2f}")
    
    return processed_df

## Run the pipeline

In [8]:
# In Jupyter notebooks, we can use await directly instead of asyncio.run()
try:
    result_df = await main_ingestion_pipeline()
    
    if result_df is not None and not result_df.empty:
        print("\nFirst few processed messages:")
        print(result_df.head())
    else:
        print("Pipeline completed but no data was returned.")
        
except Exception as e:
    print(f"Error running pipeline: {e}")
    print("Please check your API credentials and try again.")


Starting Telegram data ingestion pipeline...
Found 6 channels: ['@ZemenExpress', '@ethio_brand_collection', '@Leyueqa', '@modernshoppingcenter', '@qnashcom', '@MerttEka']
Scraping Telegram channels...
Signed in successfully as Olyad; remember to not break the ToS or you will risk an account ban!
Connected to Telegram
Scraping channel: @ZemenExpress
Scraping channel: @ethio_brand_collection
Scraping channel: @Leyueqa
Scraping channel: @modernshoppingcenter
Scraping channel: @qnashcom
Scraping channel: @MerttEka
Scraped 3689 messages from 6 channels
Raw data saved to ../data/raw/telegram_data.csv
Preprocessing data...
Processed data saved to ../data/processed/processed_telegram_data.csv

=== Pipeline Summary ===
Channels processed: 6
Raw messages scraped: 3689
Processed messages: 3683
Average tokens per message: 43.66

First few processed messages:
          channel      sender_id                 timestamp  \
0  Zemen Express® -1001307493052 2025-06-21 16:35:51+00:00   
1  Zemen Express®