# Clustring
Nous voulons savoir les sujets les plus abordés dans les news

Décompréssez le doc bbcsport.zip sur votre machine et utilisez le bon path pour le dossier

In [None]:
val corpus = sc.wholeTextFiles("/path/bbcsport/*/*.txt").map(_._2).map(_.toLowerCase())

Preparer le RDD et ajouter un ID

In [None]:
// Split document by double newlines, drop the first block, combine again as a string
val corpus_body = corpus.map(_.split("\\n\\n")).map(_.drop(1)).map(_.mkString(" "))

In [None]:
// Convert RDD to DF with ID for every document
val corpus_df = corpus_body.zipWithIndex.toDF("corpus", "id")
corpus_df.printSchema()

# les Transformateurs

In [None]:
import org.apache.spark.ml.feature._

// Set params for RegexTokenizer
val tokenizer = new RegexTokenizer()
  .setPattern("[\\W_]+")
  .setMinTokenLength(4) // Filter away tokens with length < 4
  .setInputCol("corpus")
  .setOutputCol("tokens")

### StopWordsRemover supprime les mots non pertinants

Utilisez le bon path pour le fichier

In [None]:
val stopwords = sc.textFile("/path/stop_words").collect()

In [None]:

// Set params for StopWordsRemover
val remover = new StopWordsRemover()
  .setStopWords(stopwords) // This parameter is optional
  .setInputCol("tokens")
  .setOutputCol("filtered")

## Vectorizer

In [None]:

// Set params for CountVectorizer
val vectorizer = new CountVectorizer()
  .setInputCol("filtered")
  .setOutputCol("features")
  .setVocabSize(10000)
  .setMinDF(5)

## LDA Clustring

In [None]:
val numTopics = 20

In [None]:
import org.apache.spark.ml.clustering.LDA

val lda = new LDA()
  .setK(numTopics)
  .setMaxIter(50)
  .setOptimizer("em")

  

## Créer et ajuster  Pipeline


In [None]:
import org.apache.spark.ml.Pipeline

val pipeline = new Pipeline().setStages(Array(tokenizer, remover, vectorizer, lda))

In [None]:
pipeline.write.overwrite().save("/tmp/ldaDemo/pipeline")


In [None]:
val pipelineModel = pipeline.fit(corpus_df)


Nous pouvons extraire des étapes particulières si nécessaire.



In [None]:
val vectorizerModel = pipelineModel.stages(2).asInstanceOf[CountVectorizerModel]


In [None]:
import org.apache.spark.ml.clustering.DistributedLDAModel

// Since we trained with the default optimizer (EM), we get back a DistributedLDAModel
val ldaModel = pipelineModel.stages(3).asInstanceOf[DistributedLDAModel]


In [None]:
// Sauvegardons notre modèle LDA pour une réutilisation ultérieure.
ldaModel.write.overwrite().save("/tmp/ldaDemo/model")


In [None]:
// Data log likelihood gives us a statistic for evaluation.
// This statistics is always negative, and closer to 0 is better.
ldaModel.trainingLogLikelihood

# Affichez les sujets

In [None]:
import org.apache.spark.sql.functions._

// Get vocab
val vocabList = vectorizerModel.vocabulary
val termsIdx2Str = udf { (termIndices: Seq[Int]) => termIndices.map(idx => vocabList(idx)) }

// Review Results of LDA model with Online Variational Bayes
val topics = ldaModel.describeTopics(maxTermsPerTopic = 5)
  .withColumn("terms", termsIdx2Str(col("termIndices")))
topics.select("topic", "terms", "termWeights").show

In [None]:
// Create DF with proper column names
//val termDF = termRDD2.toDF.withColumnRenamed("_1", "term").withColumnRenamed("_2", "probability").withColumnRenamed("_3", "topicId")
val zipUDF = udf { (terms: Seq[String], probabilities: Seq[Double]) => terms.zip(probabilities) }
val topicsTmp = topics.withColumn("termWithProb", explode(zipUDF(col("terms"), col("termWeights"))))
val termDF = topicsTmp.select(
  col("topic").as("topicId"),
  col("termWithProb._1").as("term"),
  col("termWithProb._2").as("probability"))
termDF.show