Часть 1: Обработка результатов, поиск возможных совпадений.

In [1]:
import glob
import pandas as pd
import numpy as np
from tqdm import tqdm

ENGLISH_LEVELS = ['элементарный', 'ниже среднего', 'средний', 'выше среднего', 'продвинутый']
EDUCATION_LEVELS = ['Среднее', 'Техникум / Колледж', 'Еще студент', 'Незаконченное высшее', 'Высшее', 'Два высших', 'Кандидат']


С декабря 2015, данные содержат поля, позволяющие осуществить первоначальный поиск. Данные за декабрь 2019 не содержат версии final, но остается возможным получить набор данных в необходимом формате, дополнив весию mini датой заполнения и определив "класс" заполнителя.

In [2]:
df_2015_dec_final = pd.read_csv('source_csv/2015_dec_final.csv')

# Compose dataframe with structure similar to "final"
df_2019_dec_final = pd.read_csv('source_csv/2019_dec_mini.csv')
df_2019_dec_raw = pd.read_csv('source_csv/2019_dec_raw.csv')
df_2019_dec_final['Дата.заполнения'] = df_2019_dec_raw['Отметка времени']

parsed_dfs = [df_2015_dec_final, df_2019_dec_final] + list(map(pd.read_csv, glob.glob('source_csv/*[6-9]*_final.csv')))


И, раз уж мы занялись написанием велосипеда, сделаем классы более детальными. Предварительно уберем разницу в названии позиции, вызванную несовпадением регистра.

In [3]:
def fix_lowercase(value):
    if isinstance(value, str):
        result = value.replace('engineer', 'Engineer')
        result = result.replace('lead', 'Lead')
        result = result.replace('analyst', 'Analyst')
        result = result.replace('manager', 'Manager')
        result = result.replace('coder', 'Coder')

        return result

def cls_by_position(df):
    if df['Язык.программирования'].str != '':
        return 'developer'
    elif df['Должность'].str.contains('QA'):
        return 'QA'
    elif df['Должность'].str.contains('esign'):
        return 'designer'
    elif df['Должность'].isin(['HR', 'Talent Researcher']):
        return 'HR'
    elif df['Должность'].str.contains('Data'):
        return 'data'
    elif df['Должность'].isin(['HTML Coder', 'DevOps', 'Sysadmin', 'DBA', 'Security Specialist', 'Research Engineer']):
        return 'tech'
    elif df['Должность'].isin(['Senior Project Manager / Program Manager', 'Director of Engineering / Program Director',
                                  'Business analyst', 'Project manager', 'Team lead', 'Scrum Master', 'Product Manager', 'Sales manager',
                                  'Technical writer']):
        return 'PM'
    else:
        return 'other'


Преобразуем значения для даты заполнения, опыта, возраста, уровня английского и образования.

In [4]:

for df in parsed_dfs:
  df['Должность'] = df['Должность'].apply(fix_lowercase)
  df['cls'] = cls_by_position(df)
  df['timestamp'] = pd.to_datetime(df['Дата.заполнения'], utc=True)
  df['english_level'] = df['Уровень.английского'].apply(lambda x: ENGLISH_LEVELS.index(x))
  df['education_level'] = df['Образование'].apply(lambda x: EDUCATION_LEVELS.index(x))

  df['current_job_exp'] = df['current_job_exp'].astype(float)
  df['exp'] = df['exp'].astype(float)
  df['salary'] = df['Зарплата.в.месяц'].astype(float)
  
  df['age'] = df['Возраст'].astype(float)

print(f'Columns: {df.columns}')
                          
                          

Columns: Index(['N', 'Должность', 'Язык.программирования', 'Специализация',
       'Общий.опыт.работы', 'Опыт.работы.на.текущем.месте', 'Зарплата.в.месяц',
       'Изменение.зарплаты.за.12.месяцев', 'Город', 'Размер.компании',
       'Тип.компании', 'Пол', 'Возраст', 'Образование', 'Университет',
       'Еще.студент', 'Уровень.английского', 'Предметная.область',
       'Дата.заполнения', 'User.Agent', 'exp', 'current_job_exp', 'loc',
       'salary', 'Валюта', 'cls', 'timestamp', 'english_level',
       'education_level', 'age'],
      dtype='object')


После заполнения, уберем неиспользующиеся или продублированные столбцы. Отсортируем результаты по дате заполнения. Еще, взгляд на названия столбцов не в английской кодировке вызывает приступы боли, поэтому переиенуем-переведем оставшиеся столбцы.

In [5]:
CLEANUP_COLUMNS = set(['User.Agent', 'Дата.заполнения', 'Зарплата.в.месяц', 'Уровень.английского', 'Образование',
                   'Возраст', 'Общий.опыт.работы', 'Опыт.работы.на.текущем.месте', 'Еще.студент'])

for df in parsed_dfs:
    columns_to_drop = CLEANUP_COLUMNS.intersection(df.columns)
    df.drop(columns_to_drop, inplace=True, axis=1)
    df.rename(columns={
        'Должность': 'position',
        'Язык.программирования': 'programming_language',
        'Специализация': 'speciality',
        'Изменение.зарплаты.за.12.месяцев': 'salary_delta_1y',
        'Город': 'city',
        'Размер.компании': 'company_size',
        'Тип.компании': 'company_type',
        'Пол': 'gender',
        'Университет': 'university',
        'Предметная.область': 'subject_area',
        'Валюта': 'currency'
    }, inplace=True)

parsed_dfs.sort(key=lambda x: x['timestamp'].iloc[0])

source_df = pd.concat(parsed_dfs)
source_df.to_csv('result_csv/concatenated_source.csv')



Данные и работу с ними реально перенести в базу, но читаемость и поддериваемость у кода на Python будут выше. Поэтому, разобьем объединение данных на несколько этапов и выполним их для результатов каждого опроса в отдельности.

<ul>
    <li> Объединяем набор данных по полям "Университет" и "Пол", т.к. эти поля будут изменятся для одной карьеры между опросами c наименьшей вероятностью. </li>
    <li> Убираем заведомо неверные записи, чтобы уменьшить результат выборки </li>
</ul>

Предполагается, что DataFrame с результатами нового опроса присоединяется к начальным данным с суффиксом. Сделаем несколько предположений (пусть несколько натянутых и условных) для объединенного набора данных, помня, что имеем дело с реальным человеком.
Добавим несколько методов для валидации строки, содержащей данные из 2х опросов.

1) Уровень образования не ухудшается. Во всяком случае, никто не признается.

In [6]:
def education_level_not_dropped(df):
    return df['education_level' + JOINED_SUFFIX] >= df['education_level']

2) Данные не относятся к одному и тому же опросу и соединены в правильной последовательности.

In [7]:
def are_different_surveys(df):
    return np.where(df['time_delta'] > pd.Timedelta(90, 'D'), True, False)


3) Уровень английского не ухудшается с течением времени.  Специфика отрасли.

In [8]:
def english_level_check(df):
    return df['english_level' + JOINED_SUFFIX] >= df['english_level']

4) Возраст изменяется с течением времени. Допустим погрешность в 1 год.

In [9]:
def age_check(df):
    age_diff = df['age' + JOINED_SUFFIX] - df['age']
    survey_age_diff = (age_diff - df['years_delta'])

    return np.where((survey_age_diff <= 1) & (survey_age_diff >= 0), True, False)

5) Опыт работы на позиции обнуляется при смене компании. Будем считать, что компания изменяется, когда изменяется ее тип.

In [10]:
def current_place_experience_check(df):
    company_type_same = df['company_type'] != df['company_type' + JOINED_SUFFIX]
    experience_diff = df['current_job_exp' + JOINED_SUFFIX] - df['current_job_exp']

    return company_type_same | df['years_delta'] > experience_diff


6) Общий опыт работы изменяется с течением времени. Допустим погрешность в 1 год.

In [11]:
def total_experience_check(df):
    experience_diff = df['exp' + JOINED_SUFFIX] - df['exp']
    survey_experience_diff = df['years_delta'] - experience_diff

    return (survey_experience_diff <= 1) & (survey_experience_diff >= 0) & (experience_diff >= 0)


7) Если опросы проведены с интервалом в 1 год, разница в з/п соответствует указанному изменению з/п за 12 месяцев. Допустим временную погрешность в ~2 месяца

Спойлер:

Заметно, что датасеты, имеющие разницу в ~год, и отфильтрованные с помощью этого метода, имеют меньше общих строк. Причем, возможность произвести эту проверку уменьшает размер результирующей выборки в ~4 раза по сравнению с соседними и более чем в 2 раза - по сравнению с исходным датасетом. Это может значить, что неверен либо способ проверки, либо более половины тестового набора заполняется с ошибками. Есть над чем подумать, а пока вызов закомментирован.

In [12]:
def salary_delta_check(df):
    passed_not_year = (df['years_delta'] - 1).abs() > 0.16 # 2 months = ~0.16 of a year
    salary_diff = df['salary' + JOINED_SUFFIX] - df['salary']

    return passed_not_year | (salary_diff == df['salary_delta_1y' + JOINED_SUFFIX])
    

8) Уровень программиста не падает при условии постоянного стека. Деградация на легаси? Не, не слышал.

In [13]:
PROGRAMMER_LEVELS = ['Junior Software Engineer', 'Software Engineer', 'Senior Software Engineer']
PROGRAMMER_TOP_LEVELS= ['Senior Software Engineer', 'Technical Lead', 'System Architect']

def dev_level_not_dropped(df):
    different_languages = df['programming_language'] != df['programming_language' + JOINED_SUFFIX]
    source_level = df['position'].apply(lambda x: PROGRAMMER_LEVELS.index(x) if x in PROGRAMMER_LEVELS else 0)
    joined_level = df['position' + JOINED_SUFFIX].apply(lambda x: PROGRAMMER_LEVELS.index(x) if x in PROGRAMMER_LEVELS else 0)
    joined_top_level = df['position' + JOINED_SUFFIX].isin(PROGRAMMER_TOP_LEVELS)

    return different_languages | joined_top_level | (source_level <= joined_level)
    

9) Уровень QA не падает.

In [14]:
QA_LEVELS = ['Junior QA engineer', 'QA engineer', 'Senior QA engineer', 'QA Tech Lead']

def qa_level_not_dropped(df):
    not_qa = result['cls'] != 'QA'
    source_level = df['position'].apply(lambda x: QA_LEVELS.index(x) if x in QA_LEVELS else 0)
    joined_level = df['position' + JOINED_SUFFIX].apply(lambda x: QA_LEVELS.index(x) if x in QA_LEVELS else 0)

    return not_qa | (source_level <= joined_level)

10) Спорная проверка: программисты и PM не переквалифицируются в QA и HR. Дайте знать, если известны опровергающие случаи.

In [15]:
def cls_check(df):
    return df['cls'].isin(['developer', 'PM']) & ~df['cls' + JOINED_SUFFIX].isin(['QA', 'HR'])


In [16]:
# First, merge DataFrames by pairs
JOINED_SUFFIX = '_joined'

result_dfs = []
candidate_dfs = parsed_dfs.copy()
for parsed_index, parsed_df in enumerate(parsed_dfs):
    candidate_dfs.remove(parsed_df)
    print(f'Обработка DataFrame {parsed_index + 1} из {len(parsed_dfs)}')

    for candidate_index, candidate_df in enumerate(candidate_dfs):
        print(f'Обработка входящего DataFrame {candidate_index + 1} из {len(candidate_dfs)}')
        result = parsed_df.merge(candidate_df, on=['university', 'gender'], suffixes=('', JOINED_SUFFIX))
        result['time_delta'] = result['timestamp' + JOINED_SUFFIX] - result['timestamp']
        result['years_delta'] = result['time_delta']/np.timedelta64(1, 'Y')
        print(f'Размер входящего датасета: {len(candidate_df)}')
        print(f'Результатов после left join: {len(result)}')
        
        # Apply validations
        result = result[
            # salary_delta_check(result) &
            # are_different_surveys(result) &
            english_level_check(result) &
            age_check(result) &
            total_experience_check(result) &
            education_level_not_dropped(result) &
            current_place_experience_check(result) &
            dev_level_not_dropped(result) &
            qa_level_not_dropped(result) &
            cls_check(result)
        ]

        print(f'Результатов после валидации: {len(result)}')
        result_dfs.append(result)

print(f'Всего {len(result_dfs)} пар датасетов.')


Processing DataFrame 1 of 9
Merging with candidate DataFrame 1 of 8
Incoming DataFrame size: 7340
Lines in merge result: 4236361
Lines after filtering: 14295
Merging with candidate DataFrame 2 of 8
Incoming DataFrame size: 8868
Lines in merge result: 5185086
Lines after filtering: 20592
Merging with candidate DataFrame 3 of 8
Incoming DataFrame size: 8704
Lines in merge result: 5015266
Lines after filtering: 17072
Merging with candidate DataFrame 4 of 8
Incoming DataFrame size: 8355
Lines in merge result: 4880894
Lines after filtering: 14538
Merging with candidate DataFrame 5 of 8
Incoming DataFrame size: 9610
Lines in merge result: 5497033
Lines after filtering: 13512
Merging with candidate DataFrame 6 of 8
Incoming DataFrame size: 10379
Lines in merge result: 5757114
Lines after filtering: 13813
Merging with candidate DataFrame 7 of 8
Incoming DataFrame size: 11439
Lines in merge result: 6196657
Lines after filtering: 12286
Merging with candidate DataFrame 8 of 8
Incoming DataFrame s

Добавим веса для отфильтрованных записей. Вес - число совпадений в столбцах. Значения столбца могут измениться для одного и того же анонимуса и разных опросов, но тем не менее, их совпадение - хороший знак. А ты веришь в знаки при анализе данных, %username%?

In [17]:

# Number of other coincidences between dataframes
WEIGHT_COLUMNS = ['programming_language', 'position', 'speciality', 'city',  'company_size',
                  'company_type', 'subject_area']

def weights(row):
    result = 0
    for original_column in WEIGHT_COLUMNS:
        if row[original_column] == row[original_column + JOINED_SUFFIX]:
            result += 1

    return result

for df in tqdm(result_dfs):
    df["weight"] = df.apply(weights, axis=1)



100%|██████████| 36/36 [02:27<00:00,  4.09s/it]


Каждый результат можно уникально идентифицировать по комбинации значения N и timestamp. Вынесем соcтавные первичные ключи в отдельный датасет и сохранимся.

In [18]:
print("Сохранение результатов...")

full_df = pd.concat(result_dfs)
full_df.to_csv('result_csv/full.csv')

join_df = full_df[['N', 'timestamp', 'N' + JOINED_SUFFIX, 'timestamp' + JOINED_SUFFIX, 'weight']]
join_df.to_csv('result_csv/join.csv')


print("Завершено.")

Сохраняемся..
Сохранен
