In [2]:
import findspark
findspark.init()

In [3]:
import pyspark

In [4]:
sc = pyspark.SparkContext(appName = "exercices_spark_ml")

## Grouping mobile devices 

In [4]:
from pyspark.mllib.clustering import KMeans, KMeansModel
from math import sqrt

In [5]:
# Load and parse the data
data = sc.textFile("./datasets/ml/devicestatus.txt")
parsedData = data.map(lambda x: (float(x.split("|")[12]), float(x.split("|")[13]))).filter(lambda x: x != (0,0))

In [6]:
# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 5, maxIterations=10, initializationMode="random", seed=42)
clusters.clusterCenters

[array([  35.08592001, -112.57643827]),
 array([  44.2344904 , -121.80042939]),
 array([  37.11982546, -121.02779765]),
 array([  39.03653443, -120.96561253]),
 array([  34.21546118, -117.72242524])]

In [7]:
# Make predictions
new_data = sc.parallelize([[  39.54839049, -120.05409642], [  35.08570018, -112.57633776]])
print(clusters.predict(new_data).collect())
print(clusters.predict([  34.28500015, -117.76854198]))

[3, 0]
4


In [8]:
# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
    # obtain the centroid of the group of the point
    center = clusters.centers[clusters.predict(point)]
    # return distance to the centroid
    return sqrt(sum([x**2 for x in point-center]))

WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x,y: x+y)

print("Within Set Sum of Squared Errors = ", WSSSE)

Within Set Sum of Squared Errors =  589978.5269258627


## Movie recommender system

In [4]:
import sys
from pyspark.mllib.recommendation import Rating, ALS

In [5]:
def parseRating(rating):
    # (user_id, movie_id, movie_eval)
    rating = str(rating).split('::')
    return (rating[0], rating[1], rating[2])

def parseMovie(movie):
    # (movie_id, movie_name)
    movie = str(movie).split('::')
    return (movie[0], movie[1])

In [6]:
movies = sc.textFile("file:/home/bego/Documents/curso_spark/datasets/ml/als/movies.dat").map(parseMovie)
ratings = sc.textFile("file:/home/bego/Documents/curso_spark/datasets/ml/als/ratings.dat,file:/home/bego/Documents/curso_spark/datasets/ml/als/personalRatings.txt").map(parseRating) # specific structure for recommendation Rating(user, product, rating)
ratingsR = ratings.map(lambda x: Rating(x[0], x[1], x[2])) # convert into Rating object

print(movies.take(1), "\n", ratings.take(1))

[('1', 'Toy Story (1995)')] 
 [('1', '1193', '5')]


In [7]:
recomender_model = ALS.train(ratingsR, rank=2)

In [8]:
def recommend(user, num):
    prediction = recomender_model.recommendProducts(user,num)
    predictionRDD = sc.parallelize(prediction)
    prediction_pairs = predictionRDD.map(lambda x: (str(x[1]), (x[0], x[2])))
    prediction_names = prediction_pairs.join(movies).collect()
    return {e[1][1]:e[1][0][1] for e in prediction_names}

In [9]:
recommend(1, 10)

{'Talk of Angels (1998)': 10.472273155187883,
 "I Can't Sleep (J'ai pas sommeil) (1994)": 8.01706263289887,
 'Zachariah (1971)': 8.015633572187824,
 'Mamma Roma (1962)': 7.056467989560282,
 'Song of Freedom (1936)': 12.499018307161833,
 'Criminal Lovers (Les Amants Criminels) (1999)': 9.745981729136162,
 'Institute Benjamenta, or This Dream People Call Human Life (1995)': 9.139884138149768,
 'Identification of a Woman (Identificazione di una donna) (1982)': 8.857223813556274,
 'Chain of Fools (2000)': 7.499411227311015,
 'First Love, Last Rites (1997)': 6.516970844364067}

In [10]:
# model evaluation
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = recomender_model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))

Mean Squared Error = 0.0


## Sentiment analysis

In [11]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.sql import SQLContext, Row, DataFrame

In [12]:
sqlContext = SQLContext(sc)

In [30]:
text = sc.textFile("file:/home/bego/Documents/curso_spark/datasets/ml/sentimiento.txt").map(lambda x: x.split(";")).map(lambda x: (x[0], float(x[1].replace('neg', '0.0').replace('pos','1.0'))))
text_df = sqlContext.createDataFrame(text, ["text", "label"])
text_df.show(5)

+-----------+-----+
|       text|label|
+-----------+-----+
|     a�icos|  0.0|
| abandonada|  0.0|
|abandonadas|  0.0|
| abandonado|  0.0|
|abandonados|  0.0|
+-----------+-----+
only showing top 5 rows



In [31]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") # important to have a column named 'features' and a column named 'label', otherwise specify in LogisticRegression
lr = LogisticRegression() # LogisticRegression(featuresCol='features', labelCol='label', predictionCol='prediction')
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(text_df)

In [44]:
test = sqlContext.createDataFrame([
    (4, "En el restaurante Ginos hacen buenos platos"),
    (5, "Pobres indefensos animales"),
    (6, "Me pedi una pizza en el telepizza y estaba fria"),
    (7, "Estoy muy motivado gracias a este curso")], ["id", "text"])

prediction = model.transform(test)

prediction["id", "text", "prediction"].show()

selected = prediction.select("id", "text", "prediction")
for row in selected.collect():
    print(row)

+---+--------------------+----------+
| id|                text|prediction|
+---+--------------------+----------+
|  4|En el restaurante...|       1.0|
|  5|Pobres indefensos...|       0.0|
|  6|Me pedi una pizza...|       0.0|
|  7|Estoy muy motivad...|       1.0|
+---+--------------------+----------+

Row(id=4, text='En el restaurante Ginos hacen buenos platos', prediction=1.0)
Row(id=5, text='Pobres indefensos animales', prediction=0.0)
Row(id=6, text='Me pedi una pizza en el telepizza y estaba fria', prediction=0.0)
Row(id=7, text='Estoy muy motivado gracias a este curso', prediction=1.0)
