Импорты и подготовка окружения

In [1]:
import asyncio
import ast
import json
import os
from functools import partial
from time import sleep
from typing import Iterator

import dashscope
import requests
from tqdm import tqdm

# Переход в корневую директорию проекта
os.chdir('..')

from dataset.src.repository import ClipRepository

Класс ApiKeys (для перебора ключей и отключения перегруженных)

In [2]:
class ApiKeys(Iterator[str]):
    """
    Класс для управления списком API-ключей с поддержкой ротации и исключения невалидных ключей.
    """

    def __init__(self, items: list[str]) -> None:
        self.items: list[str] = items
        self.index: int = 0

    def __next__(self) -> str:
        value = self.items[self.index]
        self.index = (self.index + 1) % len(self.items)
        return value

    def __bool__(self) -> bool:
        return bool(self.items)

    def __len__(self) -> int:
        return len(self.items)

    def remove(self, item: str) -> None:
        if item not in self.items:
            return
        idx = self.items.index(item)
        self.items.remove(item)
        if not self.items:
            raise RuntimeError("Список API-ключей пуст")
        if idx < self.index:
            self.index -= 1
        self.index %= len(self.items)

    def __iter__(self) -> Iterator[str]:
        return self


api_keys = ApiKeys(json.load(open('dataset/qwen_api_keys.json'))) # Загрузка API-ключей

Загрузка изображений

In [3]:
img_urls = []
for i in tqdm(range(100)):
    for attempt in range(3):
        try:
            response = requests.get(f'https://picsum.photos/1000/1000?random={i}', timeout=10)
            if response.status_code == 200:
                img_urls.append(response.url)
                break
        except requests.exceptions.SSLError as e:
            sleep(1)
        except requests.exceptions.RequestException as e:
            print(response)
            continue
    else:
        print(f'Failed to load image {i} after 3 attempts')

100%|██████████| 100/100 [03:12<00:00,  1.93s/it]


Промпт для генерации описаний

In [4]:
prompt = '''
Cоздай список из 10 коротких описаний (3–7 слов каждое) на русском языке,
описывающих это изображение. Используй формат Python-списка строк, например:
["Собака бежит по снегу", "Пёс играет на улице", ..., "Активная прогулка в зимнем лесу"].
'''

Асинхронный вызов модели qwen

In [5]:
semaphore = asyncio.Semaphore(len(api_keys))

async def get_label_of_image(img_url) -> str:
    async with semaphore:
        loop = asyncio.get_event_loop()
        current_key = next(api_keys)
        dashscope.api_key = current_key
        messages = [{
            'role': 'user',
            'content': [
                {'image': img_url},
                {'text': prompt},
            ]
        }]
        response = await loop.run_in_executor(
            None,
            partial(dashscope.MultiModalConversation.call,
                    model='qwen2.5-vl-72b-instruct',
                    messages=messages)
        )
        if response['status_code'] != 200:
            api_keys.remove(current_key)
            return await get_label_of_image(img_url)
        ClipRepository.add(img_url, ast.literal_eval(response.output.choices[0].message.content[0]['text']))

Запуск асинхронных задач

In [None]:
tasks = [get_label_of_image(url) for url in img_urls]
try:
    await asyncio.gather(*tasks)
except RuntimeError as e:
    print(e)