In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt
import seaborn as sns
import sklearn
import random
import os

import findspark
findspark.init()

from pyspark.sql import SparkSession 
from pyspark.ml  import Pipeline     
from pyspark.sql import SQLContext  
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer

In [2]:
url = 'https://raw.githubusercontent.com/alextanhongpin/machine-learning-with-pyspark/master/07_recommender_system/movie_ratings_df.csv'
csv = pd.read_csv(url, encoding="ISO-8859-1")

In [4]:
csv.head(3)

Unnamed: 0,userId,title,rating
0,196,Kolya (1996),3
1,63,Kolya (1996),3
2,226,Kolya (1996),5


In [5]:
csv.to_csv(r"C:\Users\gonza\Repositorios\TestGit\movie_ratings_df.csv",index=False)

### Movie Recommendation with Pyspark 

In [42]:
# create sparksession
spark = SparkSession.builder.appName('recomendation').master("local[5]").getOrCreate()

In [7]:
url = r"C:\Users\gonza\Repositorios\TestGit\movie_ratings_df.csv"

df = spark.read.csv(url,inferSchema=True,header=True)
df.limit(3).toPandas()

Unnamed: 0,userId,title,rating
0,196,Kolya (1996),3
1,63,Kolya (1996),3
2,226,Kolya (1996),5


In [8]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- rating: integer (nullable = true)



As we can see, the title column is stored as string type. To work with pyspark Mlib library, we need to convert string type to numeric values

In [9]:
from pyspark.ml.feature import StringIndexer,IndexToString

stringIndexer = StringIndexer(inputCol='title',outputCol='title_new')
# Applying stringindexer object on dataframe movie title column
model = stringIndexer.fit(df)
#creating new dataframe with transformed values
indexed = model.transform(df)
indexed.limit(5).toPandas()

Unnamed: 0,userId,title,rating,title_new
0,196,Kolya (1996),3,287.0
1,63,Kolya (1996),3,287.0
2,226,Kolya (1996),5,287.0
3,154,Kolya (1996),3,287.0
4,306,Kolya (1996),5,287.0


We use Alternating least squares (ALS) algorithm in Pyspark Ml library for recommendation.

In [10]:
# split the data
train, test = indexed.randomSplit([0.75,0.25])

# training the model using train dataset
from pyspark.ml.recommendation import ALS
rec = ALS(maxIter=10,
            regParam=0.01,
            userCol='userId',
            itemCol='title_new',
            ratingCol='rating',
            nonnegative=True,
            coldStartStrategy='drop')
# fit the model on training set
rec_model = rec.fit(train)
# making prediction on test set
predicted_ratings = rec_model.transform(test)
predicted_ratings.limit(5).toPandas()

Unnamed: 0,userId,title,rating,title_new,prediction
0,85,Much Ado About Nothing (1993),4,148.0,3.674069
1,588,Much Ado About Nothing (1993),5,148.0,3.894752
2,916,Much Ado About Nothing (1993),4,148.0,3.853631
3,253,Much Ado About Nothing (1993),4,148.0,3.733635
4,409,Much Ado About Nothing (1993),3,148.0,3.455377


In [12]:
# importing regression evaluator to measure rmse
from pyspark.ml.evaluation import RegressionEvaluator
# creating regresion evaluator object for measuring accuracy
evaluator = RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')
# apply the RE on prediction dataframe to calculate RMSE
rmse = evaluator.evaluate(predicted_ratings)
print(rmse)

1.0221254544133624


### After training, now is the time to recommend top movies which user might like

In [15]:
# first create datase with all unique movies
unique_movies = indexed.select('title_new').distinct()

# create a function to recomended top "n" movies to any particular users
def top_movies(user_id,n):
    # assing alias name "a" to unique movies
    a = unique_movies.alias('a')
    # creating another dataframe which contains already watched movies by active user
    watched_movies = indexed.filter(indexed['userId'] == user_id).select('title_new')
    # assigning alias 
    b = watched_movies.alias('b')
    # joining both tables on left join
    total_movies = a.join(b,a.title_new==b.title_new,how='left')
    # selecting movies which active user is yet to rate o watch
    remaining_movies = total_movies.where(col('b.title_new').isNull()).select(a.title_new).distinct()
    # adding new column of user_id
    remaining_movies = remaining_movies.withColumn("userId",lit(int(user_id)))
    # making recomendation
    recommendations = rec_model.transform(remaining_movies).orderBy("prediction",ascending=False).limit(n)
    # adding movie title
    movie_title = IndexToString(inputCol="title_new",outputCol="title",labels=model.labels)
    final_recomendation = movie_title.transform(recommendations)

    return final_recomendation.show(n,False)


In [17]:
top_movies(60,5)

+---------+------+----------+-----------------------------+
|title_new|userId|prediction|title                        |
+---------+------+----------+-----------------------------+
|681.0    |60    |6.055451  |Kundun (1997)                |
|1277.0   |60    |6.044452  |Mina Tannenbaum (1994)       |
|993.0    |60    |6.0030537 |In the Bleak Midwinter (1995)|
|1288.0   |60    |5.9719954 |Whole Wide World, The (1996) |
|835.0    |60    |5.956031  |Vanya on 42nd Street (1994)  |
+---------+------+----------+-----------------------------+



In [18]:
 spark.stop()

### pyspark examples

In [24]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.functions import *

dataDF = [(('James','','Smith'),'1991-04-01','M',3000),
  (('Michael','Rose',''),'2000-05-19','M',4000),
  (('Robert','','Williams'),'1978-09-05','M',4000),
  (('Maria','Anne','Jones'),'1967-12-01','F',4000),
  (('Jen','Mary','Brown'),'1980-02-17','F',-1)
]

schema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('dob', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('gender', IntegerType(), True)
         ])

In [26]:
# parallelize
df = spark.createDataFrame(data= dataDF,schema=schema)
df.printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- gender: integer (nullable = true)



In [27]:
# cambiar nombre
df.withColumnRenamed("dob","date_of_birth").printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- date_of_birth: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- gender: integer (nullable = true)



In [28]:
new_col = ["newCol1","newCol2","newCol3","newCol4"]
df.toDF(*new_col).printSchema()

root
 |-- newCol1: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- newCol2: string (nullable = true)
 |-- newCol3: string (nullable = true)
 |-- newCol4: integer (nullable = true)



In [29]:
# with column
nuevas_col = ["nombre","nacimiento","genero","salario"]
df3 = df.toDF(*nuevas_col)
df3.printSchema()

root
 |-- nombre: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- nacimiento: string (nullable = true)
 |-- genero: string (nullable = true)
 |-- salario: integer (nullable = true)



In [30]:
df4 = df3.select(col("genero"),col("salario"),lit('usa').alias("pais"))
df4.show(truncate=False)

+------+-------+----+
|genero|salario|pais|
+------+-------+----+
|M     |3000   |usa |
|M     |4000   |usa |
|M     |4000   |usa |
|F     |4000   |usa |
|F     |-1     |usa |
+------+-------+----+



In [31]:
from pyspark.sql.functions import when
df5 = df4.withColumn("tipo_salario",when(col("salario") >= 4000,lit("grande")).otherwise(lit("pequeño")))
df5.show(truncate=False)

+------+-------+----+------------+
|genero|salario|pais|tipo_salario|
+------+-------+----+------------+
|M     |3000   |usa |pequeño     |
|M     |4000   |usa |grande      |
|M     |4000   |usa |grande      |
|F     |4000   |usa |grande      |
|F     |-1     |usa |pequeño     |
+------+-------+----+------------+



In [34]:
df5.filter(df5.genero == "M").show(truncate=False)

+------+-------+----+------------+
|genero|salario|pais|tipo_salario|
+------+-------+----+------------+
|M     |3000   |usa |pequeño     |
|M     |4000   |usa |grande      |
|M     |4000   |usa |grande      |
+------+-------+----+------------+



In [35]:
spark.stop()

### order by and sort explained

In [43]:
simpleData = [("James","Sales","NY",90000,34,10000), \
    ("Michael","Sales","NY",86000,56,20000), \
    ("Robert","Sales","CA",81000,30,23000), \
    ("Maria","Finance","CA",90000,24,23000), \
    ("Raman","Finance","CA",99000,40,24000), \
    ("Scott","Finance","NY",83000,36,19000), \
    ("Jen","Finance","NY",79000,53,15000), \
    ("Jeff","Marketing","CA",80000,25,18000), \
    ("Kumar","Marketing","NY",91000,50,21000) \
  ]
columns= ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- state: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
+-------------+----------+-----+------+---+-----+



In [44]:
df.sort("department","state").show(truncate=False)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|Maria        |Finance   |CA   |90000 |24 |23000|
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|James        |Sales     |NY   |90000 |34 |10000|
+-------------+----------+-----+------+---+-----+



In [47]:
df.createOrReplaceTempView("EMP")
spark.sql("select employee_name,department,state,salary,age,bonus from EMP ORDER BY department asc").show(truncate=False)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|Raman        |Finance   |CA   |99000 |40 |24000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Robert       |Sales     |CA   |81000 |30 |23000|
+-------------+----------+-----+------+---+-----+



In [51]:
spark.sql(""" select department,round(avg(salary),2) as media from EMP GROUP BY 1 ORDER BY 2 DESC """).show(truncate=False)

+----------+--------+
|department|media   |
+----------+--------+
|Finance   |87750.0 |
|Sales     |85666.67|
|Marketing |85500.0 |
+----------+--------+



In [41]:
spark.stop()