In [0]:
%spark.conf

SPARK_HOME  /usr/local/spark
PYSPARK_PYTHON /usr/bin/python3
spark.pyspark.python  /usr/bin/python3

# set driver memory to 8g
spark.driver.memory 8g

# set executor number to be 3
spark.executor.instances  3

# set executor memory 4g
spark.executor.memory  2g

In [1]:
%MySQL
TRUNCATE favorite_recommend

In [2]:
%MySQL
select * from favorite_goods

In [3]:
%pyspark
# module import
import math
import requests
## SQL
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import mean, col, split, regexp_extract, when, lit
## ML
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, IndexToString
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.recommendation import ALS


# select goods_id, user_id, reg_date from favorite_goods
fav_sdf = spark.read.format("jdbc")\
    .option("url", "jdbc:mysql://j7d108.p.ssafy.io:3306/gamulgamul_test3?useSSL=false&characterEncoding=UTF-8&useUnicode=true&allowPublicKeyRetrieval=true&serverTimezone=Asia/Seoul") \
    .option("driver", "com.mysql.cj.jdbc.Driver")\
    .option("query", "select goods_id, user_id, reg_date from favorite_goods")\
    .option("user", "ji")\
    .option("password", "cvgkbhs234r#8tdx").load()

# user_id list who has favorite_goods
users_id = fav_sdf.select('user_id').distinct().rdd.map(lambda x : x[0]).collect()


# sigmoid function to weight date
def sigmoid(x):
  return 1 / (1 + math.exp(-x))
sigmoid = udf(sigmoid)


# get score by calculate weight by sigmoid function
fav_sdf = fav_sdf.select(
    'goods_id',
    'user_id',
    sigmoid(datediff(current_date(), col('reg_date'))).cast('float').alias('score')
    )


# ALS training
train, test = fav_sdf.randomSplit([0.75, 0.25], seed=1)

fav_rec = ALS(maxIter=10,
         regParam=0.01,
         userCol='user_id',
         itemCol='goods_id',
         ratingCol='score', 
         nonnegative=True,
         coldStartStrategy='drop')

fav_rec_model = fav_rec.fit(train)
fav_pred_ratings = fav_rec_model.transform(test)


# goods recommendation function to user_id, top n
def top_fav(user_id, n):
    """
    특정 user_id가 좋아할 만한 n개의 상품 추천해주는 함수
    """
 
    # 특정 user_id가 즐겨찾기에 담은 상품의 상품id로 새로운 데이터프레임 생성
    fav_goods = fav_sdf.filter((fav_sdf['user_id'] == user_id)).select('goods_id')

    # 특정 user_id가 즐겨찾기에 담은 상품들을 'tmp'라는 데이터프레임으로 alias시키기
    tmp = fav_goods.alias('tmp')
    tmp = tmp.withColumnRenamed('goods_id', 'tmp_goods_id')

    # sdf 기준으로 tmp 조인시켜서 user_id가 즐겨찾기에 담지 않은 상품 파악 가능
    fav_total_goods = fav_sdf.join(tmp, fav_sdf['goods_id'] == tmp['tmp_goods_id'],how='left')

    # tmp 데이터프레임의 goods_id 결측치를 갖고 있는 행의 sdf.goods_id 뽑아냄으로써 user_id가 아직 즐겨찾기에 담지 않은 상품 추출
    fav_remain_goods = fav_total_goods\
                       .where(col('tmp_goods_id').isNull())\
                       .select('goods_id').distinct()

    # fav_remain_goods 데이터프레임에 특정 user_id값을 동일하게 새로운 변수로 추가해주기
    fav_remain_goods = fav_remain_goods.withColumn('user_id', lit(int(user_id)))

    # 위에서 만든 ALS 모델을 사용하여 추천 상품 예측 후 n개 만큼 view -> 
    recommender = fav_rec_model.transform(fav_remain_goods)\
                           .orderBy('prediction', ascending=False)\
                           .limit(n)
                           
    goods_table = spark.read.csv('/DB_data/goods_table.csv', header=True, inferSchema=True)
    goods_table = goods_table.withColumnRenamed('goods_id', 'idx')
    
    final_recommendations = goods_table.join(recommender, goods_table['idx']==recommender['goods_id'], how='inner')
    final_recommendations = final_recommendations.withColumnRenamed('prediction', 'score')
    final_recommendations = final_recommendations.select(['user_id', 'goods_id', 'score']).orderBy('score', ascending=False)
    
    return final_recommendations


# favorite_recommend truncate
requests.post("http://3.36.106.26:8081/api/notebook/run/2HDR64PXZ/paragraph_1664290153789_2140948920")


# favorite_recommend append
url = "jdbc:mysql://j7d108.p.ssafy.io:3306/gamulgamul_test3?useSSL=false&characterEncoding=UTF-8&useUnicode=true&allowPublicKeyRetrieval=true&serverTimezone=Asia/Seoul"
prop = {"user": "ji", "password": "cvgkbhs234r#8tdx", "driver": "com.mysql.cj.jdbc.Driver"} 

for id in users_id:
    rec_sdf = top_fav(id, 5)
    # rec_sdf.show()
    rec_sdf.write.jdbc(\
        url= url,\
        table = "favorite_recommend",\
        mode="append",\
        properties=prop)

In [4]:
%pyspark
# 매일 자동화 실행
requests.post("http://3.36.106.26:8081/api/notebook/run/2HDR64PXZ/paragraph_1664289615264_120085320")

In [5]:
%MySQL
select * from favorite_recommend