In [17]:
from pyspark.ml.linalg import Vectors, SparseVector, DenseVector
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor
from collections import defaultdict
from pyspark import SparkContext
from pyspark.sql import SparkSession
import os
import json
import cPickle as pickle
import math
from pyspark.sql import functions as F


def tuple2sparse(tp, size=43, begin=19, end=42):
    dic = {}
    for i in xrange(end-begin):
        if (tp[i] - 0) > 10e-4:
            dic[i+begin] = tp[i]
    v = Vectors.sparse(size, dic)
    return v


def add(v1, v2):
    assert isinstance(v1, SparseVector) and isinstance(v2, SparseVector), 'One of them is not SparseVector!'
    assert v1.size == v2.size, 'Size not equal!'
    values = defaultdict(float) # Dictionary with default value 0.0
    # Add values from v1
    for i in range(v1.indices.size):
        values[v1.indices[i]] += v1.values[i]
    # Add values from v2
    for i in range(v2.indices.size):
        values[v2.indices[i]] += v2.values[i]
    return Vectors.sparse(v1.size, dict(values))

In [31]:
def loadDataJson(business_path='', user_path='', star_path=''):
    bDF = spark.read.json(business_path)
    uDF = spark.read.json(user_path)
    sDF = spark.read.json(star_path)

    businessDF = bDF.rdd.map(lambda x: (x['b_id'], tuple2sparse(
                         tuple(x['loc']) + tuple(x['votes']) + (x['avg_star'], ) +
                         tuple(x['cates']) + (x['rev_num'], ) + tuple(x['ckins']),
                         begin=19, end=42))).toDF(['b_id', 'b_features'])

    userDF = uDF.rdd.map(lambda x: (x['u_id'], tuple2sparse(
                     tuple(x['loc']) + tuple(x['votes']) +
                     (x['loc_num'], x['avg_star'], x['rev_num']) + tuple(x['cates']),
                     begin=0, end=19))).toDF(['u_id', 'u_features'])

    starDF = sDF.select((sDF.business_id).alias('b_id'), (sDF.user_id).alias('u_id'), 
                        (sDF.stars).alias('label'), (sDF.review_id).alias('rev_id'))
    return businessDF, userDF, starDF


def transData4GBT(businessDF, userDF, starDF):
    alldata = starDF.select(starDF.b_id, starDF.u_id, starDF.label) \
                    .join(businessDF, starDF.b_id == businessDF.b_id).drop(businessDF.b_id) \
                    .join(userDF, starDF.u_id == userDF.u_id).drop(userDF.u_id)\
                    .select('label', 'b_features', 'u_features', 'u_id', 'b_id')
    assembler = VectorAssembler(
                    inputCols=["b_features", "u_features"],
                    outputCol="features")

    data = assembler.transform(alldata).drop('b_features', 'u_features')
    return data


def traingbt(datafrom='json', business_path='', user_path='', star_path=''):
    gbt = GBTRegressor(maxIter=5, maxDepth=2, seed=42)
    if datafrom == 'json':
        businessDF, userDF, starDF = loadDataJson(business_path=business_path,
                                                  user_path=user_path,
                                                  star_path=star_path)
    elif datafrom == 'mongodb':
        businessDF, userDF, starDF = loadDataMongo()
    data = transData4GBT(businessDF, userDF, starDF)
    model = gbt.fit(data)
    return model

In [19]:
def recommendation(businessDF, userDF, testDF, model):
    CartesianDF = testDF.crossJoin(businessDF.select('b_id')).drop(testStarDF.b_id).drop('rev_id')
    recDF = transData4GBT(businessDF, userDF, CartesianDF)
    predDF = model.transform(recDF)
    
    temp = predDF.groupby('u_id').agg(F.max(predDF.prediction)) \
                 .withColumnRenamed('max(prediction)', 'prediction')
    pred = temp.join(predDF, ['prediction', 'u_id'], 'outer').drop(predDF.u_id).drop(predDF.prediction)
    pred = pred.select('u_id', 'b_id')

    return pred

In [34]:
business_path = 'businesses.json'
user_path = 'users.json'
star_path = 'yelp_academic_dataset_review.json'


gbt = GBTRegressor(maxIter=50, maxDepth=6, seed=42)
businessDF, userDF, starDF = loadDataJson(business_path=business_path,
                                          user_path=user_path,
                                          star_path=star_path)
# split starDF to training data and test data
trainStarDF, testStarDF = starDF.randomSplit([0.7, 0.3])

trainDF = transData4GBT(businessDF, userDF, trainStarDF)

model = gbt.fit(trainDF)

testDF = transData4GBT(businessDF, userDF, testStarDF)
predDF = model.transform(testDF)

predDF.show()
errors = predDF.rdd.map(lambda x: (x.label - x.prediction)**2).collect()
RMSE = math.sqrt(sum(errors)/len(errors))
print 'RMSE: %.8f' % RMSE

# recDF = recommendation(businessDF, testStarDF, model)
# recDF.printSchema()
# recDF.show()


+-----+--------------------+--------------------+--------------------+------------------+
|label|                u_id|                b_id|            features|        prediction|
+-----+--------------------+--------------------+--------------------+------------------+
|    5|-1zQA2f_syMAdA04P...|aNe8ofTYrealxqv7V...|(86,[19,21,22,23,...| 2.624166121228215|
|    2|-3i9bhfvrM3F1wsC9...|5iSmZO0SrKU6EoXK_...|(86,[19,21,22,23,...|  3.63287926235794|
|    4|-3i9bhfvrM3F1wsC9...|ghpFh6XpH1TYZhjAG...|(86,[19,21,22,23,...| 4.274533711932253|
|    4|-3i9bhfvrM3F1wsC9...|GtHu9uGXpn7Jg_Z7v...|(86,[19,21,22,23,...|3.8912370468409585|
|    4|-4Anvj46CWf57KWI9...|vKA9sIqBcW0UlTKGh...|(86,[19,21,22,23,...|3.3628979446389167|
|    4|-55DgUo52I3zW9Rxk...|3qlqzQrwh8hjBltlg...|(86,[19,21,22,23,...|  4.57561898326436|
|    5|-55DgUo52I3zW9Rxk...|CYWRPE-1IHPBb-zfF...|(86,[19,21,22,23,...|4.5735588804823175|
|    5|-55DgUo52I3zW9Rxk...|3awTUGMdUVrwEBkFF...|(86,[19,21,22,23,...| 4.580856617019137|
|    4|-55

In [32]:
# recDF = recommendation(businessDF, userDF, testStarDF, model)
# recDF.printSchema()
# recDF.show()

CartesianDF = testStarDF.crossJoin(businessDF.select('b_id')).drop(testStarDF.b_id).drop('rev_id')
CartesianDF.printSchema()



root
 |-- u_id: string (nullable = true)
 |-- label: long (nullable = true)
 |-- b_id: string (nullable = true)

In [33]:
recDF = transData4GBT(businessDF, userDF, CartesianDF)
# predDF = model.transform(recDF)
    
# temp = predDF.groupby('u_id').agg(F.max(predDF.prediction)) \
#                  .withColumnRenamed('max(prediction)', 'prediction')
# pred = temp.join(predDF, ['prediction', 'u_id'], 'outer').drop(predDF.u_id).drop(predDF.prediction)
# pred = pred.select('u_id', 'b_id')

recDF.printSchema()

An error occurred while calling o1090.transform.
: org.apache.spark.SparkException: Job 46 cancelled because Stage 214 was cancelled
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1375)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply$mcVI$sp(DAGScheduler.scala:1364)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1363)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1363)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234)
	at org.apache.spark.scheduler.DAGScheduler.handleStageCancellation(DAGScheduler.scala:1363)
	at org.apache.spark.scheduler.DAGSchedulerEventPr

In [30]:
predDF = model.transform(recDF)
predDF.printSchema()

root
 |-- label: long (nullable = true)
 |-- b_features: vector (nullable = true)
 |-- u_features: vector (nullable = true)
 |-- u_id: string (nullable = true)
 |-- b_id: string (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: double (nullable = true)

In [39]:
from pyspark.sql import functions as F
temp = prediction.groupby('u_id').agg(F.max(prediction.prediction)) \
                 .withColumnRenamed('max(prediction)', 'prediction')
pred = temp.join(prediction, ['prediction', 'u_id'], 'outer').drop(prediction.u_id).drop(prediction.prediction)

pred.printSchema()
pred.show()


root
 |-- prediction: double (nullable = true)
 |-- u_id: string (nullable = true)
 |-- label: double (nullable = true)
 |-- b_id: string (nullable = true)
 |-- features: vector (nullable = true)

+------------------+--------------------+-----+--------------------+--------------------+
|        prediction|                u_id|label|                b_id|            features|
+------------------+--------------------+-----+--------------------+--------------------+
|1.5334101408098666|09D5rRgsFmHRJKVBm...|  2.0|EtLmuDIMsBCmqdgpy...|(43,[0,5,6,7,8,11...|
|1.5334101408098666|1-VO40rPQDC4Q9Spt...|  1.0|K0pN6x7fzmsO8h36N...|(43,[0,5,6,7,9,10...|
|1.5334101408098666|2ezaoRp1PzHaMgIrz...|  1.0|sPd3E7lFzd_yooiq-...|(43,[0,5,6,7,8,18...|
|1.5334101408098666|4-ElUwzF5CgbEy0ay...|  1.0|FR7dh1_TnWNyGbhhq...|(43,[0,5,6,7,12,1...|
|1.5334101408098666|4c-dhmNntBrpUHOCc...|  1.0|vGLl5xum2u2Qf8_Av...|(43,[0,5,6,7,8,9,...|
|1.5334101408098666|4c-dhmNntBrpUHOCc...|  1.0|MTtI2bqoNHN_0m2cH...|(43,[0,5,6,7,8,

In [34]:
businessDF.printSchema()
userDF.printSchema()
starDF.printSchema()
trainStarDF.printSchema()
prediction.printSchema()

root
 |-- b_id: string (nullable = true)
 |-- b_features: vector (nullable = true)

root
 |-- u_id: string (nullable = true)
 |-- u_features: vector (nullable = true)

root
 |-- b_id: string (nullable = true)
 |-- u_id: string (nullable = true)
 |-- label: long (nullable = true)
 |-- rev_id: string (nullable = true)

root
 |-- b_id: string (nullable = true)
 |-- u_id: string (nullable = true)
 |-- label: long (nullable = true)
 |-- rev_id: string (nullable = true)

root
 |-- label: double (nullable = true)
 |-- u_id: string (nullable = true)
 |-- b_id: string (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: double (nullable = true)

In [50]:
CartesianDF = testStarDF.crossJoin(businessDF.select('b_id')).drop(testStarDF.b_id).drop('rev_id')
CartesianDF.show()

+--------------------+-----+--------------------+
|                u_id|label|                b_id|
+--------------------+-----+--------------------+
|dVy5EAV9YZIl9Xl-X...|    3|QgNYM-ccNhJ8eGsQP...|
|dVy5EAV9YZIl9Xl-X...|    3|BqsIt1BQKzS-hEKLY...|
|dVy5EAV9YZIl9Xl-X...|    3|MH0oOCJ7DKnIJWwUQ...|
|dVy5EAV9YZIl9Xl-X...|    3|s9ZY6ESOJF0mABkGr...|
|dVy5EAV9YZIl9Xl-X...|    3|ln8nvcRttTQTZeDjc...|
|dVy5EAV9YZIl9Xl-X...|    3|f8e1MH4YvIY1Km7W2...|
|dVy5EAV9YZIl9Xl-X...|    3|llifBVCFAnr124WdK...|
|dVy5EAV9YZIl9Xl-X...|    3|FiB1rfmgaED4mmHpO...|
|dVy5EAV9YZIl9Xl-X...|    3|lZaBsXK-vhxL1Ck8E...|
|dVy5EAV9YZIl9Xl-X...|    3|krBpN5vbCQrB54QvT...|
|dVy5EAV9YZIl9Xl-X...|    3|xxjxUM-VK4N33LN8N...|
|dVy5EAV9YZIl9Xl-X...|    3|qkjOhzGvUPSdKX7ss...|
|dVy5EAV9YZIl9Xl-X...|    3|2G_6PBM-klbh1u2v2...|
|dVy5EAV9YZIl9Xl-X...|    3|GD9mTnCht2bog2yb0...|
|dVy5EAV9YZIl9Xl-X...|    3|E0T-xQJXpM6Hsm-Ee...|
|dVy5EAV9YZIl9Xl-X...|    3|sPvjzXjzvGFwBLwtn...|
|dVy5EAV9YZIl9Xl-X...|    3|8u6NUtxSPH3CbLKTQ...|
