## implicit ALS(협업필터링)

- 비슷한 사용자의 행동 패턴을 기반으로 추천 (이 뉴스를 클릭한 다른 사용자는 이런 뉴스도 봤어요 )

In [1]:
pip install pyspark

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [2]:
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [3]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

In [4]:
behaviors_df=pd.read_csv('Data/behaviors.csv')
behaviors_df.head()

Unnamed: 0,impression_id,user_id,time,history,impressions
0,1,U13740,11/11/2019 9:05:58 AM,N55189 N42782 N34694 N45794 N18445 N63302 N104...,N55689-1 N35729-0
1,2,U91836,11/12/2019 6:11:30 PM,N31739 N6072 N63045 N23979 N35656 N43353 N8129...,N20678-0 N39317-0 N58114-0 N20495-0 N42977-0 N...
2,3,U73700,11/14/2019 7:01:48 AM,N10732 N25792 N7563 N21087 N41087 N5445 N60384...,N50014-0 N23877-0 N35389-0 N49712-0 N16844-0 N...
3,4,U34670,11/11/2019 5:28:05 AM,N45729 N2203 N871 N53880 N41375 N43142 N33013 ...,N35729-0 N33632-0 N49685-1 N27581-0
4,5,U8125,11/12/2019 4:11:21 PM,N10078 N56514 N14904 N33740,N39985-0 N36050-0 N16096-0 N8400-1 N22407-0 N6...


In [5]:
history_df=pd.read_csv('./Data/history.csv')
history_df.head()

Unnamed: 0,user_id,history_news_id
0,U13740,N55189
1,U13740,N42782
2,U13740,N34694
3,U13740,N45794
4,U13740,N18445


In [6]:

# 사용자-뉴스 클릭 횟수
user_item_counts = history_df.groupby(["user_id", "history_news_id"]).size().reset_index(name="click_count")


# 사용자 ID 매핑 (user_id → user_idx)
user_mapping = {uid: idx for idx, uid in enumerate(user_item_counts["user_id"].unique())}
user_item_counts["user_idx"] = user_item_counts["user_id"].map(user_mapping)


# 뉴스 ID 매핑 (history_news_id → news_idx)
news_mapping = {nid: idx for idx, nid in enumerate(user_item_counts["history_news_id"].unique())}
user_item_counts["news_idx"] = user_item_counts["history_news_id"].map(news_mapping)


user_item_counts.head()


Unnamed: 0,user_id,history_news_id,click_count,user_idx,news_idx
0,U100,N18870,1,0,0
1,U100,N20121,1,0,1
2,U100,N20575,1,0,2
3,U100,N33998,1,0,3
4,U100,N42330,1,0,4


In [7]:
user_item_counts['click_count'].describe()

count    915011.000000
mean          5.582052
std           5.592618
min           1.000000
25%           2.000000
50%           4.000000
75%           7.000000
max         310.000000
Name: click_count, dtype: float64

In [8]:
user_item_counts.to_csv("./Data/history_preprocessed.csv", index=False)

In [None]:

from pyspark.sql import SparkSession

#세션 생성

spark = SparkSession.builder \
    .appName("MIND-ALS-Recommendation") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .getOrCreate()


history_spark_df = spark.read.csv("./Data/history_preprocessed.csv", header=True, inferSchema=True)

history_spark_df.show(5)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/14 16:40:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+-------+---------------+-----------+--------+--------+
|user_id|history_news_id|click_count|user_idx|news_idx|
+-------+---------------+-----------+--------+--------+
|   U100|         N18870|          1|       0|       0|
|   U100|         N20121|          1|       0|       1|
|   U100|         N20575|          1|       0|       2|
|   U100|         N33998|          1|       0|       3|
|   U100|         N42330|          1|       0|       4|
+-------+---------------+-----------+--------+--------+
only showing top 5 rows



In [9]:
spark


In [None]:
from pyspark.ml.recommendation import ALS

# ALS 모델 정의
als = ALS(
    maxIter=20,      
    rank=100,     # 잠재 요인 개수 
    regParam=0.1,     #과적합 방지
    userCol="user_idx",
    itemCol="news_idx",
    ratingCol="click_count",
    coldStartStrategy="drop"  
)

 # train/test 나누기
(training, test) = history_spark_df.randomSplit([0.8, 0.2])

als_model = als.fit(training)


25/03/14 16:45:40 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/03/14 16:45:41 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

In [None]:
# 모든 사용자에 대한 추천 생성 (Top 5)
user_recommendations = als_model.recommendForAllUsers(5)


user_recommendations.show(5, truncate=False)




+--------+---------------------------------------------------------------------------------------------------+
|user_idx|recommendations                                                                                    |
+--------+---------------------------------------------------------------------------------------------------+
|28      |[{29274, 9.579383}, {17461, 8.745498}, {28200, 8.017481}, {7963, 8.002184}, {3056, 7.9854193}]     |
|31      |[{29274, 4.5560412}, {17461, 4.1117077}, {28200, 3.8102448}, {20961, 3.8098457}, {7963, 3.7952142}]|
|34      |[{29274, 2.3651016}, {17461, 2.1770184}, {28200, 1.9805305}, {7963, 1.9775585}, {20961, 1.9733173}]|
|53      |[{29274, 14.370767}, {17461, 12.854523}, {28200, 11.951962}, {7963, 11.922479}, {3056, 11.884595}] |
|65      |[{29274, 4.7638774}, {17461, 4.3260727}, {28200, 3.9467835}, {7963, 3.938887}, {3056, 3.9275014}]  |
+--------+---------------------------------------------------------------------------------------------------+
o

                                                                                

In [15]:
train_size = training.count()
test_size = test.count()

print(f"train count:{train_size}")
print(f"test count:{test_size}")

                                                                                

train count:732563
test count:182448


                                                                                

In [47]:
# Training dataset distinct counts
distinct_user_idx_train = training.select("user_idx").distinct().count()
distinct_news_idx_train = training.select("news_idx").distinct().count()

# Test dataset distinct counts
distinct_user_idx_test = test.select("user_idx").distinct().count()
distinct_news_idx_test = test.select("news_idx").distinct().count()

print(f"Training dataset - Distinct user_idx count: {distinct_user_idx_train}")
print(f"Training dataset - Distinct news_idx count: {distinct_news_idx_train}")
print(f"Test dataset - Distinct user_idx count: {distinct_user_idx_test}")
print(f"Test dataset - Distinct news_idx count: {distinct_news_idx_test}")

Training dataset - Distinct user_idx count: 49108
Training dataset - Distinct news_idx count: 33195
Test dataset - Distinct user_idx count: 49108
Test dataset - Distinct news_idx count: 33195


In [17]:
# 사용자/ 뉴스 행렬 확인 

user_factors=als_model.userFactors 
news_factors=als_model.itemFactors 

In [49]:
# 잠재요인벡터

user_factors.show(10) 
news_factors.show(10)

+---+--------------------+
| id|            features|
+---+--------------------+
|  0|[0.0, 0.0, 0.0, 0...|
| 10|[0.0, 0.0, 0.0, 0...|
| 20|[3.792684E-4, -0....|
| 30|[-0.029989516, -0...|
| 40|[0.0, 0.0, 0.0, 0...|
| 50|[0.0, 0.0, 0.0, 0...|
| 60|[0.0012925876, -0...|
| 70|[-0.10578365, 0.0...|
| 80|[0.0, 0.0, 0.0, 0...|
| 90|[0.0, 0.0, 0.0, 0...|
+---+--------------------+
only showing top 10 rows

+---+--------------------+
| id|            features|
+---+--------------------+
|  0|[-0.59969795, 0.1...|
| 10|[-0.0029465796, -...|
| 20|[-0.0011880719, -...|
| 30|[0.062401876, -0....|
| 40|[0.0, 0.0, 0.0, 0...|
| 50|[0.012454009, -0....|
| 60|[0.024392206, 0.0...|
| 70|[0.14256148, 0.06...|
| 80|[0.004484646, -0....|
| 90|[-0.016925707, 0....|
+---+--------------------+
only showing top 10 rows



In [39]:
# user 요인 벡터
num_rows = user_factors.count()
num_columns = len(user_factors.columns)
print(f"rows: {num_rows},columns: {num_columns}")

rows: 48862,columns: 2


25/03/09 17:08:00 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB


In [40]:
# 뉴스 요인 벡터

num_rows = news_factors.count()
num_columns = len(news_factors.columns)
print(f"rows: {num_rows},columns: {num_columns}")

rows: 30421,columns: 2


25/03/09 17:08:04 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB


In [18]:
print(spark)


<pyspark.sql.session.SparkSession object at 0x175783b50>


In [19]:
predictions = als_model.transform(test)

In [20]:
from pyspark.ml.evaluation import RegressionEvaluator

# RMSE (Root Mean Squared Error)
rmse_evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="click_count",
    predictionCol="prediction"
)
rmse = rmse_evaluator.evaluate(predictions)

# MAE (Mean Absolute Error)
mae_evaluator = RegressionEvaluator(
    metricName="mae",
    labelCol="click_count",
    predictionCol="prediction"
)
mae = mae_evaluator.evaluate(predictions)

# R² (R-Squared)
r2_evaluator = RegressionEvaluator(
    metricName="r2",
    labelCol="click_count",
    predictionCol="prediction"
)
r2 = r2_evaluator.evaluate(predictions)

# Explained Variance
exp_var_evaluator = RegressionEvaluator(
    metricName="var",
    labelCol="click_count",
    predictionCol="prediction"
)
exp_var = exp_var_evaluator.evaluate(predictions)

print(f"RMSE score = {rmse:.4f}")
print(f"MAE score = {mae:.4f}")
print(f"R² score = {r2:.4f}")
print(f"Explained Variance score = {exp_var:.4f}")




RMSE score = 1.5432
MAE score = 0.2738
R² score = 0.9242
Explained Variance score = 27.2848


                                                                                

In [None]:
# 테스트 데이터 컬럼 확인
test.printSchema()


root
 |-- user_id: string (nullable = true)
 |-- history_news_id: string (nullable = true)
 |-- click_count: integer (nullable = true)
 |-- user_idx: integer (nullable = true)
 |-- news_idx: integer (nullable = true)



In [18]:
# 예측 데이터 컬럼 확인
predictions.printSchema()


root
 |-- user_id: string (nullable = true)
 |-- history_news_id: string (nullable = true)
 |-- click_count: integer (nullable = true)
 |-- user_idx: integer (nullable = true)
 |-- news_idx: integer (nullable = true)
 |-- prediction: float (nullable = false)

