In [1]:
import $ivy.`org.apache.spark::spark-sql:3.1.1` 
import $ivy.`org.apache.spark::spark-mllib:3.1.1` 
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)
import org.apache.spark.sql._

val spark = {
  AmmoniteSparkSession.builder()
    .master("local[*]")
    .getOrCreate()
    
}

Loading spark-stubs
Getting spark JARs
Creating SparkSession


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties


[32mimport [39m[36m$ivy.$                                   
[39m
[32mimport [39m[36m$ivy.$                                     
[39m
[32mimport [39m[36morg.apache.log4j.{Level, Logger}
[39m
[32mimport [39m[36morg.apache.spark.sql._

[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@71f507a7

In [8]:
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.{HashingTF, OneHotEncoder, QuantileDiscretizer, StringIndexer, VectorAssembler}
import org.apache.spark.ml.regression.{FMRegressionModel, FMRegressor}
import org.apache.spark.mllib.evaluation.RankingMetrics
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.functions.{avg, col, collect_list, dayofmonth, lit, month, struct, to_timestamp, udf, year}
import org.apache.spark.sql.types.IntegerType

object Movies{

    

  val inputItemFeatureNames = """item_id | movie_title | release_date | 
                                video release date | IMDb URL | unknown |  
                                Action | Adventure | Animation | Children's  
                                | Comedy | Crime | Documentary | Drama | Fantasy 
                                | Film-Noir | Horror | Musical | Mystery | Romance 
                                | Sci-Fi | Thriller | War | Western""".split("\\|").map(_.trim)

  val itemFeatureNames = inputItemFeatureNames
      .filterNot(Array("release_date", "movie_title",
                       "video release date", "IMDb URL").contains(_)) ++ Seq("release_year", "release_month", "release_day")
  val items = readItems(inputItemFeatureNames).select(itemFeatureNames.map(col): _*).cache()

  val userFeatureNames = "user_id | age | gender | occupation | zip code".split("\\|").map(_.trim)
  val users = readUsers(userFeatureNames).cache()
    
  val usersTransformedFeatureNames = Array("user_id", "ageDiscretized", "genderIndex", "occupationIndex", "zip codeIndex")
  val usersTransformed = getUserPipeline
      .fit(users)
      .transform(users)
      .select(usersTransformedFeatureNames.map(col): _*)

  val train_ratings = readTrainRatings()
  val test_ratings = readTestRatings()
  val (train_upsampled, test_upsampled) = addSomeZeroRatings(train_ratings, test_ratings)

  val ratingsUpsampled = train_upsampled.union(test_upsampled)
  

  val data = ratingsUpsampled.join(items, "item_id").join(usersTransformed, "user_id")
  val featureNames = Array("user_id", "item_id") ++ itemFeatureNames.drop(1) ++ usersTransformedFeatureNames.drop(1)
  val pipeline = getPipeline(featureNames).fit(data)

  val train = pipeline.transform(train_upsampled.join(items, "item_id").join(usersTransformed, "user_id"))
  val test = pipeline.transform(readTestRatings.join(items, "item_id").join(usersTransformed, "user_id"))

  val (rankTrain, rankTest) = getTrainTestSets(train, test, ratingsUpsampled, usersTransformed, items, pipeline)
  
  def iter_factor_iter(factorizations: List[Int], iterations: List[Int]):  List[(Int, Int,
                                                                                 Double, Double, Double, Double, Double)] = {
     // param: принимает на вход массивы int для факторизации и итераций
      // возвращает результат и время тренировки для каждой итерации в виде листа
      var results: List[(Int, Int, Double, Double, Double, Double, Double)] = Nil
      for (iteration <- iterations) {
          for (factorization <- factorizations){
             results :+= main(factorization, iteration, 20)}
      }
      return results

  }
  
  def main(factorSize: Int = 30, maxIter: Int = 100, metricFor: Int = 20): (Int, Int, Double, Double, Double, Double, Double)  = {
    
    val t1 = System.nanoTime

    val model = trainModel(train, factorSize, maxIter)
      
    val duration = (System.nanoTime - t1) / 1e9d
    println("time of training " + duration + "\n")

    println("Training set:")

    val (train_precision, train_recall) = rankingMetrics(rankTrain, train, model, spark, metricFor)

    println("Testing set:")
    val (test_precision, test_recall) = rankingMetrics(rankTest, test, model, spark, metricFor)

    return (factorSize, maxIter, duration, train_precision, train_recall, test_precision, test_recall)
    
  }


  private def coldStartSplit(data: DataFrame, testSize: Double = 0.2, userColdStart: Boolean = true): (DataFrame, DataFrame) = {
    val column = if (userColdStart) "user_id" else "item_id"

    val unique = data.select(column).distinct()
    val Array(trainUniqueIds, testUniqueIds) = unique.randomSplit(Array(1 - testSize, testSize))

    val trainUnique = data.join(trainUniqueIds, column)
    val testUnique = data.join(testUniqueIds, column)

    (trainUnique, testUnique)
  }

  /** Adds negative samples. Approximately the same number of rows as in the ratings. */
  private def addSomeZeroRatings(train_ratings: DataFrame, test_ratings: DataFrame, frac: Double = 0.08): (DataFrame, DataFrame) = {
    val ratings = train_ratings.union(test_ratings)
    val userIds = ratings.select("user_id").distinct()
    val itemIds = ratings.select("item_id").distinct()
    

    val zeroRatings = userIds
      .join(itemIds)
      .join(ratings.select("user_id", "item_id"), Seq("user_id", "item_id"), "left_anti")
      .sample(frac)
      .withColumn("rating", lit(0))
    
    val Array(train_zeros, test_zeros) = zeroRatings.randomSplit(Array(0.8, 0.2))
    (train_ratings.union(train_zeros).cache(), test_ratings.union(test_zeros).cache())

  }

  /** Returns 2 DFs with cols: "user_id", "item_id", "features", "rating".
   * Train set - each user from train DF with each item
   * Test set - each user from test DF with each item
   * */
  private def getTrainTestSets(train: DataFrame,
                               test: DataFrame,
                               ratings: DataFrame,
                               users: DataFrame,
                               items: DataFrame,
                               pipeline: PipelineModel): (DataFrame, DataFrame) = {
      
    val trainUserIds = train.select("user_id").distinct()
    val testUserIds = test.select("user_id").distinct()

    val itemIds = ratings.select("item_id").distinct()

    val trainUsersItems = trainUserIds.join(itemIds)
    val testUsersItems = testUserIds
      .join(itemIds)
      .join(train, Seq("user_id", "item_id"), "left_anti")

    def getRankData(data: DataFrame): DataFrame = data
      .join(users, "user_id")
      .join(items, "item_id")
      .join(ratings, Array("user_id", "item_id"), "left")
      .na.fill(0)

    val trainRankData = getRankData(trainUsersItems)
    val testRankData = getRankData(testUsersItems)

    def transform(data: DataFrame): DataFrame =
      pipeline.transform(data).select("user_id", "item_id", "features", "rating")


    (transform(trainRankData), transform(testRankData).cache())
  }

  private def getPipeline(featureNames: Array[String]): Pipeline = {
    val oneHotEncoder = new OneHotEncoder()
      .setInputCols(featureNames)
      .setOutputCols(featureNames.map(_ + "OHE"))

    val assembler = new VectorAssembler()
      .setInputCols(featureNames.map(_ + "OHE"))
      .setOutputCol("features")

    new Pipeline().setStages(Array(oneHotEncoder, assembler))
  }

  private def getUserPipeline = {
    val columnNames = Array("gender", "occupation", "zip code")
    val stringIndexer = new StringIndexer()
      .setInputCols(columnNames)
      .setOutputCols(columnNames.map(_ + "Index"))

    val discretizer = new QuantileDiscretizer()
      .setInputCol("age")
      .setOutputCol("ageDiscretized")
      .setNumBuckets(10)

    new Pipeline().setStages(Array(stringIndexer, discretizer))
  }

  private def rankingMetrics(data: DataFrame, 
                             truthData: DataFrame, 
                             model: FMRegressionModel,
                             spark: SparkSession, metricFor: Int = 20): (Double, Double) = {
    val t1 = System.nanoTime

    import spark.implicits._

    val predictions = model.transform(data).select("user_id", "item_id", "prediction")

    val recommendations = getRecommendations(predictions)
    val truth = getUserWithItemsAsVector(truthData)

    val evalData = recommendations.join(truth, "user_id")

    val metrics = new RankingMetrics(evalData.select("prediction_items", "truth_items").as[(Array[Int], Array[Int])].rdd)
      
    val precision =  metrics.precisionAt(metricFor)
    val recall =  metrics.recallAt(metricFor)
    val MAP = metrics.meanAveragePrecisionAt(metricFor)

    println(s"precisionAt($metricFor) " + precision)
    println(s"recallAt($metricFor) " + recall)
    println(s"meanAveragePrecisionAt($metricFor) " + MAP)
      
    val duration = (System.nanoTime - t1) / 1e9d
    println("time of predictions " + duration + "\n")
    return (precision, recall)

  }

  /** DF: user_id, recommended_items as Vector.
   *
   * @param predictions DF: user_id, item_id, prediction_score */
  private def getRecommendations(predictions: DataFrame) = {
    val groupedPredictions = predictions.groupBy("user_id")
      .agg(collect_list(struct("prediction", "item_id")) as "rec_scores")


    val sortUdf = udf((l: Seq[Row]) => {
      l.map({ case Row(p: Double, s: Integer) => (p, s) }).sortBy(_._1)(Ordering[Double].reverse).map(_._2) //.take(20)
    })

    groupedPredictions.select(col("user_id"), sortUdf(col("rec_scores")) as "prediction_items")
  }

  /** DF: user_id, Vector of items which user rated */
  private def getUserWithItemsAsVector(truthData: DataFrame) = {
    truthData
      .where(col("rating") =!= lit(0))
      .groupBy("user_id").agg(collect_list("item_id") as "truth_items")
  }

  private def trainModel(dataAssembled: DataFrame, factorSize: Int = 30, maxIter: Int = 100): FMRegressionModel = {
    println(s"FactorSize = $factorSize, maxIter = $maxIter")

    val fm = new FMRegressor()
      .setLabelCol("rating")
      .setFeaturesCol("features")
      .setFactorSize(factorSize)
      .setMaxIter(maxIter)
      .setRegParam(0.005)
      .setStepSize(0.005)


    fm.fit(dataAssembled)
  }

 
    private def readUsers(userFeatureNames: Array[String]) = {
    spark.read
      .option("header", "false")
      .option("delimiter", "|")
      .option("inferSchema", "true")
      .csv("./u.user")
      .toDF(userFeatureNames: _*)
  }

  private def readItems(itemFeatureNames: Array[String]) = {
    spark.read
      .option("header", "false")
      .option("delimiter", "|")
      .option("inferSchema", "true")
      .csv("./u.item")
      .toDF(itemFeatureNames: _*)
      .na.fill("01-Jan-1995", Seq("release_date")) // Fill one missing with most frequent
      .withColumn("release_date", to_timestamp(col("release_date"), "d-MMM-yyyy"))
      .withColumn("release_year", year(col("release_date")))
      .withColumn("release_month", month(col("release_date")))
      .withColumn("release_day", dayofmonth(col("release_date")))
  }

  private def readRatings() = {
    spark.read
      .option("header", "false")
      .option("delimiter", "\t")
      .option("inferSchema", "true")
      .csv("./u.data")
      .toDF("user_id", "item_id", "rating", "timestamp")
  }
}
    
  private def readTrainRatings() = {
      spark.read
          .option("header", "true")
          .option("delimiter", "\t")
          .option("inferSchema", "true")
          .csv("./train_ratings.csv")
          .toDF()
  }

  private def readTestRatings() = {
      spark.read
          .option("header", "true")
          .option("delimiter", "\t")
            .option("inferSchema", "true")
          .csv("./test_ratings.csv")
          .toDF()
  }



[32mimport [39m[36morg.apache.spark.ml.{Pipeline, PipelineModel}
[39m
[32mimport [39m[36morg.apache.spark.ml.evaluation.RegressionEvaluator
[39m
[32mimport [39m[36morg.apache.spark.ml.feature.{HashingTF, OneHotEncoder, QuantileDiscretizer, StringIndexer, VectorAssembler}
[39m
[32mimport [39m[36morg.apache.spark.ml.regression.{FMRegressionModel, FMRegressor}
[39m
[32mimport [39m[36morg.apache.spark.mllib.evaluation.RankingMetrics
[39m
[32mimport [39m[36morg.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
[39m
[32mimport [39m[36morg.apache.spark.sql.functions.{avg, col, collect_list, dayofmonth, lit, month, struct, to_timestamp, udf, year}
[39m
[32mimport [39m[36morg.apache.spark.sql.types.IntegerType

[39m
defined [32mobject[39m [36mMovies[39m

In [9]:
val my_results = Movies.iter_factor_iter(List(10, 20, 30, 40, 50, 100), List(50, 80, 100))

FactorSize = 10, maxIter = 50
time of training 102.2901883

Training set:
precisionAt(20) 0.3097560975609755
recallAt(20) 0.09924636029606941
meanAveragePrecisionAt(20) 0.18715874805414864
time of predictions 218.1152454

Testing set:
precisionAt(20) 0.1513797634691196
recallAt(20) 0.13180389343084778
meanAveragePrecisionAt(20) 0.09270380613137973
time of predictions 198.2461726

FactorSize = 20, maxIter = 50
time of training 125.0533345

Training set:
precisionAt(20) 0.31521739130434784
recallAt(20) 0.10522695989101864
meanAveragePrecisionAt(20) 0.19241865015101803
time of predictions 207.0225407

Testing set:
precisionAt(20) 0.15006570302233904
recallAt(20) 0.13997009622779832
meanAveragePrecisionAt(20) 0.09277988359273334
time of predictions 9.0094201

FactorSize = 30, maxIter = 50
time of training 166.0967571

Training set:
precisionAt(20) 0.3586956521739132
recallAt(20) 0.1281836452004867
meanAveragePrecisionAt(20) 0.22035938138218672
time of predictions 207.87276

Testing set:
pr

[36mmy_results[39m: [32mList[39m[([32mInt[39m, [32mInt[39m, [32mDouble[39m, [32mDouble[39m, [32mDouble[39m, [32mDouble[39m, [32mDouble[39m)] = [33mList[39m(
  (
    [32m10[39m,
    [32m50[39m,
    [32m102.2901883[39m,
    [32m0.3097560975609755[39m,
    [32m0.09924636029606941[39m,
    [32m0.1513797634691196[39m,
    [32m0.13180389343084778[39m
  ),
  (
    [32m20[39m,
    [32m50[39m,
    [32m125.0533345[39m,
    [32m0.31521739130434784[39m,
    [32m0.10522695989101864[39m,
    [32m0.15006570302233904[39m,
    [32m0.13997009622779832[39m
  ),
  (
    [32m30[39m,
    [32m50[39m,
    [32m166.0967571[39m,
    [32m0.3586956521739132[39m,
    [32m0.1281836452004867[39m,
    [32m0.17588699080157696[39m,
    [32m0.17165916455568409[39m
  ),
  (
    [32m40[39m,
    [32m50[39m,
    [32m213.4030248[39m,
    [32m0.33080593849416756[39m,
    [32m0.115419734691425[39m,
    [32m0.16215505913272008[39m,
    [32m0.1572660749792

In [10]:
val data = spark.createDataFrame(my_results)

[36mdata[39m: [32mDataFrame[39m = [_1: int, _2: int ... 5 more fields]

In [11]:
data.coalesce(1).write.csv("test_sparkFM.csv")