In [1]:
import findspark
findspark.init()
from typing import *
from pyspark.sql import *
from pyspark.sql.types import  IntegerType,ArrayType,StringType,FloatType
from pyspark.ml import Pipeline
from pyspark.ml.feature import Word2Vec,Word2VecModel
from pyspark.sql.functions import *
import os
from pyspark.ml.linalg import VectorUDT,Vectors
from pyspark.ml.feature import MinMaxScaler

In [2]:
spark = SparkSession.builder.appName('enbbeding').master('local[*]').getOrCreate()

In [3]:
def tmpDouble2vec(x):
    return Vectors.dense(x)
double2vec = udf(f=tmpDouble2vec, returnType=VectorUDT())
age=[(1,),(2,),(3,),(4,)]
df = spark.createDataFrame(age,schema=["age"])
df=df.withColumn('ageVec',double2vec(col('age')))
df.show()

+---+------+
|age|ageVec|
+---+------+
|  1| [1.0]|
|  2| [2.0]|
|  3| [3.0]|
|  4| [4.0]|
+---+------+



In [4]:
minMax = MinMaxScaler(inputCol='ageVec',outputCol='ageVec').fit(df)
df =minMax.transform(df)
# df.printSchema()
df.show()

IllegalArgumentException: 'requirement failed: Output column ageVec already exists.'

In [27]:
to_array = udf(lambda x: x.toArray().tolist(), ArrayType(FloatType()))
df=df.withColumn('ageitem',to_array(col('ageScaler')).getItem(0))

In [24]:
df.show()

+---+------+--------------------+----------+
|age|ageVec|           ageScaler|   ageitem|
+---+------+--------------------+----------+
|  1| [1.0]|               [0.0]|       0.0|
|  2| [2.0]|[0.3333333333333333]|0.33333334|
|  3| [3.0]|[0.6666666666666666]| 0.6666667|
|  4| [4.0]|               [1.0]|       1.0|
+---+------+--------------------+----------+



In [3]:
sent = ("a b " * 100 + "a c " * 10).split(" ")
doc = spark.createDataFrame([(sent,), (sent,)], ["sentence"])
doc

DataFrame[sentence: array<string>]

In [5]:
doc.show(10)

+--------------------+
|            sentence|
+--------------------+
|[a, b, a, b, a, b...|
|[a, b, a, b, a, b...|
+--------------------+



In [24]:
word2Vec = Word2Vec(vectorSize=5, seed=42, inputCol="sentence")
model = word2Vec.fit(doc)

In [25]:
model.getVectors().show()

+----+--------------------+
|word|              vector|
+----+--------------------+
|   a|[0.09461779892444...|
|   b|[1.15474212169647...|
|   c|[-0.3794820010662...|
+----+--------------------+



In [26]:
doc.printSchema()

root
 |-- sentence: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [28]:
type(model) 

pyspark.ml.feature.Word2VecModel

In [49]:
df=model.getVectors()

In [57]:
# for i,j in df.
df.collect()[1]

Row(word='b', vector=DenseVector([1.1547, -0.5933, -0.8722, 0.4669, 0.5515]))

In [58]:
for row in df.collect():
    print(row['word'],row['vector'])

a [0.0946177989244461,-0.4951631426811218,0.06406556069850922,-0.37930983304977417,0.21593928337097168]
b [1.1547421216964722,-0.593326210975647,-0.8721810579299927,0.4669361710548401,0.551497220993042]
c [-0.3794820010662079,0.34077689051628113,0.06388652324676514,0.0352821946144104,-0.24136029183864594]


In [1]:
import findspark
findspark.init()
from typing import *
from pyspark.sql import *
from pyspark.sql.types import  IntegerType,ArrayType,StringType
from pyspark.ml.feature import Word2Vec
from pyspark.sql.functions import *
import os,random
from collections import defaultdict
import redis

HOST ='localhost'
PORT = 6379
def sortByTime(movieid_time_list:List):
    '''
    按照时间戳排序，返回movieids
    :param movieid_time_list:
    :return:
    '''
    movieid_time_list.sort(key=lambda x:x[1])
    mids = [i[0] for i in movieid_time_list]
    return mids

def processItemSequence(spark:SparkSession):
    '''
    处理评分数据，筛选评分大于3.5的，按照用户id分组获取评分电影序列
    :param spark:
    :return:
    '''

    df = spark.read.format('csv').option('header', 'true').load('./data/ratings.csv')
    df.printSchema()

    sortUdf = udf(f=sortByTime,returnType=ArrayType(StringType()))
    userSeq=df.where(df['rating'] >= 3.5).groupby('userId').agg(sortUdf(collect_list(struct('movieId','timestamp'))).alias('movieIds'))\
        .withColumn('movieIdStr',array_join('movieIds',' '))
    userSeq.show(5)

    #不使用udf，速度慢一点
    # userSeq=df.where(df['rating'] >= 3.5).sort('timestamp').groupby('userId').agg(collect_list('movieId').alias('movieIds')).withColumn('movieIdStr',array_join('movieIds',' ')).show(10)

    userSeq.printSchema()
    dataset = userSeq.select('movieIds')
    moviesCount = dataset.select(explode(col('movieIds'))).alias('tmp').distinct().count()
    print('unique high rating movies:{}'.format(moviesCount))
    # print(dataset.count())
    return dataset

def trainItem2vec(dataset,filename,saveToRedis=False,redisKeyPrefix=None):
    '''
    训练产生embedding,inputCol需要是 array（string）类型
    训练好后写入 filename
    :param dataset:
    :return:
    '''
    word2vec = Word2Vec(vectorSize=10,windowSize=5,maxIter=10,inputCol='movieIds')
    model = word2vec.fit(dataset)
    print('model fitted')
    # 打印相似电影，基于点积运算
    synonyms = model.findSynonymsArray('158',20)
    for moveid,similarity in synonyms:
        print('{}:{}'.format(moveid,similarity))

    with open('./modeldata/{}'.format(filename),'w') as f:
        for row in model.getVectors().collect():
            tmp=','.join([str(vector) for vector in row['vector']])
            f.write('{}:{}\n'.format(row['word'],tmp))

    # redis-cli eval "redis.call('del', unpack(redis.call('keys','*')))" 0 windows批量删除key
    if saveToRedis:
        pool = redis.ConnectionPool(host=HOST,port=PORT)
        # key的存活时间 秒
        ex = 60 * 10
        r = redis.Redis(connection_pool=pool)
        for i,row in enumerate(model.getVectors().collect()):
            tmp = ','.join([str(vector) for vector in row['vector']])
            if i == 1:
                print(type(row['vector']))
            r.set('{}:{}'.format(redisKeyPrefix,row['word']),tmp,ex)
    return model 


def dealPairMovie(movies:Row)->List:
    '''
    udf
    :param movies:
    :return:
    '''
    newl=[]
    movies = movies['movieIds']
    for i in range(len(movies)-1):
        newl.append((movies[i],movies[i+1]))
    return newl


def generateTransitionMatrix(dataset:DataFrame):
    '''
    生成状态转移矩阵
    :param dataset:
    :return:
    '''
    pairSamples=dataset.rdd.flatMap(dealPairMovie)
    pairSamples.cache()
    print(pairSamples.take(10))
    print('pairSamples over')
    # {(mid,mid2):count,...}

    pairCountMap = pairSamples.countByValue()

    print('pairCountMap_{}'.format(len(pairCountMap)))
    # 计数状态矩阵
    transitionCountMatrix = defaultdict(dict)
    itemCountMap = defaultdict(int)
    all_count=0
    for k,count in pairCountMap.items():
        transitionCountMatrix[k[0]][k[1]] = count
        itemCountMap[k[0]] +=count
        all_count+=count
    print('transitionCountMatrix over')
    #概率状态矩阵
    transitionMatrix = defaultdict(dict)
    itemDistribution = defaultdict(int)
    for a,cmap in transitionCountMatrix.items():
        for b,count in cmap.items():
            transitionMatrix[a][b] = float(count /itemCountMap[a])

    for k,count in itemCountMap.items():
        itemDistribution[k] = float(count / all_count)

    print('transitionMatrix_{}'.format(len(transitionMatrix)))
    print(transitionMatrix['858'])
    print('itemDistribution_{}'.format(len(itemDistribution)))
    print(itemDistribution['858'])
    return transitionMatrix,itemDistribution

def oneRandomWalk(transitionMatrix, itemDistribution, sampleLength):
    '''
    单次随机游走
    :param transitionMatrix:
    :param itemDistribution:
    :param sampleLength:
    :return:
    '''
    sample = []
    randomValue = random.random()
    firstItem=''
    accumulateProb=0

    # 按照电影分布，取第一部电影
    for k,v in itemDistribution.items():
        accumulateProb+=v
        if accumulateProb >= randomValue:
            firstItem=k
            break
    sample.append(firstItem)
    curItem = firstItem

    # 按照状态转移，取后面9部电影
    for i in range(1,sampleLength):
        if not transitionMatrix[curItem] or not itemDistribution[curItem]:
            break
        # 随机游走的策略
        randomProb = random.random()
        for k, prob in transitionMatrix[curItem].items():
            if randomProb >= prob:
                curItem = k
                break

        sample.append(curItem)
    return sample

def randomWalk(transitionMatrix,itemDistribution,sampleCount,sampleLength):
    '''
    随机游走
    :param transitionMatrix:
    :param itemDistribution:
    :param sampleCount:
    :param sampleLength:
    :return:
    '''
    samples = []
    for i in range(sampleCount):
        samples.append(oneRandomWalk(transitionMatrix, itemDistribution, sampleLength))
    return samples

def oneNode2vec(transitionMatrix, itemDistribution, sampleLength):

    p , q  = 0.1, 0.2
    sample = []
    randomValue = random.random()
    firstItem = ''
    accumulateProb = 0

    # 按照电影分布，取第一部电影
    for k, v in itemDistribution.items():
        accumulateProb += v
        if accumulateProb >= randomValue:
            firstItem = k
            break

    sample.append(firstItem)
    curItem = firstItem
    #nodeT始终是curElement的前一个值
    nodeT = curItem
    # 按照状态转移，取后面9部电影
    for i in range(1, sampleLength):
        if not transitionMatrix[curItem] or not itemDistribution[curItem]:
            break
        randomProb = random.random()
        # 第一步时，curItem和nodeT是同一个点，所以要保持nodeT不动，curIte前进一步
        if i == 1:
            for item, prob in transitionMatrix[curItem].items():
                if randomProb >= prob:
                    curItem = item
                    break
        else:
            for item, prob in transitionMatrix[curItem].items():
                # 跳回前一节点
                if item == nodeT:
                    prob = prob * 1 / p
                #distince =1
                elif item in transitionMatrix[nodeT]:
                    prob = prob
                #distince =2
                else:
                    prob = prob * 1/q

                if randomProb >= prob:
                    nodeT = curItem
                    curItem = item
                    break
        sample.append(curItem)

    return sample


def node2vec(transitionMatrix,itemDistribution,sampleCount,sampleLength):
    samples = []
    for i in range(sampleCount):
        samples.append(oneNode2vec(transitionMatrix, itemDistribution, sampleLength))
    return samples

def graphEmb(dataset:DataFrame,spark:SparkSession,embOutputFilename,saveToRedis=False,redisKeyPrefix=None):
    '''
    图enbding
    :param dataset:
    :param spark:
    :param embOutputFilename:
    :return:
    '''
    transitionMatrix, itemDistribution=generateTransitionMatrix(dataset)
    sampleCount = 20000
    sampleLength = 10

    # newSamples=randomWalk(transitionMatrix, itemDistribution, sampleCount, sampleLength)
    newSamples = node2vec(transitionMatrix, itemDistribution, sampleCount, sampleLength)

    # 转为rdd
    rddSamples=spark.sparkContext.parallelize([Row(movieIds=i) for i in newSamples])
    print(newSamples[:10])
    print(rddSamples.take(10))
    # 转为DataFrame
    dataFrameSamples = spark.createDataFrame(rddSamples)
    print(type(dataFrameSamples))
    print(dataFrameSamples.take(10))
    # trainItem2vec(dataFrameSamples,embOutputFilename,saveToRedis,redisKeyPrefix)

In [2]:
spark = SparkSession.builder.appName('enbbeding').master('local[*]').getOrCreate()
df = spark.read.format('csv').option('header', 'true').load('./data/ratings.csv')
dataset=processItemSequence(spark)
model=trainItem2vec(dataset,'item2vecEmb1.txt',saveToRedis=True,redisKeyPrefix='i2vEmb')
rows =model.getVectors().collect()
movdict={}
for row in rows:
    movdict[row['word']] = list(row['vector'])


In [39]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def dealuseremb(movieids:List):
    useremb=[0] * 10
    for movieid in movieids:
        movEmb = movdict.get(movieid)
        if movEmb:
            useremb=[useremb[i]+movEmb[i] for i in range(10)]
    return ','.join([str(ue) for ue in useremb])
dealUserEmb = udf(f=dealuseremb,returnType=StringType())

In [40]:
um=df.groupBy('userId').agg(collect_list(col('movieId')).alias('movieIds')).withColumn('userEmb',dealUserEmb(col('movieIds')))
um.show(5)

+------+--------------------+--------------------+
|userId|            movieIds|             userEmb|
+------+--------------------+--------------------+
| 10096|[25, 50, 457, 514...|-2.32336444780230...|
| 10351|[1, 6, 25, 26, 30...|-16.0140708163380...|
| 10436|[1, 2, 34, 39, 47...|-3.49004807882010...|
|  1090|[151, 172, 236, 2...|-2.12114070728421...|
| 11078|[1, 10, 20, 47, 5...|-2.46599732246249...|
+------+--------------------+--------------------+
only showing top 5 rows



In [46]:
um.select('userId','userEmb').collect()

[Row(userId='10096', userEmb='-2.323364447802305,-0.35313513688743114,0.9455482312478125,0.7810606732964516,-1.4330109022557735,-0.5514448419853579,0.001416034996509552,2.6387525014579296,-1.8949083760380745,2.6085873562842607'),
 Row(userId='10351', userEmb='-16.014070816338062,1.3154994202777743,-9.14203093526885,-11.175116918981075,-1.8746127239137422,-2.6251393270213157,-2.3533106229733676,5.1870934292674065,-2.5153188295662403,1.8868449674919248'),
 Row(userId='10436', userEmb='-3.4900480788201094,-1.2193054631352425,-1.7461700636195019,2.9194485172629356,3.138754840940237,10.339212758030044,5.824913382530212,1.2715864013880491,3.3181863594800234,6.51296978443861'),
 Row(userId='1090', userEmb='-2.121140707284212,-1.2063480094075203,-2.6908475208329037,2.1919729709625244,0.14747827593237162,8.45528069138527,-0.9212313019670546,-2.1806666534394026,0.9904647767543793,-1.3890195731073618'),
 Row(userId='11078', userEmb='-2.465997322462499,-7.463959708809853,0.951576616615057,1.584101

In [22]:
from pyspark.sql.f

0

In [10]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def dealuseremb(movdict,movieids:Row):
    useremb=[0] *10
    movies = movies['movieIds']
    for movieid in movies:
        movEmb = movdict.get('movieid')
        if movEmb:
            useremb=[useremb[i]+movEmb[i] for i in range(10)]
    return ','.join(useremb)
dealUserEmb = udf(f=dealuseremb,returnType=)

29776