In [1]:
from abc import ABCMeta, abstractmethod, abstractproperty
import os
import sqlite3
import pandas as pd

import requests
import json
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
import time
from dataclasses import dataclass
from urllib.parse import urlparse, parse_qs, urlencode

from tqdm import tqdm

from gensim.models import Word2Vec
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from gensim.models import FastText
from transformers import BertModel, BertTokenizer
import torch
import spacy
import re

import dask.dataframe as dd

  torch.utils._pytree._register_pytree_node(


# Скачивание данных с api hh.ru для обучающей выборки
Ограничением api hh.ru является то, что за один раз можно скачать только 2000 вакансий.
Для получения большего количества вакансий были сформированы url с фильтрами так, что количесво записей получалось меньше 2000. Разбивка в основном была по регионам, профессиям, станциям метро. 
Скачать удалось не все, тк api выкидывает ошибки, если не использовать паузы. Для скачивания всей возможной выборки возможно потребуется использование нескольких экземпляров парсеров с разных ip адресов.
Удалось скачать около 150 тысяч описаний вакансий. Резюме с api hh.ru получить не удалось. 

In [None]:
@dataclass
class ClusterId:
    region: str = "area"
    metro: str = "metro"
    professional_role: str = "professional_role"
    salary: str = "salary"
    experience: str = "experience"

class HhParser:

    def __init__(self, cluster_id: ClusterId):
        self._base_url = 'https://api.hh.ru/vacancies?clusters=true'
        self.urls = []
        self._cluster_id = cluster_id

    def get_vacancy_cluster_urls(self):
        json = self.get_json(self._base_url)
        items = self.get_region_clusters(json)

        self.urls.extend((i.get('url'), int(i.get('count'))) for i in self.filter_less_than_2000(items))

        more_than_2000 = [i for i in self.filter_more_than_2000(items) if str(i.get('name')) != 'Россия']

        extend_region_items = self.get_extend_region_clusters(more_than_2000)

        self.urls.extend((i.get('url'), int(i.get('count'))) for i in self.filter_less_than_2000(extend_region_items))
        more_than_2000 = [i for i in self.filter_more_than_2000(extend_region_items)]
        more_than_2000.extend(i for i in extend_region_items if int(i.get('count')) == 0)

        role_items = self.get_items_by_prof_role(more_than_2000)
        self.urls.extend((i.get('url'), int(i.get('count'))) for i in self.filter_less_than_2000(role_items))
        more_than_2000 = [i for i in self.filter_more_than_2000(role_items)]

        experience_items = self.get_items_by_prof_role(more_than_2000)
        self.urls.extend((i.get('url'), int(i.get('count'))) for i in self.filter_less_than_2000(experience_items))
        more_than_2000 = [i for i in self.filter_more_than_2000(experience_items)]

        self.urls = list(set(self.urls))

        print(f'The total number of {len(self.urls)} URLs collected')


    def get_extend_region_clusters(self, region_items):
        items = []

        for item in tqdm(region_items, 'Get extend region clusters'):
            url = item.get('url')

            if url is None:
                continue

            time.sleep(2)
            json = self.get_json(url)

            metro_items = self.get_metro_clusters(json)
            metro_items = self.filter_metro_station_items(metro_items)

            if len(metro_items) > 0:
                items.extend(metro_items)
            else:
                extend_region_items = self.get_region_clusters(json)

                if len(extend_region_items) > 1:
                    items.extend(extend_region_items[:-1])
                else:
                    items.extend(extend_region_items)
        return items

    def get_items_by_prof_role(self, extend_items):
        items = []

        for item in tqdm(extend_items, 'Get items by prof role'):
            url = item.get('url')

            if url is None:
                continue

            time.sleep(1)

            json = self.get_json(url)
            prof_role_items = self.get_role_clusters(json)

            items.extend(prof_role_items)

        return items

    def get_items_by_experience(self, role_items):
        items = []

        for item in tqdm(role_items, 'Get items by experience'):
            url = item.get('url')

            if url is None:
                continue

            time.sleep(0.5)

            json = self.get_json(url)
            experience_items = self.get_experience_clusters(json)

            items.extend(experience_items)

        return items

    def filter_less_than_2000(self, items):
        return [i for i in items if int(i.get('count')) <= 2000 and int(i.get('count')) != 0]

    def filter_more_than_2000(self, items):
        return [i for i in items if int(i.get('count')) > 2000]

    def get_cluster_items(self, json, cluster_id):
        clusters = json.get('clusters')

        if not clusters:
            print('key error: clusters')
            return []

        cluster = self.get_cluster_by_id(clusters, cluster_id)

        if not cluster:
            return []

        items = cluster.get('items')

        if not items:
            print('key error: items')
            return []

        return items

    def get_cluster_by_id(self, clusters, cluster_id):
        for cluster in clusters:
            _id = str(cluster.get('id'))

            if _id == cluster_id:
                return cluster

        return []

    def get_region_clusters(self, json_data):
        if json_data:
            items = self.get_cluster_items(json_data, self._cluster_id.region)
            return  items
        else:
            print('get_region_clusters: Not found json in get response!')
            return []

    def get_metro_clusters(self, json_data):
        if json_data:
            items = self.get_cluster_items(json_data, self._cluster_id.metro)
            return items
        else:
            print('get_metro_clusters: Not found json in get response!')
            return []

    def get_role_clusters(self, json_data):
        if json_data:
            items = self.get_cluster_items(json_data, self._cluster_id.professional_role)
            return items
        else:
            print('get_role_clusters: Not found json in get response!')
            return []

    def get_experience_clusters(self, json_data):
        if json_data:
            items = self.get_cluster_items(json_data, self._cluster_id.experience)
            return items
        else:
            print('get_role_clusters: Not found json in get response!')
            return []

    def filter_metro_station_items(self, metro_items):
        return [i for i in metro_items if str(i.get('type')) == 'metro_station']

    def get_json(self, url):
        response = requests.get(url, verify=False)

        if response.status_code == 200:
            json = response.json()

            if not json:
                return None;

            return json
        else:
            print(f"Failed to get json data by url {url}, status {response.status_code}, text - {response.text}")
            return None


In [None]:
cluster_id = ClusterId()

parser = HhParser(cluster_id)
parser.get_vacancy_cluster_urls()

sorted_list = sorted(parser.urls, key=lambda x: x[1])

In [None]:
# объединяем url так, чтобы количество записей по ним было максимальным, но не больше 2000
def parse_and_modify_urls(url_tuples):
    url_params_map = {}

    # Группируем URL по набору имен параметров
    for url, count in url_tuples:
        parsed_url = urlparse(url)
        query_params = parse_qs(parsed_url.query)
        query_params.pop('clusters', None)  # Убираем параметр 'clusters'
        query_param_names = sorted(query_params.keys())  # Получаем наименования параметров и сортируем их
        query_param_key = ','.join(query_param_names)

        if query_param_key not in url_params_map:
            url_params_map[query_param_key] = []

        url_params_map[query_param_key].append((url, count))

    modified_urls = []

    # Формируем новые URL с суммой количества, не превышающей 2000
    for query_param_key, url_count_pairs in url_params_map.items():
        total_count = 0
        combined_params = []

        # Набираем параметры для нового URL
        for url, count in url_count_pairs:

            if total_count + count <= 2000:
                total_count += count
                parsed_url = urlparse(url)
                query_params = parse_qs(parsed_url.query)
                for param_name, param_values in query_params.items():
                    combined_params.append((param_name, param_values[0]))  # Добавляем все значения параметров
            else:

                # Убираем параметр 'clusters' из объединенных параметров, если он есть
                combined_params = [(param_name, param_value) for param_name, param_value in combined_params if param_name != 'clusters']

                # Преобразуем объединенные параметры в строку
                new_query = urlencode(combined_params, doseq=True)
                new_url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}?{new_query}"
                modified_urls.append((new_url, total_count))  # Добавляем новый URL с суммарным количеством

                combined_params = []
                total_count = count

                parsed_url = urlparse(url)
                query_params = parse_qs(parsed_url.query)

                for param_name, param_values in query_params.items():
                    combined_params.append((param_name, param_values[0]))  # Добавляем все значения параметров

        # Убираем параметр 'clusters' из объединенных параметров, если он есть
        combined_params = [(param_name, param_value) for param_name, param_value in combined_params if param_name != 'clusters']

        # Преобразуем объединенные параметры в строку
        new_query = urlencode(combined_params, doseq=True)
        new_url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}?{new_query}"
        modified_urls.append((new_url, total_count))  # Добавляем новый URL с суммарным количеством

    return modified_urls


modified_urls = parse_and_modify_urls(sorted_list)

In [2]:

class DataSourcePdAdapter():
    __metaclass__ = ABCMeta

    def __init__(self, directory_path, file_name):
        self._directory_path = directory_path
        self._file_name = file_name
        self._extension = ''        
        self._check_filename()

    def _get_full_path(self):
        path = os.path.join(self._directory_path, f'{self._file_name}.{self._extension}')
        return path

    def _check_directory_path(self):
        if self._directory_path == '':
            return 
        if not os.path.exists(self._directory_path):
            os.mkdir(self._directory_path)

        if not os.path.exists(self._directory_path):
            raise Exception(f'Ошибка при создании каталога {self._directory_path}')

    def _check_filename(self):
        if self._file_name == '':
            raise Exception(f'file_name не может быть пустой строкой!')

    @abstractmethod
    def write(self, df):
        pass

    @abstractmethod
    def _read(self):
        pass

class CsvPdAdapter(DataSourcePdAdapter):

    def __init__(self, directory_path, file_name, sep=','):
        super().__init__(directory_path, file_name)
        self._sep = sep
        self._extension = 'csv'

    def write(self, df):
        df.to_csv(self._get_full_path(), index=False, sep=self._sep)

    def read(self, n_rows=None, chunksize=None):
        path = self._get_full_path()
        print(path)
        return pd.read_csv(path, sep=self._sep, chunksize=chunksize, nrows=n_rows, engine='python')

class SqlLitePdAdapter(DataSourcePdAdapter):
    def __init__(self, directory_path, file_name, table_name):
        super().__init__(directory_path, file_name)
        self._table = table_name
        self._operation_type = 'append'
        self._extension = 'db'

        self._create_table()

    def _create_table(self):
        with sqlite3.connect(self._get_full_path()) as con:
            con.execute(f"""
                CREATE TABLE if not exists {self._table} (
                    id INTEGER PRIMARY KEY,
                    name VARCHAR(100),
                    description TEXT,
                    branded_description TEXT,
                    key_skills VARCHAR(200),
                    professional_roles VARCHAR(200)

                );
            """)

    def write(self, df):
        self._check_directory_path()
        with sqlite3.connect(self._get_full_path()) as con:
            df.to_sql(self._table, con, index=False, if_exists="replace")

    def write_row(self, vacancy):
        self._check_directory_path()
        
        with sqlite3.connect(self._get_full_path()) as connection:
            cursor = connection.cursor()

            try:
                  cursor.execute(
                      f"""
                      INSERT INTO {self._table}
                      (
                        id, name, description, branded_description, key_skills, professional_roles
                      )
                      VALUES (?, ?, ?, ?, ?, ?)
                      """, (vacancy.id, vacancy.name, vacancy.description, vacancy.branded_description,
                            vacancy.key_skills, vacancy.professional_roles)
                )
            except Exception as ex:
                print(f'vacancy - {str(vacancy)}')
                print(f'ошибка при записи в БД - {ex}')
                
                raise ex

    def read_all(self, chunksize=None):
        query = f'select * from {self._table}'
        with sqlite3.connect(self._get_full_path()) as con:   
            con.text_factory = str
            con.text_factory = lambda b: b.decode(errors='ignore')
            return pd.read_sql(query, con, chunksize=chunksize)

    def read(self, query_params_str='', target_columns=None):
        if target_columns:
            target_columns_srt = ', '.join(target_columns)
        else:
            target_columns_srt = '*'

        query = f'select {target_columns_srt} from {self._table} {query_params_str}'
        
        with sqlite3.connect(self._get_full_path()) as con:
            return pd.read_sql(query, con)


In [203]:
sqlite3_adapter = SqlLitePdAdapter('', 'vacancies', 'vacancies')

In [None]:
@dataclass
class Vacancy:
    id: int
    name: str
    description: str
    branded_description: str
    key_skills: str
    professional_roles: str

In [None]:
# Получение подробных данных по каждой вакансии, каждую запись сохраняем сразу в БД, ошибки игнорируем
def fetch_vacancies(url):
    per_page = 100
    extracted_vacancies = 0
    continues_vac = 0

    ids = []

    result_db = sqlite3_adapter.read(target_columns=['id'])   

    total_pages = 2000 // per_page + (1 if 2000 % per_page != 0 else 0)
    ids = []
    
    captcha_vacancies = []

    for page in range(0, 1):  # получить данные по всем страницам не реально под одним ip
        params = {'per_page': per_page, 'page': page}
        
        try:
            response = requests.get(url, params=params)
        except Exception as ex:
            print(f'ошибка при получении списка вакансий {ex}')
            continue

        if response.status_code == 200:
            time.sleep(0.2)
            vacancies = response.json()

            for item in tqdm(vacancies['items']):
                is_exists = result_db["id"].isin([int(item['id'])]).any()

                if is_exists or item['id'] in ids:
                    continues_vac += 1
                    continue

                time.sleep(0.3)

                try:
                    result = get_vacancy(item['id'])
                except Exception as ex:
                    print(f'ошибка при получении вакансии - {ex}')
                    continue

                if 'captcha_required' in str(result):
                    time.sleep(1)                    
                    captcha_vacancies.append(item['id'])
                    continue

                if 'errors' in str(result):
                    time.sleep(2)                    
                    captcha_vacancies.append(item['id'])
                    continue

                _id = int(result.get('id'))
                name = result.get('name')
                description = result.get('description')
                branded_description = result.get('branded_description')
                key_skills = ' '.join(i.get('name') for i in result.get('key_skills'))
                professional_roles = ' '.join(i.get('name') for i in result.get('professional_roles'))

                vacancy = Vacancy(_id, name, description, branded_description, key_skills, professional_roles)

                try:
                    sqlite3_adapter.write_row(vacancy)
                except Exception as e:
                    print(e)
                    continue

                ids.append(_id)
                extracted_vacancies += 1

        elif response.status_code == 400 :
            print("Reached the limit of 2000 items. Stopping further fetching.")

        else:
            print(response.text)
            print(f"Error fetching vacancies. Status code: {response.status_code}")

    print(f'вакансий получено - {extracted_vacancies}')
    print(f'вакансий пропущено - {continues_vac}')
    print(f'вакансий с ошибкой - {len(captcha_vacancies)}')
    
    return captcha_vacancies

def get_vacancy(id_vacancy):
    target_url = f'https://api.hh.ru/vacancies/{id_vacancy}'
    vacancy = requests.get(target_url).json()
    return vacancy

for ind in range(0, len(modified_urls) - 1):
    url = modified_urls[ind]   
    fetch_vacancies(url[0])

In [67]:
full_df = pd.DataFrame()
chunk_size = 10000  

for chunk in sqlite3_adapter.read_all(chunksize=chunk_size):
    full_df = pd.concat([full_df, chunk], ignore_index=True)


In [5]:
full_df.shape[0]

149988

In [None]:
# объединяем данные по вакансии в одно описание, нас интересуют название вакансии, ключевые навыки и описание вакансии.
full_df['description_all'] = full_df['name'] + ' ' + full_df['description'] + ' ' + full_df['key_skills']
sqlite3_adapter.write(full_df)

## Получение тестовой выборки

Тестовая выборка на основе баз данных с сайта https://data.rcsi.science/data-catalog/datasets/186/#dataset-codebook 
Были получены описания вакансий и резюме, а так же отклики кандидатов на вакансии. По откликам столкнул вакансии и резюме.
Для уменьшения размера выборки выбрал резюме, по которым было отправлено более 3-х откликов. 
По полученным id резюме выбрал описания опыта резюме, а так же id вакансий. 
Из полученной выборки выбрал 1 резюме и по названию должности искал 10 вакансий для резюме.
Результат сохранил в csv resultResponsesTest. 
Так же в csv файле добавлены вручную id вакансий и некоторые тест-кейсы для получения более разнообразной выборки.
Итоговая тестовая выборка собрана в файле resultResponsesTest

В итоге собрано 325 резюме и покаждому по наименованию должности подобрано 10 описаний вакансий


In [3]:
trud_vsem_path = r'D:\vacancies_project'

trud_response_csv_adapter = CsvPdAdapter(trud_vsem_path, 'responses', sep=';')
trud_vacancies_csv_adapter = CsvPdAdapter(trud_vsem_path, 'vacancies', sep=';')
trud_resume_csv_adapter = CsvPdAdapter(trud_vsem_path, 'workexp', sep=';')

In [5]:
responses_df = pd.DataFrame()
chunk_size = 10000  

for chunk in trud_response_csv_adapter.read(chunksize=chunk_size):
    responses_df = pd.concat([responses_df, chunk], ignore_index=True)

D:\vacancies_project\responses.csv


In [12]:
# получение выборки с откликами
responses_df = responses_df[responses_df['response_type'] == 'Принятие']

response_unique = responses_df.drop_duplicates(subset=['id_cv', 'id_vacancy'])

group_count_df = response_unique\
                .groupby('id_cv', as_index=False)\
                .count()\
                .sort_values(by='activity_flag_candidate', ascending=False)

filtered_response_df = group_count_df.query('activity_flag_candidate >= 3')

In [13]:
filtered_response_df.head(2)

Unnamed: 0,id_cv,activity_flag_candidate,activity_flag_manager,date_creation,date_creation_mistake,date_last_updated,date_modify,date_modify_mistake,id_candidate,id_hiring_organization,is_new,id_reply,id_response,id_vacancy,region_code,response_type
120559,ca9afaf0-2478-11e8-ac03-037acc02728d,430,430,430,430,430,430,0,430,430,430,0,430,430,430,430
4331,0771d920-73d8-11e5-b17c-239645b044d5,90,90,90,90,90,90,0,90,90,90,0,90,90,90,90


In [27]:
id_cv_response = set(filtered_response_df['id_cv'])

result_response_df = response_unique[response_unique['id_cv'].isin(id_cv_response)]
result_response_df = result_response_df[['id_cv', 'id_vacancy']]
result_response_df 

Unnamed: 0,id_cv,id_vacancy
7542,4eaa0347-bc4c-11e6-94b6-0f468c90bfa7,9b1ac386-bbc6-11e6-9b24-736ab11edb0c
7543,91c6a1c8-071d-11e8-91b8-ef76bd2a03c1,0cec9c46-778c-11e8-bb58-736ab11edb0c
586279,06d34560-bf66-11ea-bd72-e37b4be0b9ed,ad13c4f6-7a6d-11eb-af9c-b905beff6f7a
586280,0c103c08-9c40-11e7-b7a1-ef76bd2a03c1,d70e3206-3b00-11e7-adfe-037acc02728d
586285,28ce6a54-dc0d-11e5-9c7c-037acc02728d,fd9a99b6-3748-11e7-a8f8-4376a32b3f45
...,...,...
1654591,faf1ae9a-f1e0-11e6-a868-736ab11edb0c,f0f7a2e6-07b7-11e7-8c0a-ef76bd2a03c1
1654592,91f292c0-119e-11ea-8d31-e37b4be0b9ed,68fd1ff2-9847-11e9-8ba1-bf2cfe8c828d
1654594,0ad22ad8-ccc8-11e6-b9e5-ef76bd2a03c1,32b96f46-91f4-11e6-b83e-4376a32b3f45
1654596,069687d0-3e25-11e7-a296-736ab11edb0c,e7651437-0741-11e8-8956-037acc02728d


In [29]:
# получаем id вакансий, которые будем читать из файла, тк файл огромный, все вакансии не нужны
filtered_vacancies_response_df = responses_df[(responses_df['id_cv'].isin(id_cv_response)) & (responses_df['response_type'] == 'Принятие')]
id_vacancies_response = set(filtered_vacancies_response_df['id_vacancy'])

In [30]:
# получаем резюме из csv 
result_resume_chunk_df = pd.DataFrame()
chunk_size = 10000  

for chunk in tqdm(trud_resume_csv_adapter.read(chunksize=chunk_size)):
    filtered_chunk = chunk[chunk['id_cv'].isin(id_cv_response)]    
    result_resume_chunk_df = pd.concat([result_resume_chunk_df, filtered_chunk], ignore_index=True)    

D:\vacancies_project\workexp.csv


1217it [14:31,  1.40it/s]


In [31]:
# получаем вакансии из csv 
result_vacancies_chunk_df = pd.DataFrame()
chunk_size = 30000  

for chunk in tqdm(trud_vacancies_csv_adapter.read(chunksize=chunk_size)):
    filtered_chunk = chunk[chunk['identifier'].isin(id_vacancies_response)]
    result_vacancies_chunk_df = pd.concat([result_vacancies_chunk_df, filtered_chunk], ignore_index=True)

D:\vacancies_project\vacancies.csv


443it [20:03,  2.72s/it]


In [None]:
# формируем датафрейм с описаниями вакансий
result_vacancies_df = result_vacancies_chunk_df[['identifier', 'responsibilities', 'title', 'requirements_qualifications', 'employment_type', 'education_requirements_speciality', 'education_requirements_education_type']]
result_vacancies_df.fillna('', inplace=True)
result_vacancies_df['education_requirements_education_type'] = result_vacancies_df['education_requirements_education_type'].apply(lambda x: f'образование {x}' if x != '' else x)

result_vacancies_df['vacancy_description'] = result_vacancies_df.apply(
    lambda row: f"{row['title']}\
    {row['responsibilities']}\
    {row['education_requirements_education_type']}\
    {row['education_requirements_speciality']}\
    {row['employment_type']}", 
    axis=1
)

In [183]:
professions = set(result_resume_chunk_df['job_title'])
prof_vacancies = result_vacancies_df[result_vacancies_df['title'].isin(professions)].drop_duplicates('vacancy_description')

In [207]:
# оставляем только 10 вакансий на каждое название профессии
filtered_prof_df = prof_vacancies.groupby('title').head(10)
filtered_prof_df = filtered_prof_df.groupby('title').filter(lambda x: len(x) == 10)

In [45]:
# получение датафрейма с описаниями резюме
result_resume_chunk_df.fillna('', inplace=True)
result_resume_df_unique = result_resume_chunk_df.drop_duplicates()

result_resume_df_unique['resume_description'] = result_resume_df_unique.apply(
    lambda row: f"{row['job_title']}\
    {row['date_from']}\
    {row['date_to']}\
    {row['company_name']}\
    {row['demands']}", 
    axis=1
)

result_resume_df_unique = result_resume_df_unique[['id_cv', 'resume_description']]
result_resume_df_unique = result_resume_df_unique.drop_duplicates()
result_resume_df_unique.head()

Unnamed: 0,id_cv,resume_description
0,38ea68a0-26e6-11e8-82b5-e37b4be0b9ed,"Нач участка,мастер 2016-09-01 ООО ""Т..."
1,38ea68a0-26e6-11e8-82b5-e37b4be0b9ed,Управляющий директор 2015-03-01 2016-08-...
2,38ea68a0-26e6-11e8-82b5-e37b4be0b9ed,"Начальник участка,мастер 2014-04-01 2015..."
3,38ea68a0-26e6-11e8-82b5-e37b4be0b9ed,Начальник участка 2014-03-01 2015-09-01 ...
4,38ea68a0-26e6-11e8-82b5-e37b4be0b9ed,Начальник участка 2013-07-01 2014-03-01 ...


In [None]:
# формирование тестовой выборки 
n = 0
prof_df = pd.DataFrame()
for title in tqdm(set(filtered_prof_df.title)):   
    
    for i in list(filtered_prof_df[filtered_prof_df.title == title].vacancy_description):
        row = {}    
        row['id_cv'] = n
        row['resume_description'] = result_resume_df_unique[result_resume_df_unique['resume_description'].str.startswith(title)].resume_description.iloc[0]
        row['id_vacancy'] = '' # добавил вручную в файле
        row['vacancy_description'] = i
        row_df = pd.DataFrame([row])
        
        prof_df = pd.concat([prof_df, row_df], ignore_index=True)
    n += 1

In [5]:
trud_result_csv_adapter = CsvPdAdapter(trud_vsem_path, 'resultResponsesTest', sep=';')
trud_result_csv_adapter.write(prof_df)

In [45]:
test_vacancies_df = trud_result_csv_adapter.read()

D:\vacancies_project\resultResponsesTest.csv


# Получение и обработка обучающей выборки

In [6]:
sqlite3_adapter = SqlLitePdAdapter('', 'vacancies', 'vacancies')

train_vacancies_df = pd.DataFrame()
chunk_size = 10000  

for chunk in sqlite3_adapter.read_all(chunksize=chunk_size):
    train_vacancies_df = pd.concat([train_vacancies_df, chunk], ignore_index=True)

In [7]:
train_data = train_vacancies_df.copy()

In [8]:
train_data.head(2)

Unnamed: 0,id,name,description,key_skills,professional_role_id,description_all,vacancy_description_proc
0,18330133,Агент по недвижимости (м. Пролетарская),Компания ИНКОМ- Недвижимость уже 30 лет успешн...,Поиск и привлечение клиентов Активные продажи ...,1,Агент по недвижимости (м. Пролетарская) Компан...,недвижимость лет успешно работает рынке мы при...
1,20155035,Руководитель отдела продаж (Private Banking),АТОН - старейшая инвестиционная компания Росси...,,2,Руководитель отдела продаж (Private Banking) А...,руководитель отдела продаж private banking ато...


In [9]:
class TextProcessor:
    def __init__(self):   
        import re
        import spacy
        
        self.nlp = spacy.load("ru_core_news_sm")
        self.stop_words = self.nlp.Defaults.stop_words        

    def spacy_tokenize(self, text):  
        doc = self.nlp(text)  
        
        tokens = []
        
        for token in doc: 
            if token.is_space:
                continue 
                
            if token.lemma_ != '':
                tokens.append(token.lemma_)
        
        return tokens
    
    def preprocess_text(self, text, is_stop_words=True):
        import re
        
        words = []
            
        text = re.sub(r'(ООО|ПАО|ЗАО|ОАО|АО)([\"«])', r'\1 \2', text)
        text = re.sub(r'[^А-ЯЁа-яёA-Za-z\s]', '', text)    
        
        doc = self.nlp(text)       
        ignored_tokens = set()
        
        for ent in doc.ents:            
            if ent.label_ in ["ORG", "LOC"]:
                ignored_tokens.update([token.text for token in ent])
        
        for word in doc:       
            if word.is_punct or word.is_space:
                continue
            
            if word.text in self.stop_words and is_stop_words:                
                continue
         
            if word.text in ignored_tokens: 
                continue 
                
            words.append(word.text.lower())
        
        text = ' '.join(words)
        
        return text
    
    def clean_html_text(self, text):        
        if not text:
            return ''
        
        soup = BeautifulSoup(text, 'html.parser')
        
        for script in soup(["script"]):
            script.extract()
        
        for style in soup.find_all('style'):
            style.extract()

        clean_text = soup.get_text(separator=' ', strip=True)

        return clean_text
    
processor = TextProcessor()

In [27]:
# предобработка описаний вакансий в обучающей выборке
from pandarallel import pandarallel

pandarallel.initialize(progress_bar=True)

train_data['vacancy_description_proc'] = train_data['description_all'].parallel_apply(processor.preprocess_text)
    
sqlite3_adapter.write(train_data)

INFO: Pandarallel will run on 4 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.

https://nalepae.github.io/pandarallel/troubleshooting/


VBox(children=(HBox(children=(IntProgress(value=0, description='0.00%', max=37497), Label(value='0 / 37497')))…

In [None]:
from pandarallel import pandarallel

pandarallel.initialize(progress_bar=True)

train_data['vacancy_description_toc'] = train_data['vacancy_description_proc'].parallel_apply(processor.spacy_tokenize)
    
sqlite3_adapter.write(train_data)

INFO: Pandarallel will run on 4 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.

https://nalepae.github.io/pandarallel/troubleshooting/


VBox(children=(HBox(children=(IntProgress(value=0, description='0.00%', max=37497), Label(value='0 / 37497')))…

In [8]:
class Vectorizer():
    __metaclass__ = ABCMeta

    def __init__(self, processor):
        self._processor = processor

    @abstractmethod
    def fit(self, docs):
        pass
    
    @abstractmethod
    def vectorize(self, docs):
        pass
    
    def process(self, docs):
        pass


class TFIDFVectorizer(Vectorizer):
    def __init__(self, processor):
        super().__init__(processor)     
        self.vectorizer = TfidfVectorizer()

    def fit(self, docs):
        start = time.time()
        flat_docs = self.process(docs)
        self.vectorizer.fit_transform(flat_docs)
        end = time.time() - start
        return end

    def vectorize(self, docs):
        flat_docs = self.process(docs)
        return self.vectorizer.transform(flat_docs)

    def process(self, docs):        
        return [' '.join(self._processor.spacy_tokenize(doc)) for doc in docs]
         

class Word2VecVectorizer(Vectorizer):
    def __init__(self, processor, size=2, window=5, min_count=1, workers=2):
        super().__init__(processor)
        self.size = size
        self.window = window
        self.min_count = min_count
        self.workers = workers
        self.model = None

    def fit(self, docs):
        start = time.time()
        self.model = Word2Vec(docs, vector_size=self.size, min_count=self.min_count)
        end = time.time() - start
        return end

    def vectorize(self, docs):
        vectors = []
        for words in docs:
            words_vecs = [self.model.wv[word] for word in words if word in self.model.wv]
    
            if len(words_vecs) == 0:
                words_vecs = np.zeros(self.size)       
            
            vectors.append(np.array(words_vecs).sum(axis=0))
            
        return np.array(vectors)
    
        
class FastTextVectorizer(Vectorizer):
    def __init__(self, processor, size=100, window=5, min_count=1, workers=2):
        super().__init__(processor)
        self.size = size
        self.window = window
        self.min_count = min_count
        self.workers = workers
        self.model = None

    def fit(self, docs):
        start = time.time()
        self.model = FastText(
              docs, 
              vector_size=self.size, 
              window=self.window, 
              min_count=self.min_count, 
              workers=self.workers
        )
        end = time.time() - start
        return end
        
    def vectorize(self, docs):        
        return np.array([np.mean([self.model.wv[word] for word in words if word in self.model.wv]
                                 or [np.zeros(self.size)], axis=0)
                         for words in docs])



class BERTVectorizer:
    def __init__(self, model_name='bert-base-multilingual-cased', batch_size=32):
        self.tokenizer = BertTokenizer.from_pretrained(model_name)
        self.model = BertModel.from_pretrained(model_name)
        self.batch_size = batch_size
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model.to(self.device)

    def transform(self, texts):
        self.model.eval()
        with torch.no_grad():
            embeddings = []
            for i in range(0, len(texts), self.batch_size):
                batch = texts[i:i+self.batch_size]
                encoded_input = self.tokenizer(batch, return_tensors='pt', padding=True, truncation=True).to(self.device)
                outputs = self.model(**encoded_input)
                embeddings.append(outputs[1].cpu().numpy())
            return np.vstack(embeddings)

# Использование класса
# bert_vectorizer = BERTVectorizer(batch_size=16)  # Можете настроить размер батча в зависимости от доступной памяти GPU
# vectors = bert_vectorizer.transform(["Пример текста", "Еще один текст"])

    
# class BERTVectorizer(Vectorizer):
#     def __init__(self, processor, model_name='bert-base-multilingual-cased'):
#         super().__init__(processor)
#         self.tokenizer = BertTokenizer.from_pretrained(model_name)
#         self.model = BertModel.from_pretrained(model_name)

#     def vectorize(self, docs):
#         with torch.no_grad():
#             return np.array(
#                 [self.model(**self.tokenizer(text, return_tensors='pt', padding=True, truncation=True))[1].numpy() for text in docs]
#             )
    
#     def process(self, docs):
#         # нужен список строк, без лемматизации и токенизации, удаление стоп-слов и цифр
#         pass


# Подготовка тестовой выборки

In [9]:
trud_vsem_path = r'D:\vacancies_project'
trud_result_csv_adapter = CsvPdAdapter(trud_vsem_path, 'resultResponsesTestProcessed2', sep=';')

test_vacancies_df = trud_result_csv_adapter.read()
data = test_vacancies_df.copy()

D:\vacancies_project\resultResponsesTestProcessed2.csv


In [97]:
%%time
data['resume_description_proc'] = data['resume_description'].apply(processor.preprocess_text)
data['vacancy_description_proc'] = data['vacancy_description'].apply(processor.preprocess_text)

CPU times: total: 3min 29s
Wall time: 3min 29s


In [10]:
data.head(2)

Unnamed: 0,id_cv,resume_description,id_vacancy,vacancy_description,resume_description_proc,vacancy_description_proc,resume_description_token,vacancy_description_token
0,0023b346-73b3-11e8-a184-9122a281f90e,Водитель погрузчика 2007-11-01 2008-10-0...,1,водитель погрузчика производить погрузку на...,водитель погрузчика готовой продук...,водитель погрузчика производить погрузку о...,"['водитель', 'погрузчик', ' ', 'гот...","['водитель', 'погрузчик', ' ', 'производить..."
1,0023b346-73b3-11e8-a184-9122a281f90e,Водитель погрузчика 2007-11-01 2008-10-0...,2,водитель погрузчика СРОЧНО! производить пог...,водитель погрузчика готовой продук...,водитель погрузчика производить погрузку о...,"['водитель', 'погрузчик', ' ', 'гот...","['водитель', 'погрузчик', ' ', 'производить..."


In [11]:
import pandas as pd
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import time

# Перечисляем методы векторизации
# Функция для вычисления метрик
def calculate_metrics(predicted, actual):
    predicted = set(predicted)
    actual = set(actual)
    
    tp = len(predicted & actual)
    fp = len(predicted - actual)
    fn = len(actual - predicted)
    
    precision = tp / (tp + fp) if tp + fp > 0 else 0
    recall = tp / (tp + fn) if tp + fn > 0 else 0
    f1_score = 2 * precision * recall / (precision + recall) if precision + recall > 0 else 0
    
    return precision, recall, f1_score

def compute_test(name, vectorizer, results, data):
    print(name)
    start = time.time()
    data_resume = data.drop_duplicates(['id_cv', 'resume_description_token'])
    
    if name == 'BERT':
        resume_vecs = vectorizer.vectorize(data_resume['resume_description_proc'])
        vacancy_vecs = vectorizer.vectorize(data['vacancy_description_proc'])
    else:        
        resume_vecs = vectorizer.vectorize(data_resume['resume_description_token'])
        vacancy_vecs = vectorizer.vectorize(data['vacancy_description_token'])
    
    # Вычисление косинусного сходства и предсказание
    cos_sim = cosine_similarity(resume_vecs, vacancy_vecs)  
    print(len(cos_sim))
    
    top_10_preds = np.argsort(-cos_sim, axis=1)[:, :10]
    print(len(top_10_preds))
    
    end_time =  time.time() - start
    
    assert(len(cos_sim) == len(set(data_resume['id_cv'])), f'cos_sim {len(cos_sim)} id_cv {0}')
    
    # Сохраняем результаты
    precision_scores, recall_scores, f1_scores = [], [], []
    
    for idx, row in enumerate(data_resume['id_cv']):        
        actual_ids = data[data['id_cv'] == row]['id_vacancy'].tolist()
        
        predicted_indices = top_10_preds[idx]
    
        # Конвертируем предсказанные индексы вакансий в фактические идентификаторы
        predicted_ids = [data.iloc[i]['id_vacancy'] for i in predicted_indices]
        
#         print(predicted_ids)
        
#         for id_ in predicted_ids:
#             print(data[data['id_vacancy'] == id_].vacancy_description_token.iloc[0])
        
        prec, rec, f1 = calculate_metrics(predicted_ids, actual_ids)
        precision_scores.append(prec)
        recall_scores.append(rec)
        f1_scores.append(f1)
        
    results[name] = {
        'Precision': np.mean(precision_scores),
        'Recall': np.mean(recall_scores),
        'F1 Score': np.mean(f1_scores),
        'time': end_time,
        'cos_sim': np.mean(top_10_preds)
    }


  assert(len(cos_sim) == len(set(data['id_cv'].drop_duplicates())), f'cos_sim {len(cos_sim)} id_cv {0}')


In [12]:
data.head(2)

Unnamed: 0,id_cv,resume_description,id_vacancy,vacancy_description,resume_description_proc,vacancy_description_proc,resume_description_token,vacancy_description_token
0,0023b346-73b3-11e8-a184-9122a281f90e,Водитель погрузчика 2007-11-01 2008-10-0...,1,водитель погрузчика производить погрузку на...,водитель погрузчика готовой продук...,водитель погрузчика производить погрузку о...,"['водитель', 'погрузчик', ' ', 'гот...","['водитель', 'погрузчик', ' ', 'производить..."
1,0023b346-73b3-11e8-a184-9122a281f90e,Водитель погрузчика 2007-11-01 2008-10-0...,2,водитель погрузчика СРОЧНО! производить пог...,водитель погрузчика готовой продук...,водитель погрузчика производить погрузку о...,"['водитель', 'погрузчик', ' ', 'гот...","['водитель', 'погрузчик', ' ', 'производить..."


In [None]:
# from joblib import Parallel, delayed
# from tqdm import tqdm

# # Определяем функцию, которая будет вызываться параллельно для каждого документа
# def tokenize_document(doc):
#     return processor.spacy_tokenize(doc)

# # Распараллеливаем выполнение функции для каждого документа
# train_tokens_parallel = Parallel(n_jobs=-1)(delayed(tokenize_document)(doc) for doc in tqdm(train_data['vacancy_description_proc'].tolist()))


In [None]:
fasttext_vectorizer = FastTextVectorizer(processor)
tfidf_vectorizer = TFIDFVectorizer(processor)
word2vec_vectorizer = Word2Vec(processor)
bert_vectorizer = BERTVectorizer()

total_data = test_data['vacancy_description_token'].tolist()\
                    + train_data['vacancy_description_toc'].tolist()\
                    + test_data['resume_description_token'].drop_duplicates().tolist()

In [None]:
%%time
learning_time_tfidf = tfidf_vectorizer.fit(total_data)

In [None]:
%%time
learning_time_fasttext = fasttext_vectorizer.fit()

In [None]:
%%time
learning_time_word2vec = word2vec_vectorizer.fit()

In [None]:
# # Создаем словарь для результатов
results = {}

vectorizers = {
    'tfidf': tfidf_vectorizer,
    'fastText': fasttext_vectorizer,
    'word2vec': word2vec_vectorizer
}

for name, vectorizer in vectorizers.items():
    compute_test(name, vectorizer, results, test_data)

# Вывод результатов
for method, metrics in results.items():
    print(f"Results for {method}:")
    for metric, value in metrics.items():
        print(f"{metric}: {value:.2f}")

In [61]:
import joblib
tfidf_vectorizer = vectorizers['TFIDF']
joblib.dump(tfidf_vectorizer, 'tfidf_vectorizer.joblib', protocol=4)

['tfidf_vectorizer.joblib']

In [83]:
word2vec_vectorizer = vectorizers['Word2Vec']
word2vec_vectorizer.model.save('word2vec_model')

fasttext_vectorizer = vectorizers['FastText']
fasttext_vectorizer.model.save('fasttext_model')