***GENERATED CODE FOR movierecommendations PIPELINE.***

***DON'T EDIT THIS CODE.***

***CONNECTOR FUNCTIONS TO READ DATA.***

In [None]:
import os
import datetime
import logging
import warnings
warnings.filterwarnings('ignore')
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)


class HDFSConnector:

    def fetch(spark, config):
        ################### INPUT HADOOP HOST PORT TO CONNECT WITH ###############################
        hdfs_server = str(os.environ['HDFS_SERVER'])
        hdfs_port = int(os.environ['HDFS_PORT'])
        df = spark.read.options(header='true', inferschema='true').csv(
            f"hdfs://{hdfs_server}:{hdfs_port}{eval(config)['url']}", header='true')
        display(df.limit(2).toPandas())
        return df

    def put(df, spark, config):
        return df.write.format('csv').options(header='true' if eval(config)["is_header"] == "Use Header Line" else 'false',
                                              delimiter=eval(config)["delimiter"]).save(("%s %s") % (datetime.datetime.now().strftime("%Y-%m-%d %H.%M.%S")+"_", eval(config)['url']))


***TRANSFORMATIONS FUNCTIONS THAT WILL BE APPLIED ON DATA***

In [None]:
import json
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col, when
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import mean, stddev, min, max, col


class CleanseData:
    # def __init__(self,df):
    #     #print()

    def cleanValueForFE(self, value):
        if value == None:
            return ""
        elif str(value) == 'nan':
            return "nan"
        else:
            return value

    def replaceByMean(self, feature, df, mean_=-1):
        df1 = df
        df1 = df1.dropna()
        meanValue = self.cleanValueForFE(df1.select(
            mean(col(feature.name)).alias('mean')).collect()[0]["mean"])
        df = df.fillna(meanValue, subset=[feature.name])
        df.withColumn(feature.name, when(col(feature.name) == " ",
                      meanValue).otherwise(col(feature.name).cast("Integer")))
        return df

    def replaceByMax(self, feature, df, max_=-1):
        df1 = df
        df1 = df1.dropna()
        maxValue = self.cleanValueForFE(df1.select(
            max(col(feature.name)).alias('max')).collect()[0]["max"])
        df = df.fillna(maxValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", maxValue).otherwise(col(feature.name)))
        return df

    def replaceByMin(self, feature, df, min_=-1):
        df1 = df
        df1 = df1.dropna()
        minValue = self.cleanValueForFE(df1.select(
            min(col(feature.name)).alias('min')).collect()[0]["min"])
        df = df.fillna(minValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", minValue).otherwise(col(feature.name)))
        return df

    def replaceByStandardDeviation(self, feature, df, stddev_=-1):
        df1 = df
        df1 = df1.dropna()
        stddevValue = self.cleanValueForFE(df1.select(
            stddev(col(feature.name)).alias('stddev')).collect()[0]["stddev"])
        df = df.fillna(stddevValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", stddevValue).otherwise(col(feature.name)))
        return df

    def replaceDateRandomly(self, feature, df):
        df1 = df
        df1 = df1.dropna()
        fillValue = self.cleanValueForFE(
            df.where(col(feature.name).isNotNull()).head(1)[0][feature.name])
        df = df.fillna(str(fillValue), subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", fillValue).otherwise(col(feature.name)))
        # print("CleanseData:replaceDateRandomly Schema : ", df.#printSchema())
        return df

    def replaceNullValues(self, fList, df):
        featuresList = df.schema.fields
        for featureObj in fList:
            for feat in featuresList:
                if featureObj["feature"] in feat.name:
                    featureName = feat
                    if "mean" in featureObj["replaceby"]:
                        df = self.replaceByMean(featureName, df)
                    elif "max" in featureObj["replaceby"]:
                        df = self.replaceByMax(featureName, df)
                    elif "min" in featureObj["replaceby"]:
                        df = self.replaceByMin(featureName, df)
                    elif "stddev" in featureObj["replaceby"]:
                        df = self.replaceByStandardDeviation(featureName, df)
                    elif "random" in featureObj["replaceby"]:
                        df = self.replaceDateRandomly(featureName, df)
        return df


def StringIndexerTransform(df, params, transformationData={}):
    dfReturn = df
    feature = params["feature"]

    dfReturn = dfReturn.fillna({feature: ''})
    outcol = feature + "_stringindexer"
    indexer = StringIndexer(
        inputCol=feature, outputCol=outcol, handleInvalid="skip")
    indexed = indexer.fit(dfReturn).transform(dfReturn)
    dfReturn = indexed
    distinct_values_list = dfReturn.select(
        outcol).distinct().rdd.map(lambda r: r[0]).collect()
    len_distinct_values_list = len(distinct_values_list)
    if len_distinct_values_list <= 4:
        changed_type_df = dfReturn.withColumn(
            outcol, dfReturn[outcol].cast(IntegerType()))
        return changed_type_df
    return dfReturn


class TransformationMain:
    # TODO: change df argument in run with following
    def run(transformationDF, config):
        configObj = json.loads(config)
        featureData = configObj["FE"]
        transformationDF = CleanseData().replaceNullValues(featureData, transformationDF)
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'UserId', 'transformation_label': 'String Indexer'}], 'feature': 'UserId', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
                                                  'count': '500', 'mean': '', 'stddev': '', 'min': 'A10024', 'max': 'A9983', 'missing': '0', 'distinct': '458'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'UserId'}, {'feature_label': 'UserId', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('UserId')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'MovieId', 'transformation_label': 'String Indexer'}], 'feature': 'MovieId', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'A100', 'max': 'A99', 'missing': '0', 'distinct': '51'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'MovieId'}, {'feature_label': 'MovieId', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('MovieId')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Genres', 'transformation_label': 'String Indexer'}], 'feature': 'Genres', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'Comedy', 'max': 'Unknown', 'missing': '0', 'distinct': '4'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Genres'}, {'feature_label': 'Genres', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Genres')
        display(transformationDF.limit(2).toPandas())
        return transformationDF


***AUTOML FUNCTIONS***

In [None]:
import json
import pandas as pd
from libreco.algorithms import DeepFM
from libreco.data import DatasetFeat
from libreco.evaluation import evaluate


def recommend(df, originalfile, user, product, isRating, coldStartKeys, rating=None):
    df = df.toPandas()
    if isRating == 0:
        ratingCol = np.ones(len(df))
        df.insert(1, 'ratingCol', ratingCol)
        rating = 'ratingCol'
    else:
        ratingCol = rating

    # ------------------------------Retreiving User and Item Columns----------------------------------------------------------------
    for cols in df.columns:
        s1 = user.split("_stringindexer")
        s2 = product.split("_stringindexer")
        if cols in s1:
            user = cols
        elif cols in s2:
            product = cols

    coldStartKeys1 = coldStartKeys[0]
    userFeatures = coldStartKeys1['UserFeatures']
    coldStartKeys2 = coldStartKeys[1]
    itemFeatures = coldStartKeys2['ItemFeatures']

    for i in userFeatures:
        if user in i:
            userFeatures.remove(i)
    for i in range(len(userFeatures)):
        if '_stringindexer' in userFeatures[i]:
            userFeatures[i] = userFeatures[i].split("_stringindexer")[0]

    for i in itemFeatures:
        if product in i:
            itemFeatures.remove(i)
    for i in itemFeatures:
        if ratingCol in i:
            itemFeatures.remove(i)
    for i in range(len(itemFeatures)):
        if '_stringindexer' in itemFeatures[i]:
            itemFeatures[i] = itemFeatures[i].split("_stringindexer")[0]

    df1 = df.copy()
    df1 = df1.rename(columns={user: 'user', product: 'item'})
    df1.insert(2, "label", 1)
    df1['label'] = df1[ratingCol]
    df1 = df1.drop("Rating", axis=1)

    sparse_features = []
    dense_features = []
    [rating]

    for cols in df1.columns:
        if df1[cols].dtype == 'object':
            sparse_features.append(cols)
        else:
            dense_features.append(cols)

    for i in sparse_features:
        if user in i:
            sparse_features.remove(i)
    for i in sparse_features:
        if product in i:
            sparse_features.remove(i)
    sparse_features.remove('user')
    sparse_features.remove('item')
    dense_features.remove('label')

    train_data, data_info = DatasetFeat.build_trainset(df1, user_col=userFeatures, item_col=itemFeatures,
                                                       sparse_col=sparse_features, dense_col=dense_features)

    # ---------------------Model Training-----------------------------------------------------------------------------------

    model = DeepFM(data_info=data_info, task='ranking', loss_type='cross_entropy', embed_size=16, n_epochs=20, lr=0.001,
                   lr_decay=False, epsilon=1e-05, batch_size=256, sampler='random', num_neg=1, use_bn=True,
                   hidden_units=(128, 64, 32), multi_sparse_combiner='sqrtn')

    history = model.fit(train_data, neg_sampling=True, verbose=1, shuffle=True, eval_data=None, metrics=None, k=10,
                        eval_batch_size=8192, eval_user_num=None, num_workers=0)

    # -----------------------Reommendation--------------------------------------------
    itemReco = {}
    for users in df1['user']:
        itemReco.update(model.recommend_user(users, n_rec=10, user_feats=None, seq=None,
                        cold_start='average', inner_id=False, filter_consumed=True, random_rec=False))
    rec_df = pd.DataFrame(itemReco).T
    rec_df['Recommendations'] = rec_df[[
        0, 1, 2, 3, 4, 5, 6, 7, 8, 9]].agg(','.join, axis=1)
    rec_df = rec_df.drop([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], axis=1)
    rec_df = rec_df.reset_index().rename(columns={'index': user})

    # --------------------Model Evaluation-----------------------------------------------
    eval_result = evaluate(model=model, data=df1, neg_sampling=True, eval_batch_size=8192, k=10,
                           metrics=["roc_auc", "precision", 'recall'])
    Reco_AUC_score = round(eval_result['roc_auc'], 2)
    PrecisionScore = round(eval_result['precision'], 2)
    RecallScore = round(eval_result['recall'], 2)
    F1Nom = (round((2 * PrecisionScore * RecallScore), 2))
    F1Den = PrecisionScore + RecallScore
    if F1Den == 0.0:
        F1score = 0.0
    else:
        F1score = round((F1Nom / F1Den), 2)
    ModelMetrics = {"AUC Score": Reco_AUC_score, "PrecisionScore": PrecisionScore,
                    "RecallScore": RecallScore, "F1score": F1score}
    display(ModelMetrics)
    display(rec_df)
    return rec_df


class Recommender:

    def run(spark_DF, spark, config):
        stageAttributes = json.loads(config)
        stageAttributes['model']
        if 'hyperParameters' in stageAttributes:
            hyperParameters = stageAttributes['hyperParameters']
            if 'loss' in hyperParameters:
                hyperParameters['loss']
            if 'learning_rate' in hyperParameters:
                hyperParameters['learning_rate']
        else:
            pass
        user = stageAttributes['user']
        product = stageAttributes['product']
        rating = stageAttributes['rating']
        ratingexists = stageAttributes['ratingexists']
        originalfile = stageAttributes['originalfile']
        coldStartKeys = stageAttributes['coldStartKeys']
        recommender_obj = recommend(
            spark_DF, originalfile, user, product, ratingexists, coldStartKeys, rating)
        [recommender_obj]


***READING DATAFRAME***

In [None]:
############## CREATE SPARK SESSION ############################ ENTER YOUR SPARK MASTER IP AND PORT TO CONNECT TO SERVER ################
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[1]').getOrCreate()
#%run movierecommendationsHooks.ipynb
try:
	#sourcePreExecutionHook()

	moviedataethitory = HDFSConnector.fetch(spark, "{'url': '/FileStore/platform/testdata/1711565294108_MovieDatasetHistory.csv', 'filename': 'MovieDatasetHistory.csv', 'delimiter': ',', 'file_type': 'Delimeted', 'dbfs_token': '', 'dbfs_domain': '', 'FilePath': '/SampleSolutions/MovieRecommender/MovieDatasetHistory.csv', 'viewFileName': 'MovieDatasetHistory.csv', 'is_header': 'Use Header Line', 'baseType': 'hdfs', 'server_url': '/nexusMax/NexusMaxPlatform/uploads/platform/', 'results_url': 'http://dnm.bfirst.ai:44040/api/read/hdfs'}")

except Exception as ex: 
	logging.error(ex)
#spark.stop()


***TRANSFORMING DATAFRAME***

In [None]:
#%run movierecommendationsHooks.ipynb
try:
	#transformationPreExecutionHook()

	autofe = TransformationMain.run(moviedataethitory,json.dumps( {"FE": [{"transformationsData": [{"feature_label": "UserId", "transformation_label": "String Indexer"}], "feature": "UserId", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "A10024", "max": "A9983", "missing": "0", "distinct": "458"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "UserId"}, {"transformationsData": [{"feature_label": "MovieId", "transformation_label": "String Indexer"}], "feature": "MovieId", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "A100", "max": "A99", "missing": "0", "distinct": "51"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "MovieId"}, {"transformationsData": [{"feature_label": "Genres", "transformation_label": "String Indexer"}], "feature": "Genres", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "Comedy", "max": "Unknown", "missing": "0", "distinct": "4"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Genres"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Popularity", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "500", "mean": "10.83", "stddev": "8.42", "min": "0.0", "max": "20.48", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Popularity"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Runtime", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "108.43", "stddev": "17.8", "min": "43", "max": "360", "missing": "0"}, "updatedLabel": "Runtime"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "vote_average", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "500", "mean": "6.86", "stddev": "2.51", "min": "0.0", "max": "8.2", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "vote_average"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Rating", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "3.52", "stddev": "0.94", "min": "1", "max": "5", "missing": "0"}, "updatedLabel": "Rating"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Age", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "43.78", "stddev": "11.61", "min": "30", "max": "70", "missing": "0"}, "updatedLabel": "Age"}]}))

	#transformationPostExecutionHook(autofe)

except Exception as ex: 
	logging.error(ex)


***TRAIN MODEL***

In [None]:
#%run movierecommendationsHooks.ipynb
try:
	#mlPreExecutionHook()

	deepfm = Recommender.run(autofe,spark,json.dumps( {"model": "DeepFm", "autorecommend": 0, "ratingexists": 1, "rating": "Rating", "user": "UserId_stringindexer", "product": "MovieId_stringindexer", "originalfile": "/FileStore/platform/testdata/1711565294108_MovieDatasetHistory.csv", "coldStartKeys": [{"UserFeatures": ["Age", "UserId_stringindexer"]}, {"ItemFeatures": ["Popularity", "Runtime", "vote_average", "Rating", "MovieId_stringindexer", "Genres_stringindexer"]}]}))

	#mlPostExecutionHook(deepfm)

except Exception as ex: 
	logging.error(ex)
#spark.stop()
