# Importation de SPARK

In [1]:
# import findspark
# findspark.init()
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Python Spark").getOrCreate()
sc = spark.sparkContext

sc.setSystemProperty('spark.executor.memory', '8g')
sc.setSystemProperty('spark.driver.memory', '45G')

# Sujet
On souhaite faire de la segmentation d'utilisateurs. Pour cela, nous allons procéder en deux étapes :

* Construction d'un profil utilisateur
* Utilisation d'un algorithme de clustering afin de calculer les groupes

Le profil d'un utilisateur est composé de 3 dimensions :
* le nombre de films d'action
* le nombre de films d'aventure
* et le nombre de films d'animation

Une fois ces quantités calculées, proposez un pipeline qui devra notamment :
* normaliser les vecteurs,
* et calculer les groupes (clustering)

Enfin, les utilisateurs 23 et 49 sont-ils dans le meme groupe ? La réponse doit être la requête que vous proposez ainsi que son résultat.

In [2]:
df_ratings = spark.read\
    .option("delimiter", "\t")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .csv('data/u.data')

In [3]:
df_ratings.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: integer (nullable = true)



In [4]:
df_items = spark.read\
    .option("delimiter", "|")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .csv('data/u.item')

In [5]:
df_items.printSchema()

root
 |-- movie_id: integer (nullable = true)
 |-- movie_title: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- video_release_date: string (nullable = true)
 |-- IMDb_url: string (nullable = true)
 |-- unknown: integer (nullable = true)
 |-- action: integer (nullable = true)
 |-- adventure: integer (nullable = true)
 |-- animation: integer (nullable = true)
 |-- children\s: integer (nullable = true)
 |-- comedy: integer (nullable = true)
 |-- crime: integer (nullable = true)
 |-- documentary: integer (nullable = true)
 |-- drama: integer (nullable = true)
 |-- fantasy: integer (nullable = true)
 |-- film-noir: integer (nullable = true)
 |-- horror: integer (nullable = true)
 |-- musical: integer (nullable = true)
 |-- mystery: integer (nullable = true)
 |-- romance: integer (nullable = true)
 |-- sci-fi: integer (nullable = true)
 |-- thriller: integer (nullable = true)
 |-- war: integer (nullable = true)
 |-- western: integer (nullable = true)



In [6]:
import pyspark.sql.functions as func


In [7]:
df_final = df_ratings.join(df_items, df_items["movie_id"] == df_ratings["item_id"]).groupby(df_ratings["user_id"]).agg(func.sum(func.when(df_items["action"] == 1, df_ratings["user_id"])).alias("action"),
                                        func.sum(func.when(df_items["adventure"] == 1, df_ratings["user_id"])).alias("adv"),
                                        func.sum(func.when(df_items["animation"] == 1, df_ratings["user_id"])).alias("anim")
                                        )

In [8]:
df_final.show()

+-------+------+-----+----+
|user_id|action|  adv|anim|
+-------+------+-----+----+
|    148|  1776| 2220|2072|
|    463|  9723| 6945|2315|
|    471|  1413| 3297|7536|
|    496| 15872|11904|3968|
|    833| 59976|24990|2499|
|    243|  1458|  972| 243|
|    392|  7840| 4704|1960|
|    540| 10260| 4320|2700|
|    623|  8722| 3738|null|
|    737|  5896| 2211|1474|
|    858|  5148| 1716|null|
|    897| 51129|31395|9867|
|     31|   155|   93|null|
|    516|  2580| 1548| 516|
|    251|  7781| 4267| 753|
|     85|  3910| 2720| 850|
|    137|  3836| 1918| 137|
|    451|  7667| 3157| 451|
|    580| 12760| 6380| 580|
|    808|  4848|  808|null|
+-------+------+-----+----+
only showing top 20 rows



In [9]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [10]:
assembler = VectorAssembler(
    inputCols=["action", "adv", "anim"],
    outputCol="features")

In [11]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)


In [12]:
from pyspark.ml import Pipeline
#union a mettre dans la pipeline puis fit le kmeans
pipeline = Pipeline(stages=[assembler.setHandleInvalid("skip"), kmeans])

In [13]:
model = pipeline.fit(df_final)

In [14]:
predictions = model.transform(df_final)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("euclidean distance = " + str(silhouette))

euclidean distance = 0.8330579029847824


In [15]:
predictions.select(predictions["user_id"], predictions["prediction"]).where(predictions["user_id"] == 23).show()

+-------+----------+
|user_id|prediction|
+-------+----------+
|     23|         0|
+-------+----------+



In [16]:
predictions.select(predictions["user_id"], predictions["prediction"]).where(predictions["user_id"] == 49).show()

+-------+----------+
|user_id|prediction|
+-------+----------+
|     49|         0|
+-------+----------+

