In [27]:
import org.apache.spark.sql.{SparkSession, Row}
import org.apache.spark.rdd.RDD
import org.apache.spark.broadcast.Broadcast
import scala.util.matching.Regex
import scala.util.parsing.json._

import org.apache.spark.sql.{SparkSession, Row}
import org.apache.spark.rdd.RDD
import org.apache.spark.broadcast.Broadcast
import scala.util.matching.Regex
import scala.util.parsing.json._


In [28]:
%%init_spark --stderr
launcher.conf.spark.app.name = "DIC"

In [29]:
println("spark.app.name: " + spark.conf.get("spark.app.name"))
println("spark.master: " + spark.conf.get("spark.master"))
println("spark.submit.deployMode: " + spark.conf.get("spark.submit.deployMode"))
println("sc.defaultParallelism: " + sc.defaultParallelism)

spark.app.name: spylon-kernel
spark.master: yarn
spark.submit.deployMode: client
sc.defaultParallelism: 2


In [30]:
val stopwords = sc.broadcast(sc.textFile("hdfs:///user/e12439016/stopwords.txt")  
  .collect() 
  .toSet)    
val DELIMS = "[\\s()\\[\\]{}.!?,;:+=\\-_\"'`~#@&*%€$§\\/\\d]+"
def tokenize(text: String): Set[String] = {
  text.toLowerCase.split(DELIMS)
    .filter(token => token.length > 1 && !stopwords.value.contains(token))
    .toSet
}

stopwords: org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Set[String]] = Broadcast(37)
DELIMS: String = [\s()\[\]{}.!?,;:+=\-_"'`~#@&*%€$§\/\d]+
tokenize: (text: String)Set[String]


In [31]:
val df = spark.read.json("hdfs:///user/dic25_shared/amazon-reviews/full/reviews_devset.json")
val reviews = df.rdd.filter(row => row.getAs[String]("reviewText") != null)

df: org.apache.spark.sql.DataFrame = [asin: string, category: string ... 8 more fields]
reviews: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[65] at filter at <console>:51


In [32]:

// (1) (category, reviewText) 
val catText = reviews.map(row => {
  val text = row.getAs[String]("reviewText")
  val cat = row.getAs[String]("category")
  (cat, text)
})

// (2) ((term, category), 1)
val termCatPairs = catText.flatMap { case (cat, text) =>
  text.toLowerCase.split(DELIMS)
    .filter(token => token.length > 1 && !stopwords.value.contains(token))
    .distinct
    .map(token => ((token, cat), 1))
}
// (3) term per category
val termCatCounts = termCatPairs.reduceByKey(_ + _)  // ((term, cat), count)

// (4) group by term
val termGrouped = termCatCounts
  .map { case ((term, cat), count) => (term, (cat, count)) }
  .groupByKey()

// (5) document per category
val docsPerCat = catText.map { case (cat, _) => (cat, 1) }
  .reduceByKey(_ + _)  // (category, count)
val totalDocs = docsPerCat.values.sum().toLong

// (6) Broadcast 
val bcDocsPerCat = sc.broadcast(docsPerCat.collectAsMap())
val bcTotalDocs = sc.broadcast(totalDocs)

// (7) global term count
val globalTermCounts = termCatPairs.map { case ((term, _), count) => (term, count) }
  .reduceByKey(_ + _)  // (term, count)
val bcGlobalTermCounts = sc.broadcast(globalTermCounts.collectAsMap())

catText: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[66] at map at <console>:60
termCatPairs: org.apache.spark.rdd.RDD[((String, String), Int)] = MapPartitionsRDD[67] at flatMap at <console>:67
termCatCounts: org.apache.spark.rdd.RDD[((String, String), Int)] = ShuffledRDD[68] at reduceByKey at <console>:74
termGrouped: org.apache.spark.rdd.RDD[(String, Iterable[(String, Int)])] = ShuffledRDD[70] at groupByKey at <console>:78
docsPerCat: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[72] at reduceByKey at <console>:83
totalDocs: Long = 78829
bcDocsPerCat: org.apache.spark.broadcast.Broadcast[scala.collection.Map[String,Int]] = Broadcast(44)
bcTotalDocs: org.apache.spark.broadcast.Broadcast[Long] = Broadcast(45)
globalTermCounts: org.apache.spark.rdd.RDD[(String, ...


In [33]:
val termCatCount = termCatPairs.distinct().count()
println(s"(term, category): $termCatCount")


(term, category): 272573


termCatCount: Long = 272573


In [34]:
val uniqueTermsCount = termCatPairs.map { case ((term, _), _) => term }.distinct().count()
println(s"$uniqueTermsCount")


96136


uniqueTermsCount: Long = 96136


In [35]:
termCatCounts.take(10).foreach(println)

((mezzos,Movies_and_TV),1)
((huge,Cell_Phones_and_Accessorie),35)
((goodwill,Electronic),2)
((lit,Tools_and_Home_Improvement),13)
((alittle,Health_and_Personal_Care),2)
((aspca,Pet_Supplie),1)
((alwasy,Book),1)
((instructor,Movies_and_TV),11)
((scales,Digital_Music),1)
((upright,Automotive),2)


In [36]:
docsPerCat.take(10).foreach(println)


(Pet_Supplie,1235)
(Movies_and_TV,4607)
(Electronic,7825)
(Office_Product,1243)
(Musical_Instrument,500)
(Cell_Phones_and_Accessorie,3447)
(Clothing_Shoes_and_Jewelry,5749)
(Tools_and_Home_Improvement,1926)
(Baby,916)
(Patio_Lawn_and_Garde,994)


In [37]:
globalTermCounts.take(10).foreach(println)

(vecindad,1)
(bone,92)
(glorifying,5)
(rollon,1)
(folan,1)
(nothin,11)
(hascreated,1)
(festivites,1)
(hem,25)
(chipset,10)


In [38]:
val chiSquares = termGrouped.flatMap { case (term, catCountsIterable) =>
  val catCounts = catCountsIterable.toMap
  val df_t = bcGlobalTermCounts.value.getOrElse(term, 0)
  val N = bcTotalDocs.value

  bcDocsPerCat.value.keys.map { cat =>
    val A = catCounts.getOrElse(cat, 0).toDouble
    val B = df_t - A
    val C = bcDocsPerCat.value(cat) - A
    val D = N - A - B - C

    val numerator = math.pow((A * D - B * C), 2)
    val denominator = (A + C) * (B + D) * (A + B) * (C + D)

    val chi2 = if (denominator > 0) N * numerator / denominator else 0.0
    ((term, cat), chi2)
  }
}

chiSquares.take(10).foreach(println)


((vecindad,Office_Product),0.016021134850471113)
((vecindad,Electronic),0.11020645691451192)
((vecindad,Cell_Phones_and_Accessorie),0.045727676741356194)
((vecindad,Tools_and_Home_Improvement),0.025044854335235418)
((vecindad,Clothing_Shoes_and_Jewelry),0.07866821197233505)
((vecindad,Apps_for_Android),0.034623951191953765)
((vecindad,Patio_Lawn_and_Garde),0.012770766489787144)
((vecindad,Beauty),0.0263394222257354)
((vecindad,Movies_and_TV),16.110905484911143)
((vecindad,Grocery_and_Gourmet_Food),0.01672878880390582)


chiSquares: org.apache.spark.rdd.RDD[((String, String), Double)] = MapPartitionsRDD[84] at flatMap at <console>:54


In [39]:
val byCategory = chiSquares.map { case ((term, category), chi2) =>
  (category, (chi2, term))
}

val top75PerCategory = byCategory
  .groupByKey() 
  .mapValues { iter =>
    iter.toSeq.sortBy(-_._1).take(75)  
  }
  .collect()  
  .toList  
  .sortBy(_._1) 

val formatted = top75PerCategory.map { case (category, topTerms) =>
  val termsStr = topTerms.map { case (chi2, term) => s"$term:$chi2" }.mkString(" ")
  s"<$category> $termsStr"
}

formatted.take(10).foreach(println)

val allTerms = top75PerCategory.flatMap { case (_, topTerms) =>
  topTerms.map { case (_, term) => term }
}.distinct.sorted  


<Apps_for_Android> games:3081.1493374842926 play:2158.369406820129 graphics:1505.51089773515 kindle:1470.820942569012 addictive:1311.9055627277771 challenging:1038.1284558527927 coins:1002.6647889526382 addicting:990.8441134974867 fire:956.1470053110604 levels:825.3813282736016 playing:692.9340396014182 ads:642.3969794099202 puzzles:596.7716753070063 apps:548.7810653104153 free:500.9884786241356 bingo:409.2358492981346 mahjong:322.00891943980963 download:303.8649278202287 faotd:288.8577201586641 facebook:282.5170543702901 downloaded:262.77022492215735 hints:242.61029019440056 solitaire:211.6429957838186 android:211.58105849598613 puzzle:198.85582217352504 gameplay:198.5123356770461 freezes:189.67737127837006 unlock:185.7521008338788 played:180.39650447458513 deleted:179.2243589462116 bought:174.4587211734982 flappy:173.30583696524425 upgrades:168.99856742047186 awesome:155.2100816653206 tablet:155.13822220891723 price:149.5959088208227 calculator:148.95756302858823 developer:148.377465

byCategory: org.apache.spark.rdd.RDD[(String, (Double, String))] = MapPartitionsRDD[85] at map at <console>:53
top75PerCategory: List[(String, Seq[(Double, String)])] = List((Apps_for_Android,List((3081.1493374842926,games), (2158.369406820129,play), (1505.51089773515,graphics), (1470.820942569012,kindle), (1311.9055627277771,addictive), (1038.1284558527927,challenging), (1002.6647889526382,coins), (990.8441134974867,addicting), (956.1470053110604,fire), (825.3813282736016,levels), (692.9340396014182,playing), (642.3969794099202,ads), (596.7716753070063,puzzles), (548.7810653104153,apps), (500.9884786241356,free), (409.2358492981346,bingo), (322.00891943980963,mahjong), (303.8649278202287,download), (288.8577201586641,faotd), (282.5170543702901,facebook), (262.77022492215735,downloaded)...


In [40]:
import java.io._
val file = new File("output_rdd.txt")
val bw = new BufferedWriter(new FileWriter(file))

formatted.foreach { line =>
  bw.write(line + "\n")
}

bw.write(allTerms.mkString(" "))
bw.close()


import java.io._
file: java.io.File = output_rdd.txt
bw: java.io.BufferedWriter = java.io.BufferedWriter@518eae80
