In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col, lit
from pyspark.sql.functions import sum as _sum, col
from datetime import timezone, timedelta, datetime
from minio import Minio

spark = SparkSession.builder \
    .appName("ml") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://54.180.166.228:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .getOrCreate()
    
# jsonl 파일을 Spark DataFrame으로 읽기
file_path = 's3a://ml-user-log/'
df = spark.read.option("multiline", "false").json(file_path)

In [11]:
drop_df = df.drop('timestamp', 'page', 'contentCategory')

df_with_score = (
    drop_df
    .withColumn("like_score", when((col("eventType") == "like_click") & (col("liked") == 'true'), lit(3)).otherwise(lit(0)))
    .withColumn("review_score", when((col("eventType") == "review_write") & col("review").isNotNull(), lit(1)).otherwise(lit(0)))
    .withColumn("rating_score", when(
        (col("eventType") == "rating_submit") & col("rating").isNotNull(),
        col("rating").cast("int")
    ).otherwise(lit(0)))
    .withColumn("click_score", when(col("eventType") == "content_click", lit(1)).otherwise(lit(0)))
)


In [12]:
df_drop = df_with_score.drop("eventType", "liked", "rating", "review")

# user_id, content_id별 total_score 합계 계산
df_agg = (
    df_with_score
    .withColumnRenamed("userId", "user_id")
    .withColumnRenamed("contentId", "content_id")
    .withColumn("total_score", col("like_score") + col("review_score") + col("rating_score") + col("click_score"))
    .groupBy("user_id", "content_id")
    .agg(
        _sum("like_score").alias("like_score"),
        _sum("review_score").alias("review_score"),
        _sum("rating_score").alias("rating_score"),
        _sum("click_score").alias("click_score"),
        _sum("total_score").alias("total_score")
    )
    .orderBy(col("user_id").asc(), col("content_id").asc())
)

df_agg.show()


+--------------------+--------------------+----------+------------+------------+-----------+-----------+
|             user_id|          content_id|like_score|review_score|rating_score|click_score|total_score|
+--------------------+--------------------+----------+------------+------------+-----------+-----------+
|315439d0-8722-4a7...|68340205aec8b2058...|         3|           0|           0|          1|          4|
|315439d0-8722-4a7...|68340205aec8b2058...|         3|           1|           5|          0|          9|
|315439d0-8722-4a7...|68340206aec8b2058...|         3|           0|           0|          1|          4|
|54303116-f86f-400...|68340205aec8b2058...|         3|           0|           0|          2|          5|
|54303116-f86f-400...|68340206aec8b2058...|         3|           0|           0|          2|          5|
+--------------------+--------------------+----------+------------+------------+-----------+-----------+



In [13]:
df_score = df_agg.drop("like_score","review_score","rating_score","click_score")
df_score.show()

+--------------------+--------------------+-----------+
|             user_id|          content_id|total_score|
+--------------------+--------------------+-----------+
|315439d0-8722-4a7...|68340205aec8b2058...|          4|
|315439d0-8722-4a7...|68340205aec8b2058...|          9|
|315439d0-8722-4a7...|68340206aec8b2058...|          4|
|54303116-f86f-400...|68340205aec8b2058...|          5|
|54303116-f86f-400...|68340206aec8b2058...|          5|
+--------------------+--------------------+-----------+

