In [1]:
import pandas as pd
from sklearn.metrics import ndcg_score
import numpy as np
from collections import defaultdict
import pandas as pnd
import tqdm
import networkx as nx
import numpy as np

In [2]:
labels_path = "final/data/label.csv"
train_path = "final/data/ego_net_tr.csv"
test_path = "final/data/ego_net_te.csv"

In [3]:
def read_label(label_path):
    with open(label_path, 'r') as label_f:
        label_f.readline()
        cur_ego_id = -1
        cur_label = None
        for line in label_f:
            ego_id, u, v = list(map(int, line.split(",")))
            if ego_id != cur_ego_id:
                if cur_ego_id != -1:
                    yield cur_ego_id, cur_label
                cur_ego_id, cur_label = ego_id, []
            cur_label.append((u, v))
        if cur_ego_id != -1 and len(cur_label) > 0:
            yield cur_ego_id, cur_label

In [4]:
def read_ego_net(ego_net_path):
    cur_ego_id = -1
    cur_ego_net = None
    with open(ego_net_path, 'r') as ego_net_f:
        ego_net_f.readline()
        for ego_line in ego_net_f:
            ego_line = ego_line.split(',')
            ego_id, u, v, t, x1, x2, x3 = int(ego_line[0]), int(ego_line[1]), int(ego_line[2]), int(ego_line[3]), float(ego_line[4]), float(ego_line[5]), float(ego_line[6])
            if ego_id != cur_ego_id:
                if cur_ego_id != -1:
                    yield cur_ego_id, cur_ego_net
                assert cur_ego_id <= ego_id
                cur_ego_id = ego_id
                cur_ego_net = nx.DiGraph()
            if u not in cur_ego_net:
                cur_ego_net.add_node(u)
            if v not in cur_ego_net:
                cur_ego_net.add_node(v)
            cur_ego_net.add_edge(u, v, t=t, x1=x1, x2=x2, x3=x3)
        if cur_ego_net.size() > 0 and cur_ego_id != -1:
            yield cur_ego_id, cur_ego_net

In [5]:
def recommend(ego_net):
    E = ego_net.edges
    mtr = nx.to_numpy_array(ego_net)
    mtr[0, :] = 0
    mtr[:, 0] = 0
    mtr1 = (mtr + mtr.T)/2
    mtr2 = (mtr + mtr.T)/2
#     time_connects = mtr
    mtr_x1 = np.zeros_like(mtr1)
    mtr_x2 = np.zeros_like(mtr1)
    mtr_x3 = np.zeros_like(mtr1)

    for x in E:
        mtr_x1[x[0], x[1]] = E[x]["x1"]
        mtr_x2[x[0], x[1]] = E[x]["x2"]
        mtr_x3[x[0], x[1]] = E[x]["x3"]
        if E[x]["t"] == -1:
            t = 10000000
        else:
            t = E[x]["t"]
        mtr1[x[0], x[1]] *= 1/np.log(((t/25)**2)+2)  + np.log(E[x]["x1"] +1)*0.2 + np.log(E[x]["x2"] +1)*0.2+ np.log(E[x]["x3"] +1)*0.5
        mtr2[x[0], x[1]] *= 1/np.log(((t/18)**2)+2)  + np.log(E[x]["x1"] +1)*0.2 + np.log(E[x]["x2"] +1)*0.2+ np.log(E[x]["x3"] +1)*0.5
    out_degree = mtr.sum(axis=1).reshape((-1, 1))

    mtr_x1_weight = mtr_x1.T.dot(mtr1) * (1 - 100 * np.eye(len(mtr2)))  * (1 - 100 * mtr1)
    mtr_x2_weight = mtr_x2.T.dot(mtr1) * (1 - 100 * np.eye(len(mtr2)))  * (1 - 100 * mtr1)
    mtr_x3_weight = mtr_x3.T.dot(mtr1) * (1 - 100 * np.eye(len(mtr2)))  * (1 - 100 * mtr1)


    mtr_norm = (mtr / (1 + np.log(1 + out_degree)))
    mtr1_norm = (mtr1 / (1 + np.log(1 + out_degree)))
    mtr2_norm = (mtr2 / (1 + np.log(1 + out_degree)))

    aa = mtr_norm.T.dot(mtr1_norm) * (1 - 100 * np.eye(len(mtr1))) * (1 - 100 * mtr1)
    aa += mtr_norm.T.dot(mtr2_norm) * (1 - 100 * np.eye(len(mtr1))) * (1 - 100 * mtr1)
    
    mtr_x1_weight = np.mean([mtr_x1_weight, mtr_x1_weight.T], axis=0)    
    mtr_x2_weight = np.mean([mtr_x2_weight, mtr_x2_weight.T], axis=0)    
    mtr_x3_weight = np.mean([mtr_x3_weight, mtr_x3_weight.T], axis=0)    
    

    aa += mtr_x1_weight*0.05
    aa += mtr_x2_weight*0.01
    aa += mtr_x3_weight*0.05
    
    aa = np.mean([aa, aa.T], axis=0)    

    recs = list()
    for i in aa.flatten().argsort()[::-1]:
        u, v = min(i // len(mtr), i % len(mtr)), max(i // len(mtr), i % len(mtr))
        
        if u < v and u != 0 and (u,v) not in recs:
            recs.append((u, v))
        if len(recs) == 1000:
            break
    return recs

In [6]:
def main():
    n = 0
    ndcgs = []
    recalls = []
    with open('val.csv', 'w') as out:
        out.write('ego_id,u,v,label\n')
        recalls = []
        for (ego_id1, ego_net), (ego_id2, label) in tqdm.tqdm(zip(read_ego_net(train_path), read_label(labels_path))):
            assert ego_id1 == ego_id2
            recs = recommend(ego_net)
            if len([x for x in label if x in recs]) == 0:
                recalls.append(0)
            else:
                for u, v in recs:
                    l = "1" if (u,v) in label else "0"
                    out.write('{},{},{},{}\n'.format(ego_id1, u, v, l))
                recalls.append(1)
            if len(recalls) % 1000  == 0:
                print(np.mean(recalls))
    with open('test.csv', 'w') as out:
        out.write('ego_id,u,v,label\n')
        for ego_id1, ego_net in tqdm.tqdm(read_ego_net(test_path)):
            recs = recommend(ego_net)
            for u, v in recs:
                out.write('{},{},{},{}\n'.format(ego_id1, u, v, -1))
main()

999it [01:19, 12.57it/s]


0.672


142it [00:12, 11.19it/s]


In [7]:
test_preds1000 = pd.read_csv('test.csv')
test_preds1000['i'] = test_preds1000.index
test_preds1000["rank"] = test_preds1000.groupby("ego_id")["i"].rank(method="dense")
test_preds1000["rank"] = test_preds1000["rank"].apply(lambda x: int(x))
test_preds1000.to_csv('test_rank.csv', index=False)

In [9]:
test_preds1000 = pd.read_csv('val.csv')
test_preds1000['i'] = test_preds1000.index
test_preds1000["rank"] = test_preds1000.groupby("ego_id")["i"].rank(method="dense")
test_preds1000["rank"] = test_preds1000["rank"].apply(lambda x: int(x))
test_preds1000.to_csv('val_rank.csv', index=False)

In [15]:
import pandas as pd
from sklearn.metrics import ndcg_score
import numpy as np
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()
print(spark.sparkContext)
print("Spark App Name : "+ spark.sparkContext.appName)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/12 19:57:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/02/12 19:57:19 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
<SparkContext master=local[1] appName=SparkByExamples.com>
Spark App Name : SparkByExamples.com


In [16]:
from pyspark.sql.types import IntegerType,BooleanType,DoubleType,LongType

for table in ['test_rank', 'val_rank']:
    train_pairs_recall = spark.read.csv(f"{table}.csv", header=True)
    train_pairs_recall = train_pairs_recall.withColumn("ego_id",F.col("ego_id").cast(LongType()))\
    .drop('i')\
    .withColumn("u",F.col("u").cast(IntegerType()))\
    .withColumn("v",F.col("v").cast(IntegerType()))\
    .withColumn("rank",F.col("rank").cast(IntegerType()))\
    .withColumn("label",F.col("label").cast(IntegerType())).write.parquet(table)

                                                                                