In [1]:
# import packages 
import sys
import pandas as pd
import seaborn as sns
from time import time
from pyspark.sql import Row
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
from pyspark.ml.feature import StringIndexer
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit,CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml import Pipeline, PipelineModel


In [2]:
# dataset path

def processing(df_raw):                 
    df_raw = df_raw.withColumn("rating", df_raw["score"].cast("float")).drop(df_raw["score"])
    # transform user_name and whisky_name string to index using spark StringIndexer function
    whiskyIndexer = StringIndexer(inputCol="whisky_name", outputCol="item",handleInvalid='error') # create indexer for asins
    userIndexer = StringIndexer(inputCol='user_name',outputCol='userid',handleInvalid='error') # create indexer for user
    whiskyIndexed = whiskyIndexer.fit(df_raw).transform(df_raw) # apply whisky_name indexer
    userIndexed = userIndexer.fit(whiskyIndexed).transform(whiskyIndexed) # apply user indexer
    df_indexed = userIndexed.drop('whisky_name').drop('user_name') # remove old columns with alphanumeric strings
    return df_indexed

In [3]:
def train(df_indexed):  
    # 70-30 train-test split
    (df_train, df_test) = df_indexed.randomSplit([0.7,0.3])
    # cache them in memory across clusters since we access this data frequently 
    df_train.cache() 
    df_test.cache()
    print("==============訓練中==============")
    # Display dataset size
#     print('Train set size: {}'.format(df_train.count()))
#     print('Test set size: {}'.format(df_test.count()))
    return df_train,df_test

In [4]:
from pyspark.sql.functions import lit
from pyspark.ml.feature import IndexToString

def recommendwhiskys(model,df_train,user, num_rec):
    # Create a dataset with distinct whisky as one column and the user of interest as another column
    itemsuser = df_train.select("item").distinct().withColumn("userid", lit(user))
#     itemsuser.show(n=3)

    # filter out games that user has already rated 
    whiskysrated = df_train.filter(df_train.userid == user).select("item", "userid")

    # apply trained recommender system
    predictions = model.transform(itemsuser.subtract(whiskysrated)).dropna().orderBy("prediction", ascending=False).limit(num_rec).select("item", "prediction")
#     predictions.show()
    
    # convert index back to original whisky 
    converter = IndexToString(inputCol="item", outputCol="recommend_whisky")
    converted = converter.transform(predictions)
    converted.show()
    return converted
    
   
    
#     return converted
    
    
      #轉成rdd去做推薦
#     convertedtordd = converted.rdd.take(num_rec) 
#     convertedtordd.take()
#     for item in convertedtordd:
#         print(item)
# #         print(type(item))
#         print("推薦whisky:{}, 推薦評分:{}".format(str(item[2]),item[1]))

In [5]:
#輸入酒款id 推薦給3個人
#再從人的id去看上一個程式碼可每人推3個酒款
#最後產出的9個酒款便是最終前端產生的結果
from pyspark.sql.functions import lit
from pyspark.ml.feature import IndexToString

def recommendwhiskys_user(model,df_train,item, num_rec):
    # Create a dataset with distinct whisky as one column and the user of interest as another column
    useritems = df_train.select("userid").distinct().withColumn("item", lit(item))
#     useritems.show(n=3)

    # filter out games that user has already rated  (曾經評論過的)
    whiskysrated = df_train.filter(df_train.item == item).select("userid", "item")
#     whiskysrated.show()
    # apply trained recommender system  (以下為預測使用者可能喜歡的酒款)
    predictions = model.transform(useritems.subtract(whiskysrated)).dropna().orderBy("prediction", ascending=False).limit(num_rec).select("userid", "prediction")
#     predictions.show()
    
    # convert index back to original whisky 
    converter = IndexToString(inputCol="userid", outputCol="recommend_user")
    converted = converter.transform(predictions)
#     converted.show()
    convertedtordd = converted.rdd.take(num_rec)
#     id_list = [] 
    for item in convertedtordd:
        result = recommendwhiskys(model,df_train,item[0],num_rec)
    return result 
        
#         print(item)
#         id_list.append(item[0])
#         print("使用者id:{}".format(str(item[0])))
#         print("推薦給使用者:{}, 推薦評分:{}".format(str(item[2]),item[1]))
#     print (id_list)
#     data= []
#     for i in id_list :
#         result = recommendwhiskys(model,df_train,i,num_rec)
#         print(result)
#         data.append(result)
#     data.show()
        
        

In [6]:
df_raw = spark.read.csv("hdfs://devenv/user/spark/whisky/user_whisky_score.csv",header = True)        
df_train = train(processing(df_raw))[0]
    
from pyspark.ml.tuning import TrainValidationSplitModel
model = TrainValidationSplitModel.load("hdfs://devenv/user/spark/whisky/AlSmodel")

print("=========輸入推薦項目數量==============")
usernum=int(2)
print("=========輸入威士忌id==============")
itemnum = int(input())
result = recommendwhiskys_user(model,df_train,itemnum,usernum)


# df = pd.DataFrame(
#     columns=['item', 'prediction','recommend_whisky'])

# dff = df = pd.DataFrame(all,columns=['酒名', '酒譜', 'url','圖片','步驟', '介紹', '評論'])


2500
+------+----------+--------------------+
|  item|prediction|    recommend_whisky|
+------+----------+--------------------+
|8550.0|  6.852953|sivo-rebel-le-moo...|
|6089.0| 6.7254953|old-elk-straight-...|
+------+----------+--------------------+

+------+----------+--------------------+
|  item|prediction|    recommend_whisky|
+------+----------+--------------------+
|8550.0|  6.732022|sivo-rebel-le-moo...|
|4989.0| 6.1625566|belgrove-rye-whiskey|
+------+----------+--------------------+



In [7]:
result.show()

+------+----------+--------------------+
|  item|prediction|    recommend_whisky|
+------+----------+--------------------+
|8550.0|  6.732022|sivo-rebel-le-moo...|
|4989.0| 6.1625566|belgrove-rye-whiskey|
+------+----------+--------------------+



In [8]:
if __name__ == "__main__":
    print("==============訓練模型==============")
    df_raw = spark.read.csv("hdfs://devenv/user/spark/whisky/user_whisky_score.csv",header = True)        
    
    df_train = train(processing(df_raw))[0]
    
    
    print("==========載入模型==============")
    
    from pyspark.ml.tuning import TrainValidationSplitModel
    model = TrainValidationSplitModel.load("hdfs://devenv/user/spark/whisky/AlSmodel")

    print("==========進行推薦===============")
    
    print("=========輸入推薦項目數量==============")
    usernum=int(2)
    print("=========輸入威士忌id==============")
    itemnum = int(input())
    result = recommendwhiskys_user(model,df_train,itemnum,usernum)
    
    
    #貯存所有結果至json



3000
+------+----------+--------------------+
|  item|prediction|    recommend_whisky|
+------+----------+--------------------+
|5828.0| 6.4259686|nikka-single-cask...|
|6982.0| 6.2807374|bruichladdich-18-...|
+------+----------+--------------------+

+------+----------+--------------------+
|  item|prediction|    recommend_whisky|
+------+----------+--------------------+
|8550.0|  6.852953|sivo-rebel-le-moo...|
|6089.0| 6.7254953|old-elk-straight-...|
+------+----------+--------------------+



In [9]:
    result

DataFrame[item: double, prediction: float, recommend_whisky: string]

In [10]:
import pandas
df3 = df2.toPandas() 
df3.to_json('~/Desktop/spark101/movies/data40n',orient = "records")

##################insert to mongo##########################
import json
from pymongo import MongoClient

myclient = MongoClient("mongodb://10.120.26.13:27017/")
db = myclient["MOVIE"]
Collection = db["copy_name"]

with open('/home/spark/Desktop/spark101/movies/data40n') as file:
    file_data = json.load(file)

if isinstance(file_data, list):
    Collection.insert_many(file_data)
else:
    Collection.insert_one(file_data)

NameError: name 'df2' is not defined