In [None]:
# Install Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download Spark
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

# Install PySpark helper
!pip install -q findspark pyspark


In [None]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"


In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("BookRecommenderALS") \
    .master("local[*]") \
    .getOrCreate()

spark


### Setup PySpark Environment

PySpark digunakan untuk membangun model collaborative filtering
karena mampu menangani dataset berukuran besar secara terdistribusi.


In [None]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!ls "/content/drive/MyDrive/Colab Notebooks/BigDataBooks"


books_clean.csv  ratings_clean.csv  users_clean.csv
books.csv	 ratings.csv	    users.csv


In [None]:
import pandas as pd

ratings = pd.read_csv(
    "/content/drive/MyDrive/Colab Notebooks/BigDataBooks/ratings_clean.csv"
)

ratings.head()


Unnamed: 0.1,Unnamed: 0,User-ID,ISBN,Book-Rating
0,0,276725,034545104X,0
1,1,276726,0155061224,5
2,2,276727,0446520802,0
3,3,276729,052165615X,3
4,4,276729,0521795028,6


In [None]:
ratings['user_idx'] = ratings['User-ID'].astype('category').cat.codes
ratings['book_idx'] = ratings['ISBN'].astype('category').cat.codes


In [None]:
ratings['rating_scaled'] = ratings['Book-Rating'] / 10


In [None]:
ratings_als = ratings[['user_idx', 'book_idx', 'rating_scaled']]

ratings_als.to_csv("/content/ratings_als.csv", index=False)


In [None]:
from sklearn.model_selection import train_test_split

train, test = train_test_split(
    ratings,
    test_size=0.2,
    random_state=42
)

train.to_csv('train_ratings.csv', index=False)
test.to_csv('test_ratings.csv', index=False)


In [None]:
!ls


drive				 spark-3.1.2-bin-hadoop2.7.tgz.3
ratings_als.csv			 spark-3.1.2-bin-hadoop2.7.tgz.4
sample_data			 spark-3.1.2-bin-hadoop2.7.tgz.5
spark-3.1.2-bin-hadoop2.7	 spark-3.1.2-bin-hadoop2.7.tgz.6
spark-3.1.2-bin-hadoop2.7.tgz	 test_ratings.csv
spark-3.1.2-bin-hadoop2.7.tgz.1  train_ratings.csv
spark-3.1.2-bin-hadoop2.7.tgz.2


In [None]:
train_spark = spark.read.csv(
    'train_ratings.csv',
    header=True,
    inferSchema=True
)

test_spark = spark.read.csv(
    'test_ratings.csv',
    header=True,
    inferSchema=True
)

train_spark.show(5)
train_spark.printSchema()


+----------+-------+----------+-----------+--------+--------+-------------+
|Unnamed: 0|User-ID|      ISBN|Book-Rating|user_idx|book_idx|rating_scaled|
+----------+-------+----------+-----------+--------+--------+-------------+
|    686565| 167349|0446611239|          0|   63068|  108387|          0.0|
|     62456|  12576|0140105832|          0|    4516|   21407|          0.0|
|   1122931| 269566|0394540654|          7|  101831|   86132|          0.7|
|    636841| 153662|0803754051|          5|   58113|  202013|          0.5|
|    878589| 212898|0373093764|          0|   80185|   60802|          0.0|
+----------+-------+----------+-----------+--------+--------+-------------+
only showing top 5 rows

root
 |-- Unnamed: 0: integer (nullable = true)
 |-- User-ID: integer (nullable = true)
 |-- ISBN: string (nullable = true)
 |-- Book-Rating: integer (nullable = true)
 |-- user_idx: integer (nullable = true)
 |-- book_idx: integer (nullable = true)
 |-- rating_scaled: double (nullable = tr

### Load Dataset ke PySpark

Dataset hasil preprocessing dimuat ke dalam DataFrame PySpark
untuk proses pelatihan dan evaluasi model ALS.


In [None]:
from pyspark.ml.recommendation import ALS

als = ALS(
    userCol="user_idx",
    itemCol="book_idx",
    ratingCol="rating_scaled",
    rank=10,
    maxIter=10,
    regParam=0.1,
    nonnegative=True,
    coldStartStrategy="drop"
)

model = als.fit(train_spark)


### Pembangunan Model ALS

Model Collaborative Filtering dibangun menggunakan algoritma
Alternating Least Squares (ALS) yang efektif untuk data sparse.


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

predictions = model.transform(test_spark)

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating_scaled",
    predictionCol="prediction"
)

rmse = evaluator.evaluate(predictions)
rmse


0.37315440098743025

### Evaluasi Model

Model dievaluasi menggunakan Root Mean Square Error (RMSE).
Nilai RMSE yang lebih kecil menunjukkan prediksi yang lebih akurat.


In [None]:
from pyspark.sql.functions import explode, col

user_recs_flat = user_recs \
    .withColumn("rec", explode("recommendations")) \
    .select(
        col("user_idx"),
        col("rec.book_idx").alias("book_idx"),
        col("rec.rating").alias("predicted_rating")
    )


In [None]:
user_recs_limited = user_recs_flat.limit(5000)

In [None]:
user_recs_limited.write.mode("overwrite").csv(
    "/content/drive/MyDrive/Colab Notebooks/BigDataBooks/user_recommendations",
    header=True
)


### Hasil Rekomendasi

Model menghasilkan rekomendasi Top-10 buku untuk setiap pengguna
berdasarkan pola rating yang telah dipelajari.


In [None]:
user_recs_pd = user_recs.toPandas()

SAVE_PATH = '/content/drive/MyDrive/Colab Notebooks/BigDataBooks/'

user_recs_pd.to_csv(
    SAVE_PATH + 'user_recommendations.csv',
    index=False
)

user_recs_pd.head()


Unnamed: 0,user_idx,recommendations
0,148,"[(264004, 0.9522560834884644), (83345, 0.95225..."
1,463,"[(264282, 1.5774879455566406), (310380, 1.5774..."
2,471,"[(244915, 1.0643168687820435), (244256, 1.0643..."
3,496,"[(98983, 1.5785084962844849), (179453, 1.54676..."
4,833,"[(217595, 0.9132230281829834), (155220, 0.8909..."


### Penyimpanan Hasil Rekomendasi

Hasil rekomendasi disimpan dalam format CSV
untuk digunakan pada aplikasi web recommender system.


In [None]:
SAVE_PATH = '/content/drive/MyDrive/Colab Notebooks/BigDataBooks/'

books_clean.to_csv(SAVE_PATH + 'books_clean.csv', index=False)
users_clean.to_csv(SAVE_PATH + 'users_clean.csv', index=False)
ratings_clean.to_csv(SAVE_PATH + 'ratings_clean.csv', index=False)
