In [1]:
sc

In [2]:
spark

In [3]:
# 사용자 행동 로그 데이터 메시지 consume
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", KAFKA_BROKER) \
  .option("subscribe", "lecture-recsys-log") \
  .option("failOnDataLoss", "false") \
  .load()
ds = df.selectExpr("CAST(value AS STRING)")

# 추천 결과 데이터 메시지 consume
df2 = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", KAFKA_BROKER) \
  .option("subscribe", "lecture-recommenders") \
  .option("failOnDataLoss", "false") \
  .load()
ds2 = df2.selectExpr("CAST(value AS STRING)")

In [4]:
df.printSchema()
df2.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [5]:
ds.printSchema()
ds2.printSchema()

root
 |-- value: string (nullable = true)

root
 |-- value: string (nullable = true)



In [6]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, MapType, LongType
schema = StructType().add("user_id", IntegerType()).add("action", StringType()).add("lecture_id", IntegerType())
#schema2 = StructType().add("user_id", IntegerType()).add("count_recommenders", MapType(StringType(), LongType()))
schema2 = StructType([
    StructField("user_id", IntegerType()),
    StructField("best", ArrayType(IntegerType()))
])

In [7]:
from pyspark.sql.functions import from_json
import pyspark.sql.functions as F

In [8]:
data = ds.select(from_json("value", schema)).alias("parsed")
data2 = ds2.select(from_json("value", schema2)).alias("parsed2")
data.printSchema()
data2.printSchema()

root
 |-- jsontostructs(value): struct (nullable = true)
 |    |-- user_id: integer (nullable = true)
 |    |-- action: string (nullable = true)
 |    |-- lecture_id: integer (nullable = true)

root
 |-- jsontostructs(value): struct (nullable = true)
 |    |-- user_id: integer (nullable = true)
 |    |-- best: array (nullable = true)
 |    |    |-- element: integer (containsNull = true)



In [9]:
logdf = data.select("jsontostructs(value).*")
logdf2 = data2.select("jsontostructs(value).*")
logdf.printSchema()
logdf2.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- action: string (nullable = true)
 |-- lecture_id: integer (nullable = true)

root
 |-- user_id: integer (nullable = true)
 |-- best: array (nullable = true)
 |    |-- element: integer (containsNull = true)



In [10]:
from pyspark.sql.functions import to_json
import pickle
import numpy as np
import pandas as pd

with open('cosine_sim_final.pkl','rb') as f:
    cosine_sim=pickle.load(f)
    
x = pd.Series(cosine_sim[1]).index+1
# 유사 강의 목록 구하기 + 점수 부여
def get_recommendations(i, action):
    sim_scores = list(enumerate(cosine_sim[i-1]))
    sim_scores = sorted(sim_scores, key = lambda x : x[1], reverse=True)
    sim_scores = sim_scores[1:6] 
    recommend = list(x[[s[0] for s in sim_scores]])
    
    if action == 'CLICK':
        for i in range(len(recommend)):
            recommend[i] = [recommend[i], 1]
    elif action == 'WISH':
        for i in range(len(recommend)):
            recommend[i] = [recommend[i], 3]
        
    return recommend





In [11]:
recommender_udf = F.udf(lambda lecture_id, action: get_recommendations(lecture_id, action), ArrayType(ArrayType(IntegerType())))
recsys_df = logdf.withColumn('recommenders', recommender_udf(logdf.lecture_id, logdf.action))

In [12]:
recsys_df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- action: string (nullable = true)
 |-- lecture_id: integer (nullable = true)
 |-- recommenders: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: integer (containsNull = true)



In [13]:
import functools

def reduce_f(val):
    return functools.reduce(lambda x,y:x+y,val)

flatten_udf = F.udf(reduce_f, ArrayType(ArrayType(IntegerType())))

temp_df = recsys_df.groupBy("user_id").agg(F.collect_list("recommenders"))
temp_df.printSchema()

group_recsys_df = temp_df.select("user_id", flatten_udf("collect_list(recommenders)").alias("recommenders"))
group_recsys_df.printSchema()


root
 |-- user_id: integer (nullable = true)
 |-- collect_list(recommenders): array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: integer (containsNull = true)

root
 |-- user_id: integer (nullable = true)
 |-- recommenders: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: integer (containsNull = true)



In [14]:
# 5개의 상위 점수 강의들로 추천 list 구성
def recsys_count(recommenders):
    temp = dict()
    for rec in recommenders:
        if rec[0] in temp:
            temp[rec[0]] = temp[rec[0]] + rec[1]
        else:
            temp[rec[0]] = rec[1]
    
    rec_list = list()
    for _ in range(5):
        max_key = max(temp.keys(), key=lambda k: temp[k])
        rec_list.append(max_key)
        temp.pop(max_key)
        
    return rec_list

In [15]:
count_udf = F.udf(lambda recommenders: recsys_count(recommenders), ArrayType(IntegerType()))
final_recsys_df = group_recsys_df.withColumn('best', count_udf(group_recsys_df.recommenders))
final_recsys_df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- recommenders: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: integer (containsNull = true)
 |-- best: array (nullable = true)
 |    |-- element: integer (containsNull = true)



In [16]:
final_recsys_df.createOrReplaceTempView('logtable')
final_recsys_df_sql = spark.sql("select (user_id, best) as value from logtable").select(to_json("value").alias("value"))
logdf2.printSchema()

#recsys_df_sql = spark.sql("select (user_id, action, lecture_id, recommenders) as value from logtable").select(to_json("value").alias("value"))

root
 |-- user_id: integer (nullable = true)
 |-- best: array (nullable = true)
 |    |-- element: integer (containsNull = true)



In [17]:
# 개인화 추천 결과 produce
query1 = final_recsys_df_sql \
        .writeStream \
        .trigger(processingTime='30 seconds') \
        .outputMode("update") \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_BROKER) \
        .option("checkpointLocation","pyspark/streaming/checkpointLocation4") \
        .option("topic", "lecture-recommenders") \
        .option("failOnDataLoss", "false") \
        .start()
# 개인화 추천 결과 Elasticsearch에 적재
query2 = logdf2 \
        .writeStream \
        .outputMode("append") \
        .format("org.elasticsearch.spark.sql") \
        .option("checkpointLocation", "pyspark/streaming/checkpointLocation5") \
        .option("es.resource", "recommenders_db") \
        .option("es.nodes", ELASTICSEARCH_DB) \
        .option("failOnDataLoss", "false") \
        .start()


In [None]:
query1.awaitTermination()
query2.awaitTermination()