## Imports

In [1]:
import org.apache.spark.sql.types._
import spark.implicits._ 
import org.apache.spark.ml.clustering.{KMeans, KMeansModel}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.{Vectors, Vector}

Intitializing Scala interpreter ...

Spark Web UI available at http://DESKTOP-LBH4NFU:4040
SparkContext available as 'sc' (version = 2.3.0, master = local[*], app id = local-1613698216372)
SparkSession available as 'spark'


import org.apache.spark.sql.types._
import spark.implicits._
import org.apache.spark.ml.clustering.{KMeans, KMeansModel}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.{Vectors, Vector}


## Esquema de la base

In [2]:
val schema = StructType(
  StructField("Platform", StringType, nullable = true) ::
    StructField("Genre", StringType, nullable = true) ::
    StructField("Publisher", StringType, nullable = true) ::
    StructField("NA_Sales", DoubleType, nullable = true) ::
    StructField("EU_Sales", DoubleType, nullable = true) ::
    StructField("JP_Sales", DoubleType, nullable = true) ::
    StructField("Other_Sales", DoubleType, nullable = true) ::
    StructField("Global_Sales", DoubleType, nullable = true) ::
    StructField("Rating", StringType, nullable = true) ::    
    StructField("Critic_Score_Class", StringType, nullable = true) ::
    Nil
)

schema: org.apache.spark.sql.types.StructType = StructType(StructField(Platform,StringType,true), StructField(Genre,StringType,true), StructField(Publisher,StringType,true), StructField(NA_Sales,DoubleType,true), StructField(EU_Sales,DoubleType,true), StructField(JP_Sales,DoubleType,true), StructField(Other_Sales,DoubleType,true), StructField(Global_Sales,DoubleType,true), StructField(Rating,StringType,true), StructField(Critic_Score_Class,StringType,true))


## Leer la data y mostrar los primeros 10 elementos

In [3]:
// read to DataFrame
val videogameDf = spark.read.format("csv").option("header", value = true).option("delimiter", ",").option("mode", "DROPMALFORMED").schema(schema).load("dato.csv").cache()

videogameDf.printSchema()
videogameDf.show(10)

root
 |-- Platform: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: double (nullable = true)
 |-- EU_Sales: double (nullable = true)
 |-- JP_Sales: double (nullable = true)
 |-- Other_Sales: double (nullable = true)
 |-- Global_Sales: double (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Critic_Score_Class: string (nullable = true)

+--------+--------+--------------------+--------+--------+--------+-----------+------------+------+------------------+
|Platform|   Genre|           Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Rating|Critic_Score_Class|
+--------+--------+--------------------+--------+--------+--------+-----------+------------+------+------------------+
|     Wii|  Sports|            Nintendo|   41.36|   28.96|    3.77|       8.45|       82.54|     E|             Bueno|
|     Wii|  Racing|            Nintendo|   15.68|    12.8|    3.79|       3.29|       35.57|     E|      

videogameDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Platform: string, Genre: string ... 8 more fields]


## Agrupamos la data númerica en una sola columna llamada "features" o caracteristicas

In [4]:
//add feature col
val cols = Array("NA_Sales","EU_Sales","JP_Sales","Other_Sales","Global_Sales")
val assembler = new VectorAssembler().setInputCols(cols).setOutputCol("features")
val featureDf = assembler.transform(videogameDf)

featureDf.printSchema()
featureDf.show(10)

root
 |-- Platform: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: double (nullable = true)
 |-- EU_Sales: double (nullable = true)
 |-- JP_Sales: double (nullable = true)
 |-- Other_Sales: double (nullable = true)
 |-- Global_Sales: double (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Critic_Score_Class: string (nullable = true)
 |-- features: vector (nullable = true)

+--------+--------+--------------------+--------+--------+--------+-----------+------------+------+------------------+--------------------+
|Platform|   Genre|           Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Rating|Critic_Score_Class|            features|
+--------+--------+--------------------+--------+--------+--------+-----------+------------+------+------------------+--------------------+
|     Wii|  Sports|            Nintendo|   41.36|   28.96|    3.77|       8.45|       82.54|     E|             Bueno|[41

cols: Array[String] = Array(NA_Sales, EU_Sales, JP_Sales, Other_Sales, Global_Sales)
assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_e46338e18423
featureDf: org.apache.spark.sql.DataFrame = [Platform: string, Genre: string ... 9 more fields]


## Separamos la data en entrenamiento y validación

In [5]:
//dividir data en 70/30
val seed = 1984
val Array(trainingData, testData) = featureDf.randomSplit(Array(0.7, 0.3), seed)

seed: Int = 1984
trainingData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Platform: string, Genre: string ... 9 more fields]
testData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Platform: string, Genre: string ... 9 more fields]


## Creamos un k-means 8-cluster

In [6]:
//build kmeans 8 cluster

val kmeans = new KMeans().setK(8).setFeaturesCol("features").setPredictionCol("KMEANS-prediction")

val kmeansModel = kmeans.fit(trainingData)


2021-02-18 20:30:33 WARN  BLAS:61 - Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
2021-02-18 20:30:33 WARN  BLAS:61 - Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


kmeans: org.apache.spark.ml.clustering.KMeans = kmeans_7dc66b0d1685
kmeansModel: org.apache.spark.ml.clustering.KMeansModel = kmeans_7dc66b0d1685


## Mostramos el resultado con los centros de cada cluster

In [7]:
println()
println("|-------------|OUTPUT|-------------|")
println()
kmeansModel.clusterCenters.foreach(println)


|-------------|OUTPUT|-------------|

[0.450556079170593,0.2610179076343072,0.06666352497643743,0.09262959472196029,0.8706032045240346]
[12.805714285714288,9.22,4.132857142857143,2.6085714285714285,28.768571428571427]
[0.10258953168044095,0.05160391796755466,0.016623813896541176,0.017594123048668475,0.18880624426078993]
[1.8592638036809817,1.2741104294478531,0.32331288343558273,0.4439263803680982,3.9008588957055195]
[41.36,28.96,3.77,8.45,82.54]
[1.0717277486910994,0.5596858638743459,0.1575130890052356,0.21138743455497389,2.000549738219895]
[5.861739130434783,3.87608695652174,0.9717391304347823,1.5504347826086957,12.259565217391303]
[3.188085106382979,2.5721276595744675,0.7993617021276599,0.687872340425532,7.24723404255319]


## Testeamos la data sobre los datos de validación

In [8]:
val predictDf = kmeansModel.transform(testData)
println()
println("|-------------|PREDICCIONES|-------------|")
println()

predictDf.show(20)
predictDf.groupBy("KMEANS-prediction").count().show()


|-------------|PREDICCIONES|-------------|

+--------+---------+--------------------+--------+--------+--------+-----------+------------+------+------------------+--------------------+-----------------+
|Platform|    Genre|           Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Rating|Critic_Score_Class|            features|KMEANS-prediction|
+--------+---------+--------------------+--------+--------+--------+-----------+------------+------+------------------+--------------------+-----------------+
|     3DS|   Action|          Activision|    0.08|    0.03|     0.0|       0.01|        0.12|     T|              Malo|[0.08,0.03,0.0,0....|                2|
|     3DS|   Action|Disney Interactiv...|    0.29|    0.28|     0.0|       0.06|        0.62|  E10+|             Bueno|[0.29,0.28,0.0,0....|                0|
|     3DS|   Action|     Electronic Arts|    0.09|    0.09|     0.0|       0.02|        0.19|  E10+|         Aceptable|[0.09,0.09,0.0,0....|                2|
|

predictDf: org.apache.spark.sql.DataFrame = [Platform: string, Genre: string ... 10 more fields]


## Medimos distancia al centro a partir de la data predicha

In [9]:
// calculate distance from center
val distFromCenter = udf((features: Vector, c: Int) => Vectors.sqdist(features, kmeansModel.clusterCenters(c)))
val distanceDf = predictDf.withColumn("distance", distFromCenter($"features", $"KMEANS-prediction"))
distanceDf.show(20)

+--------+---------+--------------------+--------+--------+--------+-----------+------------+------+------------------+--------------------+-----------------+--------------------+
|Platform|    Genre|           Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Rating|Critic_Score_Class|            features|KMEANS-prediction|            distance|
+--------+---------+--------------------+--------+--------+--------+-----------+------------+------+------------------+--------------------+-----------------+--------------------+
|     3DS|   Action|          Activision|    0.08|    0.03|     0.0|       0.01|        0.12|     T|              Malo|[0.08,0.03,0.0,0....|                2|0.006045337355711117|
|     3DS|   Action|Disney Interactiv...|    0.29|    0.28|     0.0|       0.06|        0.62|  E10+|             Bueno|[0.29,0.28,0.0,0....|                0| 0.09444925652093203|
|     3DS|   Action|     Electronic Arts|    0.09|    0.09|     0.0|       0.02|        0.19|  E10+|

distFromCenter: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,DoubleType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7, IntegerType)))
distanceDf: org.apache.spark.sql.DataFrame = [Platform: string, Genre: string ... 11 more fields]


## Contamos cuantas filas pertenecen a cada columna

In [10]:
// no of categories
predictDf.groupBy("KMEANS-prediction").count().show()

// save model
kmeansModel.write.overwrite().save("videogame-model")

+-----------------+-----+
|KMEANS-prediction|count|
+-----------------+-----+
|                1|    3|
|                6|   10|
|                3|   68|
|                5|  150|
|                7|   24|
|                2| 1440|
|                0|  463|
+-----------------+-----+



## Creamos un Cluster con algoritmo "Gaussian Mixture Model (GMM)"

Gaussian Mixture Model (GMM) representa una distribución compuesta en la que los puntos se extraen de una de k subdistribuciones gaussianas, cada una con su propia probabilidad. La implementación de spark.ml utiliza el algoritmo de **maximización de expectativas** para inducir el modelo de máxima verosimilitud dado un conjunto de muestras.

In [11]:
import org.apache.spark.ml.clustering.GaussianMixture

// Trains Gaussian Mixture Model
val gmm = new GaussianMixture().setK(8).setFeaturesCol("features").setPredictionCol("GMM-prediction").setProbabilityCol("GMM-probability")
val gmmModel = gmm.fit(trainingData)



// output parameters of mixture model model
for (i <- 0 until gmmModel.getK) {
  println(s"Gaussian $i:\nweight=${gmmModel.weights(i)}\n" +
      s"mu=${gmmModel.gaussians(i).mean}\nsigma=\n${gmmModel.gaussians(i).cov}\n")
}

2021-02-18 20:30:43 WARN  LAPACK:61 - Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
2021-02-18 20:30:43 WARN  LAPACK:61 - Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
Gaussian 0:
weight=0.2665765844659274
mu=[0.3671170883040322,0.19021997032977006,2.8371640876758776E-7,0.0607513467666437,0.618213677966869]
sigma=
0.061518499304870404  0.008297798794410855   ... (5 total)
0.008297798794410855  0.02301222096338634    ...
8.443119173136265E-7  2.0275440844544556E-8  ...
0.00522276935543689   0.004610581404417511   ...
0.0749576965395531    0.03589057465253671    ...

Gaussian 1:
weight=0.11822469977197922
mu=[0.10339137633101876,0.04317418513605366,0.07719373421284209,0.018384559293535913,0.24209448088475716]
sigma=
0.006988765458454253   0.002051933350157219   ... (5 total)
0.002051933350157219   0.002249607464470165   ...
1.8625398794278581E-4  2.6486045176877713E-4  ...
8.996398099066409E-4   6.01979284955809E-4    ...
0

import org.apache.spark.ml.clustering.GaussianMixture
gmm: org.apache.spark.ml.clustering.GaussianMixture = GaussianMixture_e8db142681f6
gmmModel: org.apache.spark.ml.clustering.GaussianMixtureModel = GaussianMixture_e8db142681f6


In [12]:
val gmmpredictDf = gmmModel.transform(testData)
println()
println("|-------------|PREDICCIONES|-------------|")
println()

gmmpredictDf.show(20)
gmmpredictDf.groupBy("GMM-prediction").count().show()


|-------------|PREDICCIONES|-------------|

+--------+---------+--------------------+--------+--------+--------+-----------+------------+------+------------------+--------------------+--------------+--------------------+
|Platform|    Genre|           Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Rating|Critic_Score_Class|            features|GMM-prediction|     GMM-probability|
+--------+---------+--------------------+--------+--------+--------+-----------+------------+------+------------------+--------------------+--------------+--------------------+
|     3DS|   Action|          Activision|    0.08|    0.03|     0.0|       0.01|        0.12|     T|              Malo|[0.08,0.03,0.0,0....|             3|[0.00160767721472...|
|     3DS|   Action|Disney Interactiv...|    0.29|    0.28|     0.0|       0.06|        0.62|  E10+|             Bueno|[0.29,0.28,0.0,0....|             0|[0.99935971567547...|
|     3DS|   Action|     Electronic Arts|    0.09|    0.09|     0.0|  

gmmpredictDf: org.apache.spark.sql.DataFrame = [Platform: string, Genre: string ... 11 more fields]


## Creamos un Cluster con algoritmo "Bisecting K-Means"

In [13]:
import org.apache.spark.ml.clustering.BisectingKMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator


// Trains a bisecting k-means model.
val bkm = new BisectingKMeans().setK(8).setFeaturesCol("features").setPredictionCol("BisKMeans-prediction")
val bkmModel = bkm.fit(trainingData)

// Make predictions
val bkmPredictions = bkmModel.transform(trainingData)

// Shows the result.
println("Cluster Centers: ")
val centers = bkmModel.clusterCenters
centers.foreach(println)


2021-02-18 20:30:48 WARN  BisectingKMeans:66 - The input RDD 262 is not directly cached, which may hurt performance if its parent RDDs are also not cached.
Cluster Centers: 
[0.1205288192501398,0.06166200335758273,0.0198153329602687,0.02097649692221623,0.2233967543368772]
[0.575803667745415,0.3287162891046382,0.08372168284789662,0.11562028047464931,1.1035167206040999]
[1.3690031152647977,0.8425233644859814,0.1870093457943925,0.319968847352025,2.718566978193146]
[2.686931818181819,1.7962500000000006,0.5962499999999998,0.5495454545454546,5.628295454545456]
[4.686000000000001,3.2276,1.0268000000000002,1.2620000000000002,10.202800000000002]
[7.136000000000002,5.825,1.8960000000000001,1.614,16.47]
[14.201999999999998,9.806000000000001,4.24,2.8420000000000005,31.092000000000002]
[41.36,28.96,3.77,8.45,82.54]


import org.apache.spark.ml.clustering.BisectingKMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator
bkm: org.apache.spark.ml.clustering.BisectingKMeans = bisecting-kmeans_31c156ca7146
bkmModel: org.apache.spark.ml.clustering.BisectingKMeansModel = bisecting-kmeans_31c156ca7146
bkmPredictions: org.apache.spark.sql.DataFrame = [Platform: string, Genre: string ... 10 more fields]
centers: Array[org.apache.spark.ml.linalg.Vector] = Array([0.1205288192501398,0.06166200335758273,0.0198153329602687,0.02097649692221623,0.2233967543368772], [0.575803667745415,0.3287162891046382,0.08372168284789662,0.11562028047464931,1.1035167206040999], [1.3690031152647977,0.8425233644859814,0.1870093457943925,0.319968847352025,2.718566978193146], [2.686931818181819,1.7962500000000006,0.5962499...

In [14]:
val BKMpredictDf = bkmModel.transform(testData)
println()
println("|-------------|PREDICCIONES|-------------|")
println()

BKMpredictDf.show(20)
BKMpredictDf.groupBy("BisKMeans-prediction").count().show()


|-------------|PREDICCIONES|-------------|

+--------+---------+--------------------+--------+--------+--------+-----------+------------+------+------------------+--------------------+--------------------+
|Platform|    Genre|           Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Rating|Critic_Score_Class|            features|BisKMeans-prediction|
+--------+---------+--------------------+--------+--------+--------+-----------+------------+------+------------------+--------------------+--------------------+
|     3DS|   Action|          Activision|    0.08|    0.03|     0.0|       0.01|        0.12|     T|              Malo|[0.08,0.03,0.0,0....|                   0|
|     3DS|   Action|Disney Interactiv...|    0.29|    0.28|     0.0|       0.06|        0.62|  E10+|             Bueno|[0.29,0.28,0.0,0....|                   0|
|     3DS|   Action|     Electronic Arts|    0.09|    0.09|     0.0|       0.02|        0.19|  E10+|         Aceptable|[0.09,0.09,0.0,0....|     

BKMpredictDf: org.apache.spark.sql.DataFrame = [Platform: string, Genre: string ... 10 more fields]


In [15]:
val finalDF = kmeansModel.transform(trainingData)
val finalDF1 = gmmModel.transform(finalDF)
val finalDF2 = bkmModel.transform(finalDF1)

finalDF2.show(20)

+--------+------+--------------------+--------+--------+--------+-----------+------------+------+------------------+--------------------+-----------------+--------------+--------------------+--------------------+
|Platform| Genre|           Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Rating|Critic_Score_Class|            features|KMEANS-prediction|GMM-prediction|     GMM-probability|BisKMeans-prediction|
+--------+------+--------------------+--------+--------+--------+-----------+------------+------+------------------+--------------------+-----------------+--------------+--------------------+--------------------+
|     3DS|Action|           505 Games|     0.0|    0.05|    0.05|        0.0|         0.1|     T|             Bueno|[0.0,0.05,0.05,0....|                2|             1|[2.02987982666683...|                   0|
|     3DS|Action|          Activision|    0.13|    0.01|     0.0|       0.02|        0.15|  E10+|              Malo|[0.13,0.01,0.0,0....|           

finalDF: org.apache.spark.sql.DataFrame = [Platform: string, Genre: string ... 10 more fields]
finalDF1: org.apache.spark.sql.DataFrame = [Platform: string, Genre: string ... 12 more fields]
finalDF2: org.apache.spark.sql.DataFrame = [Platform: string, Genre: string ... 13 more fields]


In [39]:
finalDF2.map({
    case(a, b, c, d, e, f, g, h, i, j, k, l, m, n, o) =>
  var line = a.toString + "," + b.toString + "," + c.toString + "," + d.toString + "," + e.toString + "," + f.toString + "," + g.toString + "," + h.toString + "," + i.toString + "," + j.toString + "," + k.toString + "," + l.toString + "," + m.toString + "," + n.toString + "," + o.toString 
  line
}).saveAsTextFile("FinalDF")

<console>: 54: error: constructor cannot be instantiated to expected type;

In [42]:
finalDF2.write.format("csv").option("header",true).mode("overwrite").option("sep",",").save("./FinalDF.csv")

java.lang.UnsupportedOperationException:  CSV data source does not support struct<type:tinyint,size:int,indices:array<int>,values:array<double>> data type.

In [51]:
finalDF2.select("Platform","Genre","Publisher","NA_Sales","EU_Sales","JP_Sales","Other_Sales","Global_Sales","Rating","Critic_Score_Class","KMEANS-prediction","GMM-prediction","BisKMeans-prediction").write.format("csv").option("header",true).mode("overwrite").option("sep",",").save("./FinalDF")

In [48]:
finalDF2.write.csv("./ssssssssssssssssssssssss.csv")
userRecs1.select('ID_CTE', 'recommendations.*').write.csv('/user-home/libraries/Sampled_data/datasets/rec_per_user.csv')

java.lang.UnsupportedOperationException:  CSV data source does not support struct<type:tinyint,size:int,indices:array<int>,values:array<double>> data type.