## TG bot 

In [1]:
#!pip install -Uq telethon 
# see https://docs.telethon.dev/en/stable/basic/installation.html#optional-dependencies
#!pip install -Uq telethon cryptg pillow
#!pip install -Uq python-dotenv

In [2]:
from telethon import TelegramClient, events
import asyncio
from dotenv import load_dotenv
import os, json
from pathlib import Path
import logging
from datetime import datetime

In [3]:
# For running in Jupyter/IPython
import nest_asyncio
nest_asyncio.apply()

In [4]:
def setup_logging():
    log_file = f'bot_{datetime.now().strftime("%Y%m%d")}.log'

    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers = [
            logging.FileHandler(log_file),
            logging.StreamHandler()
        ],
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    return logging.getLogger('telegram_monitor')

In [5]:
#! rm bot_{datetime.now().strftime("%Y%m%d")}.log
# ! rm settings*.json

## Setup

In [6]:
# test_settings = {
#     "keywords": ["python", "ai", "machine learning", "data science"],
#     "channels": ["ejdailyru", "pythonnews", "fastdotai"]
# }

# # Write to file
# Path('settings.json').write_text(json.dumps(test_settings, indent=2))
# data = json.loads(Path('settings.json').read_text())
# json.dumps(test_settings)

In [7]:
class Settings:
    def __init__(self, filepath='settings.json'):
        self.filepath = Path(filepath)
        self.keywords, self.channels= set(), set()
        self.load()
        
    def load(self):
        if self.filepath.exists(): 
            data = json.loads(self.filepath.read_text())
            self.keywords = set(data.get('keywords',[]))
            self.channels = set(data.get('channels',[]))

    def save(self):
        data = {'keywords': list(self.keywords), 'channels': list(self.channels)}
        self.filepath.write_text(json.dumps(data, indent=2))

In [8]:
logger = setup_logging()

load_dotenv()
api_id = os.getenv('TELEGRAM_API_ID')
api_hash = os.getenv('TELEGRAM_API_HASH')
bot_token = os.getenv('BOT_TOKEN')

settings = Settings()
client = TelegramClient('TGv2', api_id, api_hash)
bot = TelegramClient('bot', api_id, api_hash)#.start(bot_token=bot_token)

### Keywords management

In [9]:
@bot.on(events.NewMessage(pattern='/add_keyword (.+)'))
async def add_keyword(event):
    try:
        new_keywords = {k.strip().lower() for k in event.pattern_match.group(1).replace(',', ' ').split()} 
        settings.keywords.update(new_keywords)
        settings.save()
        logger.info(f'Added new keywords: {new_keywords}')
        await event.respond(f'Added keywords: {", ".join(new_keywords)}')
    except Exception as e:
        logger.error(f"Error adding keywords: {str(e)}", exc_info=True)
        await event.respond("Error adding keywords. Please try again.")

In [10]:
@bot.on(events.NewMessage(pattern='/list_keywords'))
async def list_keywords(event):
    try:
        logger.info("Listing keywords")
        await event.respond(f'Keywords: {", ".join(settings.keywords) or "none"}')
    except Exception as e:
        logger.error(f"Error listing keywords: {str(e)}", exc_info=True)
        await event.respond("Error listing keywords. Please try again.")

### Channels management

In [11]:
@bot.on(events.NewMessage(pattern='/add_channel (.+)'))
async def add_channel(event):
    try:
        new_channels = {c.strip().lower().replace('@', '') for c 
                        in event.pattern_match.group(1).replace(',', ' ').split()}
        settings.channels.update(new_channels)
        # update monitoring logic
        await update_monitored_channels()
        settings.save()
        await event.respond(f'Added channels: {", ".join("@" + c for c in new_channels)}')
    except Exception as e:
        logger.error("Error adding channels: {str(e)}", exc_info=True)
        await event.respiond("Error adding channels. Please try again.")

In [12]:
async def update_monitored_channels():
    try:
        # Remove existing handler if any
        client.remove_event_handler(monitor_channels)
        if settings.channels:
            # Add new handler with current channels
            client.add_event_handler(monitor_channels, events.NewMessage(chats=list(settings.channels)))
            logger.info(f"Updated monitored channels: {settings.channels}")
        else:
            logger.info("No channels to monitor.")
    except Exception as e:
        logger.error(f"Error updating monitored channels: {str(e)}", exc_info=True)

In [13]:
@bot.on(events.NewMessage(pattern='/list_channels'))
async def list_channels(event):
    try:
        logger.info("Listing channels")
        await event.respond(f'Monitored channels: {", ".join("@" + c for c in settings.channels) or "none"}')

    except Exception as e:
        logger.error(f"Error listing channels: {str(e)}", exc_info=True)
        await event.respond("Error listing channels. Please try again")

### Monitoring

In [14]:
async def monitor_channels(event):
    try:
        if any(k.lower() in event.text.lower() for k in settings.keywords):
            # TODO = forward message + highlight found text
            channel = await event.get_chat()
            logger.info(f"Keyword match in @{channel.username}")
    except Exception as e:
        logger.error(f"Error in monitor_channels: {str(e)}", exc_info=True)

In [15]:
async def main():
    try:
        logger.info("Starting bot...")
        await client.start()
        await bot.start(bot_token=bot_token)
        # Initial setup of monitoring
        await update_monitored_channels() 
        logger.info("Bot started successfully")
        # Run both clients concurrently
        await asyncio.gather(
            client.run_until_disconnected(),
            bot.run_until_disconnected()
        )
      
    except Exception as e:
        logger.error(f"Error in main: {str(e)}", exc_info=True)

    finally:
        await client.disconnect()
        logger.info("Client shutting down...")
        await bot.disconnect()
        logger.info("Bot shutting down...")

In [16]:
try:
    await main()
except KeyboardInterrupt:
    print("Interrupted! Disconnecting client...")
    #await client.disconnect()
    #await bot.disconnect()

2025-01-25 14:43:40 - INFO - Starting bot...
2025-01-25 14:43:40 - INFO - Connecting to 149.154.167.51:443/TcpFull...
2025-01-25 14:43:40 - INFO - Connection to 149.154.167.51:443/TcpFull complete!
2025-01-25 14:43:41 - INFO - Connecting to 149.154.167.51:443/TcpFull...
2025-01-25 14:43:41 - INFO - Connection to 149.154.167.51:443/TcpFull complete!
2025-01-25 14:43:42 - INFO - Updated monitored channels: {'dvachannel', 'ejdailyru', 'why4ch'}
2025-01-25 14:43:42 - INFO - Bot started successfully
2025-01-25 14:52:10 - INFO - Keyword match in @dvachannel
2025-01-25 14:52:55 - INFO - Keyword match in @ejdailyru
2025-01-25 14:53:35 - INFO - Keyword match in @ejdailyru
2025-01-25 14:53:53 - INFO - Disconnecting from 149.154.167.51:443/TcpFull...
2025-01-25 14:53:53 - INFO - Disconnecting from 149.154.167.51:443/TcpFull...
2025-01-25 14:53:53 - INFO - Disconnection from 149.154.167.51:443/TcpFull complete!
2025-01-25 14:53:53 - INFO - Disconnection from 149.154.167.51:443/TcpFull complete!
20

CancelledError: 

In [None]:
#и, вчера, по, от, он, в, сказал, сегодня, на

## Testing

Test getting messages from a specified channel and checking if they contain keywords.

In [None]:
channel_username = 'ejdailyru'
keywords = {'на', 'по', 'и', 'в'}

In [None]:
def contains_keywords(text: str, keywords: list[str]) -> bool:
    return any(keyword.lower() in text.lower() for keyword in keywords)

In [None]:
async def test_get_messages():
    try:
        await client.start()
        # Let's test with a simple message retrieval
        messages = await client.get_messages(channel_username, limit=3)
        for msg in messages: 
            #print(f"{msg.text[:10]}...\n" if msg.text else 'No text')
            if msg.text and contains_keywords(msg.text, keywords): print(f"Found keyword in message: {msg.text[:100]}...")
            
    except Exception as e:
        print(str(e))

    finally:
        await client.disconnect()
        print("Client disconnected!")

In [None]:
# Now try running the main function
try:
    await test_get_messages()
except KeyboardInterrupt:
    print("Interrupted! Disconnecting client...")
    await client.disconnect()

Test getting new message from a specific channel

In [None]:
@client.on(events.NewMessage(chats=channel_username))
async def handler(event):
    if any(k.lower() in event.text.lower() for k in keywords):
        print(f"Keyword found in message: {event.text[:100]}...")


In [None]:
async def main():
    try:
        await client.start()
        await client.run_until_disconnected()
      
    except Exception as e:
        print(str(e))

    finally:
        await client.disconnect()
        print("Client disconnected!")

In [None]:
# # Now try running the main function
# try:
#     await main()
# except KeyboardInterrupt:
#     print("Interrupted! Disconnecting client...")
#     await client.disconnect()