# 6.9.1 Проект - решение


In [None]:
import asyncio
import urllib.request
from typing import Any, Dict, List

RAW_URL = "https://raw.githubusercontent.com/nefelsay/asyncio/main/call_company.py"

def load_companies() -> List[Dict[str, Any]]:
    # Загружаем список компаний по ссылке из задания.
    # Обычно в файле содержится строка вида: companies = [...]
    with urllib.request.urlopen(RAW_URL, timeout=30) as resp:
        text = resp.read().decode("utf-8")

    import re, ast
    m = re.search(r"companies\s*=\s*(\[[\s\S]*\])", text)
    if not m:
        # если файл просто содержит список
        m = re.search(r"(\[[\s\S]*\])", text)
    if not m:
        raise ValueError("Не удалось найти список компаний в файле call_company.py")

    return ast.literal_eval(m.group(1))

async def call_company(company: Dict[str, Any]) -> str:
    # Имитируем звонок компании. Ждём company['call_time'] секунд.
    name = company.get("name") or company.get("company") or company.get("Company") or company.get("title") or "Company"
    phone = company.get("phone") or company.get("phone_number") or company.get("Phone") or company.get("tel") or ""
    call_time = int(company.get("call_time", 0))

    try:
        await asyncio.sleep(call_time)
        return f"Company {name}: {phone} дозвон успешен"
    except asyncio.CancelledError:
        # Компания не ответила / задача отменена
        raise

async def main():
    companies = load_companies()
    tasks = [asyncio.create_task(call_company(c)) for c in companies]

    # Общий лимит времени на дозвон: 10 секунд
    async def cancel_after_10s():
        await asyncio.sleep(10)
        for t in tasks:
            if not t.done():
                t.cancel()

    canceller = asyncio.create_task(cancel_after_10s())

    # Индивидуальный лимит на ответ: 5 секунд
    for t in tasks:
        try:
            result = await asyncio.wait_for(t, timeout=5)
            print(result)
        except asyncio.TimeoutError:
            t.cancel()
            try:
                await t
            except asyncio.CancelledError:
                pass
        except asyncio.CancelledError:
            pass

    await canceller

asyncio.run(main())
