In [3]:
from telethon import TelegramClient
import csv
import os
from dotenv import load_dotenv
import sqlite3
import time

# Load environment variables once
load_dotenv('.env')
# api_id = os.getenv('TG_API_ID')
# api_hash = os.getenv('TG_API_HASH')
# phone = os.getenv('phone')

api_id = os.getenv("TELEGRAM_API_ID") or '21483974'
api_hash = os.getenv("TELEGRAM_API_HASH") or 'ecf78ee84312a3e7368578e5e29da1f8'
phone = os.getenv("TELEGRAM_PHONE") or '251911699986'

# Initialize the client once
client = TelegramClient('scraping_session', api_id, api_hash)

# Function to scrape data from a single channel
async def scrape_channel(client, channel_username, writer, media_dir):
    entity = await client.get_entity(channel_username)
    channel_title = entity.title  # Extract the channel's title
    async for message in client.iter_messages(entity, limit=10000):
        media_path = None
        if message.media and hasattr(message.media, 'photo'):
            # Create a unique filename for the photo
            filename = f"{channel_username}_{message.id}.jpg"
            media_path = os.path.join(media_dir, filename)
            # Download the media to the specified directory if it's a photo
            await client.download_media(message.media, media_path)
        
        # Write the channel title along with other data
        writer.writerow([channel_title, channel_username, message.id, message.message, message.date, media_path])

def save_to_database(df, table_name='telegram_data'):
    query = f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(df.columns)})"
    execute_with_retry(query)
    
    for attempt in range(30):
        try:
            conn = sqlite3.connect('telegram_data.db', timeout=30)
            df.to_sql(table_name, conn, if_exists='replace', index=False)
            conn.close()
            break
        except sqlite3.OperationalError as e:
            if 'database is locked' in str(e):
                print(f"Database is locked, retrying in 5 seconds... (Attempt {attempt + 1}/30)")
                time.sleep(5)
            else:
                raise
    else:
        raise sqlite3.OperationalError("Failed to save data to database after multiple retries")

async with client:
    await main()
""" 
# 1. Data Ingestion Pipeline
# Objective: Fetch data from Telegram channels and preprocess it.
# Libraries/Tools: telethon (for Telegram scraping), pandas, re.

# %pip install telethon
# %pip install python-dotenv

from telethon.sync import TelegramClient
import pandas as pd
import re
import telethon
import os
import csv
from dotenv import load_dotenv

# Load environment variables once
load_dotenv('.env')
api_id = os.getenv("TELEGRAM_API_ID") or '21483974'
api_hash = os.getenv("TELEGRAM_API_HASH") or 'ecf78ee84312a3e7368578e5e29da1f8'
phone = os.getenv("TELEGRAM_PHONE") or '251911699986'

# Define the client and phone variables
# api_id = 21483974
# api_hash = 'ecf78ee84312a3e7368578e5e29da1f8'
# phone = '251911699986'
client = TelegramClient('session_name', api_id, api_hash)

# Configure Telegram API credentials
# api_id, api_hash, and phone are already defined in the notebook

# Connect to Telegram client
# client is already defined in the notebook

import asyncio
import nest_asyncio
nest_asyncio.apply()

# Ensure the client is connected and authenticated
async def ensure_client_connected():
    if not client.is_connected():
        await client.connect()
    if not await client.is_user_authorized():
        try:
            await client.send_code_request(phone)
            try:
                while True:
                    try:
                        await client.sign_in(phone, input('Enter the code: '))
                        break
                    except telethon.errors.PhoneCodeInvalidError:
                        print("The phone code entered was invalid. Please try again.")
            except telethon.errors.SessionPasswordNeededError:
                await client.sign_in(password=input('Two-step verification enabled. Please enter your password: '))
        except telethon.errors.SendCodeUnavailableError:
            print("SendCodeUnavailableError: All available options for this type of number were already used. Please try again later.")
            return

""" """ # Fetch messages from a channel
async def fetch_channel_data(channel_username, limit=100):
    await ensure_client_connected()
    messages = []
    async for message in client.iter_messages(channel_username, limit=limit):
        if message.text:
            messages.append({'text': message.text, 'date': message.date, 'sender': message.sender.username})
    return pd.DataFrame(messages)

# Preprocess text
def preprocess_amharic_text(df):
    # Tokenize and clean text
    df['clean_text'] = df['text'].apply(lambda x: re.sub(r'[^\w\s]', '', x))
    return df """

""" # Example usage
async def main():
    channel_data = await fetch_channel_data('valid_channel_username')
    clean_data = preprocess_amharic_text(channel_data)
    import sqlite3
    import time

    def execute_with_retry(query, params=(), retries=30, delay=15):
        for attempt in range(retries):
            try:
                conn = sqlite3.connect('telegram_data.db')
                cursor = conn.cursor()
                cursor.execute(query, params)
                conn.commit()
                cursor.close()
                conn.close()
                break
            except sqlite3.OperationalError as e:
                if 'database is locked' in str(e):
                    print(f"Database is locked, retrying in {delay} seconds...")
                    time.sleep(delay)
                else:
                    raise
        else:
            raise sqlite3.OperationalError("Failed to execute query after multiple retries") """

"""     # Save the clean data to the database
    def save_to_database(df, table_name='telegram_data'):
        query = f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(df.columns)})"
        execute_with_retry(query)
        
        for attempt in range(30):
            try:
                conn = sqlite3.connect('telegram_data.db')
                df.to_sql(table_name, conn, if_exists='replace', index=False)
                conn.close()
                break
            except sqlite3.OperationalError as e:
                if 'database is locked' in str(e):
                    print(f"Database is locked, retrying in 5 seconds... (Attempt {attempt + 1}/30)")
                    time.sleep(5)
                else:
                    raise
        else:
            raise sqlite3.OperationalError("Failed to save data to database after multiple retries")
    
    save_to_database(clean_data)
    await client.disconnect() """

""" # Run the main function
if __name__ == "__main__":
    nest_asyncio.apply()
    asyncio.run(main()) """
"""
async def main():
    await client.start()
    
    # Create a directory for media files
    media_dir = 'photos'
    os.makedirs(media_dir, exist_ok=True)

    # Open the CSV file and prepare the writer
    with open('telegram_data.csv', 'w', newline='', encoding='utf-8') as file:
        writer = csv.writer(file)
        writer.writerow(['Channel Title', 'Channel Username', 'ID', 'Message', 'Date', 'Media Path'])  # Include channel title in the header
        
        # List of channels to scrape
        channels = [
            '@Shageronlinestore',  # Existing channel
                 # You can add more channels here
            
        ]
        
        # Iterate over channels and scrape data into the single CSV file
        for channel in channels:
            await scrape_channel(client, channel, writer, media_dir)
            print(f"Scraped data from {channel}")

async with client:
    await main()
 """

  """ """ # Fetch messages from a channel


'\nasync def main():\n    await client.start()\n    \n    # Create a directory for media files\n    media_dir = \'photos\'\n    os.makedirs(media_dir, exist_ok=True)\n\n    # Open the CSV file and prepare the writer\n    with open(\'telegram_data.csv\', \'w\', newline=\'\', encoding=\'utf-8\') as file:\n        writer = csv.writer(file)\n        writer.writerow([\'Channel Title\', \'Channel Username\', \'ID\', \'Message\', \'Date\', \'Media Path\'])  # Include channel title in the header\n        \n        # List of channels to scrape\n        channels = [\n            \'@Shageronlinestore\',  # Existing channel\n                 # You can add more channels here\n            \n        ]\n        \n        # Iterate over channels and scrape data into the single CSV file\n        for channel in channels:\n            await scrape_channel(client, channel, writer, media_dir)\n            print(f"Scraped data from {channel}")\n\nasync with client:\n    await main()\n '