# Что здесь происходит

TL;DR построение эмбеддингов узлов графа, основываясь исключительно на информации о ребрах путем сближения векторов, между которыми есть ребра и отдаления векторов, между которыми ребра нет

## идея реализации

Рассмотрим ненаправленный простой граф (без петель и кратных ребер) с произвольным количеством компонент связности, ребер и вершин. Требуется построить векторные представления вершин, основываясь на представлении о смежности одних вершин с другими. Для этого предлагается следующий несложный алгоритм:

- составим матрицу смежности как пару (u_i, {v_j}), где {v_j} есть список вершин, смежных с u_i
- инициализируем для каждого u_i случайный вектор e_i размера EMB_SIZE
- элементами e_i будут целые числа из диапазона [-EMB_CARD/2; EMB_CARD/2]
- для каждого u_i выберем* множество "негативных" вершин {n_j} |{n_j}| = N_i, где N_i = |{v_j}| таких, что расстояние D|u_i, n_j| минимально и n_j не принадлежит {v_j}
- для каждой** тройки (u_i, v_j, n_j) посчитаем triplet_loss как max(0, D|u_i, v_j| - D|u_i, p_j| + MARGIN) и сдвинем u_i на расстояние ADAM_STEP в сторону получившегося градиента

\* Для того, чтобы не перебирать все возможные вектора в поиске негативов для каждой из вершин за O(N*N), предлагается использовать LSH по евклидову расстоянию с репартиционированием. Так мы получим сложность O(N/PARTITIONS), поскольку для каждой партиции семплирование выполняется параллельно, а внутри одной партиции необходимо лишь перебрать все находящиеся на ней элементы 2 раза. Также 

** На самом деле не всегда для каждой, т.к. в общем случае степень вершины может быть достаточно большой. В этом случае мы берем не больше, чем LIMIT_POSITIVES положительных и LIMIT_NEGATIVES отрицательных примеров. Также для улучшения сходимости алгоритма было решено уменьшить число эпох EPOCHS в пользу увеличения циклов ITERATIONS - перестроение-repartition. У запускающего есть свобода выбора всех перечисленных гиперпараметров.

Предложенный алгоритм был опробован на открытом датасете от FB https://snap.stanford.edu/data/ego-Facebook.html. Любопытно, что не все вершины данного графа, перечисленные в качестве смежных, присутствуют в качестве основных, что, учитывая двунаправленность связи, является противоречием. Данный факт приводит к любопытным спецэффектам: вершина при построении матрицы смежности не попадает в пайплайн и вектор для нее становится пустым и, в свою очередь, "пропадает" из списков смежности других вершин. Для некоторых вершин такие листья составляют весь список смежности и в итоге вершина сама становится листом, приводя к пропаданию новых. Датасет достраивается до полного дополнительно.

В качестве меры качества полученных эмбеддингов мы берем recall@k, где k - это степень вершины, а 1/0 соответствует принадлежности к списку смежных вершин, все полученные результаты в графе *Evaluation*. Уже на 4ой итерации полнота списка по второму кругу близка к 1. Количество итераций в конечном счете зависит от задачи, на данном этапе автор считает полученный показатель приемлемым.

# Train

In [2]:
import pyspark
from datetime import date, datetime
from pyspark.sql import SparkSession

import os

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable


spark = SparkSession.builder.appName("emb_via_triplet")\
.config("spark.executor.memory", "4g")\
.config("spark.driver.memory", "10g")\
.config("spark.cores.max", "5")\
.config("spark.yarn.am.cores", "5")\
.getOrCreate()

sc = spark.sparkContext

24/08/29 20:45:02 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
import random
import pyspark.sql.functions as F
from pyspark.ml.functions import array_to_vector
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, LongType, FloatType, ArrayType, StructType, StructField
import torch
import random
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.optim as optim
from pyspark.ml.feature import BucketedRandomProjectionLSH
from collections import namedtuple


In [5]:
EMB_SIZE = 60
EMB_CARD = 256
PARTITIONS = 20

EPOCHS = 2
MARGIN = 10
ADAM_STEP = 2.0
LIMIT_POSITIVES = 300
LIMIT_NEGATIVES = 200

ITERATIONS = 10

In [8]:
def split_and_cast(x):
    arr = x._c0.split()
    return (int(arr[0]), int(arr[1]))

def calc_euclidean(x1, x2):
    x1 = np.array(x1)
    x2 = np.array(x2)
    return float(np.linalg.norm(x1 - x2))
    
def read_fb_dataset():
    raw_csv = spark.read.csv("/facebook_combined.txt")
    return raw_csv.rdd.map(split_and_cast).toDF(["user_id", "friend_id"])

def read_and_fix_fb_dataset():
    fb_ds1 = read_fb_dataset()
    fb_ds2 = read_fb_dataset()
    fb_ds_inv = fb_ds2.withColumnRenamed("friend_id", "f").withColumnRenamed("user_id", "friend_id").withColumnRenamed("f", "user_id")
    return fb_ds1.union(fb_ds_inv.select("user_id", "friend_id")).distinct()

In [None]:
def extract_all_users(df):
    return df.select("user_id").distinct().unionAll(df.select("friend_id").withColumnRenamed("user_id", "friend_id").distinct()).distinct()

In [None]:
def generate_emb(x):
    return (x.user_id, [random.randrange(int(-EMB_CARD/2), int(EMB_CARD/2)) for _ in range(EMB_SIZE)])

def gen_random_embeddings(all_users):
    return all_users.rdd.map(generate_emb).toDF(["oid", "emb"])

In [None]:
def join_emb_and_group_positives(df, emb_table):
    df2 = df.join(emb_table, df.friend_id==emb_table.oid, "inner")
    df3 = df2.drop("oid").groupBy("user_id").agg(F.collect_list(F.struct(["friend_id", "emb"])).alias("positives"))
    df4 = df3.join(emb_table, df3.user_id==emb_table.oid, "inner").withColumnRenamed("emb", "owner_emb").drop("oid")
    
    # df5 = emb_table.join(df, df.user_id==emb_table.oid, "left_anti").withColumnRenamed("emb", "owner_emb")\
    # .withColumnRenamed("oid", "user_id")\
    # .withColumn('positives', F.array()).select("user_id", "positives", "owner_emb")
    # return df4.unionAll(df5)
    return df4

In [None]:
def repartition_by_dist(df):
    df2 = df.withColumn("owner_emb_dense", array_to_vector('owner_emb'))
    brp = BucketedRandomProjectionLSH(inputCol="owner_emb_dense", outputCol="hashes", bucketLength=PARTITIONS, numHashTables=1)
    model = brp.fit(df2)
    df3 = model.transform(df2)
    get_first=udf(lambda v: int(v[0]), IntegerType())
    df4 = df3.withColumn("part_lsh", get_first(F.col("hashes").getItem(0))).drop("hashes").drop("owner_emb_dense")
    return df4.repartition(PARTITIONS, "part_lsh")

In [10]:
def calc_euclidean(x1, x2):
    x1 = np.array(x1)
    x2 = np.array(x2)
    return float(np.linalg.norm(x1 - x2))

def get_n_closest_except(owner_emb, haystack, except_, n):
    map_to_dist = [ (x.oid, x.emb, calc_euclidean(owner_emb, x.emb)) for x in haystack]
    
    map_to_dist_sorted = sorted(map_to_dist, key=lambda x: x[2])
    
    res = []
    for id, emb, _dist in map_to_dist_sorted:
        if id in except_:
            continue
        res.append((id, emb))
        
        if len(res) == n:
            break

    
    return res
        

def sample_negatives(iterator):
    all_part = []
    Triplet = namedtuple('Triplet', 'oid emb friends')
    for it in iterator:
        friends = set(map(lambda x: x[0], it.positives))
        all_part.append(Triplet(it.user_id, it.owner_emb, friends))
    
    res = []
    n = len(all_part)
    for i in range(n):
        except_ = all_part[i].friends
        except_.add(all_part[i].oid)
        negs = get_n_closest_except(all_part[i].emb, all_part, set(except_), len(except_) - 1)
        res.append((all_part[i].oid, negs))

    return res

def gen_negatives(df):
    df2 = df.rdd.mapPartitions(sample_negatives).toDF(["uid", "negatives"])
    return df.join(df2, df2.uid==df.user_id, "left").drop("uid")

# df_w_pos_repart_1 = spark.read.parquet("/df_w_pos_repart_1")
# df_w_pos_repart_1_part5 = df_final_2.filter("partition_id=5").collect()
# sample_negatives(df_w_pos_repart_1_part5)

In [None]:
def cast_arr_float(arr):
    return list(map(lambda x: float(x), arr))

def cast_arr_int(arr):
    return list(map(lambda x: int(x), arr))

def make_new_emb(x):
    owner_emb = x.owner_emb
    positives = random.sample(x.positives, min(LIMIT_POSITIVES, len(x.positives)))
    negatives = random.sample(x.negatives, min(LIMIT_NEGATIVES, len(x.negatives)))
    for _ in range(EPOCHS):
        new_emb = make_new_emb_single_epoch(owner_emb, positives, negatives)
        owner_emb = new_emb
    return (x.user_id, owner_emb)

import numpy as np

def make_new_emb_single_epoch(owner_emb, positives, negatives):

    triplet_loss = nn.TripletMarginLoss(margin=MARGIN, p=2, eps=1e-7)
    anchor = torch.tensor([cast_arr_float(owner_emb)], requires_grad=True)
    
    for p in positives:
        for n in negatives:
            positive = torch.tensor([cast_arr_float(p.emb)], requires_grad=True)
            negative = torch.tensor([cast_arr_float(n._2)], requires_grad=True)
            embedding = nn.Embedding.from_pretrained(anchor, freeze=False)
            e = embedding(torch.tensor([0]))
            optimizer = torch.optim.Adam(embedding.parameters(), ADAM_STEP)
            loss = triplet_loss(e, positive, negative)
            loss.backward()
            optimizer.step()
    
    return cast_arr_int(anchor.tolist()[0])

def gen_new_emb(df):
    return df.rdd.map(make_new_emb).toDF(["oid", "emb"])

In [None]:
fb_ds = read_and_fix_fb_dataset()
all_users = extract_all_users(fb_ds)

last_checkpoint = 0

emb_table = gen_random_embeddings(all_users)
emb_table.write.mode('overwrite').parquet("/emb_table_random")

#emb_table = spark.read.parquet("/emb_table_"+str(last_checkpoint))

for i in range(last_checkpoint+1, last_checkpoint+ITERATIONS+1):
    df_w_pos = join_emb_and_group_positives(fb_ds, emb_table)
    df_w_pos.persist()
    df_w_pos.count()
    
    df_w_pos_repart = repartition_by_dist(df_w_pos)
    df_w_pos_repart.persist()
    df_w_pos_repart.count()

    df_w_pos_repart.write.mode('overwrite').parquet("/df_w_pos_repart_"+str(i))
    
    df_final = gen_negatives(df_w_pos_repart)
    df_final.persist()
    df_final.count()

    
    df_final.withColumn("partition_id", F.spark_partition_id()).write.mode('overwrite').parquet("/df_final_"+str(i))
    
    emb_table = gen_new_emb(df_final)
    emb_table.persist()
    print("iteration: ", i, "count: ", emb_table.count())
    
    emb_table.write.mode('overwrite').parquet("/emb_table_"+str(i))

# Evaluation

In [12]:
def collect_first_circle(len2_path):
    return list(set(map(lambda x: x.first_circle_friend_id, len2_path)))

def collect_second_circle(len2_path):
    first_circle = collect_first_circle(len2_path)
    second_circle = list(map(lambda x: x.second_circle_friend_id, len2_path)) + first_circle
    return list(set(second_circle))

def recall(n_closest, circle, n):
    recall = 0
    circle = set(circle)
    for id in n_closest:
        if id in circle:
            recall+=1
    return recall/n

def calc_recalls_for_emb(emb_t):
    fb_ds = read_and_fix_fb_dataset().withColumnRenamed("friend_id", "first_circle_friend_id").withColumnRenamed("user_id", "uid")
    fb_ds2 = read_and_fix_fb_dataset()

    fb_ds_cross = fb_ds.join(fb_ds2, fb_ds.first_circle_friend_id==fb_ds2.user_id, "inner")\
    .withColumnRenamed("friend_id", "second_circle_friend_id").drop("user_id")\
    .withColumnRenamed("uid", "user_id")

    fb_ds_cross.persist()
    fb_ds_cross.count()

    cols_list = ["first_circle_friend_id", "second_circle_friend_id"]
    fb_ds_grp = fb_ds_cross.groupBy("user_id").agg(F.collect_list(F.struct(cols_list)).alias("len2_path"))\
    .join(emb_t, emb_t.oid==fb_ds_cross.user_id, "inner").drop("oid")\
    .withColumnRenamed("emb", "owner_emb")
    
    fb_ds_grp.persist()
    fb_ds_grp.count()

    collect_first_circle_udf = udf(collect_first_circle, ArrayType(IntegerType()))
    collect_second_circle_udf = udf(collect_second_circle, ArrayType(IntegerType()))

    fb_ds_circles = fb_ds_grp.withColumn("first_circle", collect_first_circle_udf(F.col("len2_path")))\
    .withColumn("second_circle", collect_second_circle_udf(F.col("len2_path")))\
    .drop("len2_path")
    fb_ds_circles.persist()
    fb_ds_circles.count()
    
    emb_table_clct = emb_t.collect()
    emb_table_clct_bc = sc.broadcast(emb_table_clct)

    def get_n_closest_to_owner(owner_id, owner_emb, n):
        emb_table_clct_ = emb_table_clct_bc.value
        id2emb_list = get_n_closest_except(owner_emb, emb_table_clct_, [owner_id], n)
        return list(map(lambda x: x[0], id2emb_list))
        
    get_n_closest_to_owner_udf = udf(get_n_closest_to_owner, ArrayType(IntegerType()))
    fb_ds_closest = fb_ds_circles.withColumn("n_closest",\
                                             get_n_closest_to_owner_udf(F.col("user_id"), \
                                                                        F.col("owner_emb"), \
                                                                        F.size(fb_ds_circles.first_circle)))
    
    fb_ds_closest.persist()
    fb_ds_closest.count()

    recall_udf = udf(recall, FloatType())
    n_fc = F.size(fb_ds_circles.first_circle)
    col_ncl = F.col("n_closest")
    
    recall_df = fb_ds_closest.filter("size(first_circle) > 20")\
    .withColumn("fc_recall", recall_udf(col_ncl, F.col("first_circle"), n_fc))\
    .withColumn("sc_recall", recall_udf(col_ncl, F.col("second_circle"), n_fc))
    
    recall_df.persist()
    recall_df.count()

    recall_df.write.mode('overwrite').parquet("/fb_ds_grp_")
    
    fc_recall = recall_df.agg({"fc_recall": "avg"}).head()["avg(fc_recall)"]
    sc_recall = recall_df.agg({"sc_recall": "avg"}).head()["avg(sc_recall)"]

    return (fc_recall, sc_recall)

In [14]:
calc_recalls_for_emb(spark.read.parquet("/emb_table_1"))

                                                                                

(0.027570640367780393, 0.2205639165663426)

In [18]:
calc_recalls_for_emb(spark.read.parquet("/emb_table_4"))

                                                                                

(0.5392777759359398, 0.9997891434668658)

# TSNE (likely OOM)

In [None]:
sample_uid = 347

In [None]:
import pyspark.sql.functions as F
fb_ds = read_fb_dataset()
# fb_ds.groupBy("user_id").agg(F.count("friend_id").alias("fr_cnt")).show()

In [None]:
fb_ds = read_and_fix_fb_dataset()
sample = fb_ds.filter("user_id="+str(sample_uid)).cache()

sample.count()

In [None]:
sample_friends = list(map(lambda x: x.friend_id, sample.select("friend_id").collect()))
len(sample_friends)

In [None]:
emb_table2 = spark.read.parquet("/emb_table_1").withColumnRenamed("user_id", "oid")
emb_table = spark.read.parquet("/emb_table_random")

all_from_sample = sample.select("user_id")\
.unionAll(sample.select("friend_id").withColumnRenamed("friend_id", "user_id")).distinct().cache()

all_from_sample_pls_emb = all_from_sample.join(emb_table, emb_table.oid==all_from_sample.user_id, "inner")\
.withColumnRenamed("emb", "emb_rnd")\
.join(emb_table2, emb_table2.oid==all_from_sample.user_id, "inner")\
.withColumnRenamed("emb", "emb_1step").drop("user_id").cache()

all_from_sample_pls_emb.persist()
all_from_sample_pls_emb.count()

In [None]:
print(all_from_sample_pls_emb.count())

In [None]:
all_from_sample_pls_emb_clctd = all_from_sample_pls_emb.collect()

In [None]:
X_rnd = list(map(lambda x: x.emb_rnd, all_from_sample_pls_emb_clctd))
X_1step = list(map(lambda x: x.emb_1step, all_from_sample_pls_emb_clctd))

In [None]:
sample_owner_emb_1step = emb_table2.filter("oid="+str(sample_uid)).head().emb
sample_owner_emb_1step_bc = sc.broadcast(sample_owner_emb_1step)

def calc_euclidean_to_owner(x2):
    return calc_euclidean(sample_owner_emb_1step_bc.value, x2)
    
calc_euclidean_to_owner_udf=udf(calc_euclidean_to_owner, FloatType())

all_emb_w_dist_to_owner = emb_table2.withColumn("dist_to_owner", calc_euclidean_to_owner_udf(F.col("emb"))).cache()
all_emb_w_dist_to_owner.count()

In [None]:
sample_closest = get_n_closest_except(sample_owner_emb_1step, all_emb_w_dist_to_owner.collect(), [], len(sample_friends))

sample_closest_emb = list(map(lambda x: x[1], sample_closest))
sample_closest_ids = list(map(lambda x: x[0], sample_closest))

In [None]:
len(sample_closest)

In [None]:
from sklearn.manifold import TSNE
import numpy as np
import matplotlib.pyplot as plt


tsne = TSNE(n_components=2, random_state=42, perplexity=5)
X_rnd_2d = tsne.fit_transform(np.array(X_rnd))
X_1step_2d = tsne.fit_transform(np.array(X_1step))
X_closest = tsne.fit_transform(np.array(sample_closest_emb))

# X_rnd_2d = np.array(X_rnd)
# X_1step_2d = np.array(X_1step)
# X_closest = np.array(sample_closest_emb)

fig = plt.figure()
ax1 = fig.add_subplot(111)

color_rnd = ['grey' for _ in range(len(X_rnd_2d))]
color_1st = ['cyan'] + ['green' for _ in range(len(X_1step_2d) - 1)]
ax1.scatter(X_rnd_2d[:,0], X_rnd_2d[:,1], color=color_rnd, label='rnd')
ax1.scatter(X_1step_2d[:,0], X_1step_2d[:,1], color=color_1st, label='1st')
#ax1.scatter(X_closest[:,0], X_closest[:,1], color='blue', label='closest')

plt.legend()
plt.show()