In [2]:
import aiohttp
import asyncio
import pandas as pd
import random
import time
from datetime import datetime, timedelta, timezone
import logging

In [3]:
# === –ù–∞—Å—Ç—Ä–æ–π–∫–∏ ===
header = {'User-Agent': 'HH-Data-Coll/v2.0 (contact 135861v@mail.ru)'}
area_id_russia = 113
per_page = 100
max_pages = 100
semaphore = asyncio.Semaphore(5)

# –û–ø—ã—Ç
experiences = ['noExperience', 'between1And3', 'between3And6', 'moreThan6']

# –ù–∞—Å—Ç—Ä–æ–π–∫–∏ –ª–æ–≥–≥–∏—Ä–æ–≤–∞–Ω–∏—è
log_filename = f"hh_parser_log_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.log"

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",  
    handlers=[
        logging.FileHandler(log_filename, encoding="utf-8"), 
        logging.StreamHandler()
    ]
)

def log_elapsed_time(start, task="Code"):
    end = time.perf_counter()
    elapsed = end - start
    mins, secs = divmod(elapsed, 60)
    hrs, mins = divmod(mins, 60)
    logging.info(f"‚è± –í—Ä–µ–º—è –≤—ã–ø–æ–ª–Ω–µ–Ω–∏—è ({task}): {int(hrs):02d}:{int(mins):02d}:{secs:.2f} —Å–µ–∫—É–Ω–¥")


# –ü–µ—Ä–∏–æ–¥: –ø–æ—Å–ª–µ–¥–Ω–∏–µ 30 –¥–Ω–µ–π —Å —à–∞–≥–æ–º 7 –¥–Ω–µ–π
def get_date_ranges(days=30, step=7):
    today = datetime.now(timezone.utc).date()
    start_date = today - timedelta(days=days)
    ranges = []
    current = start_date
    while current < today:
        end = min(current + timedelta(days=step), today)
        ranges.append((current.isoformat(), end.isoformat()))
        current = end
    return ranges

# –ó–∞–≥—Ä—É–∑–∫–∞ —Å–ø—Ä–∞–≤–æ—á–Ω–∏–∫–æ–≤ –¥–ª—è –≤–∞–ª–∏–¥–∞—Ü–∏–∏ –¥–∞–Ω–Ω—ã—Ö
async def fetch_reference_data():
    async with aiohttp.ClientSession(headers=header) as session:
        async with session.get('https://api.hh.ru/dictionaries') as resp:
            dict_data = await resp.json()

        async with session.get('https://api.hh.ru/areas') as resp:
            area_data = await resp.json()
            area_ids = set()

            def extract_area_ids(areas):
                for a in areas:
                    area_ids.add(str(a['id']))
                    if 'areas' in a and a['areas']:
                        extract_area_ids(a['areas'])

            extract_area_ids(area_data)

        return {
            'experience': {e['name'] for e in dict_data['experience']},
            'employment': {e['name'] for e in dict_data['employment']},
            'schedule': {e['name'] for e in dict_data['schedule']},
            'area_ids': area_ids
        }

# RateLimiter
class RateLimiter:
    def __init__(self, max_rps=3, long_pause_every=500, long_pause_duration=60):
        self.max_rps = max_rps
        self.long_pause_every = long_pause_every
        self.long_pause_duration = long_pause_duration
        self.last_request = None
        self.request_count = 0
        self.lock = asyncio.Lock()

    async def wait(self):
        async with self.lock:
            now = time.monotonic()
            if self.last_request is not None:
                elapsed = now - self.last_request
                delay = max(0, 1.0 / self.max_rps - elapsed)
                if delay > 0:
                    await asyncio.sleep(delay)
            self.last_request = time.monotonic()
            self.request_count += 1
            if self.request_count % self.long_pause_every == 0:
                logging.info(f"–î–æ–ª–≥–∞—è –ø–∞—É–∑–∞ {self.long_pause_duration} —Å–µ–∫ –ø–æ—Å–ª–µ {self.request_count} –∑–∞–ø—Ä–æ—Å–æ–≤...")
                await asyncio.sleep(self.long_pause_duration)

limiter = RateLimiter()


In [4]:
#=== –§—É–Ω–∫—Ü–∏–∏ —Å–±–æ—Ä–∞ –≤–∞–∫–∞–Ω—Å–∏–π ===
# –ü–æ–ª—É—á–µ–Ω–∏–µ IT-—Ä–æ–ª–µ–π
async def fetch_it_roles():
    url = 'https://api.hh.ru/professional_roles'
    async with aiohttp.ClientSession(headers=header) as session:
        async with session.get(url) as response:
            response.raise_for_status()
            data = await response.json()
            for cat in data['categories']:
                if int(cat['id']) == 11:
                    roles = [
                        {'id': int(role['id']), 'name': role['name']}
                        for role in cat['roles']
                        if int(role['id']) not in [12, 25, 34, 155]
                    ]
                    logging.info(f'üîπ –ù–∞–π–¥–µ–Ω–æ {len(roles)} IT-—Ä–æ–ª–µ–π')
                    return roles
            return []

# –ó–∞–≥—Ä—É–∑–∫–∞ –æ–¥–Ω–æ–π —Å—Ç—Ä–∞–Ω–∏—Ü—ã —Å –ø–æ–≤—Ç–æ—Ä–Ω—ã–º–∏ –ø–æ–ø—ã—Ç–∫–∞–º–∏
async def fetch_page(session, params, page, desc, max_retries=3):
    params['page'] = page
    for attempt in range(1, max_retries + 1):
        await limiter.wait()
        try:
            async with semaphore:
                async with session.get('https://api.hh.ru/vacancies', params=params) as response:
                    if response.status == 403:
                        logging.warning(f"üö´ –û—à–∏–±–∫–∞ 403 –Ω–∞ —Å—Ç—Ä–∞–Ω–∏—Ü–µ {page} ‚Äî {desc} (–ø–æ–ø—ã—Ç–∫–∞ {attempt})")
                    elif response.status >= 400:
                        logging.warning(f"‚ö†Ô∏è –û—à–∏–±–∫–∞ {response.status} –Ω–∞ —Å—Ç—Ä–∞–Ω–∏—Ü–µ {page} ‚Äî {desc} (–ø–æ–ø—ã—Ç–∫–∞ {attempt})")
                    response.raise_for_status()
                    return await response.json()
        except Exception as e:
            logging.warning(f"‚ö†Ô∏è –ò—Å–∫–ª—é—á–µ–Ω–∏–µ –ø—Ä–∏ –∑–∞–≥—Ä—É–∑–∫–µ —Å—Ç—Ä–∞–Ω–∏—Ü—ã {page} ‚Äî {desc} (–ø–æ–ø—ã—Ç–∫–∞ {attempt}): {e}")
            await asyncio.sleep(random.uniform(1.0, 2.0))

    error_msg = f"‚ùå –û—à–∏–±–∫–∞: –Ω–µ —É–¥–∞–ª–æ—Å—å –∑–∞–≥—Ä—É–∑–∏—Ç—å —Å—Ç—Ä–∞–Ω–∏—Ü—É {page} ‚Äî {desc} –ø–æ—Å–ª–µ {max_retries} –ø–æ–ø—ã—Ç–æ–∫. –û—Å—Ç–∞–Ω–æ–≤–∫–∞."
    logging.error(error_msg)
    raise RuntimeError(error_msg)

# –°–±–æ—Ä –≤–∞–∫–∞–Ω—Å–∏–π –ø–æ –∫–æ–º–±–∏–Ω–∞—Ü–∏–∏ —Ä–æ–ª–µ–π, –æ–ø—ã—Ç–∞ –∏ –¥–∞—Ç
async def fetch_vacancies(session, role_id, role_name, experience, date_from, date_to):
    desc = f"—Ä–æ–ª—å '{role_name}', –æ–ø—ã—Ç '{experience}', {date_from}‚Äì{date_to}"
    vacancies = []
    params = {
        'professional_role': role_id,
        'area': area_id_russia,
        'experience': experience,
        'date_from': date_from,
        'date_to': date_to,
        'per_page': per_page,
    }

    for page in range(max_pages):
        data = await fetch_page(session, params, page, desc)
        if data is None:
            break
        items = data.get('items', [])
        if not items:
            break
        vacancies.extend(items)
        if page >= data.get('pages', 0) - 1:
            break
        await asyncio.sleep(random.uniform(0.3, 0.6))

    logging.info(f"‚úÖ –°–æ–±—Ä–∞–Ω–æ {len(vacancies)} –≤–∞–∫–∞–Ω—Å–∏–π ‚Äî {desc}")
    return vacancies

# –û—Å–Ω–æ–≤–Ω–æ–π —Å–±–æ—Ä –¥–∞–Ω–Ω—ã—Ö
async def collect_vacancies_async():
    all_vacancies = []
    date_ranges = get_date_ranges()
    roles = await fetch_it_roles()

    try:
        async with aiohttp.ClientSession(headers=header) as session:
            for role in roles:
                logging.info(f"\n== –†–æ–ª—å: {role['name']} (ID {role['id']}) ==")
                for exp in experiences:
                    for date_from, date_to in date_ranges:
                        batch = await fetch_vacancies(session, role['id'], role['name'], exp, date_from, date_to)
                        all_vacancies.extend(batch)

    except RuntimeError as critical_error:
        logging.critical(f"üö® –ü–∞—Ä—Å–∏–Ω–≥ –ø—Ä–µ—Ä–≤–∞–Ω: {critical_error}")
        raise  

    # logging.info(f"\nüöÄ –í—Å–µ–≥–æ —Å–æ–±—Ä–∞–Ω–æ {len(all_vacancies)} –≤–∞–∫–∞–Ω—Å–∏–π")
    return all_vacancies

# –ü—Ä–µ–æ–±—Ä–∞–∑–æ–≤–∞–Ω–∏–µ –≤–∞–∫–∞–Ω—Å–∏–π –≤ DataFrame
def extract_vacancy_data(raw_vacancies):
    extracted = []
    for v in raw_vacancies:
        area_info = v.get('area', {})
        salary_range = v.get('salary_range') or {}
        
        extracted.append({
            'load_datetime': datetime.now(),
            'professional_role': v.get('professional_roles', [{}])[0].get('name') 
                                 if v.get('professional_roles') else None,
            'id': v.get('id'),
            'name': v.get('name'),
            'area': area_info.get('name'),
            'area_id': area_info.get('id'),
            'employer': v.get('employer', {}).get('name'),
            'accredited_it_employer': v.get('employer', {}).get('accredited_it_employer'),
            'trusted_employer': v.get('employer', {}).get('trusted'),
            'published_at': v.get('published_at'),
            'created_at': v.get('created_at'),
            'closed_at': v.get('closed_at'),
            'archived': v.get('archived'),
            'url': v.get('alternate_url'),
            'salary_from': v.get('salary', {}).get('from') if v.get('salary') else None,
            'salary_to': v.get('salary', {}).get('to') if v.get('salary') else None,
            'currency': v.get('salary', {}).get('currency') if v.get('salary') else None,
            'salary_mode': (salary_range.get('mode') or {}).get('name'),        # –ó–∞ –º–µ—Å—è—Ü, –∑–∞ —á–∞—Å...
            'salary_frequency': (salary_range.get('frequency') or {}).get('name'),  # –†–∞–∑ –≤ –º–µ—Å—è—Ü –∏ —Ç.–ø.
            'experience': v.get('experience', {}).get('name'),
            'schedule': v.get('schedule', {}).get('name'),
            'employment': v.get('employment', {}).get('name'),
        })
    return pd.DataFrame(extracted).drop_duplicates('id', keep='last')


# –í–∞–ª–∏–¥–∞—Ü–∏—è –¥–∞–Ω–Ω—ã—Ö
def validate_vacancies(df: pd.DataFrame, dicts: dict):
    required_fields = ['id', 'name', 'area', 'area_id', 'published_at']
    errors = []

    def is_empty(val):
        return val is None or val == "" or (isinstance(val, list) and len(val) == 0)

    invalid_rows = []

    for i, row in df.iterrows():
        row_errors = []

        # –ü—Ä–æ–≤–µ—Ä–∫–∞ –æ–±—è–∑–∞—Ç–µ–ª—å–Ω—ã—Ö –ø–æ–ª–µ–π –Ω–∞ –ø—É—Å—Ç–æ—Ç—É
        for field in required_fields:
            if field not in row or is_empty(row[field]):
                row_errors.append(f"–ü—É—Å—Ç–æ–µ –æ–±—è–∑–∞—Ç–µ–ª—å–Ω–æ–µ –ø–æ–ª–µ '{field}'")

        # –ü—Ä–æ–≤–µ—Ä–∫–∞ –∑–Ω–∞—á–µ–Ω–∏–π –∏–∑ —Å–ø—Ä–∞–≤–æ—á–Ω–∏–∫–æ–≤
        if 'experience' in row and row['experience'] not in dicts['experience'] and not is_empty(row['experience']):
            row_errors.append(f"–ù–µ–∏–∑–≤–µ—Å—Ç–Ω–æ–µ experience: {row['experience']}")
        if 'employment' in row and row['employment'] not in dicts['employment'] and not is_empty(row['employment']):
            row_errors.append(f"–ù–µ–∏–∑–≤–µ—Å—Ç–Ω–æ–µ employment: {row['employment']}")
        if 'schedule' in row and row['schedule'] not in dicts['schedule'] and not is_empty(row['schedule']):
            row_errors.append(f"–ù–µ–∏–∑–≤–µ—Å—Ç–Ω–æ–µ schedule: {row['schedule']}")
        if 'area_id' in row and str(row['area_id']) not in dicts['area_ids'] and not is_empty(row['area_id']):
            row_errors.append(f"–ù–µ–∏–∑–≤–µ—Å—Ç–Ω—ã–π area_id: {row['area_id']}")

        if row_errors:
            errors.append((i, row_errors))
            invalid_rows.append(i)

    if errors:
        for i, errs in errors:
            logging.warning(f"–°—Ç—Ä–æ–∫–∞ {i} –æ—Ç–∫–ª–æ–Ω–µ–Ω–∞: {', '.join(errs)}")

    reject_df = df.loc[invalid_rows].copy()
    valid_df = df.drop(index=invalid_rows).reset_index(drop=True)

    return valid_df.reset_index(drop=True), reject_df.reset_index(drop=True)



In [5]:
# –ó–∞–ø—É—Å–∫ —Å–±–æ—Ä–∞ –≤–∞–∫–∞–Ω—Å–∏–π
start = time.perf_counter()

raw_vacancies = await collect_vacancies_async()
df = extract_vacancy_data(raw_vacancies)

reference_data = await fetch_reference_data()
valid_df, reject_df = validate_vacancies(df, reference_data)

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
reject_filename = f"rejected_vacancies_{timestamp}_reject.csv"
if not reject_df.empty:
    reject_df.to_csv(reject_filename, index=False)
    logging.info(f"\nüö´ {len(reject_df)} –≤–∞–∫–∞–Ω—Å–∏–π –æ—Ç–∫–ª–æ–Ω–µ–Ω–æ –∏ —Å–æ—Ö—Ä–∞–Ω–µ–Ω–æ –≤ —Ñ–∞–π–ª {reject_filename}")
else:
    logging.info("\n‚úÖ –í—Å–µ –≤–∞–∫–∞–Ω—Å–∏–∏ –ø—Ä–æ—à–ª–∏ –≤–∞–ª–∏–¥–∞—Ü–∏—é. Reject-—Ñ–∞–π–ª –Ω–µ —Å–æ–∑–¥–∞–Ω.")


logging.info(f"\nüöÄ –í—Å–µ–≥–æ —Å–æ–±—Ä–∞–Ω–æ –≤–∞–∫–∞–Ω—Å–∏–π: {len(raw_vacancies)}")
logging.info(f"üì¶ –ó–∞–≥—Ä—É–∂–µ–Ω–æ —É–Ω–∏–∫–∞–ª—å–Ω—ã—Ö id: {valid_df['id'].nunique()}")

log_elapsed_time(start, "hh_parser")


2025-08-25 12:40:40,050 [INFO] üîπ –ù–∞–π–¥–µ–Ω–æ 21 IT-—Ä–æ–ª–µ–π
2025-08-25 12:40:40,056 [INFO] 
== –†–æ–ª—å: BI-–∞–Ω–∞–ª–∏—Ç–∏–∫, –∞–Ω–∞–ª–∏—Ç–∏–∫ –¥–∞–Ω–Ω—ã—Ö (ID 156) ==
2025-08-25 12:40:40,456 [INFO] ‚úÖ –°–æ–±—Ä–∞–Ω–æ 14 –≤–∞–∫–∞–Ω—Å–∏–π ‚Äî —Ä–æ–ª—å 'BI-–∞–Ω–∞–ª–∏—Ç–∏–∫, –∞–Ω–∞–ª–∏—Ç–∏–∫ –¥–∞–Ω–Ω—ã—Ö', –æ–ø—ã—Ç 'noExperience', 2025-07-26‚Äì2025-08-02
2025-08-25 12:40:40,798 [INFO] ‚úÖ –°–æ–±—Ä–∞–Ω–æ 45 –≤–∞–∫–∞–Ω—Å–∏–π ‚Äî —Ä–æ–ª—å 'BI-–∞–Ω–∞–ª–∏—Ç–∏–∫, –∞–Ω–∞–ª–∏—Ç–∏–∫ –¥–∞–Ω–Ω—ã—Ö', –æ–ø—ã—Ç 'noExperience', 2025-08-02‚Äì2025-08-09
2025-08-25 12:40:41,248 [INFO] ‚úÖ –°–æ–±—Ä–∞–Ω–æ 27 –≤–∞–∫–∞–Ω—Å–∏–π ‚Äî —Ä–æ–ª—å 'BI-–∞–Ω–∞–ª–∏—Ç–∏–∫, –∞–Ω–∞–ª–∏—Ç–∏–∫ –¥–∞–Ω–Ω—ã—Ö', –æ–ø—ã—Ç 'noExperience', 2025-08-09‚Äì2025-08-16
2025-08-25 12:40:41,630 [INFO] ‚úÖ –°–æ–±—Ä–∞–Ω–æ 38 –≤–∞–∫–∞–Ω—Å–∏–π ‚Äî —Ä–æ–ª—å 'BI-–∞–Ω–∞–ª–∏—Ç–∏–∫, –∞–Ω–∞–ª–∏—Ç–∏–∫ –¥–∞–Ω–Ω—ã—Ö', –æ–ø—ã—Ç 'noExperience', 2025-08-16‚Äì2025-08-23
2025-08-25 12:40:41,855 [INFO] ‚úÖ –°–æ–±—Ä–∞–Ω–æ 21 –≤–∞–∫–∞–Ω—Å–∏–π ‚Äî —Ä–æ–ª

In [None]:
valid_df.info()

In [6]:
valid_df.head()

Unnamed: 0,load_datetime,professional_role,id,name,area,area_id,employer,accredited_it_employer,trusted_employer,published_at,...,archived,url,salary_from,salary_to,currency,salary_mode,salary_frequency,experience,schedule,employment
0,2025-08-23 21:39:22.792162,"BI-–∞–Ω–∞–ª–∏—Ç–∏–∫, –∞–Ω–∞–ª–∏—Ç–∏–∫ –¥–∞–Ω–Ω—ã—Ö",123259817,–ú–ª–∞–¥—à–∏–π –∞–Ω–∞–ª–∏—Ç–∏–∫ –¥–∞–Ω–Ω—ã—Ö,–ú–æ—Å–∫–≤–∞,1,Topface Media,False,True,2025-07-25T14:49:47+0300,...,False,https://hh.ru/vacancy/123259817,40000.0,60000.0,RUR,–ó–∞¬†–º–µ—Å—è—Ü,,–ù–µ—Ç –æ–ø—ã—Ç–∞,–£–¥–∞–ª–µ–Ω–Ω–∞—è —Ä–∞–±–æ—Ç–∞,–ü–æ–ª–Ω–∞—è –∑–∞–Ω—è—Ç–æ—Å—Ç—å
1,2025-08-23 21:39:22.792257,"BI-–∞–Ω–∞–ª–∏—Ç–∏–∫, –∞–Ω–∞–ª–∏—Ç–∏–∫ –¥–∞–Ω–Ω—ã—Ö",123363422,Intern Data Analyst / –°—Ç–∞–∂–µ—Ä –ê–Ω–∞–ª–∏—Ç–∏–∫ –¥–∞–Ω–Ω—ã—Ö,–ú–æ—Å–∫–≤–∞,1,Okkam,False,True,2025-07-29T12:37:43+0300,...,False,https://hh.ru/vacancy/123363422,,,,,,–ù–µ—Ç –æ–ø—ã—Ç–∞,–ü–æ–ª–Ω—ã–π –¥–µ–Ω—å,–ß–∞—Å—Ç–∏—á–Ω–∞—è –∑–∞–Ω—è—Ç–æ—Å—Ç—å
2,2025-08-23 21:39:22.792311,"BI-–∞–Ω–∞–ª–∏—Ç–∏–∫, –∞–Ω–∞–ª–∏—Ç–∏–∫ –¥–∞–Ω–Ω—ã—Ö",123233941,–°—Ç–∞–∂–µ—Ä Data-–∞–Ω–∞–ª–∏—Ç–∏–∫,–ú–æ—Å–∫–≤–∞,1,T2,False,True,2025-07-25T11:08:17+0300,...,False,https://hh.ru/vacancy/123233941,,,,,,–ù–µ—Ç –æ–ø—ã—Ç–∞,–ì–∏–±–∫–∏–π –≥—Ä–∞—Ñ–∏–∫,–ß–∞—Å—Ç–∏—á–Ω–∞—è –∑–∞–Ω—è—Ç–æ—Å—Ç—å
3,2025-08-23 21:39:22.792351,"BI-–∞–Ω–∞–ª–∏—Ç–∏–∫, –∞–Ω–∞–ª–∏—Ç–∏–∫ –¥–∞–Ω–Ω—ã—Ö",123418505,–ú–ª–∞–¥—à–∏–π –ø—Ä–æ–≥—Ä–∞–º–º–∏—Å—Ç-–∞–Ω–∞–ª–∏—Ç–∏–∫ (Junior Program A...,–°—Ç–∞–≤—Ä–æ–ø–æ–ª—å,84,–ù–∏–∫–æ–ª–µ–Ω–∫–æ –°–µ—Ä–≥–µ–π –ê–ª–µ–∫—Å–∞–Ω–¥—Ä–æ–≤–∏—á,False,True,2025-07-30T12:08:41+0300,...,False,https://hh.ru/vacancy/123418505,30000.0,50000.0,RUR,–ó–∞¬†–º–µ—Å—è—Ü,,–ù–µ—Ç –æ–ø—ã—Ç–∞,–£–¥–∞–ª–µ–Ω–Ω–∞—è —Ä–∞–±–æ—Ç–∞,–ü–æ–ª–Ω–∞—è –∑–∞–Ω—è—Ç–æ—Å—Ç—å
4,2025-08-23 21:39:22.792439,"BI-–∞–Ω–∞–ª–∏—Ç–∏–∫, –∞–Ω–∞–ª–∏—Ç–∏–∫ –¥–∞–Ω–Ω—ã—Ö",123415001,–ê–Ω–∞–ª–∏—Ç–∏–∫ —Ä–∞–∑–≤–∏—Ç–∏—è DWH (–Ø—Ä–æ—Å–ª–∞–≤–ª—å),–Ø—Ä–æ—Å–ª–∞–≤–ª—å,112,–õ–∏–≥–∞ –¶–∏—Ñ—Ä–æ–≤–æ–π –≠–∫–æ–Ω–æ–º–∏–∫–∏,True,True,2025-07-30T11:21:16+0300,...,False,https://hh.ru/vacancy/123415001,,,,,,–ù–µ—Ç –æ–ø—ã—Ç–∞,–ü–æ–ª–Ω—ã–π –¥–µ–Ω—å,–ü–æ–ª–Ω–∞—è –∑–∞–Ω—è—Ç–æ—Å—Ç—å


In [2]:
# ============ –ü–µ—Ä–≤–∏—á–Ω–∞—è –∑–∞–≥—Ä—É–∑–∫–∞ key_skills ============
import asyncio
import aiohttp
import logging
import random
import time
from datetime import datetime
from collections import deque
import pandas as pd


# --- RateLimiter (–æ–≥—Ä–∞–Ω–∏—á–µ–Ω–∏–µ –ø–æ RPS)
class RateLimiter:
    def __init__(self, max_calls: int, period: float = 1.0):
        self.max_calls = max_calls
        self.period = period
        self.calls = deque()

    async def __aenter__(self):
        while len(self.calls) >= self.max_calls and time.monotonic() - self.calls[0] < self.period:
            await asyncio.sleep(self.period - (time.monotonic() - self.calls[0]))
        self.calls.append(time.monotonic())
        return self

    async def __aexit__(self, exc_type, exc, tb):
        pass


# --- –§—É–Ω–∫—Ü–∏—è –ø–æ–ª—É—á–µ–Ω–∏—è key_skills –¥–ª—è –æ–¥–Ω–æ–π –≤–∞–∫–∞–Ω—Å–∏–∏ —Å retry
async def fetch_keyskills(session, vacancy_id, limiter, sem, logger, max_retries=3):
    url = f"https://api.hh.ru/vacancies/{vacancy_id}"
    headers = {
        "User-Agent": (
            "Mozilla/5.0 (X11; Ubuntu; Linux x86_64) AppleWebKit/537.36 "
            "(KHTML, like Gecko) Chrome/127.0 Safari/537.36"
        )
    }

    for attempt in range(1, max_retries + 1):
        async with sem:
            async with limiter:
                # –Ω–µ–±–æ–ª—å—à–∞—è —Å–ª—É—á–∞–π–Ω–∞—è –ø–∞—É–∑–∞ –¥–ª—è —Å–≥–ª–∞–∂–∏–≤–∞–Ω–∏—è –Ω–∞–≥—Ä—É–∑–∫–∏
                await asyncio.sleep(random.uniform(0.1, 0.5))
                try:
                    async with session.get(url, headers=headers) as resp:
                        if resp.status == 200:
                            data = await resp.json()
                            key_skills = data.get("key_skills", [])
                            if not key_skills:
                                return vacancy_id, ["–Ω–µ —É–∫–∞–∑–∞–Ω–æ"]
                            return vacancy_id, [ks["name"] for ks in key_skills]

                        elif resp.status in (403, 429):
                            wait = 60 * attempt  # —ç–∫—Å–ø–æ–Ω–µ–Ω—Ü–∏–∞–ª—å–Ω–∞—è –ø–∞—É–∑–∞
                            logger.warning(
                                f"‚ö†Ô∏è –û—à–∏–±–∫–∞ {resp.status} –¥–ª—è –≤–∞–∫–∞–Ω—Å–∏–∏ {vacancy_id}, "
                                f"–ø–æ–ø—ã—Ç–∫–∞ {attempt}, –∂–¥—É {wait} —Å–µ–∫"
                            )
                            await asyncio.sleep(wait)
                            continue

                        else:
                            logger.error(f"‚ùå –û—à–∏–±–∫–∞ {resp.status} –ø—Ä–∏ –ø–æ–ª—É—á–µ–Ω–∏–∏ –≤–∞–∫–∞–Ω—Å–∏–∏ {vacancy_id}")
                            return vacancy_id, None

                except Exception as e:
                    logger.error(f"‚ùå –ò—Å–∫–ª—é—á–µ–Ω–∏–µ –¥–ª—è {vacancy_id}: {e}")
                    await asyncio.sleep(5)

    # –µ—Å–ª–∏ –≤—Å–µ –ø–æ–ø—ã—Ç–∫–∏ –Ω–µ—É–¥–∞—á–Ω—ã–µ
    return vacancy_id, None


# --- –û—Å–Ω–æ–≤–Ω–∞—è —Ñ—É–Ω–∫—Ü–∏—è –ø–µ—Ä–≤–∏—á–Ω–æ–π –∑–∞–≥—Ä—É–∑–∫–∏ key_skills
async def collect_keyskills_primary(
    vacancy_ids, batch_size=2000, inner_batch=50, pause_hours=0.5, rps=2, concurrency=2, log_file=None
):
    # –õ–æ–≥–≥–µ—Ä
    logger = logging.getLogger("keyskills_logger")
    logger.setLevel(logging.INFO)
    logger.handlers.clear()
    if log_file is None:
        log_file = f"keyskills_primary_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.log"
    fh = logging.FileHandler(log_file, mode="w", encoding="utf-8")
    formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
    fh.setFormatter(formatter)
    logger.addHandler(fh)

    logger.info("‚ñ∂Ô∏è –°—Ç–∞—Ä—Ç –ø–µ—Ä–≤–∏—á–Ω–æ–π –∑–∞–≥—Ä—É–∑–∫–∏ key_skills")
    
    results = []
    limiter = RateLimiter(rps)
    sem = asyncio.Semaphore(concurrency)

    async with aiohttp.ClientSession() as session:
        total = len(vacancy_ids)
        for i in range(0, total, batch_size):
            batch = vacancy_ids[i:i + batch_size]
            logger.info(f"üîπ –û–±—Ä–∞–±–æ—Ç–∫–∞ –±–∞—Ç—á–∞ {i // batch_size + 1}, –≤–∞–∫–∞–Ω—Å–∏–π: {len(batch)}")
            
            # --- –æ–±—Ä–∞–±–∞—Ç—ã–≤–∞–µ–º batch –ø–æ –∫—É—Å–∫–∞–º inner_batch ---
            for j in range(0, len(batch), inner_batch):
                sub_batch = batch[j:j + inner_batch]
                tasks = [fetch_keyskills(session, vid, limiter, sem, logger) for vid in sub_batch]
                sub_results = await asyncio.gather(*tasks)
                results.extend([r for r in sub_results if r is not None])

                # –ø–∞—É–∑–∞ –º–µ–∂–¥—É –ø–æ–¥–±–∞—Ç—á–∞–º–∏, —á—Ç–æ–±—ã –Ω–µ –Ω–∞–≥—Ä—É–∂–∞—Ç—å API
                await asyncio.sleep(random.uniform(1, 3))

            logger.info(f"‚úÖ –ó–∞–≤–µ—Ä—à–µ–Ω –±–∞—Ç—á {i // batch_size + 1}/{(total - 1) // batch_size + 1}")
            print(f"‚úÖ –ó–∞–≤–µ—Ä—à–µ–Ω –±–∞—Ç—á {i // batch_size + 1}/{(total - 1) // batch_size + 1}")

            if i + batch_size < total:
                logger.info(f"‚è∏ –ü–∞—É–∑–∞ {pause_hours} —á –ø–µ—Ä–µ–¥ —Å–ª–µ–¥—É—é—â–∏–º –±–∞—Ç—á–µ–º...")
                print(f"‚è∏ –ü–∞—É–∑–∞ {pause_hours} —á –ø–µ—Ä–µ–¥ —Å–ª–µ–¥—É—é—â–∏–º –±–∞—Ç—á–µ–º...")
                await asyncio.sleep(pause_hours * 3600)

    logger.info("üèÅ –ü–µ—Ä–≤–∏—á–Ω–∞—è –∑–∞–≥—Ä—É–∑–∫–∞ key_skills –∑–∞–≤–µ—Ä—à–µ–Ω–∞")
    print("üèÅ –ü–µ—Ä–≤–∏—á–Ω–∞—è –∑–∞–≥—Ä—É–∑–∫–∞ key_skills –∑–∞–≤–µ—Ä—à–µ–Ω–∞")

    # –ü—Ä–µ–æ–±—Ä–∞–∑—É–µ–º –≤ DataFrame
    data = []
    for vacancy_id, skills in results:
        if skills is None:
            continue
        for skill in skills:
            data.append({"vacancy_id": vacancy_id, "key_skill": skill})

    df_keyskills = pd.DataFrame(data)
    return df_keyskills


In [9]:
# --- –ó–∞–ø—É—Å–∫ –ø–µ—Ä–≤–∏—á–Ω–æ–≥–æ —Å–±–æ—Ä–∞ key_skills---
vacancy_ids = valid_df["id"].dropna().astype(str).tolist()

df_keyskills = await collect_keyskills_primary(
    vacancy_ids,
    batch_size=2000,  # –∫—Ä—É–ø–Ω—ã–π –±–∞—Ç—á (—Ä–∞–∑–¥–µ–ª–∏—Ç—Å—è –Ω–∞ inner_batch)
    inner_batch=50,   # –≤–Ω—É—Ç—Ä–∏ –æ–±—Ä–∞–±–∞—Ç—ã–≤–∞–µ–º –ø–æ 50 –≤–∞–∫–∞–Ω—Å–∏–π
    rps=3,            # –Ω–µ –±–æ–ª–µ–µ 3 –∑–∞–ø—Ä–æ—Å–æ–≤/—Å–µ–∫
    concurrency=2     # –º–∞–∫—Å–∏–º—É–º 2 –æ–¥–Ω–æ–≤—Ä–µ–º–µ–Ω–Ω–æ
)

df_keyskills.head()


2025-08-25 14:47:34,707 [INFO] ‚ñ∂Ô∏è –°—Ç–∞—Ä—Ç –ø–µ—Ä–≤–∏—á–Ω–æ–π –∑–∞–≥—Ä—É–∑–∫–∏ key_skills
2025-08-25 14:47:34,707 [INFO] üîπ –û–±—Ä–∞–±–æ—Ç–∫–∞ –±–∞—Ç—á–∞ 1, –≤–∞–∫–∞–Ω—Å–∏–π: 2000


üîπ –û–±—Ä–∞–±–æ—Ç–∫–∞ –±–∞—Ç—á–∞ 1, –≤–∞–∫–∞–Ω—Å–∏–π: 2000




CancelledError: 

In [None]:
# vacancy_ids = valid_df["id"].unique().tolist()
# results = await collect_keyskills_primary(vacancy_ids, batch_size=7500, pause_hours=1)
# key_skills_all = []
# for v in results:
#     if not v:
#         continue
#     if v.get("key_skills"):
#         for skill in v["key_skills"]:
#             key_skills_all.append({"vacancy_id": v["id"], "key_skill": skill["name"]})
#     else:
#         key_skills_all.append({"vacancy_id": v["id"], "key_skill": "–Ω–µ —É–∫–∞–∑–∞–Ω–æ"})

# df_key_skills = pd.DataFrame(key_skills_all)


In [None]:
# –°–±–æ—Ä key_skills - —á–µ—Ä–Ω–æ–≤–∏–∫

# async def extract_key_skills(raw_vacancies: list, session: aiohttp.ClientSession, rate_limiter, logger) -> pd.DataFrame:
#     all_key_skills = []
#     total = len(raw_vacancies)

#     for idx, vac in enumerate(raw_vacancies, 1):
#         vac_id = vac.get("id")
#         if not vac_id:
#             continue

#         url = f"https://api.hh.ru/vacancies/{vac_id}"

#         await rate_limiter.wait()

#         try:
#             async with session.get(url) as response:
#                 if response.status != 200:
#                     logger.warning(f"[{idx}/{total}] –ù–µ —É–¥–∞–ª–æ—Å—å –ø–æ–ª—É—á–∏—Ç—å key_skills –¥–ª—è –≤–∞–∫–∞–Ω—Å–∏–∏ {vac_id}, —Å—Ç–∞—Ç—É—Å {response.status}")
#                     all_key_skills.append({"vacancy_id": vac_id, "key_skill": "–Ω–µ —É–∫–∞–∑–∞–Ω—ã"})
#                     continue
#                 vacancy_detail = await response.json()
#         except Exception as e:
#             logger.warning(f"[{idx}/{total}] –û—à–∏–±–∫–∞ –ø—Ä–∏ –∑–∞–ø—Ä–æ—Å–µ key_skills –¥–ª—è {vac_id}: {e}")
#             all_key_skills.append({"vacancy_id": vac_id, "key_skill": "–Ω–µ —É–∫–∞–∑–∞–Ω—ã"})
#             continue

#         key_skills = vacancy_detail.get("key_skills", [])
#         if not key_skills:
#             all_key_skills.append({"vacancy_id": vac_id, "key_skill": "–Ω–µ —É–∫–∞–∑–∞–Ω—ã"})
#         else:
#             for skill_entry in key_skills:
#                 skill = skill_entry.get("name", "").strip()
#                 all_key_skills.append({"vacancy_id": vac_id, "key_skill": skill})

#         if idx % 500 == 0 or idx == total:
#             logger.info(f"üîπ –û–±—Ä–∞–±–æ—Ç–∞–Ω–æ {idx}/{total} –≤–∞–∫–∞–Ω—Å–∏–π –¥–ª—è key_skills")

#     return pd.DataFrame(all_key_skills)


In [None]:
# # –°–±–æ—Ä –≤–∞–∫–∞–Ω—Å–∏–π –∏ key_skills - —á–µ—Ä–Ω–æ–≤–∏–∫

# start = time.perf_counter()
# raw_vacancies = await collect_vacancies_async()
# df = extract_vacancy_data(raw_vacancies)

# # –í–∞–ª–∏–¥–∞—Ü–∏—è –≤–∞–∫–∞–Ω—Å–∏–π
# reference_data = await fetch_reference_data()
# valid_df, reject_df = validate_vacancies(df, reference_data)

# # –ó–∞–≥—Ä—É–∑–∫–∞ key_skills
# async with aiohttp.ClientSession(headers=header) as session:
#     df_key_skills = await extract_key_skills(raw_vacancies, session, limiter, logging)

# # –ò–Ω—Ñ–æ—Ä–º–∞—Ü–∏—è –ø–æ —Ä–µ–∑—É–ª—å—Ç–∞—Ç–∞–º
# logging.info(f"\nüöÄ –í—Å–µ–≥–æ —Å–æ–±—Ä–∞–Ω–æ –≤–∞–∫–∞–Ω—Å–∏–π: {len(raw_vacancies)}")
# logging.info(f"üì¶ –ó–∞–≥—Ä—É–∂–µ–Ω–æ —É–Ω–∏–∫–∞–ª—å–Ω—ã—Ö id: {valid_df['id'].nunique()}")
# logging.info(f"üîë –°—Ñ–æ—Ä–º–∏—Ä–æ–≤–∞–Ω–æ —Å—Ç—Ä–æ–∫ –≤ key_skills: {len(df_key_skills)}")

# log_elapsed_time(start, "hh_parser + key_skills")
