In [1]:
import math

def weightRate(rate, time): #TODO: a better approach to weight rates using time
	rate = float(rate)
	return rate

def weightProfi(rate, profi): #TODO: a better approach to weight profiles using rate
	if rate <= 0:
		return [0 for profiDim in profi]
	else:
		return [rate * profiDim for profiDim in profi]

def getMovieLine(row):
	mIdx = row[0]
	profi = row[1:]
	return (mIdx, [float(profiDim) for profiDim in profi])

def getRateLine(row):
	uIdx, mIdx, rate, time = row
	wRate = weightRate(rate, time)
	return (uIdx, (mIdx, wRate))

def getUserRate(line):
	uIdx, mRate = line
	mIdx, rate = mRate
	return (uIdx, (1, rate, rate ** 2))

def sumUserRate(x, y):
	xC, xR, xR2 = x
	yC, yR, yR2 = y
	return (xC + yC, xR + yR, xR2 + yR2)

class rateNormalizer(object):
	def __init__(self, userRateCount):
		self.meanRate = dict()
		self.varRate = dict()
		for uIdx, rateCount in userRateCount:
			count, rate, rate2 = rateCount
			self.meanRate[uIdx] = rate / count
			self.varRate[uIdx] = (rate2 / count) - self.meanRate[uIdx] ** 2
		self.sqrtMeanVarRate = math.sqrt(sum(self.varRate.values()) / len(self.varRate))
		return

	def normalize(self, line):
		uIdx, mRate = line
		mIdx, rate = mRate
		#assume the rate is normal dist., normalize to N(0, 1)
		nRate = 0.1 + (rate - self.meanRate[uIdx]) / (math.sqrt(self.varRate[uIdx]) + self.sqrtMeanVarRate) #TODO: a better approach to avoid dividing by 0
		return (mIdx, (uIdx, nRate))

def getUserComponent(data):
	mIdx, UM = data
	mProfi, uRate = UM
	uIdx, rate = uRate
	uProfiCompo = weightProfi(rate, mProfi)
	return (uIdx, uProfiCompo)

def sumUserProfi(x, y):
	return [x[i] + y[i] for i in range(len(x))]

def formatCSV(line):
	uIdx, uProfi = line
	res = [uIdx]
	for uProfiDim in uProfi:
		res.append(uProfiDim)
	return res

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
940,application_1551670200070_0558,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
movieData = spark.read.csv('/user/hz333/data/project/movieMetaProfile.csv', header = False)
rateData = spark.read.csv('/user/hz333/data/project/ratings.csv', header = True)

In [3]:
#(mIdx, mProfi) => (mIdx, [mProfi])
movie = movieData.rdd.map(getMovieLine)
#(uIdx, mIdx, rate, time) => (uIdx, (mIdx, wRate))
rate = rateData.rdd.map(getRateLine)

#(uIdx, (mIdx, rate)) => (uIdx, (1, rate, rate2))
userRate = rate.map(getUserRate)
#(uIdx, (1, rate, rate)) => (uIdx, (count, sumRate, sumRate2))
userRate = userRate.reduceByKey(sumUserRate)

userRate = userRate.collect()
rateNorm = rateNormalizer(userRate)

#(uIdx, (mIdx, rate)) => (mIdx, (uIdx, nRate))
MURate = rate.map(rateNorm.normalize)
#(mIdx, mProfi), (mIdx, (uIdx, rate)) => (mIdx, (mProfi, (uIdx, rate)))
MUM = movie.join(MURate)

#(mIdx, (mProfi, (uIdx, rate))) => (uIdx, [uProfiCompo])
uProfiCompo = MUM.map(getUserComponent)
#(uIdx, [uProfiCompo]) => (uIdx, [uProfi])
uProfi = uProfiCompo.reduceByKey(sumUserProfi)

In [4]:
uProfi.filter(lambda x: x[0] == '1').collect()

[('1', [0.0, 80.47164437342879, 94.51310751218831, 57.932530343214594, 72.02264824491319, 49.4118315957792, 51.32307933598421, 0.0, 50.00890242593793, 54.00985944005231, 4.064035812205046, 6.864016828072797, 0.0, 52.30238975995697, 29.48471919760489, 19.285233305525182, 38.870603420170355, 45.07166220323994, 38.71461823793791, 10.756180244835514])]

In [5]:
len(userRate)

610

In [6]:
#(uIdx, [uProfi]) => (uIdx, uProfi)
uProfiCSV = uProfi.map(formatCSV)
uProfiCSV = spark.createDataFrame(uProfiCSV, samplingRatio = 1)
uProfiCSV.write.option('header', 'false').csv('/user/hz333/data/project/uMetaProfi.csv')