# Big Data Project - Job 2 Optimized

In [1]:
import org.apache.spark

Intitializing Scala interpreter ...

Spark Web UI available at http://host.docker.internal:4045
SparkContext available as 'sc' (version = 3.5.1, master = local[*], app id = local-1742551388696)
SparkSession available as 'spark'


import org.apache.spark


In [2]:
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

val sparkSession = SparkSession.builder().getOrCreate()

/**
 * This method creates an RDD with rows [asin, reviewText, overall, category, summary]
 * given the path of a csv file
 * */
def create(path: String, spark: SparkSession):  RDD[(String, String, Double, String, String)] = {
  spark.read
    .option("header", "true")
    .option("quote", "\"")
    .option("delimiter", ",")
    .option("multiline", "true")
    .option("escape", "\"")
    .csv(path).rdd
    .map(row => {
      val asin = row.getAs[String]("asin")
      val reviewText = row.getAs[String]("reviewText")
      val overall = try {
        row.getAs[String]("overall").toDouble
      } catch {
        case e: Exception => 0.0
      }
      val category = row.getAs[String]("category")
      val summary = row.getAs[String]("summary")

      (asin, reviewText, overall, category, summary)
    })}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
sparkSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@253cb773
create: (path: String, spark: org.apache.spark.sql.SparkSession)org.apache.spark.rdd.RDD[(String, String, Double, String, String)]


In [3]:
/**
 * This method cleans a string substituting all the special characters except from ' and
 * the multiple blank spaces with a blank space. It also trims the string.
 * */
def cleanString(s: String): String = {
  s.toLowerCase()
    .replaceAll("[^a-zA-z0-9 ']", " ")
    .replaceAll("\\[", " ")
    .replaceAll("\\]", " ")
    .replaceAll("\\s+", " ")
    .trim()
}

cleanString: (s: String)String


In [4]:
import org.apache.spark.HashPartitioner
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK

val rddReviewAppliances = create("../../../../dataset/Appliances_5_part0.csv", sparkSession)
val rddReviewSoftware = create("../../../../dataset/Software_5_part0.csv", sparkSession)

val p = new HashPartitioner(36)

// union of the three Rdds
val rddUnion = rddReviewAppliances
  .union(rddReviewSoftware)
  // remove review and category column, clean summary string
  .map({case (id, review, rating, category, summary) => (id, rating, cleanString(summary))})
  // remove rows where the cleaned string is empty
  .filter(x => x._3 != "")
  // map id as key, replace summary with the words
  .map(x => (x._1, (x._2, x._3.split(" "))))

import org.apache.spark.HashPartitioner
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
rddReviewAppliances: org.apache.spark.rdd.RDD[(String, String, Double, String, String)] = MapPartitionsRDD[9] at map at <console>:41
rddReviewSoftware: org.apache.spark.rdd.RDD[(String, String, Double, String, String)] = MapPartitionsRDD[19] at map at <console>:41
p: org.apache.spark.HashPartitioner = org.apache.spark.HashPartitioner@24
rddUnion: org.apache.spark.rdd.RDD[(String, (Double, Array[String]))] = MapPartitionsRDD[23] at map at <console>:45


In [5]:
// create filter based on rating and number of reviews
val filter = rddUnion
    // remove words column
    .map({case (id, (rating, words)) => (id, rating)})
    // aggregate by id to compute the sum of all the ratings for the product and count the number of reviews
    .aggregateByKey((0.0, 0.0))((a, r) => (a._1 + r, a._2 + 1), (a1, a2) => (a1._1 + a2._1, a1._2 + a2._2))
    // compute the average rating, keep the number of reviews
    .map({case (id, (ratingSum, ratingNumber)) => (id, (ratingSum/ratingNumber, ratingNumber))})
    // filter to keep products with high avg rating but few reviews
    .filter(x => x._2._1 >= 4 && x._2._2 < 10)

filter: org.apache.spark.rdd.RDD[(String, (Double, Double))] = MapPartitionsRDD[27] at filter at <console>:38


In [6]:
 val filteredRdd = rddUnion
  // remove rating column
  .map({case (id, (rating, words)) => (id, words)})
  // optimization
  .partitionBy(p)
  // join used to filter the original Rdd
  .join(filter)
  // optimization
  .persist(MEMORY_AND_DISK)

filteredRdd: org.apache.spark.rdd.RDD[(String, (Array[String], (Double, Double)))] = MapPartitionsRDD[32] at join at <console>:37


In [7]:
// now we work with the filter Rdd to count how many times each single word appears in all the summaries
val wordFreq = filteredRdd
    // flat map is used to create a Rdd where each tuple consist of a word and the number 1
    .flatMap({case (_, (summary, _))=>
      summary
        .map(x=>(x, 1.0))})
    // now it is possible to count the occurrences of each word
    .reduceByKey(_+_)

wordFreq: org.apache.spark.rdd.RDD[(String, Double)] = ShuffledRDD[34] at reduceByKey at <console>:36


In [10]:
// now we work with the filter Rdd to count how many times each single word appears in the summaries of a single product
val wordFreqPerReview = filteredRdd
    .flatMap({case (id, (summary, _))=>
      summary
        .map(x=>((id, x), 1.0))})
    .reduceByKey(_ + _)
    // map because we need the words as key for the join
    .map({case ((id, word), count) => (word, (id, count))})
    // join to obtain the total frequency of each word
    .join(wordFreq)
    // compute the ratio between the frequency of the word in the reviews of a product and the total frequency
    .map({case (word, ((id, count), tot)) => (word, (id, count/tot))})
    // we use mapValues to place the values inside a List, we do this so that, later, it is possible to
    // put together a list of all the products the word appears in, together with the ratio computed in the previous map
    .mapValues(x => List(x))
    // we create the list appending the values for the same word (key)
    .reduceByKey((a, b) => a ++ b)
    // map to change the key to the number of products a word appears in, and sort based on it
    .map(x => (x._2.size, (x._1, x._2)))
    .sortByKey(ascending = false)
    // we keep only the words that appear in multiple products
    .filter(x => x._1 > 10)
    // map to write as DF on file
    .map(x => (x._1, x._2._1, x._2._2.toString()))

wordFreqPerReview: org.apache.spark.rdd.RDD[(Int, String, String)] = MapPartitionsRDD[64] at map at <console>:53


In [11]:
wordFreqPerReview.collect()

res1: Array[(Int, String, String)] = Array((89,stars,List((B0056I99WG,0.020202020202020204), (B00MGMWTQS,0.020202020202020204), (B00NG7K2RA,0.015151515151515152), (B01637RISK,0.015151515151515152), (B001GL6QDM,0.010101010101010102), (B001DPFP88,0.020202020202020204), (B00GRFIIHO,0.015151515151515152), (B005FDK7J6,0.005050505050505051), (B002U0L1BU,0.005050505050505051), (B00W4YKCGC,0.020202020202020204), (B015IHWAZW,0.025252525252525252), (B00FFINRG6,0.005050505050505051), (B008SCNCTI,0.020202020202020204), (B00PG8FWS6,0.015151515151515152), (B00132DENO,0.005050505050505051), (B004E564PW,0.005050505050505051), (B004I49NJ8,0.005050505050505051), (B0043T8K8I,0.010101010101010102), (B00DM8KQ2Y,0.015151515151515152), (B00E6LI5NI,0.010101010101010102), (B00ENFYLOO,0.005050505050505051), (B00...
