In [1]:
%%init_spark
launcher.packages = ["org.vegas-viz:vegas_2.11:0.3.11", "org.vegas-viz:vegas-spark_2.11:0.3.11"]
launcher.master = "local[4]"

In [2]:
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.clustering.StreamingKMeans
import org.apache.spark.sql._
import org.apache.spark.sql.types._

Intitializing Scala interpreter ...

Spark Web UI available at http://jeslava-pc:4040
SparkContext available as 'sc' (version = 2.4.4, master = local[4], app id = local-1585077084153)
SparkSession available as 'spark'


import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.clustering.StreamingKMeans
import org.apache.spark.sql._
import org.apache.spark.sql.types._


In [3]:
// Création du StreamingContext : le flux sera découpés en "tranches" de 10 secondes,
//   chaque tranche sera un RDD
val ssc = new StreamingContext(sc, Seconds(10))

// Lecture du flux de données d'entree : les nouveaux fichiers déposés dans
//   "data/stream" seront rassemblés dans un RDD toutes les 10s et traités
val trainingData = ssc.textFileStream("file:///home/hadoop/msata/cnam_RCP216/data_mining/TP6_dataflow/stream").map(Vectors.parse)

ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@fe8ded9
trainingData: org.apache.spark.streaming.dstream.DStream[org.apache.spark.mllib.linalg.Vector] = org.apache.spark.streaming.dstream.MappedDStream@5d56ff94


In [4]:
// Paramétrage de Streaming k-means
val numDimensions = 2   // données 2D
val numClusters = 5     // recherche de 5 groupes
val model = new StreamingKMeans()  // création d'un modèle

numDimensions: Int = 2
numClusters: Int = 5
model: org.apache.spark.mllib.clustering.StreamingKMeans = org.apache.spark.mllib.clustering.StreamingKMeans@3302292a


In [5]:
// Initialisation des paramètres du modèle
model.setK(numClusters)
model.setDecayFactor(0.5)   // valeur de alpha
model.setRandomCenters(numDimensions, 1.0)

res0: model.type = org.apache.spark.mllib.clustering.StreamingKMeans@3302292a


In [6]:
// Indication du flux sur lequel le modèle est appris (les groupes sont trouvés)
model.trainOn(trainingData)

In [7]:
// Indication du flux sur lequel les prédictions sont faites par le modèle
val resultats = model.predictOn(trainingData)

resultats: org.apache.spark.streaming.dstream.DStream[Int] = org.apache.spark.streaming.dstream.MappedDStream@1434d8cb


In [8]:
// À titre pédagogique pour la visualisation, on sauvegarde sur le disque
// les RDD des points traités et des clusters prédits
trainingData.foreachRDD(rdd => rdd.saveAsTextFile("file:///home/hadoop/msata/cnam_RCP216/data_mining/TP6_dataflow/points"))
resultats.foreachRDD(rdd => rdd.saveAsTextFile("file:///home/hadoop/msata/cnam_RCP216/data_mining/TP6_dataflow/resultats"))

In [9]:
// Launch spark flow
ssc.start()

In [11]:
// Launch the following command from a terminal:
// cp data/full/{aaa,aab,aac} stream/
//And check results on resultats folder

<console>: 4: error: ';' expected but ',' found.

In [17]:
// On lit les RDD et on créé un DataFrame à deux colonnes
val clusters = sc.textFile("file:///home/hadoop/msata/cnam_RCP216/data_mining/TP6_dataflow/resultats/").map(_.toInt)
val points = sc.textFile("file:///home/hadoop/msata/cnam_RCP216/data_mining/TP6_dataflow/points").map(p => Vectors.parse(p).toArray)
val predictions = clusters.zip(points).toDF("cluster", "coords")
predictions.printSchema()

root
 |-- cluster: integer (nullable = false)
 |-- coords: array (nullable = true)
 |    |-- element: double (containsNull = false)



clusters: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[608] at map at <console>:67
points: org.apache.spark.rdd.RDD[Array[Double]] = MapPartitionsRDD[611] at map at <console>:68
predictions: org.apache.spark.sql.DataFrame = [cluster: int, coords: array<double>]


In [19]:
// Vegas permet de visualiser les clusters ainsi obtenus
// Import des bibliothèques de Vegas
implicit val render = vegas.render.ShowHTML(s => print("%html " + s))

import vegas._
import vegas.data.External._
import vegas.sparkExt._

// Construction du nuage de points
val scatter = predictions.withColumn("x", $"coords".getItem(0)).withColumn("y", $"coords".getItem(1))

render: vegas.render.ShowHTML = <function1>
import vegas._
import vegas.data.External._
import vegas.sparkExt._
scatter: org.apache.spark.sql.DataFrame = [cluster: int, coords: array<double> ... 2 more fields]


In [20]:
Vegas("Données initiales").withDataFrame(scatter).mark(Point).encodeX("x", Quant).encodeY("y", Quant).encodeColor("cluster", Nom).show

In [21]:
// Les valeurs des centres des clusters sont accessibles via l’objet lastModel() qui 
// renvoie le dernier modèle KMeans calculé 
//à partir des données du flux
model.latestModel().clusterCenters

res12: Array[org.apache.spark.mllib.linalg.Vector] = Array([0.22567614752825793,0.5023218135489114], [0.22567614752827792,0.5023218135489314], [-0.44362107723763694,-0.6120968293161826], [-0.18663023693410216,0.7351128192338361], [-0.4436210772376569,-0.6120968293162026])


In [22]:
// Now we are going to copy all files 
// cp data/full/* stream/.

In [23]:
//Now we visualize again 
// Construction du nuage de points
val scatter = predictions.withColumn("x", $"coords".getItem(0)).withColumn("y", $"coords".getItem(1))

scatter: org.apache.spark.sql.DataFrame = [cluster: int, coords: array<double> ... 2 more fields]


In [24]:
Vegas("Données initiales").withDataFrame(scatter).mark(Point).encodeX("x", Quant).encodeY("y", Quant).encodeColor("cluster", Nom).show

In [25]:
// And new centers are:
model.latestModel().clusterCenters

res15: Array[org.apache.spark.mllib.linalg.Vector] = Array([0.14152378740134425,0.34215244153016555], [0.5722041751206708,0.4592565578824435], [-0.4293069658063142,-0.5686628757709866], [-0.0941069953719659,0.7432846122300463], [-0.44941142835276404,-0.6783933093799789])
