From 84cc1beb5852ceab635fbf95f64b17a1c7b4046e Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Tue, 1 Sep 2015 17:32:28 +0800 Subject: [PATCH 1/3] update lda example --- .../spark/examples/mllib/LDAExample.scala | 149 +++++------------- 1 file changed, 40 insertions(+), 109 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala index 75b0f69cf91aa..6094b723fefa4 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala @@ -18,9 +18,9 @@ // scalastyle:off println package org.apache.spark.examples.mllib -import java.text.BreakIterator - -import scala.collection.mutable +import org.apache.spark.ml.Pipeline +import org.apache.spark.ml.feature.{CountVectorizerModel, CountVectorizer, StopWordsRemover, RegexTokenizer} +import org.apache.spark.sql.{Row, SQLContext} import scopt.OptionParser @@ -118,7 +118,7 @@ object LDAExample { // Load documents, and prepare them for LDA. val preprocessStart = System.nanoTime() val (corpus, vocabArray, actualNumTokens) = - preprocess(sc, params.input, params.vocabSize, params.stopwordFile) + preProcess(sc, params.input, params.vocabSize, params.stopwordFile) corpus.cache() val actualCorpusSize = corpus.count() val actualVocabSize = vocabArray.size @@ -186,121 +186,52 @@ object LDAExample { * Load documents, tokenize them, create vocabulary, and prepare documents as term count vectors. * @return (corpus, vocabulary as array, total token count in corpus) */ - private def preprocess( + private def preProcess( sc: SparkContext, paths: Seq[String], vocabSize: Int, - stopwordFile: String): (RDD[(Long, Vector)], Array[String], Long) = { + stopWordFile: String): (RDD[(Long, Vector)], Array[String], Long) = { // Get dataset of document texts // One document per line in each text file. If the input consists of many small files, // this can result in a large number of small partitions, which can degrade performance. // In this case, consider using coalesce() to create fewer, larger partitions. val textRDD: RDD[String] = sc.textFile(paths.mkString(",")) - - // Split text into words - val tokenizer = new SimpleTokenizer(sc, stopwordFile) - val tokenized: RDD[(Long, IndexedSeq[String])] = textRDD.zipWithIndex().map { case (text, id) => - id -> tokenizer.getWords(text) - } - tokenized.cache() - - // Counts words: RDD[(word, wordCount)] - val wordCounts: RDD[(String, Long)] = tokenized - .flatMap { case (_, tokens) => tokens.map(_ -> 1L) } - .reduceByKey(_ + _) - wordCounts.cache() - val fullVocabSize = wordCounts.count() - // Select vocab - // (vocab: Map[word -> id], total tokens after selecting vocab) - val (vocab: Map[String, Int], selectedTokenCount: Long) = { - val tmpSortedWC: Array[(String, Long)] = if (vocabSize == -1 || fullVocabSize <= vocabSize) { - // Use all terms - wordCounts.collect().sortBy(-_._2) - } else { - // Sort terms to select vocab - wordCounts.sortBy(_._2, ascending = false).take(vocabSize) - } - (tmpSortedWC.map(_._1).zipWithIndex.toMap, tmpSortedWC.map(_._2).sum) + val sqlContext = new SQLContext(sc) + import sqlContext.implicits._ + + val df = textRDD.toDF("texts") + val customizedStopWords: Array[String] = if (stopWordFile.isEmpty) { + Array.empty[String] + } else { + val stopWordText = sc.textFile(stopWordFile).collect() + stopWordText.flatMap(_.stripMargin.split("\\s+")) } - - val documents = tokenized.map { case (id, tokens) => - // Filter tokens by vocabulary, and create word count vector representation of document. - val wc = new mutable.HashMap[Int, Int]() - tokens.foreach { term => - if (vocab.contains(term)) { - val termIndex = vocab(term) - wc(termIndex) = wc.getOrElse(termIndex, 0) + 1 - } - } - val indices = wc.keys.toArray.sorted - val values = indices.map(i => wc(i).toDouble) - - val sb = Vectors.sparse(vocab.size, indices, values) - (id, sb) - } - - val vocabArray = new Array[String](vocab.size) - vocab.foreach { case (term, i) => vocabArray(i) = term } - - (documents, vocabArray, selectedTokenCount) + val tokenizer = new RegexTokenizer() + .setInputCol("texts") + .setOutputCol("rawTokens") + val stopWordsRemover = new StopWordsRemover() + .setInputCol("rawTokens") + .setOutputCol("tokens") + stopWordsRemover.setStopWords(stopWordsRemover.getStopWords ++ customizedStopWords) + val countVectorizer = new CountVectorizer() + .setVocabSize(vocabSize) + .setInputCol("tokens") + .setOutputCol("vectors") + + val pipeline = new Pipeline() + .setStages(Array(tokenizer, stopWordsRemover, countVectorizer)) + + val model = pipeline.fit(df) + val documents = model.transform(df) + .select("vectors") + .map { case Row(features: Vector) => features } + .zipWithIndex() + .map(_.swap) + + (documents, + model.stages(2).asInstanceOf[CountVectorizerModel].vocabulary, // vocabulary + documents.map(_._2.numActives).sum().toLong) // total token count } } -/** - * Simple Tokenizer. - * - * TODO: Formalize the interface, and make this a public class in mllib.feature - */ -private class SimpleTokenizer(sc: SparkContext, stopwordFile: String) extends Serializable { - - private val stopwords: Set[String] = if (stopwordFile.isEmpty) { - Set.empty[String] - } else { - val stopwordText = sc.textFile(stopwordFile).collect() - stopwordText.flatMap(_.stripMargin.split("\\s+")).toSet - } - - // Matches sequences of Unicode letters - private val allWordRegex = "^(\\p{L}*)$".r - - // Ignore words shorter than this length. - private val minWordLength = 3 - - def getWords(text: String): IndexedSeq[String] = { - - val words = new mutable.ArrayBuffer[String]() - - // Use Java BreakIterator to tokenize text into words. - val wb = BreakIterator.getWordInstance - wb.setText(text) - - // current,end index start,end of each word - var current = wb.first() - var end = wb.next() - while (end != BreakIterator.DONE) { - // Convert to lowercase - val word: String = text.substring(current, end).toLowerCase - // Remove short words and strings that aren't only letters - word match { - case allWordRegex(w) if w.length >= minWordLength && !stopwords.contains(w) => - words += w - case _ => - } - - current = end - try { - end = wb.next() - } catch { - case e: Exception => - // Ignore remaining text in line. - // This is a known bug in BreakIterator (for some Java versions), - // which fails when it sees certain characters. - end = BreakIterator.DONE - } - } - words - } - -} -// scalastyle:on println From ee4832d512fdeb57773f51846c9e75ecd8774c01 Mon Sep 17 00:00:00 2001 From: yuhaoyang Date: Sun, 11 Oct 2015 23:59:51 -0700 Subject: [PATCH 2/3] respond to comment --- .../spark/examples/mllib/LDAExample.scala | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala index 6094b723fefa4..2a553c8cb53ba 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala @@ -118,7 +118,7 @@ object LDAExample { // Load documents, and prepare them for LDA. val preprocessStart = System.nanoTime() val (corpus, vocabArray, actualNumTokens) = - preProcess(sc, params.input, params.vocabSize, params.stopwordFile) + preprocess(sc, params.input, params.vocabSize, params.stopwordFile) corpus.cache() val actualCorpusSize = corpus.count() val actualVocabSize = vocabArray.size @@ -186,29 +186,28 @@ object LDAExample { * Load documents, tokenize them, create vocabulary, and prepare documents as term count vectors. * @return (corpus, vocabulary as array, total token count in corpus) */ - private def preProcess( + private def preprocess( sc: SparkContext, paths: Seq[String], vocabSize: Int, - stopWordFile: String): (RDD[(Long, Vector)], Array[String], Long) = { + stopwordFile: String): (RDD[(Long, Vector)], Array[String], Long) = { + + val sqlContext = SQLContext.getOrCreate(sc) + import sqlContext.implicits._ // Get dataset of document texts // One document per line in each text file. If the input consists of many small files, // this can result in a large number of small partitions, which can degrade performance. // In this case, consider using coalesce() to create fewer, larger partitions. - val textRDD: RDD[String] = sc.textFile(paths.mkString(",")) - val sqlContext = new SQLContext(sc) - import sqlContext.implicits._ - - val df = textRDD.toDF("texts") - val customizedStopWords: Array[String] = if (stopWordFile.isEmpty) { + val df = sc.textFile(paths.mkString(",")).toDF("docs") + val customizedStopWords: Array[String] = if (stopwordFile.isEmpty) { Array.empty[String] } else { - val stopWordText = sc.textFile(stopWordFile).collect() + val stopWordText = sc.textFile(stopwordFile).collect() stopWordText.flatMap(_.stripMargin.split("\\s+")) } val tokenizer = new RegexTokenizer() - .setInputCol("texts") + .setInputCol("docs") .setOutputCol("rawTokens") val stopWordsRemover = new StopWordsRemover() .setInputCol("rawTokens") From 672ce97892c974994630527e0e724090b3ac52e5 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Tue, 8 Dec 2015 16:26:37 +0800 Subject: [PATCH 3/3] fix imports --- .../spark/examples/mllib/LDAExample.scala | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala index 2a553c8cb53ba..70010b05e4345 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala @@ -18,19 +18,16 @@ // scalastyle:off println package org.apache.spark.examples.mllib -import org.apache.spark.ml.Pipeline -import org.apache.spark.ml.feature.{CountVectorizerModel, CountVectorizer, StopWordsRemover, RegexTokenizer} -import org.apache.spark.sql.{Row, SQLContext} - import scopt.OptionParser import org.apache.log4j.{Level, Logger} - -import org.apache.spark.{SparkContext, SparkConf} -import org.apache.spark.mllib.clustering.{EMLDAOptimizer, OnlineLDAOptimizer, DistributedLDAModel, LDA} -import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.ml.Pipeline +import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel, RegexTokenizer, StopWordsRemover} +import org.apache.spark.mllib.clustering.{DistributedLDAModel, EMLDAOptimizer, LDA, OnlineLDAOptimizer} +import org.apache.spark.mllib.linalg.Vector import org.apache.spark.rdd.RDD - +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.{SparkConf, SparkContext} /** * An example Latent Dirichlet Allocation (LDA) app. Run with @@ -216,14 +213,14 @@ object LDAExample { val countVectorizer = new CountVectorizer() .setVocabSize(vocabSize) .setInputCol("tokens") - .setOutputCol("vectors") + .setOutputCol("features") val pipeline = new Pipeline() .setStages(Array(tokenizer, stopWordsRemover, countVectorizer)) val model = pipeline.fit(df) val documents = model.transform(df) - .select("vectors") + .select("features") .map { case Row(features: Vector) => features } .zipWithIndex() .map(_.swap) @@ -233,4 +230,4 @@ object LDAExample { documents.map(_._2.numActives).sum().toLong) // total token count } } - +// scalastyle:on println