# Part 1) RDDs

## Data Loading

In [1]:
val Path= "hdfs:///user/dic24_shared/amazon-reviews/full/reviews_devset.json"
val data = spark.read.json(Path)
// Making the RDD 
// Category is the Key 
// reviewText is the value
val pairs = data.rdd.map{row=>
    val category = row.getAs[String] ("category")
    val text = row.getAs[String] ("reviewText")
    (category,text)}
// Reading stopWord into RDD string and convert it to an array
val stopwordsPath = "Exercise2/stopwords.txt"
val stopwords = sc.textFile(stopwordsPath).collect()

Intitializing Scala interpreter ...

Spark Web UI available at http://captain01.os.hpc.tuwien.ac.at:9999/proxy/application_1715326141961_2687
SparkContext available as 'sc' (version = 3.2.3, master = yarn, app id = application_1715326141961_2687)
SparkSession available as 'spark'


Path: String = hdfs:///user/dic24_shared/amazon-reviews/full/reviews_devset.json
data: org.apache.spark.sql.DataFrame = [asin: string, category: string ... 8 more fields]
pairs: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[9] at map at <console>:29
stopwordsPath: String = Exercise2/stopwords.txt
stopwords: Array[String] = Array(a, aa, able, about, above, absorbs, accord, according, accordingly, across, actually, after, afterwards, again, against, ain, album, album, all, allow, allows, almost, alone, along, already, also, although, always, am, among, amongst, an, and, another, any, anybody, anyhow, anyone, anything, anyway, anyways, anywhere, apart, app, appear, appreciate, appropriate, are, aren, around, as, aside, ask, asking, associated, at, available, away, awfully, ...


## Data Cleaning

In [2]:
// converting the values(reviewText) to lowercase
//spliting the reviewTexts into words and removing any non-alphabetic characters
//removing stopWords from ReviewTexts
val clean_reviews = pairs.mapValues(value=>value.toLowerCase
                                  .split("[^a-zA-Z]+")
                                  .filterNot(x => stopwords.contains(x.toLowerCase)).mkString(" ")).cache()
// the number of partition is the defualt number 
//we tried different partitions and incsreasing the number of partitions resulted in increasing the time duration 
clean_reviews.getNumPartitions

clean_reviews: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[12] at mapValues at <console>:28
res0: Int = 2


## Chi_2 Value Calculating

In [3]:
//counting number of each words per category
val categoryTermCount = clean_reviews.flatMapValues(x=>x.split(" ")).map(word => (word, 1)).reduceByKey ((x,y)=>x+y)
// making a tuple with key value of category and value of (word,number of that word in the respective category)
val result_A = categoryTermCount.map{case(k,v) => ( (k._2),( k._1, v))}
// Calculate a number of unique words in the whole reviews. The result is a tuple (word, frequency of word )
val countWords = clean_reviews.map{ case (key, value) =>  value }.flatMap (x=>x.split(" ")).map(word => (word, 1)).reduceByKey ((x,y)=>x+y)

categoryTermCount: org.apache.spark.rdd.RDD[((String, String), Int)] = ShuffledRDD[15] at reduceByKey at <console>:25
result_A: org.apache.spark.rdd.RDD[(String, (String, Int))] = MapPartitionsRDD[16] at map at <console>:27
countWords: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[20] at reduceByKey at <console>:29


In [4]:
// Do the join datasets per word column
val rddJoin = result_A.join(countWords)
// Prepare data for the following join per category. The data now represents a tuple (category (word, A,B))
val result_A_B = rddJoin.map{case (k,v)=> (v._1._1, (k, v._1._2, v._2-v._1._2)) }
// Calculate a frequency of each category
val countCategory = clean_reviews.map{ case (key, value) =>  (key,1)}.reduceByKey ((x,y)=>x+y)


rddJoin: org.apache.spark.rdd.RDD[(String, ((String, Int), Int))] = MapPartitionsRDD[23] at join at <console>:27
result_A_B: org.apache.spark.rdd.RDD[(String, (String, Int, Int))] = MapPartitionsRDD[24] at map at <console>:29
countCategory: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[26] at reduceByKey at <console>:31


In [5]:
// Join per category
val rddJoin2 = result_A_B.join(countCategory).persist
// Calculate the number of lines in the whole document

// Calculate A,B,C,D
// val A_B_C_D = rddJoin2.map{case (k,v)=> (k, (v._1._1, v._1._2, v._1._3, v._2-v._1._2,N - v._1._3 - v._2 )) }

rddJoin2: org.apache.spark.rdd.RDD[(String, ((String, Int, Int), Int))] = MapPartitionsRDD[29] at join at <console>:26


In [6]:
// Calculate chi_2 value using A,B,C,D,N
val N = clean_reviews.count().toInt
val chi2 = rddJoin2.map{case (k,v)=> {
    val A = v._1._2.toFloat
    val B = v._1._3.toFloat
    val C = v._2-v._1._2.toFloat
    val D = N - v._1._3 - v._2.toFloat
    val result = (N*(A*D-B*C)*(A*D-B*C))/((A+B)*(A+C)*(B+D)*(C+D))
    (k, (result,v._1._1))
}}
// Group the lines according to the key (=category) and sort according to the value of chi_2
val grouped = chi2.groupByKey().mapValues(tuple => tuple.toList.sortBy(-_._1))
// Extract the first 75 values in each category
val grouped_75 = grouped.mapValues(line=>line.take(75)).sortByKey()

N: Int = 78829
chi2: org.apache.spark.rdd.RDD[(String, (Float, String))] = MapPartitionsRDD[30] at map at <console>:27
grouped: org.apache.spark.rdd.RDD[(String, List[(Float, String)])] = MapPartitionsRDD[32] at mapValues at <console>:36
grouped_75: org.apache.spark.rdd.RDD[(String, List[(Float, String)])] = ShuffledRDD[36] at sortByKey at <console>:38


## Export

In [8]:
import scala.reflect.io.File

// Transforming and saving the output like the previous exercise
val output = grouped_75.map { case (category, terms) =>
  val formattedTerms = terms.map { case (term, chi2) =>
    s"$chi2:$term"
  }.mkString(" ")

  s"<$category> $formattedTerms"
}

// Save the output
val file = File("output_rdd.txt")
file.writeAll(output.collect().mkString("\n"))

// Extract terms and sort them alphabetically
val terms = grouped_75.flatMap { case (category, terms) =>
  terms.map { case (term, chi2) => chi2 }
}.distinct().collect().sorted

// Append the sorted terms to the file
file.appendAll("\n" + terms.mkString(" "))


import scala.reflect.io.File
output: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[42] at map at <console>:32
file: scala.reflect.io.File = output_rdd.txt
terms: Array[String] = Array(abq, acdelco, acne, acoustic, acre, acting, action, actor, actors, actress, acura, adapter, addario, addicted, addicting, addictive, ads, ai, aic, aidan, aimard, aired, airsoft, albums, alex, almonds, alpha, alternator, altima, ameda, amel, amino, ammo, amp, android, animated, animation, anime, answering, antenna, ants, apos, appetite, apple, applecare, apply, apps, aquarium, ar, arai, arch, arrangements, articulation, artisan, artist, artists, atdi, atv, audio, author, authors, avent, awesome, babies, babyface, back, backpacking, bag, bags, bait, baits, baking, ball, ballad, ballads, ballmount, ball...
