In [1]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd

In [2]:
spark= SparkSession\
       .builder\
       .appName("concre-reall")\
       .getOrCreate()

In [3]:
rating_df=spark.read.csv('E:/Recomendation_sys_project/archive/rating.csv',header=True, inferSchema=True)
rating_df.printSchema()
rating_df=rating_df.where('rating>7')#评分大于7分的用户行为才表示推荐该电影

root
 |-- user_id: integer (nullable = true)
 |-- anime_id: integer (nullable = true)
 |-- rating: integer (nullable = true)



# 1. Deep walk 构建动漫序列

### 1.1 每个用户的anime观看序列

In [4]:
watch_seq_df=rating_df.groupby('user_id').agg(collect_list(col('anime_id').cast('string')).alias("anime_ids"))

In [5]:
watch_seq_df.show(10)

+-------+--------------------+
|user_id|           anime_ids|
+-------+--------------------+
|     31|[2581, 3784, 3785...|
|     34|[20, 30, 32, 147,...|
|     53|[101, 849, 1195, ...|
|     65|[687, 853, 1221, ...|
|     78|       [4224, 18153]|
|     85|[223, 356, 481, 5...|
|    108|[57, 61, 132, 147...|
|    137|         [121, 5114]|
|    148|[20, 81, 170, 263...|
|    155|[164, 199, 226, 3...|
+-------+--------------------+
only showing top 10 rows



### 1.2 构建邻接矩阵

这里数据是评分7分以上的电影，所以这里有一个假设，所有用户的评分行为表示用户喜欢和推荐该电影。
构建邻接矩阵就是构建graph embedding， 每一个nodes之间出现连接线表示同时出现在某一个用户的评分列表里，如果两部电影之间被多个用户评价（喜爱），那这显示了两部电影之间有某种关系（或者相似度），所以可以形成序列。 根据这个序列放到word2vec的算法里计训练出的每一个动漫的embedding, 是使用用户行为反应两个动漫之间的相似度
 

In [6]:
watch_seq=watch_seq_df.collect()

In [7]:
watch_seq[0]

Row(user_id=31, anime_ids=['2581', '3784', '3785', '5680', '6347', '6634', '7791', '8516', '8769', '8841', '9471', '9617', '9760', '9919', '9938', '10087', '10408', '10793', '10897', '11499', '11737', '11741', '11757', '11843', '12291', '13055', '13759', '14467', '14741', '14811', '14813', '15225', '15315', '15687', '15699', '15809', '15879', '16005', '16417', '16524'])

In [8]:
watch_seq1=[ w['anime_ids'] for w in watch_seq ]#形成的是一个二维的list

In [9]:
from collections import defaultdict
matrix=defaultdict(lambda:defaultdict(int))
matrix

defaultdict(<function __main__.<lambda>()>, {})

In [10]:
n=100
for i in range(n):
    seq=watch_seq1[i]
    for j in range(len(seq)):
        for k in range(j+1,len(seq)):
            a=seq[j]
            b=seq[k]
            if a==b:
                continue
            matrix[a][b]+=1
            matrix[b][a]+=1

### 1.3 概率转移矩阵

 这里数据是评分7分以上的电影，所以这里有一个假设，所以用户的评分行为表示用户喜欢和推荐该电影

In [11]:
trans_probs=defaultdict(lambda: defaultdict(list))

In [12]:
trans_probs

defaultdict(<function __main__.<lambda>()>, {})

In [13]:
trans_probs=defaultdict(lambda: defaultdict(list))
def get_transfer_probs(vs):
    neighbours=list(vs.keys())
    total_weight=__builtin__.sum(vs.values())
    probs=[weight/total_weight for weight in vs.values()]
    return neighbours,probs

In [14]:
for keys,indict in matrix.items():
    neigh,probs=get_transfer_probs(indict)
    trans_probs[keys]['neighbours']=neigh
    trans_probs[keys]["probs"]=probs

### 1.4 随机选取入口node

In [15]:
entrence_items=list(trans_probs.keys())

In [16]:
nodes_sum={i:__builtin__.sum(matrix[i].values()) for i in entrence_items}

In [17]:
total_sum=__builtin__.sum(nodes_sum.values())

In [18]:
entren_probs=[v/total_sum for k,v in nodes_sum.items()]

### 1.5 Deep walk

In [19]:
import numpy as np

In [20]:
rng= np.random.default_rng()
def one_walk(length,entrence_items,entrence_probs,transfer_probs):
    path=[]
    start_point=rng.choice(entrence_items,1,p=entrence_probs)[0]
    path.append(str(start_point))
    current_point=start_point
    for _ in range(length):
        next_point=rng.choice(transfer_probs[current_point]['neighbours'], 1, p=transfer_probs[current_point]['probs'])[0]
        path.append(str(next_point))
        current_point=next_point
    return path

In [21]:
n=100#在实际的工作中，这个值可以更大，采样500是不够用的
deepwalk_sample=[one_walk(19,entrence_items,entren_probs,trans_probs) for _ in range(n)]

# 2. 训练每个动漫的Embedding

### item2vec

In [26]:
# 要用spark的word2vec，所以要将原始的list转化成spark的df
deepwalk_df=spark.createDataFrame([[row] for row in deepwalk_sample],['anime_ids'])
deepwalk_df.show(10)

+--------------------+
|           anime_ids|
+--------------------+
|[9367, 558, 31043...|
|[2923, 2034, 2089...|
|[20031, 14813, 47...|
|[9041, 8675, 2251...|
|[6880, 19769, 72,...|
|[23847, 28677, 28...|
|[15809, 15613, 11...|
|[4715, 22359, 221...|
|[17895, 23673, 14...|
|[513, 31098, 8740...|
+--------------------+
only showing top 10 rows



In [23]:
from pyspark.ml.feature import Word2Vec

In [24]:
item2vec=Word2Vec(vectorSize=5,maxIter=2,windowSize=15)
item2vec.setInputCol('anime_ids')
item2vec.setOutputCol('anime_ids_vec')

Word2Vec_31a6b85fdce1

In [25]:
model=item2vec.fit(deepwalk_df)

# 3. LSH查找最近邻动漫的Emdedding