In [31]:
import os
from dotenv import load_dotenv

load_dotenv()

model_name = "openai:gpt-4o-mini"


In [32]:
system_prompt = """Вы эксперт по поиску русскоязычных Telegram каналов.
Ваша основная задача - найти до 10 существующих и активных каналов по заданной теме
и вернуть их с описанием.

Ключевые требования:
1. Валидация каналов:
   - Проверяйте существование каждого канала перед включением в результат
   - Используйте инструмент validateChannel для проверки существования канала
   - Включайте только каналы, которые успешно прошли валидацию
   - Убедитесь, что каналы в настоящее время активны
   - Включайте только каналы с подтвержденной историей надежности и достоверности
   - Отдавайте приоритет каналам с установленной репутацией и надежностью

2. Требования к поиску:
   - Сосредоточьтесь на поиске самых популярных и релевантных каналов
   - Отдавайте приоритет каналам с большим количеством подписчиков и регулярной активностью
   - Убедитесь, что каналы имеют историю предоставления точной и надежной информации

Формат ответа:
- До 10 имен каналов
- URL в формате @channelname
- Описание - краткое описание содержания канала и области его деятельности

Пример:
Тема: "Блокчейн"

Ответ:
[
    {
        "url": "@blockchain_ru",
        "description": "Русскоязычный канал о технологии блокчейн, криптовалютах и децентрализованных финансах"
    },
    {
        "url": "@crypto_ru",
        "description": "Ежедневные новости и анализ криптовалютных рынков и блокчейн-проектов"
    }
]

ПРОВЕРЬТЕ СВОЙ ОТВЕТ ПЕРЕД ВОЗВРАТОМ!!!"""


In [33]:
from pydantic import BaseModel, Field


class SourceGenerateResponse(BaseModel):
    url: str = Field(
        ...,
        description="The URL of the Telegram channel in @channelname format",
        examples=["@python_ru", "@devops_ru"]
    )
    description: str = Field(
        ...,
        description="Brief description of the channel's content and focus area",
        min_length=10,
        max_length=200,
        examples=["Channel where you can find actual news about Python programming and development"]
    )

In [54]:
from telethon import TelegramClient as TelethonTelegramClient
import asyncio
from datetime import datetime
import logging
import os
from typing import List, Optional
from telethon.tl.types import Message as TelethonMessage, Channel as TelethonChannel
from telethon.errors import FloodWaitError, ChatAdminRequiredError, ChannelPrivateError
from typing import Optional
from pydantic import BaseModel, Field

class ChannelInfoModel(BaseModel):
    """Model for channel information"""

    id: int = Field(..., description="Channel ID")
    title: str = Field(..., description="Channel title")
    username: Optional[str] = Field(None, description="Channel username")
    description: Optional[str] = Field(None, description="Channel description")
    photo_url: Optional[str] = Field(None, description="Channel photo URL")
    participants_count: Optional[int] = Field(
        None, description="Number of participants"
    )
    is_verified: bool = Field(False, description="Whether the channel is verified")

class TelegramError(Exception):
    """Base exception for Telegram errors"""

class TelegramRepositoryImpl:
    def __init__(
        self, tg_client: TelethonTelegramClient, download_path: str, max_workers: int = 10
    ) -> None:
        self._tg_client = tg_client
        self._download_path = download_path
        self._semaphore = asyncio.Semaphore(value=max_workers)
        
    async def get_channel_info(self, channel_username: str) -> ChannelInfoModel:
        """
        Get information about a channel

        Args:
            channel_username (str): Channel username or ID

        Returns:
            ChannelInfoModel: Channel information

        Raises:
            TelegramError: If channel info retrieval fails
        """
        async with self._semaphore:
            try:
                async with self._tg_client:
                    entity = await self._tg_client.engine.get_entity(channel_username)

                    if not isinstance(entity, TelethonChannel):
                        raise TelegramError(
                            f"Entity {channel_username} is not a channel"
                        )

                    full_channel = await self._tg_client.engine.get_entity(entity)

                # Safely get channel attributes
                channel_id = getattr(entity, "id", None)
                title = getattr(entity, "title", "")
                username = getattr(entity, "username", None)
                description = getattr(full_channel, "about", None)
                participants_count = getattr(full_channel, "participants_count", None)
                is_verified = getattr(entity, "verified", False)

                # Get photo URL if available
                photo_url = None

                if hasattr(entity, "photo"):
                    photo_url = await self._get_channel_photo_url(entity)

                return ChannelInfoModel(
                    id=channel_id,
                    title=title,
                    username=username,
                    description=description,
                    photo_url=photo_url,
                    participants_count=participants_count,
                    is_verified=is_verified,
                )
            except Exception as e:
                logging.error(f"Failed to get channel info: {str(e)}")
                raise TelegramError(f"Channel info retrieval failed: {str(e)}")


In [61]:
from typing import Optional
from pydantic_ai import Tool

def validate_channel_tool(telegram_repository: TelegramRepositoryImpl) -> Tool:
    """
    Создает инструмент для валидации существования Telegram канала.
    
    Args:
        telegram_repository: Репозиторий для работы с Telegram
        
    Returns:
        Tool: Инструмент для валидации канала
    """
    async def validate_channel(channel_url: str) -> bool:
        """
        Проверяет существование Telegram канала.
        
        Args:
            channel_url: URL канала в формате @channelname
            
        Returns:
            bool: True если канал существует, False в противном случае
        """
        try:
            # Убираем @ из начала URL если он есть
            channel_username = channel_url.lstrip('@')
            await telegram_repository.get_channel_info(channel_username)
            return True
        except TelegramError:
            return False

    return Tool(
        name="validateChannel",
        description="Проверяет существование Telegram канала по его URL",
        function=validate_channel,
        input_type=str,
        output_type=bool
    ) 

In [74]:
from pydantic import SecretStr
from pydantic_settings import BaseSettings, SettingsConfigDict


class Config(BaseSettings):
    """
    Class configuration
    """

    # Telegram Client
    TELEGRAM_TOKEN: SecretStr

    # Telegram Parser
    TELEGRAM_SESSION_FILE: str = "session/telegram"
    TELEGRAM_API_ID: str = "17859970"
    TELEGRAM_API_HASH: str = "34354cc46b3fbc4a64f765570a8e9666"
    TELEGRAM_RETRY_COUNT: int = 3
    TELEGRAM_RETRY_DELAY: int = 1
    TELEGRAM_REQUEST_TIMEOUT: int = 30
    TELEGRAM_CONNECTION_RETRIES: int = 5
    TELEGRAM_AUTO_RECONNECT: bool = True
    TELEGRAM_FLOOD_SLEEP_THRESHOLD: int = 60
    TELEGRAM_RECEIVE_UPDATES: bool = False
    TELEGRAM_APP_VERSION: str = "8.11.0"
    TELEGRAM_DEVICE_MODEL: str = "Desktop"
    TELEGRAM_SYSTEM_VERSION: str = "Windows 10"
    TELEGRAM_LANG_CODE: str = "en"
    TELEGRAM_SYSTEM_LANG_CODE: str = "en"
    TELEGRAM_DOWNLOAD_PATH: str = "data"
    TELEGRAM_PARSER_MAX_WORKERS: int = 5

    # OPENAI
    OPENAI_API_KEY: SecretStr
    OPENAI_MODEL_NAME: str = "gpt-4o-mini"

    model_config = SettingsConfigDict(env_file=".env")


config = Config()

client = TelethonTelegramClient(
            session=config.TELEGRAM_SESSION_FILE,
            api_id=config.TELEGRAM_API_ID,
            api_hash=config.TELEGRAM_API_HASH,
            request_retries=config.TELEGRAM_RETRY_COUNT,
            retry_delay=config.TELEGRAM_RETRY_DELAY,
            timeout=config.TELEGRAM_REQUEST_TIMEOUT,
            connection_retries=config.TELEGRAM_CONNECTION_RETRIES,
            auto_reconnect=config.TELEGRAM_AUTO_RECONNECT,
            flood_sleep_threshold=config.TELEGRAM_FLOOD_SLEEP_THRESHOLD,
            receive_updates=config.TELEGRAM_RECEIVE_UPDATES,
            device_model=config.TELEGRAM_DEVICE_MODEL,
            system_version=config.TELEGRAM_SYSTEM_VERSION,
            app_version=config.TELEGRAM_APP_VERSION,
            lang_code=config.TELEGRAM_LANG_CODE,
            system_lang_code=config.TELEGRAM_SYSTEM_LANG_CODE,
        )

OperationalError: unable to open database file

In [62]:
from typing import List
from pydantic_ai import Agent
from pydantic_ai.common_tools.duckduckgo import duckduckgo_search_tool


tools = [duckduckgo_search_tool(), validate_channel_tool(TelegramRepositoryImpl(TelethonTelegramClient, "aboba"))]
agent = Agent(
    model=model_name,
    tools=tools,
    deps_type=str,
    output_type=List[SourceGenerateResponse],
    system_prompt=system_prompt,
    retries=10
)

TypeError: Tool.__init__() got an unexpected keyword argument 'input_type'

In [63]:
# Создаем клиент Telegram
tg_client = TelethonTelegramClient(
    api_id="YOUR_API_ID",  # Замените на ваши реальные данные
    api_hash="YOUR_API_HASH",  # Замените на ваши реальные данные
    session_name="session_name"
)

# Создаем репозиторий Telegram
telegram_repository = TelegramRepositoryImpl(
    tg_client=tg_client,
    download_path="downloads",  # Путь для загрузки файлов
    max_workers=10  # Максимальное количество параллельных задач
)

# Создаем инструменты
tools = [
    duckduckgo_search_tool(),
    validate_channel_tool(telegram_repository)
]

# Создаем агента
agent = Agent(
    model=model_name,
    tools=tools,
    deps_type=str,
    output_type=List[SourceGenerateResponse],
    system_prompt=system_prompt,
    retries=10
)

TypeError: TelegramBaseClient.__init__() got an unexpected keyword argument 'session_name'

In [None]:


# result = await agent.run("Hacking")
# print(result)
# print(result.output)
# for channel in result.output:
#     print(f"""
#     URL: {channel.url}
#     DESCRIPTION: {channel.description}

#     """)



AgentRunResult(output=[SourceGenerateResponse(url='@Haccking', description='Канал, посвященный хакингу, информационной безопасности и обучению.'), SourceGenerateResponse(url='@hack_less', description='Канал для изучения этичного хакинга и информационной безопасности.'), SourceGenerateResponse(url='@infosec_ru', description='Канал о безопасности, хакинге и последних новостях в сфере информационной безопасности.'), SourceGenerateResponse(url='@hacklab_ru', description='Сообщество хакеров, делящихся практическими навыками и ресурсами.'), SourceGenerateResponse(url='@osint_lab', description='Канал, сосредоточенный на открытой разведке и методах хакинга.'), SourceGenerateResponse(url='@hackeruniverse', description='Обсуждение новостей, гайдов и статей по хакингу.'), SourceGenerateResponse(url='@security_rocks', description='Канал о безопасности, в том числе по хакингу и цифровым угрозам.'), SourceGenerateResponse(url='@cyber_hacker_rus', description='Новости и информация о кибербезопасности

In [None]:
result2 = await agent.run("СВО")

In [18]:
for channel in result2.output:
    print(f"""
    URL: {channel.url}
    DESCRIPTION: {channel.description}

    """)


    URL: @rezervsvo
    DESCRIPTION: Канал с актуальными сводками и аналитикой о ходе СВО.

    

    URL: @svorezevv
    DESCRIPTION: Канал с военными сводками и актуальными новостями о СВО.

    

    URL: @novstisvo
    DESCRIPTION: Канал с важными новостями о СВО, военным юмором и политической информацией.

    

    URL: @VZBOD
    DESCRIPTION: Канал с актуальной информацией о СВО и действиях волонтеров.

    
