In [0]:
import org.apache.spark.ml.feature.{Word2Vec, Word2VecModel}

In [1]:
val df = spark.read.format("parquet").load("yelp")

In [2]:
df.show

In [3]:
val starDf = df.select("stars")
starDf.show

In [4]:
spark.sql("USE jl11998")
spark.sql("CREATE TABLE yelp_review (user_id STRING, business_id STRING, review_id STRING, word2vec array<FLOAT>, stars FLOAT, date STRING, timestamp INT)")

In [5]:
df.write.mode("overwrite").saveAsTable("yelp_review_tbl")

In [6]:
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS

In [7]:
val ratings = df.select("user_id", "business_id", "stars", "timestamp")
ratings.show()

In [8]:
ratings.select("user_id").distinct().count()

In [9]:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions
import org.apache.spark.sql.types.{StructField,StructType,IntegerType, LongType}
import org.apache.spark.sql.Row

def generate_index(df: DataFrame, col: String, new_col_name: String):DataFrame = {
    val df_item = df.select(col).distinct()
    val new_schema = StructType(df_item.schema.fields ++ Array(StructField(new_col_name, LongType, nullable = false)))
    val df_rdd = df_item.rdd.zipWithIndex()
    spark.createDataFrame(df_rdd.map{case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, new_schema)    
}

In [10]:
val ratings_user = ratings.select("user_id").distinct()
val ratings_user_with_index = generate_index(ratings_user, "user_id", "user_index")
ratings_user_with_index.show()

In [11]:
val r1 = ratings.join(ratings_user_with_index, "user_id")
r1.show()

In [12]:
r1.count()

In [13]:
r1.select("user_index").distinct().count()

In [14]:
val ratings_item = ratings.select("business_id").distinct()
val ratings_item_with_index = generate_index(ratings_item, "business_id", "business_index")
ratings_item_with_index.show()

In [15]:
val ratings_review = r1.join(ratings_item_with_index, "business_id")
ratings_review.show()

In [16]:
ratings_review.write
        .format("parquet")
        .mode("overwrite")
        .option("compression", "snappy")
        .save("yelp_preprocess/yelp_review_with_ub_index")

In [17]:
val Array(training, test) = ratings_review.randomSplit(Array(0.8, 0.2))

val als = new ALS()
        .setMaxIter(5)
        .setRegParam(0.01)
        .setUserCol("user_index")
        .setItemCol("business_index")
        .setRatingCol("stars")

val model = als.fit(training)

In [18]:
model.setColdStartStrategy("drop")
val pred = model.transform(test)

In [19]:
pred.show()

In [20]:
import org.apache.spark.ml.evaluation.RegressionEvaluator

val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("stars").setPredictionCol("prediction")
val rmse = evaluator.evaluate(pred)
println(s"RMSE = $rmse")

In [21]:
val itemRec = model.recommendForAllItems(10)
itemRec.show()