<a href="https://colab.research.google.com/github/AnnisaFitry/Tugas7-BigData/blob/main/Tugas7_BD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

slide 30

In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=94bf9efbf18e2b6a63c25dc81a8bb0db9a8ee6ed2b7377ff987dfdb2e8a88639
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [3]:
from google.colab import drive

# Accessing My Google Drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col

In [7]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

class Rating:
    def __init__(self, userId, movieId, rating, timestamp):
        self.userId = int(userId)
        self.movieId = int(movieId)
        self.rating = float(rating)
        self.timestamp = long(timestamp)

def parseRating(str):
    fields = str.split(",")
    assert(len(fields) == 4)
    return Rating(int(fields[0]), int(fields[1]), float(fields[2]), int(fields[3]))

In [8]:
# Test it
parseRating("1,1193,5,978300760")

NameError: ignored

In [9]:
ratings = spark.read.csv("/content/drive/MyDrive/Tugas7-Bigdata/ml-latest-small/ratings.csv", header=True, inferSchema=True)
# Check if everything is ok
ratings.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [10]:
training, test = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Alternating Least Squares (ALS) matrix factorization.
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")

model = als.fit(training)
model.write().overwrite().save("mymodel")

# Prepare the recommendations
predictions = model.transform(test)
squaredErrors = predictions.withColumn("squaredError", (col("rating") - col("prediction")) ** 2)
mse = squaredErrors.filter(~col("squaredError").isNull()).selectExpr("sum(squaredError) as sumSquaredError").collect()[0][0] / squaredErrors.filter(~col("squaredError").isNull()).count()

In [11]:
predictions.show(10)

+------+-------+------+---------+----------+
|userId|movieId|rating|timestamp|prediction|
+------+-------+------+---------+----------+
|     1|   1025|   5.0|964982791| 4.1591964|
|     1|     47|   5.0|964983815| 4.8560295|
|     1|    780|   3.0|964984086| 4.3560705|
|     1|    661|   5.0|964982838| 3.0809762|
|     1|     50|   5.0|964982931| 4.4966793|
|     1|     70|   3.0|964982400|  4.433076|
|     1|    480|   4.0|964982346|  4.473155|
|     1|   1023|   5.0|964982681| 4.5363245|
|     1|    940|   5.0|964982176| 4.5123997|
|     1|    260|   5.0|964981680|  4.678892|
+------+-------+------+---------+----------+
only showing top 10 rows



In [12]:
predictions.write.csv("ml-predictions.csv", header=True)

slide 48

In [13]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [14]:
lines = spark.read.csv('/content/drive/MyDrive/Tugas7-Bigdata/ml-latest-small/ratings.csv').rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=int(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)

Py4JJavaError: ignored

In [15]:
predictions.show()

+------+-------+------+---------+----------+
|userId|movieId|rating|timestamp|prediction|
+------+-------+------+---------+----------+
|     1|   1025|   5.0|964982791| 4.1591964|
|     1|   1265|   4.0|964983599|  4.297022|
|     1|     47|   5.0|964983815| 4.8560295|
|     1|   1275|   5.0|964982290|  4.979747|
|     1|    780|   3.0|964984086| 4.3560705|
|     1|   1240|   5.0|964983723| 4.4778376|
|     1|   1073|   5.0|964981680|  5.135961|
|     1|   1377|   3.0|964982653|  4.242219|
|     1|   1060|   4.0|964980924| 3.3260522|
|     1|    661|   5.0|964982838| 3.0809762|
|     1|     50|   5.0|964982931| 4.4966793|
|     1|     70|   3.0|964982400|  4.433076|
|     1|    480|   4.0|964982346|  4.473155|
|     1|   1023|   5.0|964982681| 4.5363245|
|     1|    940|   5.0|964982176| 4.5123997|
|     1|   1573|   5.0|964982290| 3.9817665|
|     1|    260|   5.0|964981680|  4.678892|
|     1|   1954|   5.0|964982176| 4.4843493|
|     1|   2005|   5.0|964981710| 4.4846525|
|     1|  

In [16]:
import math
result = predictions.rdd.map(lambda row: row['prediction'] - row['rating']).map(lambda x: x*x).filter(lambda x: not math.isnan(x))
mse = result.reduce(lambda x,y: x+y)

In [17]:
mse

22913.509731572183

slide 52

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

# Load and parse the data
data = spark.read.text("/content/spark/kmeans_data.txt").rdd
parsedData = data.map(lambda row: Vectors.dense([float(x) for x in row[0].split(' ')])).toDF(["features"]).cache()

# Cluster the data into two classes using KMeans
numClusters = 2
numIterations = 20
kmeans = KMeans().setK(numClusters).setMaxIter(numIterations)
model = kmeans.fit(parsedData)

# Evaluate clustering by computing Within Set Sum of Squared Errors
wssse = model.computeCost(parsedData)
print("Within Set Sum of Squared Errors = " + str(wssse))

# Save and load model
model.save("/content/KMeansModel1")
sameModel = KMeansModel.load("/content/KMeansModel1")

slide 53-54

In [None]:
!pip install pyspark

from pyspark.mllib.clustering import KMeans, KMeansModel
from numpy import array
from math import sqrt
from pyspark import SparkContext

sc = SparkContext("local[*]", "KMeans Example")

# Load and parse the data
data = sc.textFile("/content/spark/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

# Build the model (cluster the data)
clusters = KMeans.train(parsedData, k=2, maxIterations=10, runs=10, initializationMode="random")

# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Errors = " + str(WSSSE))

# Save and load model
clusters.save(sc, "myModelPath")
sameModel = KMeansModel.load(sc, "myModelPath")