In [None]:
import math

#movie profile part
def getMovieTag(row):
	uIdx, mIdx, tag, time = row
	return ((mIdx, tag.lower()), 1)

def count(x, y):
	return (x + y)

def movieTag2Tag(line):
	(mIdx, tag), count = line
	return (tag, 1)

def movieTag2Movie(line):
	(mIdx, tag), count = line
	return (mIdx, (tag, count))

class IDFScore(object):
	def __init__(self, tagCount, N):
		self.tagCount = dict(tagCount)
		self.N = N
		self.tagList = sorted(self.tagCount.keys()) #the col index

	def getTFIDF(self, line):
		mIdx, tagFrq = line
		maxFrq = max(tagFrq, key = lambda x: x[1])[1]

		tagFrq = dict(tagFrq)
		TFIDF = []
		for tag in self.tagList:
			if tag in tagFrq:
				IDF = math.log(self.N / self.tagCount[tag], 2)
				TF = tagFrq[tag] / maxFrq
				TFIDF.append(TF * IDF)
			else:
				TFIDF.append(0.)
		return [mIdx, TFIDF]


#user profile part
def weightRate(rate, time): #TODO: a better approach to weight rates using time
	rate = float(rate)
	return rate

def weightProfi(rate, profi): #TODO: a better approach to weight profiles using rate
	if rate <= 0:
		return [0 for profiDim in profi]
	else:
		return [rate * profiDim for profiDim in profi]

def getRateLine(row):
	uIdx, mIdx, rate, time = row
	wRate = weightRate(rate, time)
	return (uIdx, (mIdx, wRate))

def getUserRate(line):
	uIdx, mRate = line
	mIdx, rate = mRate
	return (uIdx, (1, rate, rate ** 2))

def sumUserRate(x, y):
	xC, xR, xR2 = x
	yC, yR, yR2 = y
	return (xC + yC, xR + yR, xR2 + yR2)

class rateNormalizer(object):
	def __init__(self, userRateCount):
		self.meanRate = dict()
		self.varRate = dict()
		for uIdx, rateCount in userRateCount:
			count, rate, rate2 = rateCount
			self.meanRate[uIdx] = rate / count
			self.varRate[uIdx] = (rate2 / count) - self.meanRate[uIdx] ** 2
		self.sqrtMeanVarRate = math.sqrt(sum(self.varRate.values()) / len(self.varRate))
		return

	def normalize(self, line):
		uIdx, mRate = line
		mIdx, rate = mRate
		#assume the rate is normal dist., normalize to N(0, 1)
		nRate = 0.1 + (rate - self.meanRate[uIdx]) / (math.sqrt(self.varRate[uIdx]) + self.sqrtMeanVarRate) #TODO: a better approach to avoid dividing by 0
		return (mIdx, (uIdx, nRate))

def getUserComponent(data):
	mIdx, UM = data
	mProfi, uRate = UM
	uIdx, rate = uRate
	uProfiCompo = weightProfi(rate, mProfi)
	return (uIdx, uProfiCompo)

def sumUserProfi(x, y):
	return [x[i] + y[i] for i in range(len(x))]


#similarity part
def getCos(m, u):
	compo = [m[i] * u[i] for i in range(len(m))]
	return sum(compo)

def normalize(profi):
	norm2 = sum([dim ** 2 for dim in profi])
	norm = math.sqrt(norm2)
	return [dim / norm2 for dim in profi] #norm(profi) == 1

def getProfi(row):
	idx = row[0]
	profi = [float(profiDim) for profiDim in row[1:]]
	nProfi = normalize(profi)
	return (idx, nProfi)

def getSim(MU):
	(mIdx, mProfi), (uIdx, uProfi) = MU
	sim = getCos(mProfi, uProfi)
	return (mIdx, uIdx, sim)

In [None]:
tagsData = spark.read.csv('/user/hz333/data/project/tags.csv', header = True)

In [None]:
#(uIdx, mIdx, tag, time) => ((mIdx, tag), 1)
movieTag = tagsData.rdd.map(getMovieTag)
#((mIdx, tag), 1) => ((mIdx, tag), count)
movieTagCount = movieTag.reduceByKey(count)

#((mIdx, tag), count) => (tag, 1)
tagCount = movieTagCount.map(movieTag2Tag)
#(tag, 1) => (tag, count)
tagCount = tagCount.reduceByKey(count)
tagCount = tagCount.collect()

#((mIdx, tag), count) => (mIdx, (tag, count))
movieCount = movieTagCount.map(movieTag2Movie)
#(mIdx, (tag, count)) => (mIdx, [(tag, count)])
movieCount = movieCount.groupByKey()
movieCount = movieCount.mapValues(list)

IDF = IDFScore(tagCount, movieCount.count())

#(mIdx, [(tag, count)]) => (mIdx, [mProfi])
mProfi = movieCount.map(IDF.getTFIDF)

In [None]:
rateData = spark.read.csv('/user/hz333/data/project/sTrain.csv', header = False)

In [None]:
#(uIdx, mIdx, rate, time) => (uIdx, (mIdx, wRate))
rate = rateData.rdd.map(getRateLine)

#(uIdx, (mIdx, rate)) => (uIdx, (1, rate, rate2))
userRate = rate.map(getUserRate)
#(uIdx, (1, rate, rate)) => (uIdx, (count, sumRate, sumRate2))
userRate = userRate.reduceByKey(sumUserRate)

userRate = userRate.collect()
rateNorm = rateNormalizer(userRate)

#(uIdx, (mIdx, rate)) => (mIdx, (uIdx, nRate))
MURate = rate.map(rateNorm.normalize)
#(mIdx, mProfi), (mIdx, (uIdx, rate)) => (mIdx, (mProfi, (uIdx, rate)))
MUM = mProfi.join(MURate)

#(mIdx, (mProfi, (uIdx, rate))) => (uIdx, [uProfiCompo])
uProfiCompo = MUM.map(getUserComponent)
#(uIdx, [uProfiCompo]) => (uIdx, [uProfi])
uProfi = uProfiCompo.reduceByKey(sumUserProfi)
uProfi = uProfi.filter(lambda x: sum(x[1]) != 0)

In [None]:
#(mIdx, [mProfi]) => #(mIdx, [nMProfi])
mProfi = mProfi.mapValues(normalize)
#(uIdx, [uProfi]) => #(uIdx, [nMProfi])
uProfi = uProfi.mapValues(normalize)

#(mIdx, [mProfi]), (uIdx, [uProfi]) => ((mIdx, [mProfi]), (uIdx, [uProfi]))
MU = mProfi.cartesian(uProfi)
#((mIdx, [mProfi]), (uIdx, [uProfi])) => (mIdx, uIdx, sim)
simMat = MU.map(getSim)

In [None]:
simCSV = spark.createDataFrame(simMat, samplingRatio = 1)
simCSV.repartition(1).write.option('header', 'false').csv('/user/hz333/data/project/tagsSim.csv')

In [None]:
simMat.take(1)