In [1]:
import org.apache.spark.ml.feature.{RegexTokenizer, StopWordsRemover, CountVectorizer, CountVectorizerModel, IDF, StringIndexer, ChiSqSelector}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import scala.collection.mutable.ListBuffer
import java.io.{File, PrintWriter}

Intitializing Scala interpreter ...

Spark Web UI available at http://c100.local:8088/proxy/application_1596895008206_25693
SparkContext available as 'sc' (version = 2.4.0-cdh6.3.2, master = yarn, app id = application_1596895008206_25693)
SparkSession available as 'spark'


import org.apache.spark.ml.feature.{RegexTokenizer, StopWordsRemover, CountVectorizer, CountVectorizerModel, IDF, StringIndexer, ChiSqSelector}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import scala.collection.mutable.ListBuffer
import java.io.{File, PrintWriter}


# Load Data

In [2]:
val DEVSET = "hdfs:///user/pknees/amazon-reviews/full/reviews_devset.json"

DEVSET: String = hdfs:///user/pknees/amazon-reviews/full/reviews_devset.json


In [3]:
// load data into DataFrame and select relevant columns
val reviewsDf = spark.read.json(DEVSET).select("category", "reviewText")

reviewsDf: org.apache.spark.sql.DataFrame = [category: string, reviewText: string]


# Encode Category

In [4]:
// the category has to be encoded as index for the ChiSqSelector to work
val indexer = new StringIndexer()
    .setInputCol("category")
    .setOutputCol("label")

indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_ff3bfc3b02db


# Tokenize

In [5]:
// the tokenizer creates unigrams from the given regex pattern
// further case folding and removing of tokens with less than two characters is done
val tokenizer = new RegexTokenizer()
    .setInputCol("reviewText")
    .setOutputCol("tokensRaw")
    .setPattern("[ \t0123456789.!?,;:()\\[\\]{}\\-_\"'`~#&*%$\\\\/]+")
    .setToLowercase(true)
    .setMinTokenLength(2)

tokenizer: org.apache.spark.ml.feature.RegexTokenizer = regexTok_f039a03b4583


# Stop Words Removal

In [6]:
// remove stop words which should be ignored for text classification
val stopWordsRemover = new StopWordsRemover()
    .setInputCol("tokensRaw")
    .setOutputCol("tokens")
    .setStopWords(Array("a", "aa", "able", "about", "above", "according", "accordingly", "across", "actually", "after", "afterwards", "again", "against", "ain", "all", "allow", "allows", "almost", "alone", "along", "already", "also", "although", "always", "am", "among", "amongst", "an", "and", "another", "any", "anybody", "anyhow", "anyone", "anything", "anyway", "anyways", "anywhere", "apart", "appear", "appreciate", "appropriate", "are", "aren", "around", "as", "aside", "ask", "asking", "associated", "at", "available", "away", "awfully", "b", "bb", "be", "became", "because", "become", "becomes", "becoming", "been", "before", "beforehand", "behind", "being", "believe", "below", "beside", "besides", "best", "better", "between", "beyond", "bibs", "book", "both", "brief", "but", "by", "c", "came", "can", "cannot", "cant", "car", "cause", "causes", "cd", "certain", "certainly", "changes", "clearly", "co", "com", "come", "comes", "concerning", "consequently", "consider", "considering", "contain", "containing", "contains", "corresponding", "could", "couldn", "course", "currently", "d", "definitely", "described", "despite", "did", "didn", "different", "do", "does", "doesn", "doing", "don", "done", "down", "downwards", "during", "e", "each", "edu", "eg", "eight", "either", "else", "elsewhere", "enough", "entirely", "especially", "et", "etc", "even", "ever", "every", "everybody", "everyone", "everything", "everywhere", "ex", "exactly", "example", "except", "f", "far", "few", "fifth", "first", "five", "followed", "following", "follows", "for", "former", "formerly", "forth", "four", "from", "further", "furthermore", "g", "game", "game", "get", "gets", "getting", "given", "gives", "go", "goes", "going", "gone", "got", "gotten", "greetings", "h", "had", "hadn", "happens", "hardly", "has", "hasn", "have", "haven", "having", "he", "hello", "help", "hence", "her", "here", "hereafter", "hereby", "herein", "hereupon", "hers", "herself", "hi", "him", "himself", "his", "hither", "hopefully", "how", "howbeit", "however", "i", "ie", "if", "ignored", "immediate", "in", "inasmuch", "inc", "indeed", "indicate", "indicated", "indicates", "inner", "insofar", "instead", "into", "inward", "is", "isn", "it", "its", "itself", "j", "just", "k", "keep", "keeps", "kept", "know", "known", "knows", "l", "last", "lately", "later", "latter", "latterly", "least", "less", "lest", "let", "life", "like", "liked", "likely", "little", "ll", "look", "looking", "looks", "ltd", "m", "mainly", "many", "may", "maybe", "me", "mean", "meanwhile", "merely", "might", "mon", "more", "moreover", "most", "mostly", "much", "must", "my", "myself", "n", "name", "namely", "nd", "near", "nearly", "necessary", "need", "needs", "neither", "never", "nevertheless", "new", "next", "nine", "no", "nobody", "non", "none", "noone", "nor", "normally", "not", "nothing", "novel", "now", "nowhere", "o", "obviously", "of", "off", "often", "oh", "ok", "okay", "old", "on", "once", "one", "ones", "only", "onto", "or", "other", "others", "otherwise", "ought", "our", "ours", "ourselves", "out", "outside", "over", "overall", "own", "p", "particular", "particularly", "per", "perhaps", "placed", "please", "plus", "possible", "presumably", "probably", "provides", "q", "que", "quite", "qv", "r", "rather", "rd", "re", "really", "reasonably", "regarding", "regardless", "regards", "relatively", "respectively", "right", "s", "said", "same", "saw", "say", "saying", "says", "second", "secondly", "see", "seeing", "seem", "seemed", "seeming", "seems", "seen", "self", "selves", "sensible", "sent", "serious", "seriously", "seven", "several", "shall", "she", "should", "shouldn", "since", "six", "so", "some", "somebody", "somehow", "someone", "something", "sometime", "sometimes", "somewhat", "somewhere", "soon", "sorry", "specified", "specify", "specifying", "still", "sub", "such", "sup", "sure", "t", "take", "taken", "tell", "tends", "th", "than", "thank", "thanks", "thanx", "that", "that", "thats", "the", "their", "theirs", "them", "themselves", "then", "thence", "there", "there", "thereafter", "thereby", "therefore", "therein", "theres", "thereupon", "these", "they", "think", "third", "this", "thorough", "thoroughly", "those", "though", "three", "through", "throughout", "thru", "thus", "to", "together", "too", "took", "toward", "towards", "tried", "tries", "truly", "try", "trying", "twice", "two", "u", "un", "under", "unfortunately", "unless", "unlikely", "until", "unto", "up", "upon", "us", "use", "used", "useful", "uses", "using", "usually", "v", "value", "various", "ve", "very", "via", "viz", "vs", "want", "wants", "was", "wasn", "way", "we", "welcome", "well", "went", "were", "weren", "what", "whatever", "when", "whence", "whenever", "where", "whereafter", "whereas", "whereby", "wherein", "whereupon", "wherever", "whether", "which", "while", "whither", "who", "whoever", "whole", "whom", "whose", "why", "will", "willing", "wish", "with", "within", "without", "won", "wonder", "would", "wouldn", "x", "y", "yes", "yet", "you", "your", "yours", "yourself", "yourselves", "z", "zero"))

stopWordsRemover: org.apache.spark.ml.feature.StopWordsRemover = stopWords_36160cc47a7c


# TF-IDF calculation

## TF with CountVectorizer

In [7]:
// the first part of TF-IDF is to calculate the term frequency 
// for this we use the CountVectorizer and not HashingTF althoug it is not as performant
// because we need access to the vocabulary later
val countVectorizer = new CountVectorizer()
    .setInputCol("tokens")
    .setOutputCol("featuresRaw")

countVectorizer: org.apache.spark.ml.feature.CountVectorizer = cntVec_7c8dcae98a51


## IDF

In [8]:
// now the term frequency is multiplyed by the inverse document frequency using the IDF class
val idf = new IDF()
    .setInputCol("featuresRaw")
    .setOutputCol("featuresWeighted")

idf: org.apache.spark.ml.feature.IDF = idf_ae58982393a2


# ChiSqSelector

In [9]:
// last Chi-Squared feature selection is used to extract the top 4000 tokens overall
val selector = new ChiSqSelector()
    .setNumTopFeatures(4000)
    .setLabelCol("label")
    .setFeaturesCol("featuresWeighted")
    .setOutputCol("features")

selector: org.apache.spark.ml.feature.ChiSqSelector = chiSqSelector_5fd8fc74f524


# Extract top 4000 features

In [10]:
// now we run all steps needed for the feature creation and selection
val indexDf = indexer.fit(reviewsDf).transform(reviewsDf)
val tokenizedDf = tokenizer.transform(indexDf)
val tokenizedFilteredDf = stopWordsRemover.transform(tokenizedDf)
val cvModel = countVectorizer.fit(tokenizedFilteredDf)
val tfDf = cvModel.transform(tokenizedFilteredDf)
val tfIdfDf = idf.fit(tfDf).transform(tfDf)
val cSqModel = selector.fit(tfIdfDf)
val topFeaturesDf = cSqModel.transform(tfIdfDf)

indexDf: org.apache.spark.sql.DataFrame = [category: string, reviewText: string ... 1 more field]
tokenizedDf: org.apache.spark.sql.DataFrame = [category: string, reviewText: string ... 2 more fields]
tokenizedFilteredDf: org.apache.spark.sql.DataFrame = [category: string, reviewText: string ... 3 more fields]
cvModel: org.apache.spark.ml.feature.CountVectorizerModel = cntVec_7c8dcae98a51
tfDf: org.apache.spark.sql.DataFrame = [category: string, reviewText: string ... 4 more fields]
tfIdfDf: org.apache.spark.sql.DataFrame = [category: string, reviewText: string ... 5 more fields]
cSqModel: org.apache.spark.ml.feature.ChiSqSelectorModel = chiSqSelector_5fd8fc74f524
topFeaturesDf: org.apache.spark.sql.DataFrame = [category: string, reviewText: string ... 6 more fields]


In [11]:
// our final DataFrame contains all intermediate calculations as well as the final 4000 top features
topFeaturesDf.show()

+--------------------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|            category|          reviewText|class|           tokensRaw|              tokens|         featuresRaw|    featuresWeighted|            features|
+--------------------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|Patio_Lawn_and_Garde|This was a gift f...| 18.0|[this, was, gift,...|[gift, husband, m...|(96489,[2,3,4,10,...|(96489,[2,3,4,10,...|(4000,[2,3,4,10,1...|
|Patio_Lawn_and_Garde|This is a very ni...| 18.0|[this, is, very, ...|[nice, spreader, ...|(96489,[0,1,4,25,...|(96489,[0,1,4,25,...|(4000,[0,1,4,25,4...|
|Patio_Lawn_and_Garde|The metal base wi...| 18.0|[the, metal, base...|[metal, base, hos...|(96489,[7,13,34,2...|(96489,[7,13,34,2...|(4000,[7,13,189,3...|
|Patio_Lawn_and_Garde|For the most part...| 18.0|[for, the, most, ...|

In [12]:
// as only indexes are saved in the features we need the original vocabulary to extract the tokens as strings
val vocabulary = cvModel.vocabulary

vocabulary: Array[String] = Array(great, good, read, love, time, story, product, work, recommend, back, easy, make, bought, made, find, books, buy, price, put, reading, quality, people, works, quot, years, nice, characters, case, long, series, lot, found, author, day, bit, movie, feel, makes, thing, perfect, fit, end, set, loved, things, thought, album, music, small, hard, phone, give, fun, year, world, size, worth, pretty, times, sound, written, light, real, big, amazon, part, bad, highly, money, excellent, purchased, happy, high, enjoyed, problem, family, interesting, wanted, character, job, review, purchase, man, watch, song, days, enjoy, songs, place, home, stars, short, film, writing, play, cover, top, full, fan, fine, color, side, order, wonderful, amazing, point, fact, reviews, o...

In [13]:
// the ChiSqSelector model stores the 4000 top selected features
val selectedFeatures = cSqModel.selectedFeatures

selectedFeatures: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 29, 31, 32, 33, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 50, 51, 52, 53, 54, 55, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 73, 74, 75, 76, 78, 79, 80, 81, 82, 83, 84, 86, 87, 88, 89, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 159, 160, 161, 162, 163, 164, 165, 166, 168, 170, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 1...

In [14]:
// now we access and store the selected features from the vocabulary and sort the tokens
var selectedTokenList = new ListBuffer[String]()
for (feature <- selectedFeatures) {
    selectedTokenList += vocabulary(feature)
}
var selectedTokens = selectedTokenList.toList.sorted

selectedTokenList: scala.collection.mutable.ListBuffer[String] = ListBuffer(great, good, read, love, time, story, product, work, recommend, back, easy, make, bought, made, find, books, buy, price, put, reading, quality, people, works, quot, years, nice, characters, case, series, found, author, day, movie, feel, makes, thing, perfect, fit, end, set, loved, things, thought, album, music, small, phone, give, fun, year, world, size, pretty, times, sound, written, light, real, big, amazon, part, bad, highly, money, excellent, purchased, happy, enjoyed, problem, family, interesting, character, job, review, purchase, man, watch, song, enjoy, songs, place, home, short, film, writing, play, cover, top, full, fan, fine, color, side, order, wonderful, amazing, point, fact, ordered, stories, favori...

# Write tokens to File

In [17]:
// create output file and print writer
val myfile = new File("./output_ds.txt" )
val pw = new PrintWriter(myfile)

myfile: java.io.File = ./output_ds.txt
pw: java.io.PrintWriter = java.io.PrintWriter@66385335


In [18]:
// write tokens to output file
for (token <- selectedTokens) {
    pw.write(token + " ")
}
pw.close()

# Pipeline

In [30]:
// as running everything seperately is tedious we can put all steps into a pipeline which performs all specified steps one after the other
val pipeline = new Pipeline()
  .setStages(Array(indexer, tokenizer, stopWordsRemover, countVectorizer, idf, selector))
val featureDF = pipeline.fit(df).transform(df)

pipeline: org.apache.spark.ml.Pipeline = pipeline_44fcffe7859e
