# Start Spark

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkContext
sc = SparkContext()

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

## Exercice: Grupos de dispositivos móviles

In [3]:
from pyspark.mllib.clustering import KMeans 
import numpy as np

In [4]:
filename = "file:/home/clement/Desktop/formacion-hadoop/Ejercicios MLLIB - ML-20200114/datasets y soluciones/ml/devicestatus.txt"

In [5]:
data = sc.textFile(filename).map(lambda x: x.split("|"))\
                            .map(lambda x: (float(x[12]), float(x[13])))\
                            .filter(lambda x: x != (0,0))\
                            .cache()

In [6]:
data.take(5)

[(33.6894754264, -117.543308253),
 (37.4321088904, -121.485029632),
 (39.4378908349, -120.938978486),
 (39.3635186767, -119.400334708),
 (33.1913581092, -116.448242643)]

In [7]:
data.count()

431857

In [8]:
model = KMeans.train(data, 5)

In [9]:
model.centers

[array([  34.29049653, -117.77978731]),
 array([  45.33695551, -120.99431457]),
 array([  35.08592001, -112.57643827]),
 array([  37.96974389, -121.20684208]),
 array([  41.97751673, -121.58231811])]

In [10]:
model.predict((33, -116))

0

In [11]:
def error(point): 
    center = model.centers[model.predict(point)] 
    return np.sqrt(sum([x**2 for x in (point - center)]))

In [12]:
WSSSE = data.map(lambda point: error(point)).reduce(lambda x, y: x + y) 
print("Within Set Sum of Squared Error = " + str(WSSSE))

Within Set Sum of Squared Error = 607334.5450057113


## Exercice: Recomendación de películas

In [13]:
import sys
from pyspark.mllib.recommendation import Rating
from pyspark.mllib.recommendation import ALS

ratings_filename = "file:///home/clement/Desktop/formacion-hadoop/Ejercicios MLLIB - ML-20200114/datasets y soluciones/ml/als/ratings.dat"
personalRatings_filename = "file:///home/clement/Desktop/formacion-hadoop/Ejercicios MLLIB - ML-20200114/datasets y soluciones/ml/als/personalRatings.txt"
movies_filename = "file:///home/clement/Desktop/formacion-hadoop/Ejercicios MLLIB - ML-20200114/datasets y soluciones/ml/als/movies.dat"

def parseRating(line):
    # Realizar el procesamiento de la línea para devolver únicamente los datos necesarios
    fields = line.split("::")
    return int(fields[0]), int(fields[1]), float(fields[2])

def parseMovie(line):
    # Realizar el procesamiento de la línea para devolver únicamente los datos necesarios
    fields = line.split("::")
    return (int(fields[0]), fields[1])

movies = sc.textFile(movies_filename).map(lambda x: parseMovie(x))
#Lee y crea el Pair RDD de películas obteniendo en cada registro únicamente los datos necesarios (idpelícula, nombre)

ratings = sc.textFile(ratings_filename+","+personalRatings_filename).map(lambda x: parseRating(x))
#Lee y crea RDD de valoraciones obteniendo en cada registro únicamente los datos necesarios (idusuario,idpelicula, valoración)

ratingsR = ratings.map(lambda x: Rating(x[0], x[1], x[2])) 
#Crea el RDD de entrada al algoritmo con los registros de tipo Rating
model = ALS.train(ratingsR, rank=2)
#Llama al modelo de entrenamiento ALS
result = model.recommendProducts(0, 5)
#Obtén las 5 recomendaciones para nuestro usuario (id=0)

resultRDD = sc.parallelize(result)

resultJ = resultRDD.map(lambda x: (x[1], x[2]))
#Genera un nuevo Pair RDD con los resultados obtenidos para poder realizar posteriormente un JOIN con el RDD de películas
joinMovies = resultJ.join(movies)
#Realiza el join con el RDD anterior y el Pair RDD de películas creado al inicio

print("Películas recomendadas para tí:")
for i in joinMovies.collect():
    print(i[1][1])

Películas recomendadas para tí:
Song of Freedom (1936)
Neon Bible, The (1995)
Cotton Mary (1999)
Mamma Roma (1962)
Chain of Fools (2000)


In [14]:
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((int(r[0]), int(r[1])), int(r[2]))).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))

Mean Squared Error = 0.7505228457756725


## Exercice: Análisis de sentimiento

In [15]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row

In [16]:
valoraciones_filename = "file:///home/clement/Desktop/formacion-hadoop/Ejercicios MLLIB - ML-20200114/datasets y soluciones/ml/sentimiento.txt"
valoraciones = sc.textFile(valoraciones_filename).map(lambda x: x.replace("neg","0").replace("pos","1")).map(lambda x: x.split(";")).map(lambda x: Row(x[0], float(x[1])))

In [17]:
valoraciones.take(5)

[<Row('a�icos', 0.0)>,
 <Row('abandonada', 0.0)>,
 <Row('abandonadas', 0.0)>,
 <Row('abandonado', 0.0)>,
 <Row('abandonados', 0.0)>]

In [18]:
valoracionesDF = valoraciones.toDF(["text", "label"])

In [19]:
valoracionesDF.show()

+-----------+-----+
|       text|label|
+-----------+-----+
|     a�icos|  0.0|
| abandonada|  0.0|
|abandonadas|  0.0|
| abandonado|  0.0|
|abandonados|  0.0|
|  abandonar|  0.0|
|  abandonos|  0.0|
| abarrotada|  0.0|
|abarrotadas|  0.0|
| abarrotado|  0.0|
|abarrotados|  0.0|
|    abatida|  0.0|
|   abatidas|  0.0|
|    abatido|  0.0|
|   abatidos|  0.0|
|abatimiento|  0.0|
|     abatir|  0.0|
| abigarrada|  0.0|
|abigarradas|  0.0|
| abigarrado|  0.0|
+-----------+-----+
only showing top 20 rows



In [20]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
tokenized = tokenizer.transform(valoracionesDF)
tokenized.show(5)

+-----------+-----+-------------+
|       text|label|        words|
+-----------+-----+-------------+
|     a�icos|  0.0|     [a�icos]|
| abandonada|  0.0| [abandonada]|
|abandonadas|  0.0|[abandonadas]|
| abandonado|  0.0| [abandonado]|
|abandonados|  0.0|[abandonados]|
+-----------+-----+-------------+
only showing top 5 rows



In [21]:
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
hashedTF = hashingTF.transform(tokenized)
hashedTF.show(5)

+-----------+-----+-------------+--------------------+
|       text|label|        words|            features|
+-----------+-----+-------------+--------------------+
|     a�icos|  0.0|     [a�icos]|(262144,[83924],[...|
| abandonada|  0.0| [abandonada]|(262144,[246055],...|
|abandonadas|  0.0|[abandonadas]|(262144,[204805],...|
| abandonado|  0.0| [abandonado]|(262144,[125409],...|
|abandonados|  0.0|[abandonados]|(262144,[201041],...|
+-----------+-----+-------------+--------------------+
only showing top 5 rows



In [22]:
lr = LogisticRegression()
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# Entrenamos el pipeline sobre los datos de entrenamiento
model = pipeline.fit(valoracionesDF)

In [23]:
test = sqlContext.createDataFrame([
(4,"En el restaurante Ginos hacen buenos platos"),
(5,"Pobres indefensos animales"),
(6,"Me pedi una pizza en el telepizza y estaba fria"),
(7,"Estoy muy motivado gracias a este curso")], ["id", "text"])

In [24]:
prediction = model.transform(test)
selected = prediction.select("id", "text", "prediction")
for row in selected.collect():
    print(row) 

Row(id=4, text='En el restaurante Ginos hacen buenos platos', prediction=1.0)
Row(id=5, text='Pobres indefensos animales', prediction=0.0)
Row(id=6, text='Me pedi una pizza en el telepizza y estaba fria', prediction=0.0)
Row(id=7, text='Estoy muy motivado gracias a este curso', prediction=1.0)
