In [1]:
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)

In [2]:
#import data 
data = sc.textFile("file:///home/albatrosx/Downloads/ml-100k/u.data")

In [3]:
#split the data 
data= data.map(lambda x:x.split("\t"))

In [4]:
#display first 6 rows from the data 
data.take(6)

[['196', '242', '3', '881250949'],
 ['186', '302', '3', '891717742'],
 ['22', '377', '1', '878887116'],
 ['244', '51', '2', '880606923'],
 ['166', '346', '1', '886397596'],
 ['298', '474', '4', '884182806']]

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession(sc)

In [6]:
hasattr(data, "toDF")

True

In [7]:
#header list
header_Rdd=['user_id','movies_id','rating','timsemp']

In [8]:
#creat spark dataframe 
df = data.toDF(header_Rdd)

In [9]:
#change colomn type from String to int
from pyspark.sql.types import IntegerType,TimestampType
from pyspark.sql.functions import col
df = df.withColumn("movies_id",col("movies_id").cast("integer"))
df = df.withColumn("user_id",col("user_id").cast("integer"))
df = df.withColumn("rating",col("rating").cast("integer"))
df = df.withColumn("timsemp",col("timsemp").cast("integer"))

In [10]:
#take the movies who have a good raiting from users
df_f=df.filter((df.rating>=3))

In [11]:
#create items data 
from pyspark.sql.functions import collect_list
items=df_f.groupBy(df_f.user_id).agg(collect_list('movies_id'))

In [12]:
#import the association rules algorithm 
from pyspark.ml.fpm import FPGrowth

In [13]:
#build the model
fpGrowth = FPGrowth(itemsCol="collect_list(movies_id)", minSupport=0.2, minConfidence=0.6)


In [14]:
#fit the model 
model = fpGrowth.fit(items)

In [15]:
#display generated rules
model.associationRules.show()

+-------------+----------+------------------+------------------+
|   antecedent|consequent|        confidence|              lift|
+-------------+----------+------------------+------------------+
|        [121]|     [181]|0.7731343283582089|1.5316505706760315|
|        [121]|     [174]| 0.608955223880597|1.4249250027776748|
|        [121]|       [1]|0.7223880597014926| 1.633601775296181|
|        [121]|     [100]|0.6388059701492538|  1.26553367615703|
|        [121]|      [50]|0.8149253731343283| 1.377194671802279|
|        [121]|     [117]|0.6865671641791045|1.9500989030749865|
|        [121]|     [222]|0.6119402985074627|1.7593283582089552|
|        [121]|     [405]|0.6388059701492538| 2.174707688991864|
|        [405]|     [117]|0.7111913357400722|2.0200404506111087|
|        [405]|     [181]| 0.740072202166065|1.4661514425264688|
|        [405]|       [1]|0.7436823104693141|1.6817563999342042|
|        [405]|     [121]|0.7725631768953068|2.1747076889918637|
|        [405]|      [50]

In [16]:
model.associationRules.count()

1298

In [17]:
#impork kmeans model 
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [18]:
#import the useer data 
user = sc.textFile("file:///home/albatrosx/Downloads/ml-100k/u.user")
user= user.map(lambda x:x.split("|"))

In [19]:
user.take(5)

[['1', '24', 'M', 'technician', '85711'],
 ['2', '53', 'F', 'other', '94043'],
 ['3', '23', 'M', 'writer', '32067'],
 ['4', '24', 'M', 'technician', '43537'],
 ['5', '33', 'F', 'other', '15213']]

In [20]:
header=['user_id','age','gender','occupation','zip code']

In [21]:
user=user.toDF(header)

In [22]:
user

DataFrame[user_id: string, age: string, gender: string, occupation: string, zip code: string]

In [23]:
user = user.withColumn("user_id",col("user_id").cast("integer"))
user = user.withColumn("age",col("age").cast("integer"))
user = user.withColumn("zip code",col("zip code").cast("integer"))

In [24]:
user

DataFrame[user_id: int, age: int, gender: string, occupation: string, zip code: int]

In [25]:
import pyspark.sql.functions as F

In [26]:
types = user.select("gender").distinct().rdd.flatMap(lambda x: x).collect()
codes = user.select("occupation").distinct().rdd.flatMap(lambda x: x).collect()
types_expr = [F.when(F.col("gender") == ty, 1).otherwise(0).alias("gender_" + ty) for ty in types]
codes_expr = [F.when(F.col("occupation") == code, 1).otherwise(0).alias("occupation_" + code) for code in codes]
user = user.select('user_id','age','gender','occupation','zip code', *types_expr+codes_expr)


In [27]:
f=user.schema.names.copy()

In [28]:
f.remove('zip code')
f.remove('user_id')
f.remove('gender')
f.remove('occupation')

In [29]:
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=f, outputCol="features")
new_df = vecAssembler.transform(user)
new_df.show()

+-------+---+------+-------------+--------+--------+--------+--------------------+------------------+-----------------+---------------+-----------------+---------------------+--------------------+----------------+--------------------+--------------------+------------------+-------------------+-----------------+---------------------+------------------------+-------------------+---------------------+-------------------+------------------------+--------------------+-----------------+--------------------+
|user_id|age|gender|   occupation|zip code|gender_F|gender_M|occupation_librarian|occupation_retired|occupation_lawyer|occupation_none|occupation_writer|occupation_programmer|occupation_marketing|occupation_other|occupation_executive|occupation_scientist|occupation_student|occupation_salesman|occupation_artist|occupation_technician|occupation_administrator|occupation_engineer|occupation_healthcare|occupation_educator|occupation_entertainment|occupation_homemaker|occupation_doctor|        

In [30]:
kmeans = KMeans().setK(30).setSeed(1)

In [42]:
#kmeans = KMeans().setK(42).setSeed(1)
kmeans=KMeans(
    featuresCol=vecAssembler.getOutputCol(), 
    predictionCol="cluster", k=5)
model = kmeans.fit(new_df.select("features"))
prediction = model.transform(new_df.select("features"))
evaluator = ClusteringEvaluator(predictionCol="cluster",featuresCol="features")
Silhouette = evaluator.evaluate(prediction)
print("Silhouette with squared euclidean distance = " + str(Silhouette))

Silhouette with squared euclidean distance = 0.6705895730157284


In [46]:
centers = model.clusterCenters()#centers
print(centers)

[array([3.38537415e+01, 2.89115646e-01, 7.10884354e-01, 7.48299320e-02,
       0.00000000e+00, 1.70068027e-02, 1.02040816e-02, 4.42176871e-02,
       8.84353741e-02, 4.42176871e-02, 1.22448980e-01, 4.42176871e-02,
       5.78231293e-02, 6.80272109e-02, 2.04081633e-02, 3.74149660e-02,
       2.72108844e-02, 9.52380952e-02, 9.18367347e-02, 1.02040816e-02,
       1.19047619e-01, 1.70068027e-02, 6.80272109e-03, 3.40136054e-03]), array([2.26737401e+01, 2.78514589e-01, 7.21485411e-01, 1.85676393e-02,
       0.00000000e+00, 7.95755968e-03, 1.32625995e-02, 3.97877984e-02,
       7.16180371e-02, 1.32625995e-02, 9.81432361e-02, 1.59151194e-02,
       1.85676393e-02, 4.64190981e-01, 7.95755968e-03, 3.18302387e-02,
       3.18302387e-02, 4.50928382e-02, 5.03978780e-02, 7.95755968e-03,
       2.65251989e-02, 2.65251989e-02, 7.95755968e-03, 2.65251989e-03]), array([6.62631579e+01, 1.05263158e-01, 8.94736842e-01, 5.26315789e-02,
       3.68421053e-01, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
 

In [44]:
prediction = model.transform(new_df)

In [45]:
prediction.groupBy("cluster").count().orderBy("cluster").show()#count members in each cluster
prediction.select('user_id', 'cluster').show(5)#show several clustered data

+-------+-----+
|cluster|count|
+-------+-----+
|      0|  294|
|      1|  377|
|      2|   19|
|      3|  187|
|      4|   66|
+-------+-----+

+-------+-------+
|user_id|cluster|
+-------+-------+
|      1|      1|
|      2|      4|
|      3|      1|
|      4|      1|
|      5|      0|
+-------+-------+
only showing top 5 rows



In [47]:
#import training and test data 
trainset_movies=sc.textFile("file:///home/albatrosx/Downloads/ml-100k/u1.base")
testset_movies=sc.textFile("file:///home/albatrosx/Downloads/ml-100k/u1.test")
trainset_movies= trainset_movies.map(lambda x:x.split("\t"))
testset_movies=testset_movies.map(lambda x:x.split("\t"))

In [51]:
#dispaly the first 6 rows 
trainset_movies.take(6)

[['1', '1', '5', '874965758'],
 ['1', '2', '3', '876893171'],
 ['1', '3', '4', '878542960'],
 ['1', '4', '3', '876893119'],
 ['1', '5', '3', '889751712'],
 ['1', '7', '4', '875071561']]

In [49]:
testset_movies.take(6)

[['1', '6', '5', '887431973'],
 ['1', '10', '3', '875693118'],
 ['1', '12', '5', '878542960'],
 ['1', '14', '5', '874965706'],
 ['1', '17', '3', '875073198'],
 ['1', '20', '4', '887431883']]

In [52]:
header_Rdd=['user_id','movies_id','rating','timsemp']

In [53]:
training_df = trainset_movies.toDF(header_Rdd)

In [54]:
training_df.head(5)

[Row(user_id='1', movies_id='1', rating='5', timsemp='874965758'),
 Row(user_id='1', movies_id='2', rating='3', timsemp='876893171'),
 Row(user_id='1', movies_id='3', rating='4', timsemp='878542960'),
 Row(user_id='1', movies_id='4', rating='3', timsemp='876893119'),
 Row(user_id='1', movies_id='5', rating='3', timsemp='889751712')]

In [55]:
test_df = testset_movies.toDF(header_Rdd)

In [56]:
test_df

DataFrame[user_id: string, movies_id: string, rating: string, timsemp: string]

In [57]:
#change column  type from string to int 
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col
training_df = training_df.withColumn("movies_id",col("movies_id").cast("integer"))
training_df = training_df.withColumn("user_id",col("user_id").cast("integer"))
training_df = training_df.withColumn("rating",col("rating").cast("integer"))
training_df = training_df.withColumn("timsemp",col("timsemp").cast("integer"))

In [58]:
test_df = test_df.withColumn("movies_id",col("movies_id").cast("integer"))
test_df = test_df.withColumn("user_id",col("user_id").cast("integer"))
test_df = test_df.withColumn("rating",col("rating").cast("integer"))
test_df = test_df.withColumn("timsemp",col("timsemp").cast("integer"))

In [59]:
from pyspark.ml.recommendation import ALS

In [60]:
#build the als model 
als = ALS(maxIter=20, regParam=0.2, userCol="user_id", 
          itemCol="movies_id", ratingCol="rating")

In [66]:
#drop missing values 
training_df=training_df.na.drop()
test_df=test_df.na.drop()

In [67]:
model = als.fit(training_df)

In [68]:
prediction = model.transform(test_df)

In [69]:
prediction.take(1)

[Row(user_id=251, movies_id=148, rating=2, timsemp=886272547, prediction=3.178914785385132)]

In [71]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(
    labelCol="rating", predictionCol="prediction", metricName="rmse")
prediction=prediction.na.drop()
rmse = evaluator.evaluate(prediction)
rmse

0.9444044474429493