***GENERATED CODE FOR deepfmreco PIPELINE.***

***DON'T EDIT THIS CODE.***

***CONNECTOR FUNCTIONS TO READ DATA.***

In [None]:
import os
import datetime
import logging
import warnings
warnings.filterwarnings('ignore')
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)


class HDFSConnector:

    def fetch(spark, config):
        ################### INPUT HADOOP HOST PORT TO CONNECT WITH ###############################
        hdfs_server = str(os.environ['HDFS_SERVER'])
        hdfs_port = int(os.environ['HDFS_PORT'])
        df = spark.read.options(header='true', inferschema='true').csv(
            f"hdfs://{hdfs_server}:{hdfs_port}{eval(config)['url']}", header='true')
        display(df.limit(2).toPandas())
        return df

    def put(df, spark, config):
        return df.write.format('csv').options(header='true' if eval(config)["is_header"] == "Use Header Line" else 'false',
                                              delimiter=eval(config)["delimiter"]).save(("%s %s") % (datetime.datetime.now().strftime("%Y-%m-%d %H.%M.%S")+"_", eval(config)['url']))


***AUTOML FUNCTIONS***

In [None]:
import json
import pandas as pd
from libreco.algorithms import DeepFM
from libreco.data import DatasetFeat
from libreco.evaluation import evaluate


def recommend(df, originalfile, user, product, isRating, coldStartKeys, rating=None):
    df = df.toPandas()
    if isRating == 0:
        ratingCol = np.ones(len(df))
        df.insert(1, 'ratingCol', ratingCol)
        rating = 'ratingCol'
    else:
        ratingCol = rating

    # ------------------------------Retreiving User and Item Columns----------------------------------------------------------------
    for cols in df.columns:
        s1 = user.split("_stringindexer")
        s2 = product.split("_stringindexer")
        if cols in s1:
            user = cols
        elif cols in s2:
            product = cols

    coldStartKeys1 = coldStartKeys[0]
    userFeatures = coldStartKeys1['UserFeatures']
    coldStartKeys2 = coldStartKeys[1]
    itemFeatures = coldStartKeys2['ItemFeatures']

    for i in userFeatures:
        if user in i:
            userFeatures.remove(i)
    for i in range(len(userFeatures)):
        if '_stringindexer' in userFeatures[i]:
            userFeatures[i] = userFeatures[i].split("_stringindexer")[0]

    for i in itemFeatures:
        if product in i:
            itemFeatures.remove(i)
    for i in itemFeatures:
        if ratingCol in i:
            itemFeatures.remove(i)
    for i in range(len(itemFeatures)):
        if '_stringindexer' in itemFeatures[i]:
            itemFeatures[i] = itemFeatures[i].split("_stringindexer")[0]

    df1 = df.copy()
    df1 = df1.rename(columns={user: 'user', product: 'item'})
    df1.insert(2, "label", 1)
    df1['label'] = df1[ratingCol]
    df1 = df1.drop("Rating", axis=1)

    sparse_features = []
    dense_features = []
    [rating]

    for cols in df1.columns:
        if df1[cols].dtype == 'object':
            sparse_features.append(cols)
        else:
            dense_features.append(cols)

    for i in sparse_features:
        if user in i:
            sparse_features.remove(i)
    for i in sparse_features:
        if product in i:
            sparse_features.remove(i)
    sparse_features.remove('user')
    sparse_features.remove('item')
    dense_features.remove('label')

    train_data, data_info = DatasetFeat.build_trainset(df1, user_col=userFeatures, item_col=itemFeatures,
                                                       sparse_col=sparse_features, dense_col=dense_features)

    # ---------------------Model Training-----------------------------------------------------------------------------------

    model = DeepFM(data_info=data_info, task='ranking', loss_type='cross_entropy', embed_size=16, n_epochs=20, lr=0.001,
                   lr_decay=False, epsilon=1e-05, batch_size=256, sampler='random', num_neg=1, use_bn=True,
                   hidden_units=(128, 64, 32), multi_sparse_combiner='sqrtn')

    history = model.fit(train_data, neg_sampling=True, verbose=1, shuffle=True, eval_data=None, metrics=None, k=10,
                        eval_batch_size=8192, eval_user_num=None, num_workers=0)

    # -----------------------Reommendation--------------------------------------------
    itemReco = {}
    for users in df1['user']:
        itemReco.update(model.recommend_user(users, n_rec=10, user_feats=None, seq=None,
                        cold_start='average', inner_id=False, filter_consumed=True, random_rec=False))
    rec_df = pd.DataFrame(itemReco).T
    rec_df['Recommendations'] = rec_df[[
        0, 1, 2, 3, 4, 5, 6, 7, 8, 9]].agg(','.join, axis=1)
    rec_df = rec_df.drop([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], axis=1)
    rec_df = rec_df.reset_index().rename(columns={'index': user})

    # --------------------Model Evaluation-----------------------------------------------
    eval_result = evaluate(model=model, data=df1, neg_sampling=True, eval_batch_size=8192, k=10,
                           metrics=["roc_auc", "precision", 'recall'])
    Reco_AUC_score = round(eval_result['roc_auc'], 2)
    PrecisionScore = round(eval_result['precision'], 2)
    RecallScore = round(eval_result['recall'], 2)
    F1Nom = (round((2 * PrecisionScore * RecallScore), 2))
    F1Den = PrecisionScore + RecallScore
    if F1Den == 0.0:
        F1score = 0.0
    else:
        F1score = round((F1Nom / F1Den), 2)
    ModelMetrics = {"AUC Score": Reco_AUC_score, "PrecisionScore": PrecisionScore,
                    "RecallScore": RecallScore, "F1score": F1score}
    display(ModelMetrics)
    display(rec_df)
    return rec_df


class Recommender:

    def run(spark_DF, spark, config):
        stageAttributes = json.loads(config)
        stageAttributes['model']
        if 'hyperParameters' in stageAttributes:
            hyperParameters = stageAttributes['hyperParameters']
            if 'loss' in hyperParameters:
                hyperParameters['loss']
            if 'learning_rate' in hyperParameters:
                hyperParameters['learning_rate']
        else:
            pass
        user = stageAttributes['user']
        product = stageAttributes['product']
        rating = stageAttributes['rating']
        ratingexists = stageAttributes['ratingexists']
        originalfile = stageAttributes['originalfile']
        coldStartKeys = stageAttributes['coldStartKeys']
        recommender_obj = recommend(
            spark_DF, originalfile, user, product, ratingexists, coldStartKeys, rating)
        [recommender_obj]


***READING DATAFRAME***

In [None]:
############## CREATE SPARK SESSION ############################ ENTER YOUR SPARK MASTER IP AND PORT TO CONNECT TO SERVER ################
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[1]').getOrCreate()
#%run deepfmrecoHooks.ipynb
try:
	#sourcePreExecutionHook()

	moviedataethitorycopydwhs = HDFSConnector.fetch(spark, "{'url': '/FileStore/platform/testdata/1715250119760_MovieDatasetHistory_copy.csv', 'filename': 'MovieDatasetHistory_copy.csv', 'delimiter': ',', 'file_type': 'Delimeted', 'FilePath': '/Recommendation/deepMovieNameReco/MovieDatasetHistory_copy.csv', 'viewFileName': 'MovieDatasetHistory_copy.csv', 'is_header': 'Use Header Line', 'baseType': 'hdfs', 'server_url': '/nexusMax/NexusMaxPlatform/uploads/platform/', 'results_url': 'http://dnm.bfirst.ai:44040/api/read/hdfs'}")

except Exception as ex: 
	logging.error(ex)
#spark.stop()


***TRAIN MODEL***

In [None]:
#%run deepfmrecoHooks.ipynb
try:
	#mlPreExecutionHook()

	deepfm = Recommender.run(moviedataethitorycopydwhs,spark,json.dumps( {"model": "DeepFm", "autorecommend": 0, "ratingexists": 1, "rating": "Rating", "user": "UserId", "product": "original_title", "originalfile": "/FileStore/platform/testdata/1715250119760_MovieDatasetHistory_copy.csv", "coldStartKeys": [{"UserFeatures": ["UserId", "UserCountry"]}, {"ItemFeatures": ["original_title", "MovieId", "Genres", "Popularity", "Runtime", "vote_average", "Rating", "Age"]}]}))

	#mlPostExecutionHook(deepfm)

except Exception as ex: 
	logging.error(ex)
#spark.stop()
