In [1]:
!pip install python-docx
!pip install striprtf

Collecting python-docx
  Downloading python_docx-1.2.0-py3-none-any.whl.metadata (2.0 kB)
Downloading python_docx-1.2.0-py3-none-any.whl (252 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m253.0/253.0 kB[0m [31m5.3 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: python-docx
Successfully installed python-docx-1.2.0
Collecting striprtf
  Downloading striprtf-0.0.29-py3-none-any.whl.metadata (2.3 kB)
Downloading striprtf-0.0.29-py3-none-any.whl (7.9 kB)
Installing collected packages: striprtf
Successfully installed striprtf-0.0.29


In [15]:
import re
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
import docx
from google.colab import files
import io
from striprtf.striprtf import rtf_to_text

@dataclass
class Vacancy:
    title: Optional[str] = None
    city: Optional[str] = None
    employment_type: Optional[str] = None
    work_schedule: Optional[str] = None
    responsibilities: List[str] = None
    requirements: List[str] = None
    advantages: List[str] = None  # Новое поле для "Будет преимуществом"
    education_level: Optional[str] = None
    experience_required: Optional[str] = None

    def __post_init__(self):
        if self.responsibilities is None:
            self.responsibilities = []
        if self.requirements is None:
            self.requirements = []
        if self.advantages is None:
            self.advantages = []

class AdvancedVacancyParser:
    """Улучшенный парсер вакансий с обработкой сложных структур"""

    def parse_file(self, file_content: bytes, filename: str) -> Vacancy:
        """Парсинг файла вакансии"""
        try:
            if filename.lower().endswith('.docx'):
                return self._parse_docx_vacancy(file_content)
            elif filename.lower().endswith('.rtf'):
                return self._parse_rtf_vacancy(file_content)
            else:
                raise ValueError(f"Неподдерживаемый формат: {filename}")

        except Exception as e:
            print(f"Ошибка при чтении вакансии: {e}")
            return Vacancy()

    def _parse_docx_vacancy(self, file_content: bytes) -> Vacancy:
        """Парсинг DOCX вакансии"""
        doc = docx.Document(io.BytesIO(file_content))
        vacancy_data = {}

        # Обрабатываем таблицы
        for table in doc.tables:
            for row in table.rows:
                if len(row.cells) >= 2:
                    field_name = row.cells[0].text.strip()
                    field_value = row.cells[1].text.strip()
                    if field_name and field_value:
                        vacancy_data[field_name] = field_value

        # Получаем полный текст для сложного парсинга
        full_text = "\n".join([para.text for para in doc.paragraphs if para.text.strip()])

        return self._parse_vacancy_data(vacancy_data, full_text)

    def _parse_rtf_vacancy(self, file_content: bytes) -> Vacancy:
        """Парсинг RTF вакансии"""
        rtf_text = file_content.decode('utf-8', errors='ignore')
        text = rtf_to_text(rtf_text)
        vacancy_data = {}

        # Парсим RTF текст
        lines = text.split('\n')
        for line in lines:
            line = line.strip()
            if '|' in line:  # Табличный формат
                parts = [p.strip() for p in line.split('|')]
                if len(parts) >= 2:
                    vacancy_data[parts[0]] = '|'.join(parts[1:])
            elif ':' in line and len(line.split(':')) == 2:  # Ключ: значение
                key, value = [p.strip() for p in line.split(':', 1)]
                vacancy_data[key] = value

        return self._parse_vacancy_data(vacancy_data, text)

    def _parse_vacancy_data(self, vacancy_data: Dict[str, str], full_text: str) -> Vacancy:
        """Парсинг данных вакансии с улучшенной обработкой"""
        vacancy = Vacancy()

        # Основные поля
        vacancy.title = self._get_field_value(vacancy_data, 'Название')
        vacancy.city = self._get_field_value(vacancy_data, 'Город')
        vacancy.employment_type = self._get_field_value(vacancy_data, 'Тип занятости')
        vacancy.work_schedule = self._get_field_value(vacancy_data, 'Текст график работы')
        vacancy.education_level = self._get_field_value(vacancy_data, 'Уровень образования')
        vacancy.experience_required = self._get_field_value(vacancy_data, 'Требуемый опыт работы')

        # Обрабатываем обязанности и требования с улучшенным парсингом
        responsibilities_text = self._get_field_value(vacancy_data, 'Обязанности (для публикации)')
        requirements_text = self._get_field_value(vacancy_data, 'Требования (для публикации)')

        # Улучшенный парсинг с разделением на требования и преимущества
        if requirements_text:
            requirements, advantages = self._parse_requirements_with_advantages(requirements_text)
            vacancy.requirements = requirements
            vacancy.advantages = advantages
        else:
            # Ищем в полном тексте
            req_text = self._find_section(full_text, ['Требования'])
            if req_text:
                requirements, advantages = self._parse_requirements_with_advantages(req_text)
                vacancy.requirements = requirements
                vacancy.advantages = advantages

        # Обрабатываем обязанности
        if responsibilities_text:
            vacancy.responsibilities = self._parse_advanced_list(responsibilities_text)
        else:
            resp_text = self._find_section(full_text, ['Обязанности'])
            if resp_text:
                vacancy.responsibilities = self._parse_advanced_list(resp_text)

        return vacancy

    def _parse_requirements_with_advantages(self, text: str) -> Tuple[List[str], List[str]]:
        """Разделяет требования на основные и преимущества"""
        requirements = []
        advantages = []

        # Разделяем текст на части до и после "Будет преимуществом"
        advantage_keywords = ['будет преимуществом', 'преимуществом будет', 'considered an advantage', 'будет плюсом', 'плюсом будет']

        main_text = text
        advantage_text = ""

        for keyword in advantage_keywords:
            if keyword in text.lower():
                parts = re.split(keyword, text, flags=re.IGNORECASE)
                if len(parts) >= 2:
                    main_text = parts[0]
                    advantage_text = parts[1]
                    break

        # Парсим основные требования
        requirements = self._parse_advanced_list(main_text)

        # Парсим преимущества
        if advantage_text:
            advantages = self._parse_advanced_list(advantage_text)

        return requirements, advantages

    def _parse_advanced_list(self, text: str) -> List[str]:
        """Улучшенный парсинг списков с различными форматами"""
        if not text:
            return []

        items = []
        current_item = ""

        # Разделяем текст на строки
        lines = text.split('\n')

        for line in lines:
            line = line.strip()
            if not line:
                continue

            # Определяем начало нового пункта
            is_new_item = any(line.startswith(prefix) for prefix in ['-', '•', '—', '*', '✓', '→']) or \
                         re.match(r'^\d+[\.\)]', line) or \
                         re.match(r'^[a-z][\)\.]', line, re.IGNORECASE)

            if is_new_item and current_item:
                # Сохраняем предыдущий пункт
                cleaned_item = self._clean_list_item(current_item)
                if cleaned_item:
                    items.append(cleaned_item)
                current_item = line
            else:
                # Продолжаем текущий пункт
                if current_item:
                    current_item += " " + line
                else:
                    current_item = line

        # Добавляем последний пункт
        if current_item:
            cleaned_item = self._clean_list_item(current_item)
            if cleaned_item:
                items.append(cleaned_item)

        # Дополнительная обработка: разделяем пункты, которые могли слиться
        final_items = []
        for item in items:
            # Проверяем, не содержит ли пункт несколько подпунктов
            if ';' in item and len(item) > 50:  # Длинный пункт с точками с запятой
                sub_items = item.split(';')
                for sub_item in sub_items:
                    cleaned_sub = sub_item.strip()
                    if cleaned_sub and len(cleaned_sub) > 3:
                        final_items.append(cleaned_sub)
            else:
                final_items.append(item)

        return [item for item in final_items if item and len(item) > 3]

    def _clean_list_item(self, item: str) -> str:
        """Очистка пункта списка от маркеров"""
        # Убираем различные маркеры списка
        patterns = [
            r'^[•\-—*\s]+',
            r'^\d+[\.\)]\s*',
            r'^[a-z][\)\.]\s*',
            r'^[✓→▶]\s*'
        ]

        for pattern in patterns:
            item = re.sub(pattern, '', item, flags=re.IGNORECASE)

        item = item.strip()

        # Убираем лишние дефисы в начале после очистки
        item = re.sub(r'^-\s*', '', item)

        return item

    def _get_field_value(self, data: Dict[str, str], field_name: str) -> Optional[str]:
        """Получение значения поля"""
        if field_name in data:
            return data[field_name]

        # Поиск по частичному совпадению
        for key in data.keys():
            if field_name.lower() in key.lower():
                return data[key]

        return None

    def _find_section(self, text: str, section_names: List[str]) -> Optional[str]:
        """Поиск секции по названию"""
        for name in section_names:
            pattern = rf'{name}.*?(?=\n\s*[А-ЯA-Z]|\n\n|$)'
            match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
            if match:
                # Убираем название секции из текста
                section_text = match.group(0)
                return re.sub(rf'^{name}[\s:\-]*', '', section_text, flags=re.IGNORECASE).strip()
        return None

# Обновленный DocumentProcessor
class DocumentProcessor:
    def __init__(self):
        self.vacancy_parser = AdvancedVacancyParser()
        self.uploaded_files = {}

    def upload_files(self):
        print("Загрузите файлы вакансий (DOCX или RTF):")
        uploaded = files.upload()
        self.uploaded_files = uploaded
        return uploaded
        
    def process_vacancy(self, file_name: str) -> Vacancy:
        if file_name in self.uploaded_files:
            return self.vacancy_parser.parse_file(self.uploaded_files[file_name], file_name)
        else:
            raise ValueError(f"Файл {file_name} не найден")

# Инициализация процессора
processor = DocumentProcessor()

In [8]:
pip install docx2txt

Collecting docx2txt
  Downloading docx2txt-0.9-py3-none-any.whl.metadata (529 bytes)
Downloading docx2txt-0.9-py3-none-any.whl (4.0 kB)
Installing collected packages: docx2txt
Successfully installed docx2txt-0.9
Note: you may need to restart the kernel to use updated packages.


In [23]:
import re
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
import docx
from google.colab import files
import io
import os
from docx2txt import process as rtf_process

@dataclass
class Candidate:
    experience_total: Optional[str] = None
    work_experience: List[Dict[str, Any]] = None
    education: List[Dict[str, Any]] = None
    skills: List[str] = None
    languages: List[Dict[str, Any]] = None
    positions: List[str] = None

    def __post_init__(self):
        if self.work_experience is None:
            self.work_experience = []
        if self.education is None:
            self.education = []
        if self.skills is None:
            self.skills = []
        if self.languages is None:
            self.languages = []
        if self.positions is None:
            self.positions = []

class ResumeParser:
    """Парсер для резюме"""

    def parse_file(self, file_content: bytes, filename: str) -> Candidate:
        """Парсинг файла резюме"""
        try:
            if filename.lower().endswith('.docx'):
                text = self._extract_text_from_docx(file_content)
            elif filename.lower().endswith('.rtf'):
                text = self._extract_text_from_rtf(file_content)
            else:
                raise ValueError(f"Неподдерживаемый формат: {filename}")

            return self.parse_text(text)

        except Exception as e:
            print(f"Ошибка при чтении резюме: {e}")
            return Candidate()

    def _extract_text_from_docx(self, file_content: bytes) -> str:
        """Извлечение текста из DOCX"""
        doc = docx.Document(io.BytesIO(file_content))
        text_parts = []

        for para in doc.paragraphs:
            if para.text.strip():
                text_parts.append(para.text)

        for table in doc.tables:
            for row in table.rows:
                row_text = []
                for cell in row.cells:
                    if cell.text.strip():
                        row_text.append(cell.text.strip())
                if row_text:
                    text_parts.append(" | ".join(row_text))

        return "\n".join(text_parts)

    def _extract_text_from_rtf(self, file_content: bytes) -> str:
        """Извлечение текста из RTF с улучшенной обработкой"""
        try:
            # Декодируем байты в строку
            rtf_text = file_content.decode('utf-8', errors='ignore')

            # Упрощенная обработка RTF
            text = self._simple_rtf_to_text(rtf_text)
            return text

        except Exception as e:
            print(f"Ошибка при обработке RTF: {e}")
            # Альтернативный метод: попробуем просто удалить RTF теги
            return self._fallback_rtf_processing(file_content)

    def _simple_rtf_to_text(self, rtf_text: str) -> str:
        """Упрощенный конвертер RTF в текст"""
        # Удаляем RTF заголовок
        text = re.sub(r'\\[a-zA-Z]+\d*', ' ', rtf_text)
        text = re.sub(r'\{.*?\}', ' ', text)
        text = re.sub(r'\\[{}]', '', text)

        # Заменяем специальные последовательности
        text = text.replace(r'\par', '\n')
        text = text.replace(r'\line', '\n')
        text = text.replace(r'\tab', '\t')
        text = text.replace(r'\emdash', '—')
        text = text.replace(r'\endash', '–')

        # Удаляем лишние пробелы и переносы
        text = re.sub(r'\s+', ' ', text)
        text = re.sub(r'\n\s+', '\n', text)

        return text.strip()

    def _fallback_rtf_processing(self, file_content: bytes) -> str:
        """Аварийный метод обработки RTF"""
        try:
            # Просто пытаемся декодировать как текст, игнорируя RTF разметку
            text = file_content.decode('utf-8', errors='ignore')
            # Удаляем явные RTF теги
            text = re.sub(r'\\[a-zA-Z]+\d*', ' ', text)
            text = re.sub(r'[{}]', ' ', text)
            text = re.sub(r'\s+', ' ', text)
            return text
        except:
            return "Не удалось извлечь текст из RTF файла"

    def parse_text(self, text: str) -> Candidate:
        """Парсинг текста резюме"""
        candidate = Candidate()
        text = self._normalize_text(text)

        candidate.experience_total = self._extract_experience(text)
        candidate.skills = self._extract_skills(text)
        candidate.languages = self._extract_languages(text)
        candidate.positions = self._extract_positions(text)

        return candidate

    def _normalize_text(self, text: str) -> str:
        """Нормализация текста"""
        text = re.sub(r'[–—−‐]', '-', text)
        text = re.sub(r'\s+', ' ', text)
        return text

    def _extract_experience(self, text: str) -> Optional[str]:
        """Извлечение опыта работы"""
        patterns = [
            r'Опыт работы[\s:\-]*([^\n]+?)(?=\n|$)',
            r'Стаж[\s:\-]*([^\n]+?)(?=\n|$)',
            r'Experience[\s:\-]*([^\n]+?)(?=\n|$)',
        ]

        for pattern in patterns:
            match = re.search(pattern, text, re.IGNORECASE)
            if match:
                experience = match.group(1).strip()
                experience = re.sub(r'[|\-]\s*', '', experience)
                return experience
        return None

    def _extract_skills(self, text: str) -> List[str]:
        """Извлечение навыков"""
        skills_section = self._find_section(text, ['Навыки', 'Skills'])
        if not skills_section:
            return []

        skills_section = re.sub(r'^(Навыки|Skills)[\s:\-]*', '', skills_section, flags=re.IGNORECASE)
        skills = re.findall(r'\b[A-ZА-Я][A-Za-zА-Яа-я0-9\s\/\+\.\-]+\b', skills_section)

        filtered_skills = []
        for skill in skills:
            skill = skill.strip()
            if (2 < len(skill) < 50 and
                not any(x in skill.lower() for x in ['навыки', 'skills', 'языки', 'languages']) and
                not re.match(r'^[0-9\s\-]+$', skill)):
                filtered_skills.append(skill)

        return list(set(filtered_skills))

    def _extract_languages(self, text: str) -> List[Dict[str, Any]]:
        """Извлечение языков"""
        languages_section = self._find_section(text, ['Языки', 'Languages'])
        if not languages_section:
            return []

        languages = []
        patterns = [
            r'([А-Яа-яA-Za-z]+)[\s\-:]+([А-Яа-яA-Za-z0-9\s\-]+)',
            r'([А-Яа-яA-Za-z]+)[\s\-]+уровень[\s\-]+([А-Яа-яA-Za-z0-9\s\-]+)',
        ]

        for pattern in patterns:
            matches = re.findall(pattern, languages_section, re.IGNORECASE)
            for lang, level in matches:
                lang = lang.strip()
                level = level.strip()
                if lang and level and len(lang) > 2:
                    languages.append({'language': lang, 'level': level})

        return languages

    def _extract_positions(self, text: str) -> List[str]:
        """Извлечение должностей"""
        positions = set()
        position_patterns = [
            r'(Ведущий специалист|Старший специалист|Специалист|Инженер|Разработчик|Аналитик|Менеджер|Администратор)',
            r'(Senior|Junior|Lead|Principal)\s+[A-Za-zА-Яа-я]+',
        ]

        for pattern in position_patterns:
            matches = re.findall(pattern, text, re.IGNORECASE)
            for match in matches:
                if isinstance(match, tuple):
                    match = match[0]
                positions.add(match.strip())

        return list(positions)

    def _find_section(self, text: str, section_names: List[str]) -> Optional[str]:
        """Поиск секции по названию"""
        for name in section_names:
            pattern = rf'{name}.*?(?=\n\s*[А-ЯA-Z]|\n\n|$)'
            match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
            if match:
                return match.group(0)
        return None

# Добавляем парсер резюме в DocumentProcessor
class DocumentProcessor:
    def __init__(self):
        self.resume_parser = ResumeParser()
        self.uploaded_files = {}

    def upload_files(self):
        print("Загрузите файлы (DOCX или RTF):")
        uploaded = files.upload()
        self.uploaded_files = uploaded
        return uploaded

    def upload_files(self, file_paths=None):
        """
        Загружает файлы по заданным путям.
        :param file_paths: Список путей к файлам (строки) или None (если загружать из текущей директории)
        :return: Словарь с именами файлов и их содержимым
        """
        if file_paths is None:
            # Если не указаны пути — ищем все .docx и .rtf в текущей папке
            file_paths = []
            for ext in ['docx', 'rtf']:
                file_paths.extend([f for f in os.listdir('.') if f.lower().endswith(ext)])

        uploaded = {}
        for path in file_paths:
            if not os.path.exists(path):
                print(f"Файл не найден: {path}")
                continue

            try:
                filename = os.path.basename(path)
                print(f"Обрабатываю файл: {filename}")

                if path.lower().endswith('.docx'):
                    doc = Document(path)
                    text = ' '.join([para.text for para in doc.paragraphs])
                elif path.lower().endswith('.rtf'):
                    text = rtf_process(path)
                else:
                    print(f"Неподдерживаемый формат: {path}")
                    continue

                uploaded[filename] = {
                    'path': path,
                    'text': text
                }
                print(f"Загружен: {filename}")

            except Exception as e:
                print(f"Ошибка при чтении {path}: {e}")

        self.uploaded_files = uploaded
        return uploaded

    def process_resume(self, file_name: str) -> Candidate:
        if file_name in self.uploaded_files:
            return self.resume_parser.parse_file(self.uploaded_files[file_name], file_name)
        else:
            raise ValueError(f"Файл {file_name} не найден")

# Инициализация процессора
processor = DocumentProcessor()

In [25]:
# Загружаем файлы
print("Загрузите ваши файлы (DOCX):")
uploaded = processor.upload_files(['/kaggle/input/111111111111111111/1  .rtf'])
print("Загружены файлы:", list(uploaded.keys()))

Загрузите ваши файлы (DOCX):
Обрабатываю файл: 1  .rtf
Ошибка при чтении /kaggle/input/111111111111111111/1  .rtf: File is not a zip file
Загружены файлы: []


In [None]:
# Обрабатываем вакансии с улучшенным выводом
print("Обработка вакансий...")
vacancy_files = [f for f in uploaded.keys() if any(kw in f.lower() for kw in ['ваканс', 'описан', 'vacanc', 'job'])]

for vacancy_file in vacancy_files:
    try:
        print(f"\n{'='*80}")
        print(f"🔍 ОБРАБОТКА ВАКАНСИИ: {vacancy_file}")
        print(f"{'='*80}")

        vacancy = processor.process_vacancy(vacancy_file)

        print("✅ ОСНОВНАЯ ИНФОРМАЦИЯ:")
        print(f"   📋 Должность: {vacancy.title or 'Не указана'}")
        print(f"   🏙️ Город: {vacancy.city or 'Не указан'}")
        print(f"   💼 Тип занятости: {vacancy.employment_type or 'Не указан'}")
        print(f"   📅 График работы: {vacancy.work_schedule or 'Не указан'}")
        print(f"   🎓 Уровень образования: {vacancy.education_level or 'Не указан'}")
        print(f"   ⏳ Требуемый опыт: {vacancy.experience_required or 'Не указан'}")

        print(f"\n📋 ОСНОВНЫЕ ТРЕБОВАНИЯ ({len(vacancy.requirements)}):")
        for i, req in enumerate(vacancy.requirements, 1):
            print(f"   {i:2d}. {req}")

        if vacancy.advantages:
            print(f"\n⭐ БУДЕТ ПРЕИМУЩЕСТВОМ ({len(vacancy.advantages)}):")
            for i, advantage in enumerate(vacancy.advantages, 1):
                print(f"   {i:2d}. {advantage}")

        print(f"\n📝 ОБЯЗАННОСТИ ({len(vacancy.responsibilities)}):")
        for i, resp in enumerate(vacancy.responsibilities, 1):
            print(f"   {i:2d}. {resp}")

    except Exception as e:
        print(f"❌ Ошибка при обработке {vacancy_file}: {e}")
        import traceback
        traceback.print_exc()

In [None]:
# Обрабатываем резюме
print("\nОбработка резюме...")
resume_files = [f for f in uploaded.keys() if any(kw in f.lower() for kw in ['резюме', 'cv', 'resume', 'образец'])]

for resume_file in resume_files:
    try:
        print(f"\n🔍 Обрабатываю резюме: {resume_file}")
        candidate = processor.process_resume(resume_file)

        print("✅ ДАННЫЕ КАНДИДАТА:")
        print(f"📅 Общий опыт: {candidate.experience_total or 'Не указан'}")

        print(f"\n💼 Должности ({len(candidate.positions)}):")
        for i, pos in enumerate(candidate.positions[:5], 1):
            print(f"   {i}. {pos}")

        print(f"\n🛠️  Навыки ({len(candidate.skills)}):")
        for i, skill in enumerate(candidate.skills[:15], 1):
            print(f"   {i}. {skill}")

        print(f"\n🌐 Языки ({len(candidate.languages)}):")
        for i, lang in enumerate(candidate.languages, 1):
            print(f"   {i}. {lang['language']} - {lang['level']}")

    except Exception as e:
        print(f"❌ Ошибка при обработке {resume_file}: {e}")