# Working with a file & building Recommender System

In [86]:
from pyspark import SparkContext, SparkConf
sc = SparkContext.getOrCreate()   #here, sc is a spark context which is nothing but a handle to run spark functions and spark codes
import pyspark as pys
from pyspark.sql import SparkSession
pys = SparkSession.builder.appName("Python Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()
import numpy as np
#above line is a way to start spark session

In [2]:
from pyspark import Row   #Row can be used to create a row object by using named arguments, the fields will be sorted by names.

In [3]:
#download data from: https://www.dcc.fc.up.pt/~ltorgo/Regression/cal_housing.html
rdd = sc.textFile("./CaliforniaHousing/cal_housing.data")

In [4]:
rdd.count()

20640

In [5]:
rdd.take(2)

['-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000',
 '-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000']

In [6]:
type(rdd.take(2)[0])

str

In [7]:
a = rdd.take(20640)
type(a)

list

In [8]:
def func(line):
    return line.split(",")

In [9]:
func(a[0])

['-122.230000',
 '37.880000',
 '41.000000',
 '880.000000',
 '129.000000',
 '322.000000',
 '126.000000',
 '8.325200',
 '452600.000000']

In [10]:
df_break = rdd.map(func)

In [11]:
df_break.take(2)

[['-122.230000',
  '37.880000',
  '41.000000',
  '880.000000',
  '129.000000',
  '322.000000',
  '126.000000',
  '8.325200',
  '452600.000000'],
 ['-122.220000',
  '37.860000',
  '21.000000',
  '7099.000000',
  '1106.000000',
  '2401.000000',
  '1138.000000',
  '8.301400',
  '358500.000000']]

In [12]:
# The file contains all the the variables. Specifically, it contains median house value, med ian income, housing median age, 
# total rooms, total bedrooms, population, households, latitude, and longitude in that order.
df = df_break.map(lambda line: Row(MedianHouseValue = line[0], 
                                   MedianIncome = line[1], 
                                   HousingMedianAge = line[2],
                                   TotalRooms = line[3],
                                   TotalBedrooms = line[4],
                                   Population = line[5],
                                   Households = line[6],
                                   Latitude = line[7],
                                   Longitude = line[8])).toDF()

In [13]:
df.columns

['Households',
 'HousingMedianAge',
 'Latitude',
 'Longitude',
 'MedianHouseValue',
 'MedianIncome',
 'Population',
 'TotalBedrooms',
 'TotalRooms']

In [14]:
df.show()

+-----------+----------------+--------+-------------+----------------+------------+-----------+-------------+-----------+
| Households|HousingMedianAge|Latitude|    Longitude|MedianHouseValue|MedianIncome| Population|TotalBedrooms| TotalRooms|
+-----------+----------------+--------+-------------+----------------+------------+-----------+-------------+-----------+
| 126.000000|       41.000000|8.325200|452600.000000|     -122.230000|   37.880000| 322.000000|   129.000000| 880.000000|
|1138.000000|       21.000000|8.301400|358500.000000|     -122.220000|   37.860000|2401.000000|  1106.000000|7099.000000|
| 177.000000|       52.000000|7.257400|352100.000000|     -122.240000|   37.850000| 496.000000|   190.000000|1467.000000|
| 219.000000|       52.000000|5.643100|341300.000000|     -122.250000|   37.850000| 558.000000|   235.000000|1274.000000|
| 259.000000|       52.000000|3.846200|342200.000000|     -122.250000|   37.850000| 565.000000|   280.000000|1627.000000|
| 193.000000|       52.0

In [31]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

In [15]:
data = sc.textFile("./Data/test.data")
data.take(5)

['1,1,5.0', '1,2,1.0', '1,3,5.0', '1,4,1.0', '2,1,5.0']

In [16]:
type(data)

pyspark.rdd.RDD

In [17]:
data = data.map(lambda line: line.split(","))

In [19]:
data.take(5)

[['1', '1', '5.0'],
 ['1', '2', '1.0'],
 ['1', '3', '5.0'],
 ['1', '4', '1.0'],
 ['2', '1', '5.0']]

In [42]:
ratings = data.map(lambda line: Rating(int(line[0]), int(line[1]), float(line[2])))

In [43]:
ratings.take(5)

[Rating(user=1, product=1, rating=5.0),
 Rating(user=1, product=2, rating=1.0),
 Rating(user=1, product=3, rating=5.0),
 Rating(user=1, product=4, rating=1.0),
 Rating(user=2, product=1, rating=5.0)]

In [96]:
rank = 25  #you can give here any number in the range 10 to 50. Rank is the number of features to use (also referred to as the number of latent factors).
numIterations = 10 
model = ALS.train(ratings, rank, numIterations)

In [97]:
testdata = ratings.map(lambda p: (p[0], p[1]))

In [98]:
testdata.take(5)

[(1, 1), (1, 2), (1, 3), (1, 4), (2, 1)]

In [99]:
predictions = model.predictAll(testdata)
predictions.take(5)

[Rating(user=4, product=4, rating=4.9964969712683445),
 Rating(user=4, product=1, rating=1.0009851367426241),
 Rating(user=4, product=2, rating=4.9964969712683445),
 Rating(user=4, product=3, rating=1.0009851367426241),
 Rating(user=1, product=4, rating=1.0010428930721067)]

In [78]:
predictions = predictions.map(lambda l: ((l[0], l[1]), l[2]))
predictions.take(5)

[((4, 4), 4.9968399937803305),
 ((4, 1), 1.001197134336252),
 ((4, 2), 4.9968399937803305),
 ((4, 3), 1.001197134336252),
 ((1, 4), 1.00136997659397)]

In [79]:
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2]))

In [80]:
ratesAndPreds.take(5)

[((1, 1), 5.0), ((1, 2), 1.0), ((1, 3), 5.0), ((1, 4), 1.0), ((2, 1), 5.0)]

In [81]:
ratesAndPreds = ratesAndPreds.join(predictions)
ratesAndPreds.take(5)

[((1, 3), (5.0, 4.995976405087321)),
 ((1, 4), (1.0, 1.00136997659397)),
 ((3, 2), (5.0, 4.9968399937803305)),
 ((2, 2), (1.0, 1.00136997659397)),
 ((2, 4), (1.0, 1.00136997659397))]

In [91]:
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))

Mean Squared Error = 7.371230454186882e-06
