# <center>Анализ закрытых вакансий сайта Headhunter</center>
##### 1. Получить список закрытых вакансий IT в Петербурге
##### 2. Подготовить базу для анализа

In [28]:
import http.client
import json
import time
import numpy
import re 
import datetime as dt
from datetime import timedelta as td
import pandas as pd
import csv
from pymongo import MongoClient
import threading

In [29]:
# Подключаю базу данных
import sqlite3
conn_db = sqlite3.connect('hr.db', timeout=10)
c = conn_db.cursor()

In [30]:
# Создание таблицы историй изменения статуса вакансий 
c.execute("PRAGMA synchronous = OFF")

c.execute('''             
            create table if not exists vacancy_history
                 (
                     id_vacancy integer, 
                     date_load text, 
                     date_from text,
                     date_to text
                 )
             ''')

<sqlite3.Cursor at 0x11ceebfc490>

In [31]:
headers = {"User-Agent": "hh-recommender"}

В общеем, идея в следующем: 
1. собирая каждый день данные за последние 30 дней, мы получаем опубликованные вакансии за каждый день, которые на текущий момент не в архиве.
2. Вакансия автоматом переходимт в архив, если с момента публикации прошло 30 дней
3. Вакансия переходит в архив в **любое** время до 30 дней с момента публикации
4. Частный случай: вакансия закрылась по истечению срока, затем была опубликована заново
3. Делаем вывод по закрытым вакансиям за период

### На ежедневной основе получаем массив вакансий за последние 30 дней
1. Позволю себе немного хардкода и упрощения: сразу фильтрую только по Санкт-Петербургу (area = 2) и по Специализации - IT (specialization=1)
2. За один запрос можно получить не более 2000 коллекций, в течение одного дня может быть больше опубликованных вакансий, поэтому запускаем за пол-дня 

In [32]:
def get_vacancy_history():
    conn = http.client.HTTPSConnection("api.hh.ru")
    per_page = 100
    page = 0
    count = per_page
    
    count_days = 30
    total = 0
    cur_date = dt.datetime.now()
    hours = 0
    collection_for_ins = []
        
    while count_days >= 0:
        
        while hours < 24:
            date_from = (cur_date.replace(hour=hours, minute=0, second=0) - 
                             td(days=count_days)).strftime('%Y-%m-%dT%H:%M:%S')

            date_to = (cur_date.replace(hour=hours + 11, minute=59, second=59) - 
                           td(days=count_days)).strftime('%Y-%m-%dT%H:%M:%S')

            while count == per_page:
                path = ("/vacancies?area=2&specialization=1&page={}&per_page={}&date_from={}&date_to={}"
                        .format(page, per_page, date_from, date_to))

                conn.request("GET", path, headers=headers)
                response = conn.getresponse()
                vacancies = response.read()
                conn.close()

                count = len(json.loads(vacancies)['items'])
                
                for item in json.loads(vacancies)['items']:
                    collection_for_ins.append(
                                                (
                                                    item['id'],
                                                    cur_date.strftime('%Y-%m-%dT%H:%M:%S'),
                                                    date_from,
                                                    date_to
                                                )
                                            )
                   
                # Вставка значений в БД
                try:
                    c.executemany('INSERT INTO vacancy_history VALUES (?,?,?,?)', collection_for_ins)
                except sqlite3.DatabaseError as err:       
                    print("Error: ", err)
                else:
                    conn_db.commit()

                if collection_for_ins:
                    page = page + 1
                    total = total + count
                    # обнуление массива
                    del(collection_for_ins[:])

            print(date_from, date_to, total)
    
            hours = hours + 12
            total = 0
            page = 0
            count = per_page
        
        count_days = count_days - 1
        hours = 0

Данная процедура может быть поставлена на crontab

In [33]:
get_vacancy_history()

2018-07-10T00:00:00 2018-07-10T11:59:59 36
2018-07-10T12:00:00 2018-07-10T23:59:59 70
2018-07-11T00:00:00 2018-07-11T11:59:59 32
2018-07-11T12:00:00 2018-07-11T23:59:59 64
2018-07-12T00:00:00 2018-07-12T11:59:59 25
2018-07-12T12:00:00 2018-07-12T23:59:59 77
2018-07-13T00:00:00 2018-07-13T11:59:59 22
2018-07-13T12:00:00 2018-07-13T23:59:59 61
2018-07-14T00:00:00 2018-07-14T11:59:59 9
2018-07-14T12:00:00 2018-07-14T23:59:59 5
2018-07-15T00:00:00 2018-07-15T11:59:59 2
2018-07-15T12:00:00 2018-07-15T23:59:59 8
2018-07-16T00:00:00 2018-07-16T11:59:59 100
2018-07-16T12:00:00 2018-07-16T23:59:59 239
2018-07-17T00:00:00 2018-07-17T11:59:59 48
2018-07-17T12:00:00 2018-07-17T23:59:59 117
2018-07-18T00:00:00 2018-07-18T11:59:59 47
2018-07-18T12:00:00 2018-07-18T23:59:59 117
2018-07-19T00:00:00 2018-07-19T11:59:59 44
2018-07-19T12:00:00 2018-07-19T23:59:59 98
2018-07-20T00:00:00 2018-07-20T11:59:59 37
2018-07-20T12:00:00 2018-07-20T23:59:59 109
2018-07-21T00:00:00 2018-07-21T11:59:59 3
2018-07-21T

### Получение закрытых вакансий за период:

В выборку попадают те вакансии, у которых прерывается история выгрузок

Дата, слeдующая за датой последней выгрузки - это и есть дата закрытия вакансии

In [34]:
# получение последней заты загрузки
def get_last_day_load():
    
    c.execute("""
        select date(max(date_load))
        from vacancy_history
            """)
    
    item = c.fetchone() 
   
    if item[0] != None:
        return dt.datetime.strptime(item[0], '%Y-%m-%d') 
    else:
        return None

In [35]:
# Вакансии, закрытые за период
def get_closed_by_period(date_in, date_out):
    
    last_day_out = get_last_day_load()
   
    if last_day_out != None:
        if last_day_out <= date_out:
            date_out = last_day_out - td(days=1) 
    
    date_in = date_in.strftime('%Y-%m-%d')
    date_out = date_out.strftime('%Y-%m-%d')
    
    c.execute("""
            select 
                    a.id_vacancy,
                    date(a.date_load) as date_last_load,
                    date(a.date_from) as date_publish,
                    ifnull(a.date_next, date(a.date_load, '+1 day')) as date_close
            from (
                select 
                    vh1.id_vacancy,
                    vh1.date_load,
                    vh1.date_from,
                    min(vh2.date_load) as date_next
                from vacancy_history vh1
                left join vacancy_history vh2
                    on vh1.id_vacancy = vh2.id_vacancy
                    and vh1.date_load < vh2.date_load
                where date(vh1.date_load) between :date_in and :date_out
                group by 
                    vh1.id_vacancy,
                    vh1.date_load,
                    vh1.date_from
                ) as a
            where a.date_next is null
            """, 
              {"date_in" : date_in, "date_out" : date_out})
    return c.fetchall()  

Запускаем выборку по закрытым вакансиям за март 2018

In [39]:
date_in = dt.datetime(2018, 6, 1)
date_out = dt.datetime(2018, 7, 31)

closed_vacancies = get_closed_by_period(date_in, date_out)

df = pd.DataFrame(closed_vacancies, columns = ['id_vacancy', 'date_last_load', 'date_publish', 'date_close'])
df.head()

Unnamed: 0,id_vacancy,date_last_load,date_publish,date_close
0,18126697,2018-07-09,2018-07-09,2018-07-10
1,18146667,2018-06-28,2018-06-06,2018-06-29
2,18155121,2018-07-09,2018-06-19,2018-07-10
3,18280951,2018-06-28,2018-06-01,2018-06-29
4,18881605,2018-07-09,2018-07-02,2018-07-10


In [40]:
# Количество закрытых вакансия за каждый день
df.groupby('date_close')['id_vacancy'].count()

date_close
2018-06-28     197
2018-06-29    1495
2018-07-10    3902
Name: id_vacancy, dtype: int64

In [15]:
# Экспорт полной таблицы из БД в CSV
data = c.execute('select * from vacancy_history')

with open('vacancy_history.csv','w', newline='') as out_csv_file:
    csv_out = csv.writer(out_csv_file)                       
    csv_out.writerow(d[0] for d in data.description)
    csv_out.writerows(data.fetchall())

conn_db.close()

### Хранение полной информации о вакансии полученной через API
Буду использовать свою любимую NoSQL базу MongoDB

Ps. Вообще, при большом желании можно использовать ту же базу sqlite, но для этого придется написать больше обработок

In [16]:
# Подключаем облачную базу Mongo
from pymongo import MongoClient
from pymongo import ASCENDING
from pymongo import errors
client = MongoClient('mongodb://hr_user:hr@ds115219.mlab.com:15219/hr_db')
db = client.hr_db
VacancyMongo = db.Vacancy

In [17]:
# result = VacancyMongo.create_index([('id', ASCENDING)], unique=True)
# sorted(list(VacancyMongo.index_information()))
# result = VacancyMongo.create_index([('date_load', ASCENDING)], unique=False)
# sorted(list(VacancyMongo.index_information()))

In [18]:
# Получение справочника
def get_dictionaries():
    conn = http.client.HTTPSConnection("api.hh.ru")
    conn.request("GET", "https://api.hh.ru/dictionaries", headers=headers)
    response = conn.getresponse()
    if response.status != 200:
        conn.close()
        conn = http.client.HTTPSConnection("api.hh.ru")
        conn.request("GET", "https://api.hh.ru/dictionaries", headers=headers)
        response = conn.getresponse()
    dictionaries = response.read()
    dictionaries_json = json.loads(dictionaries)

    return dictionaries_json

Для оптимизации и уменьшения количества обрабатываемых вакансий, будем инсертить только новые

In [19]:
# В зависимости от требований, можно брать разные массивы вакансий
# в нашем примере будем обрабатывать вакансии за последние 5 дней
def get_list_of_vacancies_sql():
    
    conn_db = sqlite3.connect('hr.db', timeout=10)
    conn_db.row_factory = lambda cursor, row: row[0]
    c = conn_db.cursor()
    items = c.execute("""
                select 
                    distinct id_vacancy
                from vacancy_history
                where date(date_load) >= date('now', '-5 day')
            """).fetchall()
    
    #items = c.fetchall()    
    conn_db.close()
    return items

In [20]:
def get_list_of_vacancies_nosql():
    
    date_load = (dt.datetime.now() - td(days=5)).strftime('%Y-%m-%d')
    vacancies_from_mongo = []

    for item in VacancyMongo.find({"date_load" : {"$gte" : date_load}}, {"id" : 1, "_id" : 0}):
        vacancies_from_mongo.append(int(item['id']))
   
    return vacancies_from_mongo

In [21]:
from nltk.stem.snowball import SnowballStemmer
stemmer = SnowballStemmer("russian") 

In [22]:
def vacancies_processing(vacancies_list):
    
    cur_date = dt.datetime.now().strftime('%Y-%m-%d')

    for vacancy_id in vacancies_list:
        conn = http.client.HTTPSConnection("api.hh.ru")
        conn.request("GET", "/vacancies/{}".format(vacancy_id), headers=headers)
        response = conn.getresponse()
        if response.status != 404:
            vacancy_txt = response.read()
            conn.close()
            vacancy = json.loads(vacancy_txt)

            # salary
            salary = None
            if 'salary' in vacancy:
                if vacancy['salary'] != None:
                    if vacancy['salary']['from'] == None and vacancy['salary']['to'] != None:
                        salary = vacancy['salary']['to'] / currency_rates[vacancy['salary']['currency']]
                    elif vacancy['salary']['to'] == None and vacancy['salary']['from'] != None:
                        salary = vacancy['salary']['from'] / currency_rates[vacancy['salary']['currency']]
                    elif vacancy['salary']['to'] != None and vacancy['salary']['from'] != None:
                        salary = ((vacancy['salary']['from'] + vacancy['salary']['to']) / 2) \
                            / currency_rates[vacancy['salary']['currency']]

                max_salary = 500000
                if salary is not None:
                    salary = int(salary)
                    if salary >= max_salary:
                        salary = max_salary

            # grade
            grade = None
            if 'name' in vacancy:
                p_grade = ''
                title = re.sub(u'[^a-zа-я]+', ' ', vacancy['name'].lower(), re.UNICODE)
                words = re.split(r'\s{1,}', title.strip())
                for title_word in words:
                    title_word = stemmer.stem(title_word)
                    if len(title_word.strip()) > 1:
                        p_grade = p_grade + " " + title_word.strip()

                if re.search('(главн)|(princip)', p_grade):
                    grade = 'principal'    
                elif re.search('(ведущ)|(senior)|(lead)|(expert)|([f|F]ull)', p_grade):
                    grade = 'senior'
                elif re.search('(middl)|(инженер)|(программист)|(специалист)|(разработчик)|(engineer)|(developer)', p_grade):
                    grade = 'middle'
                elif re.search('(помощник)|(junior)|(начин)|(младш)', p_grade):
                    grade = 'junior'
                elif re.search('(руководител)|(менеджер)|(начальник)|(manager)|(head)', p_grade):
                    grade = 'manager'
                else:
                    grade = 'not specify'


            vacancy['salary_processed'] = salary
            vacancy['date_load'] = cur_date
            vacancy['grade'] = grade
            vacancy.pop('branded_description', None)

            try:
                post_id = VacancyMongo.insert_one(vacancy)
            except errors.DuplicateKeyError:
                print ('Cant insert the duplicate vacancy_id:', vacancy['id'])

Нахожу разницу между двумя массивами, обрабатываю только те вакансии, которые еще не были загружены

Для оптимизации - разбиваю массив на 500 элементов

In [23]:
hh_dictionary = get_dictionaries()
currencies = hh_dictionary['currency']
currency_rates = {}
for currency in currencies:
    currency_rates[currency['code']] = currency['rate']   

In [41]:
sql_list = get_list_of_vacancies_sql()
mongo_list = get_list_of_vacancies_nosql()
vac_for_proс = []

s = set(mongo_list)
vac_for_proс = [x for x in sql_list if x not in s]

vac_id_chunks = [vac_for_proс[x: x + 500] for x in range(0, len(vac_for_proс), 500)]
t_num = 1
threads = []

print('sql_list:', len(sql_list))
print('mongo_list:', len(mongo_list))
print('vac_for_proс:', len(vac_for_proс))
print('vac_id_chunks:', len(vac_id_chunks))

sql_list: 7661
mongo_list: 7630
vac_for_proс: 31
vac_id_chunks: 1


In [25]:
for vac_id_chunk in vac_id_chunks:
    print('starting', t_num)
    t_num = t_num + 1
    t = threading.Thread(target=vacancies_processing, kwargs={'vacancies_list': vac_id_chunk})
    threads.append(t)
    t.start()
    
for t in threads:
    t.join()

starting 1
starting 2


Имея в распоряжении собранную базу данных, можем выполнять различные аналитические выборки

Выведу ТОП-10 вакансий Python - разработчиков с самой высокой зарплатой

In [43]:
cursor_mongo = VacancyMongo.find({"name" : {"$regex" : ".*[pP]ython*"}})

df_mongo = pd.DataFrame(list(cursor_mongo))
del df_mongo['_id']

pd.concat([df_mongo.drop(['employer'], axis=1), 
           df_mongo['employer'].apply(pd.Series)['name']], axis=1)[['grade',
                                                                    'name', 
                                                                    'salary_processed'
                                                                   ]].sort_values('salary_processed',
                                                                                  ascending=False)[:10]

Unnamed: 0,grade,name,name.1,salary_processed
286,senior,Web Team Lead / Архитектор (Python/Django/React),Investex Ltd,293901.0
245,senior,Senior Python разработчик в Черногорию,Betmaster,277141.0
164,senior,Senior Python разработчик в Черногорию,Betmaster,275289.0
180,middle,Back-End Web Developer (Python),Soshace,250000.0
224,middle,Back-End Web Developer (Python),Soshace,250000.0
295,senior,Lead Python Engineer for a Swiss Startup,Assaia International AG,250000.0
74,middle,Back-End Web Developer (Python),Soshace,250000.0
142,middle,Back-End Web Developer (Python),Soshace,250000.0
115,senior,Python teamlead,DigitalHR,230000.0
94,senior,"Ведущий разработчик (Python, PHP, Javascript)",IK GROUP,220231.0


In [173]:
cursor_mongo = VacancyMongo.find({"name" : {"$regex" : ".*[jJ]ava[^sS]"}, "address" : {"$ne" : None}})
df_mongo = pd.DataFrame(list(cursor_mongo))


In [174]:
df_mongo['metro'] = df_mongo.apply(lambda x: x['address']['metro']['station_name']
                                   if x['address']['metro'] is not None 
                                   else None, axis = 1)

df_mongo.groupby('metro')['_id'] \
                                .count() \
                                .reset_index(name='count') \
                                .sort_values(['count'], ascending=False) \
                                [:10]


Unnamed: 0,metro,count
2,Василеостровская,87
20,Петроградская,68
5,Выборгская,46
24,Площадь Ленина,45
6,Горьковская,45
34,Чкаловская,43
15,Нарвская,32
23,Площадь Восстания,29
30,Старая Деревня,29
7,Елизаровская,27
