In [1]:
import sklearn
import random 

# Pyspark Library #
# SQL
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import mean, col, split, regexp_extract, when, lit
# ML
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, IndexToString
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.recommendation import ALS

import os
import pandas as pd
import numpy as np
from pandas import Series, DataFrame

In [2]:
#Make Spark Session
spark = SparkSession.builder.config( "spark.jars","/root/hadoop/share/hadoop/tools/lib/mysql-connector-java-8.0.27.jar") \
    .master("local").appName("ml_reco").getOrCreate()

In [3]:
#Load tables (buylist, product, category_small, user_recommand)
buylist = spark.read.format("jdbc").option("url","jdbc:mysql://b2cdb.chy6lfqzk3p1.ap-northeast-2.rds.amazonaws.com:3306/b2c")\
    .option("driver", "com.mysql.cj.jdbc.Driver").option("dbtable", "buylist") \
    .option("user", "admin").option("password", "SMoEcEXZ6PZUUiqDv5w9").load()
product = spark.read.format("jdbc").option("url","jdbc:mysql://b2cdb.chy6lfqzk3p1.ap-northeast-2.rds.amazonaws.com:3306/b2c")\
    .option("driver", "com.mysql.cj.jdbc.Driver").option("dbtable", "product") \
    .option("user", "admin").option("password", "SMoEcEXZ6PZUUiqDv5w9").load()
category_small = spark.read.format("jdbc").option("url","jdbc:mysql://b2cdb.chy6lfqzk3p1.ap-northeast-2.rds.amazonaws.com:3306/b2c")\
    .option("driver", "com.mysql.cj.jdbc.Driver").option("dbtable", "category_small") \
    .option("user", "admin").option("password", "SMoEcEXZ6PZUUiqDv5w9").load()
user_recommand = spark.read.format("jdbc").option("url","jdbc:mysql://b2cdb.chy6lfqzk3p1.ap-northeast-2.rds.amazonaws.com:3306/b2c")\
    .option("driver", "com.mysql.cj.jdbc.Driver").option("dbtable", "user_recommand") \
    .option("user", "admin").option("password", "SMoEcEXZ6PZUUiqDv5w9").load()
cart = spark.read.format("jdbc").option("url","jdbc:mysql://b2cdb.chy6lfqzk3p1.ap-northeast-2.rds.amazonaws.com:3306/b2c")\
    .option("driver", "com.mysql.cj.jdbc.Driver").option("dbtable", "cart") \
    .option("user", "admin").option("password", "SMoEcEXZ6PZUUiqDv5w9").load()
search_user = spark.read.format("jdbc").option("url","jdbc:mysql://b2cdb.chy6lfqzk3p1.ap-northeast-2.rds.amazonaws.com:3306/b2c")\
    .option("driver", "com.mysql.cj.jdbc.Driver").option("dbtable", "search_user") \
    .option("user", "admin").option("password", "SMoEcEXZ6PZUUiqDv5w9").load()
like = spark.read.format("jdbc").option("url","jdbc:mysql://b2cdb.chy6lfqzk3p1.ap-northeast-2.rds.amazonaws.com:3306/b2c")\
    .option("driver", "com.mysql.cj.jdbc.Driver").option("dbtable", "b2c.like") \
    .option("user", "admin").option("password", "SMoEcEXZ6PZUUiqDv5w9").load()

In [4]:
#특정 비율만큼 추출
# buylist = buylist.sample(withReplacement=False,fraction=0.3)
# cart = cart.sample(withReplacement=False,fraction=1.0)
# search_user = search_user.sample(withReplacement=False,fraction=0.5)
# like = like.sample(withReplacement=False,fraction=0.8)

In [5]:
#필요한 값만 남기고 정리 (필요값 : user_id, product_id,
#                           category_(small,mid)_id, avg_star, keyword)
buylist = buylist.drop('buy_date','id','count')
product = product.drop('regist_time','name','price','brand','image')
category_small = category_small.drop('name')
search_user = search_user.drop('id')

In [6]:
product = product.withColumnRenamed('id','product_id')
category_small = category_small.withColumnRenamed('id','category_small_id')

In [7]:
df = cart.unionByName(like,allowMissingColumns=True)
# df = df.unionByName(search_user,allowMissingColumns=True)
df = df.unionByName(buylist,allowMissingColumns=True)

In [8]:
inner_df = df.join(product, on = ["product_id"],how='left').sort("product_id")

In [9]:
inner_df.show()

                                                                                

+----------+-------+-----------------+--------+
|product_id|user_id|category_small_id|avg_star|
+----------+-------+-----------------+--------+
|         1|      1|                1|     0.0|
|         1|      2|                1|     0.0|
|         2|      1|                1|     4.2|
|         2|      2|                1|     4.2|
|         2|      2|                1|     4.2|
|         3|      1|                1|     4.0|
|         4|      2|                1|     4.8|
|         5|      1|                1|     3.7|
|         8|      2|                2|     4.8|
|        10|      2|                3|     2.8|
+----------+-------+-----------------+--------+



In [10]:
train, test = inner_df.randomSplit([0.75, 0.25])

rec = ALS(maxIter=10,
         regParam=0.01,
         userCol='user_id',
         itemCol='product_id',
         ratingCol='avg_star', # label -> predict할 때는 필요 없음!
         nonnegative=True,
         coldStartStrategy='drop')

In [11]:
rec_model = rec.fit(train)

22/02/24 06:32:06 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/02/24 06:32:06 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
                                                                                

In [12]:
pred_ratings = rec_model.transform(test)
pred_ratings.toPandas()

                                                                                

Unnamed: 0,product_id,user_id,category_small_id,avg_star,prediction
0,1,2,1,0.0,0.0
1,2,2,1,4.2,4.199393
