# Verinin Yüklenmesi & Ön Hazırlık

In [18]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [19]:
spark = SparkSession.builder.appName("RecModel").getOrCreate()

In [20]:
data_path = "/home/projects/bigdata-recommendation-engine/data/"

In [21]:
books_df = (
    spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(f"{data_path}books.csv")
)
ratings_df = (
    spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(f"{data_path}ratings.csv")
)

                                                                                

In [22]:
books_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- book_id: integer (nullable = true)
 |-- best_book_id: integer (nullable = true)
 |-- work_id: integer (nullable = true)
 |-- books_count: integer (nullable = true)
 |-- isbn: string (nullable = true)
 |-- isbn13: double (nullable = true)
 |-- authors: string (nullable = true)
 |-- original_publication_year: double (nullable = true)
 |-- original_title: string (nullable = true)
 |-- title: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- average_rating: string (nullable = true)
 |-- ratings_count: string (nullable = true)
 |-- work_ratings_count: string (nullable = true)
 |-- work_text_reviews_count: string (nullable = true)
 |-- ratings_1: double (nullable = true)
 |-- ratings_2: integer (nullable = true)
 |-- ratings_3: integer (nullable = true)
 |-- ratings_4: integer (nullable = true)
 |-- ratings_5: integer (nullable = true)
 |-- image_url: string (nullable = true)
 |-- small_image_url: string (nullable = true)


In [34]:
final_books_df = ( 
    books_df
    .withColumn("best_book_id", F.col("best_book_id").cast("string"))
    .withColumn("work_id", F.col("work_id").cast("string"))
    .withColumn("isbn13", F.col("isbn13").cast("string"))
    .withColumn("original_publication_year", F.col("original_publication_year").cast("int"))
    .withColumn("average_rating", F.col("average_rating").cast("double"))
    .withColumn("ratings_count", F.col("ratings_count").cast("int"))
    .withColumn("work_ratings_count", F.col("work_ratings_count").cast("int"))
    .withColumn("work_text_reviews_count", F.col("work_text_reviews_count").cast("int"))
    .withColumn("ratings_1", F.col("ratings_1").cast("int"))
)

In [36]:
final_books_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- book_id: integer (nullable = true)
 |-- best_book_id: string (nullable = true)
 |-- work_id: string (nullable = true)
 |-- books_count: integer (nullable = true)
 |-- isbn: string (nullable = true)
 |-- isbn13: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- original_publication_year: integer (nullable = true)
 |-- original_title: string (nullable = true)
 |-- title: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- average_rating: double (nullable = true)
 |-- ratings_count: integer (nullable = true)
 |-- work_ratings_count: integer (nullable = true)
 |-- work_text_reviews_count: integer (nullable = true)
 |-- ratings_1: integer (nullable = true)
 |-- ratings_2: integer (nullable = true)
 |-- ratings_3: integer (nullable = true)
 |-- ratings_4: integer (nullable = true)
 |-- ratings_5: integer (nullable = true)
 |-- image_url: string (nullable = true)
 |-- small_image_url: string (nullable = tru

In [25]:
ratings_df.printSchema()

root
 |-- book_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- rating: integer (nullable = true)



In [33]:
final_ratings_df = (
    ratings_df
)

In [35]:
final_ratings_df.printSchema()

root
 |-- book_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- rating: integer (nullable = true)



# Modelin Eğitilmesi

In [28]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [37]:
(training_set, test_set) = final_ratings_df.randomSplit([0.8,0.2], seed=42)
training_set.cache()
test_set.cache()

DataFrame[book_id: int, user_id: int, rating: int]

In [38]:
print(f"Toplam Oy:{final_ratings_df.count()}")
print(f"Eğitim Seti Toplam Oy: {training_set.count()}")
print(f"Test Seti Toplam Oy:{test_set.count()}")

                                                                                

Toplam Oy:981756


                                                                                

Eğitim Seti Toplam Oy: 785323




Test Seti Toplam Oy:196433


                                                                                

In [31]:
als = ALS(
    rank=20,
    userCol="user_id",
    itemCol="book_id",
    ratingCol="rating",
    coldStartStrategy="drop",
    nonnegative=True
)

In [39]:
model = als.fit(training_set)

25/09/04 13:29:16 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

In [40]:
preds = model.transform(test_set)

In [41]:
preds.select("user_id", "book_id", "rating", "prediction").show(10)

                                                                                

+-------+-------+------+----------+
|user_id|book_id|rating|prediction|
+-------+-------+------+----------+
|  32592|      3|     5| 3.1839614|
|  19984|      7|     5|  5.047631|
|  32592|     20|     4| 3.6634092|
|  19984|     22|     5|  4.670302|
|  35982|     26|     2| 2.2080925|
|  32592|     27|     4| 4.2122602|
|  35982|     38|     3|   3.41518|
|   1088|     44|     1| 2.6129036|
|  32592|     46|     4|  4.054374|
|  19984|     47|     5|  5.284743|
+-------+-------+------+----------+
only showing top 10 rows



In [43]:
evaluator = RegressionEvaluator(
    metricName="rmse",      # Hesaplamak istediğimiz metrik: RMSE
    labelCol="rating",      # "Doğru cevapların" bulunduğu sütunun adı
    predictionCol="prediction" # "Modelin tahminlerinin" bulunduğu sütunun adı
)

# Değerlendiriciyi, tahminleri içeren DataFrame üzerinde çalıştırıyoruz.
rmse = evaluator.evaluate(preds)

print(f"Test Verisi Üzerindeki Kök Ortalama Kare Hatası (RMSE) = {rmse}")

                                                                                

Test Verisi Üzerindeki Kök Ortalama Kare Hatası (RMSE) = 0.9030659515241622


In [44]:
print("Her kullanıcı için en iyi 5 kitap önerisi hesaplanıyor...")
user_recs = model.recommendForAllUsers(5)

print("Ham öneri formatı:")
user_recs.printSchema()
user_recs.show(5, truncate=False)

Her kullanıcı için en iyi 5 kitap önerisi hesaplanıyor...
Ham öneri formatı:
root
 |-- user_id: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- book_id: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)





+-------+---------------------------------------------------------------------------------------------+
|user_id|recommendations                                                                              |
+-------+---------------------------------------------------------------------------------------------+
|1      |[{6285, 3.8414783}, {9842, 3.815668}, {7831, 3.793963}, {2051, 3.7828848}, {7008, 3.7477252}]|
|3      |[{8362, 1.033042}, {2590, 1.0188516}, {307, 1.0170922}, {7902, 1.012861}, {7593, 1.0115753}] |
|5      |[{862, 4.8895006}, {723, 4.876078}, {6590, 4.829024}, {2292, 4.822625}, {3093, 4.7901845}]   |
|6      |[{6089, 5.211527}, {4868, 5.1143374}, {3628, 5.062479}, {6902, 5.047463}, {8926, 5.043027}]  |
|9      |[{8109, 4.1205454}, {9024, 4.065214}, {8946, 4.0373516}, {8926, 4.0175223}, {9531, 3.985509}]|
+-------+---------------------------------------------------------------------------------------------+
only showing top 5 rows



                                                                                

In [45]:
print("\nHer kitap için en uygun 5 kullanıcı önerisi hesaplanıyor...")
item_recs = model.recommendForAllItems(5)

print("Ham öneri formatı:")
item_recs.printSchema()
item_recs.show(5, truncate=False)


Her kitap için en uygun 5 kullanıcı önerisi hesaplanıyor...
Ham öneri formatı:
root
 |-- book_id: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- user_id: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)





+-------+---------------------------------------------------------------------------------------------------+
|book_id|recommendations                                                                                    |
+-------+---------------------------------------------------------------------------------------------------+
|1      |[{34886, 5.7956386}, {23753, 5.7770925}, {38263, 5.749137}, {34272, 5.7251973}, {30757, 5.7018046}]|
|3      |[{31285, 4.9232116}, {15842, 4.845913}, {26861, 4.828252}, {7373, 4.7432876}, {7124, 4.6918855}]   |
|5      |[{35816, 5.31054}, {47347, 5.3077493}, {40088, 5.2869205}, {6701, 5.2667537}, {30757, 5.203312}]   |
|6      |[{30440, 5.491889}, {19137, 5.410286}, {34886, 5.383967}, {41085, 5.3579617}, {22638, 5.341396}]   |
|9      |[{19137, 5.0235996}, {40181, 4.964751}, {38867, 4.94023}, {31602, 4.8850975}, {41819, 4.872061}]   |
+-------+---------------------------------------------------------------------------------------------------+
only showi

                                                                                