From 69b5dd855edd3e5523c8431c9f40bad237739d9e Mon Sep 17 00:00:00 2001 From: Ganesh Krishnan Date: Fri, 20 Jan 2017 00:32:31 +1100 Subject: [PATCH] Update LDAExample with Spark 2.1.0 --- .../spark/examples/mllib/LDAExample.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala index b923e627f2095..d6dcc951a428b 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala @@ -106,15 +106,18 @@ object LDAExample { } private def run(params: Params): Unit = { - val conf = new SparkConf().setAppName(s"LDAExample with $params") - val sc = new SparkContext(conf) + Logger.getRootLogger.setLevel(Level.WARN) - + val spark = SparkSession + .builder + .appName("SparkLDAExample") + .getOrCreate() + // Load documents, and prepare them for LDA. val preprocessStart = System.nanoTime() val (corpus, vocabArray, actualNumTokens) = - preprocess(sc, params.input, params.vocabSize, params.stopwordFile) + preprocess(spark, params.input, params.vocabSize, params.stopwordFile) corpus.cache() val actualCorpusSize = corpus.count() val actualVocabSize = vocabArray.length @@ -175,7 +178,7 @@ object LDAExample { } println() } - sc.stop() + spark.stop() } /** @@ -183,22 +186,19 @@ object LDAExample { * @return (corpus, vocabulary as array, total token count in corpus) */ private def preprocess( - sc: SparkContext, + spark: SparkSession, paths: Seq[String], vocabSize: Int, stopwordFile: String): (RDD[(Long, Vector)], Array[String], Long) = { - val spark = SparkSession - .builder - .sparkContext(sc) - .getOrCreate() + import spark.implicits._ // Get dataset of document texts // One document per line in each text file. If the input consists of many small files, // this can result in a large number of small partitions, which can degrade performance. // In this case, consider using coalesce() to create fewer, larger partitions. - val df = sc.textFile(paths.mkString(",")).toDF("docs") + val df = spark.read.text(paths.mkString(",")).toDF("docs") val customizedStopWords: Array[String] = if (stopwordFile.isEmpty) { Array.empty[String] } else {