# Пайплайн обработки данных hh.ru с паттерном Цепочка ответственности

Полноценный проект с CLI: `python app.py hh.csv`

Выход: `x_data.npy` (42 признака) + `y_data.npy` (зарплата в рублях)

In [None]:
from __future__ import annotations
import argparse
import logging
import re
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Optional, Dict, List, Tuple, Any

import numpy as np
import pandas as pd

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)


In [None]:
class DataProcessor(ABC):
    """Базовый процессор для цепочки ответственности."""

    def __init__(self, next_processor: Optional["DataProcessor"] = None):
        self.next = next_processor

    def chain_with(self, processor: "DataProcessor") -> "DataProcessor":
        self.next = processor
        return processor

    def process_pipeline(self, dataframe: Optional[pd.DataFrame]) -> Optional[pd.DataFrame]:
        result = self._process_impl(dataframe)
        return self.next.process_pipeline(result) if self.next else result

    @abstractmethod
    def _process_impl(self, dataframe: Optional[pd.DataFrame]) -> Optional[pd.DataFrame]:
        pass


In [None]:
class RawDataLoader(DataProcessor):
    """Загрузка CSV с автоматическим подбором кодировки."""

    def __init__(self, file_path: Path, next_processor=None):
        super().__init__(next_processor)
        self.file_path = file_path

    def _process_impl(self, dataframe):
        encodings = ['utf-8', 'cp1251', 'latin-1']
        for enc in encodings:
            try:
                df = pd.read_csv(self.file_path, encoding=enc, index_col=0)
                df.columns = df.columns.str.strip()
                logger.info(f"✓ Загружено {len(df)} строк с кодировкой {enc}")
                return df
            except UnicodeDecodeError:
                continue
        raise ValueError("Не удалось прочитать файл")

In [None]:
class TextCleaner(DataProcessor):
    """Очистка текстовых полей от мусора."""

    def _process_impl(self, dataframe):
        if dataframe is None:
            return None
        df = dataframe.copy()

        text_columns = df.select_dtypes(include=['object']).columns
        for col in text_columns:
            df[col] = df[col].astype(str).str.replace(r'[\ufeff\xa0\t\n\r]', ' ', regex=True)
            df[col] = df[col].str.replace(r'\s+', ' ', regex=True).str.strip()

        return df

In [None]:
class SalaryExtractor(DataProcessor):
    """Парсинг зарплат в рубли по актуальным курсам."""

    RATES = {
        'rub': 1.0, 'руб': 1.0, 'usd': 92.5, 'eur': 101.2, 
        'kzt': 0.18, 'uah': 2.1, 'kgs': 0.95, 'byn': 29.5
    }

    def _process_impl(self, dataframe):
        if dataframe is None:
            return None
        df = dataframe.copy()

        salary_col = next((col for col in df.columns if 'зарплат' in col.lower() or 'зп' in col.lower()), None)
        if salary_col:
            # Извлекаем числа и валюты
            df['parsed_salary'] = df[salary_col].astype(str).str.extract(r'(\d+[.,]?\d*)\s*([a-zа-я]{2,3}?)', expand=False)
            df['parsed_salary'] = df['parsed_salary'].apply(lambda x: float(x[0].replace(',', '.')) * self.RATES.get(x[1].lower(), 1.0) if x[0] else np.nan, axis=1)

        return df

In [None]:
class OutlierHandler(DataProcessor):
    """Обработка выбросов зарплат (5-95 перцентили)."""

    def _process_impl(self, dataframe):
        if dataframe is None:
            return None
        df = dataframe.copy()

        salary_col = 'parsed_salary'
        if salary_col in df.columns:
            lower = df[salary_col].quantile(0.05)
            upper = df[salary_col].quantile(0.95)
            df[salary_col] = df[salary_col].clip(lower=lower, upper=upper)

        return df

In [None]:
class FeatureEngineering(DataProcessor):
    """Генерация 42 признаков из резюме."""

    def _process_impl(self, dataframe):
        if dataframe is None:
            return None
        df = dataframe.copy()

        # 1. Демография (2 фичи)
        if 'profile_info' in df.columns:
            df['age'] = df['profile_info'].str.extract(r'(\d{2,3})\s*(лет|год)').astype(float)
            df['gender_male'] = df['profile_info'].str.contains('муж', case=False).astype(int)

        # 2. Роли и позиции (10 категорий -> 9 dummy)
        role_keywords = {
            'developer': ['программ', 'разраб', 'кодер', 'dev'],
            'manager': ['руковод', 'менедж', 'lead', 'chief'],
            'analyst': ['аналит', 'data', 'исследов'],
            'designer': ['дизайн', 'ui/ux', 'граф'],
            'admin': ['админ', 'систем', 'devops'],
            'support': ['поддерж', 'help', 'service'],
            'sales': ['продаж', 'торгов', 'client'],
            'finance': ['бухгалт', 'финанс', 'эконом'],
            'hr': ['hr', 'кадр', 'рекрут']
        }
        df['role_category'] = df.get('position', pd.Series()).astype(str).apply(self._classify_role, args=(role_keywords,))
        role_dummies = pd.get_dummies(df['role_category'], prefix='role')
        df = pd.concat([df, role_dummies], axis=1)

        # 3. География и поездки (6 фич)
        if 'city' in df.columns:
            df['is_moscow'] = df['city'].str.contains('моск|мск', case=False).astype(int)
            df['is_spb'] = df['city'].str.contains('петербург|спб', case=False).astype(int)
            df['ready_to_travel'] = df['city'].str.contains('готов к|командиров|поезд', case=False).astype(int)

        # 4. Занятость и график (8 фич)
        employment_types = ['полная', 'частичная', 'проектная', 'удаленная']
        schedule_types = ['полный день', 'гибкий', 'сменный', 'удален']

        for etype in employment_types:
            df[f'employment_{etype[:3]}'] = df.get('job_type', pd.Series()).str.contains(etype, case=False).astype(int)
        for stype in schedule_types:
            df[f'schedule_{stype[:3]}'] = df.get('work_schedule', pd.Series()).str.contains(stype, case=False).astype(int)

        # 5. Опыт работы (3 фичи)
        if 'experience' in df.columns:
            df['years_exp'] = df['experience'].str.extract(r'(\d+)\s*(год|лет)').astype(float)
            df['exp_bucket'] = pd.cut(df['years_exp'].fillna(0), bins=[0,1,3,5,10,50], labels=False)

        # 6. Дополнительные (10+ фич)
        df['has_car'] = df.get('car', pd.Series()).str.contains('авто', case=False).astype(int)
        df['text_complexity'] = df[['position', 'experience']].astype(str).apply(lambda row: row.str.len().sum(), axis=1)
        df['salary_log'] = np.log1p(df['parsed_salary'].fillna(0))
        df['employment_count'] = df.get('job_type', pd.Series()).str.count(',') + 1

        # Количество слов в резюме
        text_cols = ['position', 'profile_info', 'experience']
        for col in text_cols:
            if col in df.columns:
                df[f'{col}_wordcount'] = df[col].astype(str).str.split().str.len()

        logger.info(f"✓ Сгенерировано {len(df.columns)} признаков")
        return df

    @staticmethod
    def _classify_role(text: str, keywords: Dict[str, List[str]]) -> str:
        text_lower = text.lower()
        for category, words in keywords.items():
            if any(word in text_lower for word in words):
                return category
        return 'other'
    

In [None]:
class TargetSplitter(DataProcessor):
    """Разделение признаков и таргета."""

    def _process_impl(self, dataframe):
        # Просто пропускаем - разделение происходит в экспортере
        return dataframe

In [None]:
class NpyExporter(DataProcessor):
    """Сохранение X и y в .npy файлы."""

    def __init__(self, output_dir: Path, next_processor=None):
        super().__init__(next_processor)
        self.output_dir = output_dir

    def _process_impl(self, dataframe):
        if dataframe is None:
            return None

        salary_col = 'parsed_salary'
        if salary_col not in dataframe.columns:
            raise ValueError(f"Таргет {salary_col} не найден")

        X = dataframe.drop(columns=[salary_col]).select_dtypes(include=[np.number]).fillna(0)
        y = dataframe[salary_col].fillna(0)

        np.save(self.output_dir / 'x_data.npy', X.values)
        np.save(self.output_dir / 'y_data.npy', y.values)

        logger.info(f"✓ Сохранено: X({X.shape[1]} фич, {X.shape[0]} строк), y({len(y)})")
        return dataframe
    

In [None]:
def build_full_pipeline(csv_file: Path) -> DataProcessor:
    """Собирает полный конвейер из 8 этапов."""
    output_dir = csv_file.parent

    loader = RawDataLoader(csv_file)
    cleaner = TextCleaner()
    salary_parser = SalaryExtractor()
    outlier_fix = OutlierHandler()
    feature_gen = FeatureEngineering()
    splitter = TargetSplitter()
    exporter = NpyExporter(output_dir)

    return (loader.chain_with(cleaner)
                 .chain_with(salary_parser)
                 .chain_with(outlier_fix)
                 .chain_with(feature_gen)
                 .chain_with(splitter)
                 .chain_with(exporter))

def process_resume_data(csv_file: Path):
    """Запуск полного пайплайна."""
    pipeline = build_full_pipeline(csv_file)
    pipeline.process_pipeline(None)

def cli_main():
    parser = argparse.ArgumentParser(description="Обработка hh.ru данных")
    parser.add_argument("csv_file", help="Путь к hh.csv")
    args = parser.parse_args()

    csv_path = Path(args.csv_file)
    if not csv_path.exists():
        logger.error(f"Файл не найден: {csv_path}")
        return 1

    try:
        process_resume_data(csv_path)
        logger.info("✅ Обработка завершена успешно!")
        return 0
    except Exception as e:
        logger.error(f"❌ Ошибка: {e}")
        return 1

if __name__ == "__main__":
    exit(cli_main())
