# Updating HH.ru Vacancies in ClickHouse
This notebook automates the updating of vacancy data from the `vacancies_hh_ru` table in ClickHouse:

- Selects non-archived vacancies from ClickHouse.
- Requests details for each vacancy in parallel via the HH API.
- If the description or status has changed, forms an update.
- If a vacancy is not found (404), sets the `deleted=1` flag.
- All changes are applied to ClickHouse using ALTER TABLE ... UPDATE.

**Technologies:**  
`pandas`, `sqlalchemy`, `requests`, `tqdm`, `ThreadPoolExecutor`, ClickHouse, HH API

---

> Structure: import → load data → update → write changes

In [14]:
import pandas as pd
import time
import os
import re
import json
import requests
from datetime import datetime, timedelta
from tqdm import tqdm 
from sqlalchemy.sql import text
from concurrent.futures import ThreadPoolExecutor, as_completed
import plotly.express as px
import plotly.graph_objects as go
from clickhouse_driver import Client
from sqlalchemy import create_engine

In [9]:
CONFIG_PATH = os.path.expanduser('~/pet-projects/jupyter-notebooks/config.json')
TOKEN_PATH = os.path.expanduser('~/pet-projects/jupyter-notebooks/token.json')
BASE_DIR = os.path.expanduser('~/pet-projects/jupyter-notebooks/')
DATA_DIR = os.path.expanduser('~/pet-projects/jupyter-notebooks/data/hh_api_data')

In [10]:
def load_config():
    """Load configuration from config.json"""
    try:
        with open(CONFIG_PATH, 'r') as config_file:
            return json.load(config_file)
    except FileNotFoundError:
        raise Exception("config.json file not found")
    except json.JSONDecodeError:
        raise Exception("Error parsing config.json")

def save_token(token_data):
    """Save token to file"""
    token_data['saved_at'] = datetime.now().isoformat()
    with open(TOKEN_PATH, 'w') as token_file:
        json.dump(token_data, token_file)

def load_token():
    """Load existing token"""
    try:
        with open(TOKEN_PATH, 'r') as token_file:
            token_data = json.load(token_file)
            saved_at = datetime.fromisoformat(token_data['saved_at'])
            # Check if token has expired (we store for 1 day)
            if datetime.now() - saved_at < timedelta(days=1):
                return token_data['access_token']
    except (FileNotFoundError, json.JSONDecodeError, KeyError):
        pass
    return None

def get_access_token(client_id, client_secret):
    """Get access token from HH.ru API"""
    # First try to load existing token
    existing_token = load_token()
    if existing_token:
        return existing_token

    # If token not found or expired, request new one
    token_url = 'https://hh.ru/oauth/token'
    headers = {'Content-Type': 'application/x-www-form-urlencoded'}
    data = {
        'grant_type': 'client_credentials',
        'client_id': client_id,
        'client_secret': client_secret
    }
    
    response = requests.post(token_url, headers=headers, data=data)
    
    if response.status_code == 200:
        token_data = response.json()
        save_token({
            'access_token': token_data['access_token'],
            'saved_at': datetime.now().isoformat()
        })
        return token_data['access_token']
    else:
        raise Exception(f"Error getting token: {response.status_code}, {response.text}")


def create_api_client():
    """Create API client with loaded credentials"""
    config = load_config()
    
    client_id = config.get('client_id')
    client_secret = config.get('client_secret')
    user_email = config.get('user_email')
    
    if not client_id or not client_secret:
        raise Exception("client_id or client_secret missing in config.json")
    
    access_token = get_access_token(client_id, client_secret)
    
    return {
        'headers': {
            'Authorization': f'Bearer {access_token}',
            'HH-User-Agent': f'Your_App_Name ({user_email})'
        },
        'base_url': 'https://api.hh.ru'
    }

In [51]:
def get_vacancy_details(api_client, vacancy_id):
    """Get detailed information about a specific vacancy"""
    response = requests.get(
        f"{api_client['base_url']}/vacancies/{vacancy_id}",
        headers=api_client['headers']
    )
    
    if response.status_code != 200:
        raise Exception(f"Error getting vacancy details: {response.status_code}, {response.text}")
        
    return response.json()

def mark_vacancy_deleted(vacancy_id, engine):
    sql = f"ALTER TABLE vacancies_hh_ru UPDATE deleted = 1 WHERE id = {vacancy_id}"
    with engine.begin() as conn:
        conn.execute(text(sql))
    print(f"Вакансия {vacancy_id} помечена как deleted")

def get_vacancy_updates_parallel(df_vacancies, api_client, max_workers=5):
    """
    Параллельно скачивает детали вакансий и возвращает список словарей с изменениями.
    """
    updates = []
    not_found_ids = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(get_vacancy_details, api_client, row['id']): row['id'] for idx, row in df_vacancies.iterrows()}
        for future in tqdm(as_completed(futures), total=len(futures), desc="Скачивание деталей", unit="vacancy"):
            vacancy_id = futures[future]
            try:
                data = future.result()
                new_description = data.get('description', '')
                is_archived = int(data.get('archived', False))
                row = df_vacancies[df_vacancies['id'] == vacancy_id].iloc[0]
                update_fields = {}
                if row['description'] != new_description:
                    update_fields['description'] = new_description
                if row['archived'] != is_archived:
                    update_fields['archived'] = is_archived
                if update_fields:
                    update_fields['id'] = vacancy_id
                    updates.append(update_fields)
            except Exception as e:
                if "404" in str(e):
                    update_fields = {'id': vacancy_id, 'deleted': 1}
                    updates.append(update_fields)
                    not_found_ids.append(vacancy_id)
                else:
                    print(f"Ошибка для id={vacancy_id}: {e}")
    print(f"Всего помечено deleted: {len(not_found_ids)}")
    return updates

def apply_vacancy_updates_to_clickhouse(updates, engine):
    """
    Применяет изменения к ClickHouse по списку словарей.
    """
    for upd in updates:
        set_clause = ', '.join([f"{k} = :{k}" for k in upd if k != 'id'])
        sql = text(f"ALTER TABLE vacancies_hh_ru UPDATE {set_clause} WHERE id = :id")
        print(sql)
        with engine.begin() as conn:
            conn.execute(sql, upd)
        print(f"Вакансия {upd['id']} обновлена: {upd}")
    print(f"Всего обновлено: {len(updates)}")


In [48]:
config = load_config()
# Использую secure native протокол с портом 9440
clickhouse_url = f"clickhouse+native://default:{config['clickhouse_password']}@{config['clickhouse_host']}:9440/default?secure=True"
engine = create_engine(clickhouse_url, connect_args={'connect_timeout': 10, 'send_receive_timeout': 10})

In [None]:
query = "SELECT * FROM vacancies_hh_ru WHERE archived = 0 AND created_at < '2025-04-15'"
df_vacancies = pd.read_sql(query, engine)
df_vacancies.info()

In [53]:
api_client = create_api_client()

In [None]:
print(f"Unique ids: {df_vacancies['id'].unique()}")

In [None]:
updates = get_vacancy_updates_parallel(df_vacancies, api_client)

In [None]:
apply_vacancy_updates_to_clickhouse(updates, engine)