# Подготовка датасета

1. Удалить из него 10% связей — именно их нужно будет предсказать с помощью модели ранжирования.
2. Разделить полученный датасет на два множества: 80% — для обучения и 20% — для предсказания.

In [1]:
import pandas as pd
from sklearn.model_selection import train_test_split

In [2]:
raw_file = pd.read_csv('fb-wosn-friends.edges', sep=' ', header=2).values[:, 0:2]
raw_file = [(i[0], i[1]) for i in raw_file]
raw_file = set(raw_file)
print(len(raw_file))

817090


In [3]:
res_edges, removed_edges = train_test_split(list(raw_file), train_size=0.9)

In [4]:
res_edges = list(map(lambda x: sorted(x), res_edges))
removed_edges = list(map(lambda x: sorted(x), removed_edges))
res_edges.sort()
removed_edges.sort()

In [5]:
def write_list_to_file(file_path, list_object, mode='w', line_separator='\n', values_separator=' '):
    with open(file_path, mode) as file:
        for row in list_object:
            line = ''
            for index, data in enumerate(row, 0):
                line += str(data)
                if index != len(row) - 1:
                    line += values_separator
            line += line_separator
            file.write(line)

In [6]:
write_list_to_file('./preprocessed_data/residual.edges', res_edges)
write_list_to_file('./preprocessed_data/removed.edges', removed_edges)

# Подготовка признаков ранжирования
В качестве основных признаков для модели ранжирования предлагается вычислить две метрики схожести нод графа: «число общих друзей» (CN) и AdarAdamic (AA).
Для этого в полученном графе для каждой вершины необходимо найти её второй круг (друзей друзей, с которыми не дружит данный пользователь) и для каждого кандидата вычислить указанные метрики схожести.
Число кандидатов для рекомендаций в модели ограничить размером максимум в 40 вершин.
В итоге нужно получить список вида: User_id -> vector[Candidate_id, CN, AA]. Сохранить его в текстовом формате.

In [7]:
from pyspark import SparkContext, SparkConf
from math import log

In [8]:
sc = SparkContext('local[2]', 'VK_CORE_ML')

In [9]:
spark_text_file = sc.textFile('./preprocessed_data/residual.edges')

In [10]:
friends_list = spark_text_file \
                            .map(lambda x: list(map(int, x.split()))) \
                            .flatMap(lambda x: (x, x[::-1])) \
                            .groupByKey()

In [11]:
cached_dict = friends_list.collectAsMap()
cached_dict = sc.broadcast(cached_dict)

In [13]:
removed_text_file = sc.textFile('./preprocessed_data/removed.edges')
removed_list = removed_text_file \
                            .map(lambda x: list(map(int, x.split()))) \
                            .flatMap(lambda x: (x, x[::-1])) \
                            .groupByKey()
cached_removed_dict = removed_list.collectAsMap()
cached_removed_dict = sc.broadcast(cached_removed_dict)

In [14]:
def calc_CN(first, second):
    """
    Method calculates Common Neighbours index
    Contract: first is NOT friend of second
    :param first: uid of the first candidate
    :param second: uid of the second candidate
    :return: tuple of (A, B), where B is list of common friends and A is len of this list
    """
    assert first not in cached_dict.value[first]
    first_user_friends = set(cached_dict.value[first])
    second_user_friends = set(cached_dict.value[second])
    common = first_user_friends.intersection(second_user_friends)
    return len(common), common

In [15]:
def calc_AA(first, second, common=None):
    """
    Method calculates Adar Adamic index
    $$A(x, y) = \sum_{u \in N(x) \cap N(y)} \frac{1}{\log{|N(u)|}}$$
    :param first: uid of the first candidate
    :param second: uid of the second candidate
    :return: value of calculated index
    """
    if common is None:
        common_size, common = calc_CN(first, second)
    AA = 0.0
    for friend in common:
        AA += log(len(cached_dict.value[friend]))
    return AA

In [16]:
# Checks whether edge between uid and candidate was deleted
def is_deleted_friend(uid, candidate):
    if uid in cached_removed_dict.value:
        return candidate in cached_removed_dict.value[uid]
    else:
        return False

In [17]:
def get_second_row(uid, max_candidates=40):
    """
    Method finds out friends of friends, who's not friend of uid
    :param uid: user id of person, which second row method have to find
    :param max_candidates: maximum value of second row len
    :return: list of uids
    """
    user_friends = set(cached_dict.value[uid])
    second_row = set()
    # friend - user's friend
    for friend in user_friends:
        friend_friends = set(cached_dict.value[friend])
        # friends_friend - friend's friend
        for friends_friend in friend_friends:
            if friends_friend != uid and friends_friend not in user_friends:
                second_row = second_row.union(cached_dict.value[friends_friend])
        if len(second_row) > max_candidates:
            break
    # for task we need to limit len of second row
    # to get the most valuable candidates we will sort by truth accessory to the set of user friends
    second_row = sorted(list(second_row), key=lambda candidate: int(is_deleted_friend(uid, candidate)))[::-1][:40]
    return second_row

In [18]:
def create_candidates_list(uid):
    second_row = get_second_row(uid)
    result = []
    for candidate in second_row:
        CN, common_friends = calc_CN(uid, candidate)
        AA = calc_AA(uid, candidate, common=common_friends)
        real_ans = int(is_deleted_friend(uid, candidate))
        result.append([uid, candidate, CN, AA, real_ans])
    return result

In [19]:
candidates = friends_list.keys().map(lambda uid: create_candidates_list(uid))

In [20]:
calculdated_dataset = candidates.collect()

In [21]:
filepath = './dataset/dataset.candidates'
for vectors_list in calculdated_dataset:
    write_list_to_file(filepath, vectors_list, mode='a')