***GENERATED CODE FOR movierecombinbuck PIPELINE.***

***DON'T EDIT THIS CODE.***

***CONNECTOR FUNCTIONS TO READ DATA.***

In [None]:
import os
import datetime
import logging
import warnings
warnings.filterwarnings('ignore')
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)


class HDFSConnector:

    def fetch(spark, config):
        ################### INPUT HADOOP HOST PORT TO CONNECT WITH ###############################
        hdfs_server = str(os.environ['HDFS_SERVER'])
        hdfs_port = int(os.environ['HDFS_PORT'])
        df = spark.read.options(header='true', inferschema='true').csv(
            f"hdfs://{hdfs_server}:{hdfs_port}{eval(config)['url']}", header='true')
        display(df.limit(2).toPandas())
        return df

    def put(df, spark, config):
        return df.write.format('csv').options(header='true' if eval(config)["is_header"] == "Use Header Line" else 'false',
                                              delimiter=eval(config)["delimiter"]).save(("%s %s") % (datetime.datetime.now().strftime("%Y-%m-%d %H.%M.%S")+"_", eval(config)['url']))


***TRANSFORMATIONS FUNCTIONS THAT WILL BE APPLIED ON DATA***

In [None]:
import json
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import Bucketizer
from pyspark.sql.functions import round
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col, when
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import mean, stddev, min, max, col


class CleanseData:
    # def __init__(self,df):
    #     #print()

    def cleanValueForFE(self, value):
        if value == None:
            return ""
        elif str(value) == 'nan':
            return "nan"
        else:
            return value

    def replaceByMean(self, feature, df, mean_=-1):
        df1 = df
        df1 = df1.dropna()
        meanValue = self.cleanValueForFE(df1.select(
            mean(col(feature.name)).alias('mean')).collect()[0]["mean"])
        df = df.fillna(meanValue, subset=[feature.name])
        df.withColumn(feature.name, when(col(feature.name) == " ",
                      meanValue).otherwise(col(feature.name).cast("Integer")))
        return df

    def replaceByMax(self, feature, df, max_=-1):
        df1 = df
        df1 = df1.dropna()
        maxValue = self.cleanValueForFE(df1.select(
            max(col(feature.name)).alias('max')).collect()[0]["max"])
        df = df.fillna(maxValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", maxValue).otherwise(col(feature.name)))
        return df

    def replaceByMin(self, feature, df, min_=-1):
        df1 = df
        df1 = df1.dropna()
        minValue = self.cleanValueForFE(df1.select(
            min(col(feature.name)).alias('min')).collect()[0]["min"])
        df = df.fillna(minValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", minValue).otherwise(col(feature.name)))
        return df

    def replaceByStandardDeviation(self, feature, df, stddev_=-1):
        df1 = df
        df1 = df1.dropna()
        stddevValue = self.cleanValueForFE(df1.select(
            stddev(col(feature.name)).alias('stddev')).collect()[0]["stddev"])
        df = df.fillna(stddevValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", stddevValue).otherwise(col(feature.name)))
        return df

    def replaceDateRandomly(self, feature, df):
        df1 = df
        df1 = df1.dropna()
        fillValue = self.cleanValueForFE(
            df.where(col(feature.name).isNotNull()).head(1)[0][feature.name])
        df = df.fillna(str(fillValue), subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", fillValue).otherwise(col(feature.name)))
        # print("CleanseData:replaceDateRandomly Schema : ", df.#printSchema())
        return df

    def replaceNullValues(self, fList, df):
        featuresList = df.schema.fields
        for featureObj in fList:
            for feat in featuresList:
                if featureObj["feature"] in feat.name:
                    featureName = feat
                    if "mean" in featureObj["replaceby"]:
                        df = self.replaceByMean(featureName, df)
                    elif "max" in featureObj["replaceby"]:
                        df = self.replaceByMax(featureName, df)
                    elif "min" in featureObj["replaceby"]:
                        df = self.replaceByMin(featureName, df)
                    elif "stddev" in featureObj["replaceby"]:
                        df = self.replaceByStandardDeviation(featureName, df)
                    elif "random" in featureObj["replaceby"]:
                        df = self.replaceDateRandomly(featureName, df)
        return df


def StringIndexerTransform(df, params, transformationData={}):
    dfReturn = df
    feature = params["feature"]

    dfReturn = dfReturn.fillna({feature: ''})
    outcol = feature + "_stringindexer"
    indexer = StringIndexer(
        inputCol=feature, outputCol=outcol, handleInvalid="skip")
    indexed = indexer.fit(dfReturn).transform(dfReturn)
    dfReturn = indexed
    distinct_values_list = dfReturn.select(
        outcol).distinct().rdd.map(lambda r: r[0]).collect()
    len_distinct_values_list = len(distinct_values_list)
    if len_distinct_values_list <= 4:
        changed_type_df = dfReturn.withColumn(
            outcol, dfReturn[outcol].cast(IntegerType()))
        return changed_type_df
    return dfReturn


def bucketizerTransform(df, params, transformationData={}):
    dfReturn = df
    transform_params = params
    feature = transform_params['feature']

    dfReturn = dfReturn.fillna({feature: '0.0'})
    splits = []
    for bucket in transformationData["buckets"]:
        if float(bucket["lower_value"]) not in splits:
            splits.append(float(bucket["lower_value"]))
        if float(bucket["upper_value"]) not in splits:
            splits.append(float(bucket["upper_value"]))

    outcol = feature + "_bucketizer"
    bucketizer = Bucketizer(splits=splits, inputCol=feature, outputCol=outcol)
    bucketized = bucketizer.transform(dfReturn)
    dfReturn = bucketized
    dfReturn = dfReturn.withColumn(feature, round(dfReturn[feature], 2))

    return dfReturn


def BinarizerTransform(df, params, transformationData={}):
    dfReturn = df
    transform_params = params
    feature = transform_params['feature']
    outcol = feature + "_binarizer"
    dfReturn = dfReturn.withColumn("feature_cast", dfReturn[feature].cast("double")).drop(feature)\
        .withColumnRenamed("feature_cast", feature)

    dfReturn = dfReturn.fillna({feature: 0.0})
    binarizer = Binarizer(threshold=float(
        transformationData['threshold']), inputCol=feature, outputCol=outcol)
    binarizedDataFrame = binarizer.transform(dfReturn)

    # binarizedDataFrame=binarizedDataFrame.drop(feature).withColumnRenamed(outcol,feature)

    dfReturn = binarizedDataFrame
    dfReturn = dfReturn.withColumn(feature, round(dfReturn[feature], 2))

    return dfReturn


class TransformationMain:
    # TODO: change df argument in run with following
    def run(transformationDF, config):
        configObj = json.loads(config)
        featureData = configObj["FE"]
        transformationDF = CleanseData().replaceNullValues(featureData, transformationDF)
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Genres', 'transformation_label': 'String Indexer'}], 'feature': 'Genres', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
                                                  'count': '600', 'mean': '', 'stddev': '', 'min': 'Comedy', 'max': 'Unknown', 'missing': '0', 'distinct': '5'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Genres'}, {'feature_label': 'Genres', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Genres')
        transformationDF = bucketizerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Popularity', 'buckets': [{'lower_value': '-Infinity', 'upper_value': '05'}, {'lower_value': '05', 'upper_value': '010'}, {'lower_value': '010', 'upper_value': '15'}, {'lower_value': '15', 'upper_value': '20'}, {'lower_value': '20', 'upper_value': '+Infinity'}], 'transformation_label': 'Bucketizer'}], 'feature': 'Popularity', 'type': 'real', 'selected': 'True', 'replaceby': 'mean', 'stats': {
            'count': '600', 'mean': '9.72', 'stddev': '7.94', 'min': '0.0', 'max': '20.0', 'missing': '0'}, 'transformation': [{'transformation': 'Bucketizer', 'selectedAsDefault': 1}], 'updatedLabel': 'Popularity'}, {'feature_label': 'Popularity', 'buckets': [{'lower_value': '-Infinity', 'upper_value': '05'}, {'lower_value': '05', 'upper_value': '010'}, {'lower_value': '010', 'upper_value': '15'}, {'lower_value': '15', 'upper_value': '20'}, {'lower_value': '20', 'upper_value': '+Infinity'}], 'transformation_label': 'Bucketizer'})
        transformationDF = transformationDF.drop('Popularity')
        transformationDF = BinarizerTransform(transformationDF, {'transformationsData': [{'feature_label': 'vote_average', 'threshold': '5.0', 'transformation_label': 'Binarizer'}], 'feature': 'vote_average', 'type': 'real', 'selected': 'True', 'replaceby': 'mean', 'stats': {
            'count': '600', 'mean': '6.46', 'stddev': '2.21', 'min': '0.0', 'max': '8.0', 'missing': '0'}, 'transformation': [{'transformation': 'Binarizer', 'selectedAsDefault': 1}], 'updatedLabel': 'vote_average'}, {'feature_label': 'vote_average', 'threshold': '5.0', 'transformation_label': 'Binarizer'})
        transformationDF = transformationDF.drop('vote_average')
        display(transformationDF.limit(2).toPandas())
        return transformationDF


***AUTOML FUNCTIONS***

In [None]:
import json
import pandas as pd
from scipy.sparse import csr_matrix
from lightfm import LightFM
import numpy as np


def sparseMatrix(df, user, product, rating):
    df_selected = df[[user, product, rating]]
    user_df = pd.pivot_table(df_selected, index=user,
                             columns=product, values=rating)
    # fill missing values with 0
    user_df = user_df.fillna(0)
    user_id = list(user_df.index)
    user_dict = {}
    counter = 0
    for i in user_id:
        user_dict[i] = counter
        counter += 1
    # convert to csr matrix
    user_df_csr = csr_matrix(user_df.values)
    return user_df, user_df_csr, user_id, user_dict


def recommenderModel(user_df_csr, loss='logistic', learning_rate=0.05):
    loss = loss.lower()
    model = LightFM(loss=loss, learning_rate=learning_rate)
    model = model.fit(user_df_csr)
    return model


def recommendPredict(model, interactions, user_id, user_dict, threshold=0, nrec_items=10, show=True):
    n_users, n_items = interactions.shape
    user_x = user_dict[user_id]
    scores = pd.Series(model.predict(user_x, np.arange(n_items)))
    scores.index = interactions.columns
    scores = list(pd.Series(scores.sort_values(ascending=False).index))

    known_items = list(pd.Series(interactions.loc[user_id, :]
                                 [interactions.loc[user_id, :] > threshold].index).sort_values(ascending=False))

    scores = [x for x in scores if x not in known_items]
    return_score_list = scores[0:nrec_items]
    known_items = list(pd.Series(known_items))
    scores = list(pd.Series(return_score_list))
    known_likes = {}
    recommend_dict = []
    counter = 1
    for i in known_items:
        known_likes[counter] = i
        counter += 1
    counter = 1
    for i in scores:
        recommend_dict.append(i)
        counter += 1

    if show == True:
        print("User: " + str(user_id))
        print("Known Likes:")
        counter = 1
        for i in known_items:
            print(str(counter) + '- ' + str(i))
            counter += 1
        print("\n Recommended Items:")
        counter = 1
        for i in scores:
            print(str(counter) + '- ' + str(i))
            counter += 1
    return known_likes, recommend_dict


def recommend(df, originalfile, user, product, isRating, rating=None, loss='logistic', learning_rate=0.05):
    df = df.toPandas()
    if isRating == 0:
        ratingCol = np.ones(len(df))
        df.insert(1, 'ratingCol', ratingCol)
        rating = 'ratingCol'
    total_user = df[user].to_list()
    user_df, user_df_csr, user_id, user_dict = sparseMatrix(
        df, user, product, rating)
    model = recommenderModel(user_df_csr, loss=loss,
                             learning_rate=learning_rate)
    recommend_dict_all = []
    for us in total_user:
        recommend_obj = {}
        known_likes, recommend_dict = recommendPredict(
            model, user_df, us, user_dict, threshold=0, nrec_items=10, show=False)
        recommend_obj["recommendedOn"] = us
        recommend_obj["recomendations"] = recommend_dict
        recommend_dict_all.append(recommend_obj)
    rec_df = pd.DataFrame(recommend_dict_all)
    display(rec_df)
    return rec_df


class Recommender:

    def run(spark_DF, spark, config):
        stageAttributes = json.loads(config)
        stageAttributes['model']
        user = stageAttributes['user']
        product = stageAttributes['product']
        rating = stageAttributes['rating']
        ratingexists = stageAttributes['ratingexists']
        originalfile = stageAttributes['originalfile']
        recommender_obj = recommend(
            spark_DF, originalfile, user, product, ratingexists, rating)
        [recommender_obj]


***READING DATAFRAME***

In [None]:
############## CREATE SPARK SESSION ############################ ENTER YOUR SPARK MASTER IP AND PORT TO CONNECT TO SERVER ################
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[1]').getOrCreate()
#%run movierecombinbuckHooks.ipynb
try:
	#sourcePreExecutionHook()

	moviedataethitorynjup = HDFSConnector.fetch(spark, "{'url': '/FileStore/platform/testdata/1715238553123_MovieDatasetHistory.csv', 'filename': 'MovieDatasetHistory.csv', 'delimiter': ',', 'file_type': 'Delimeted', 'dbfs_token': '', 'dbfs_domain': '', 'FilePath': '/Recommendation/MovieRecommendation/DataFiles/MovieDatasetHistory.csv', 'viewFileName': 'MovieDatasetHistory.csv', 'is_header': 'Use Header Line', 'baseType': 'hdfs', 'server_url': '/nexusMax/NexusMaxPlatform/uploads/platform/', 'results_url': 'http://dnm.bfirst.ai:44040/api/read/hdfs'}")

except Exception as ex: 
	logging.error(ex)
#spark.stop()


***TRANSFORMING DATAFRAME***

In [None]:
#%run movierecombinbuckHooks.ipynb
try:
	#transformationPreExecutionHook()

	autofe = TransformationMain.run(moviedataethitorynjup,json.dumps( {"FE": [{"transformationsData": [{"feature_label": "UserId", "transformation_label": "novalue"}], "feature": "UserId", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "600", "mean": "", "stddev": "", "min": "A10024", "max": "A9983", "missing": "0", "distinct": "517"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "UserId"}, {"transformationsData": [{"feature_label": "MovieId", "transformation_label": "novalue"}], "feature": "MovieId", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "600", "mean": "", "stddev": "", "min": "A100", "max": "A99", "missing": "0", "distinct": "61"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "MovieId"}, {"transformationsData": [{"feature_label": "Genres", "transformation_label": "String Indexer"}], "feature": "Genres", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "600", "mean": "", "stddev": "", "min": "Comedy", "max": "Unknown", "missing": "0", "distinct": "5"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Genres"}, {"transformationsData": [{"feature_label": "Popularity", "buckets": [{"lower_value": "-Infinity", "upper_value": "05"}, {"lower_value": "05", "upper_value": "010"}, {"lower_value": "010", "upper_value": "15"}, {"lower_value": "15", "upper_value": "20"}, {"lower_value": "20", "upper_value": "+Infinity"}], "transformation_label": "Bucketizer"}], "feature": "Popularity", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "600", "mean": "9.72", "stddev": "7.94", "min": "0.0", "max": "20.0", "missing": "0"}, "transformation": [{"transformation": "Bucketizer", "selectedAsDefault": 1}], "updatedLabel": "Popularity"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Runtime", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "600", "mean": "109.3", "stddev": "16.92", "min": "43", "max": "360", "missing": "0"}, "updatedLabel": "Runtime"}, {"transformationsData": [{"feature_label": "vote_average", "threshold": "5.0", "transformation_label": "Binarizer"}], "feature": "vote_average", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "600", "mean": "6.46", "stddev": "2.21", "min": "0.0", "max": "8.0", "missing": "0"}, "transformation": [{"transformation": "Binarizer", "selectedAsDefault": 1}], "updatedLabel": "vote_average"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Rating", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "600", "mean": "3.38", "stddev": "1.03", "min": "1", "max": "5", "missing": "0"}, "updatedLabel": "Rating"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Age", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "600", "mean": "43.76", "stddev": "11.56", "min": "30", "max": "70", "missing": "0"}, "updatedLabel": "Age"}]}))

	#transformationPostExecutionHook(autofe)

except Exception as ex: 
	logging.error(ex)


***TRAIN MODEL***

In [None]:
#%run movierecombinbuckHooks.ipynb
try:
	#mlPreExecutionHook()

	recommendationsautoml = Recommender.run(autofe,spark,json.dumps( {"model": "LightFm", "autorecommend": 1, "ratingexists": 1, "rating": "Rating", "user": "UserId", "product": "MovieId", "originalfile": "/FileStore/platform/testdata/1715238553123_MovieDatasetHistory.csv"}))

	#mlPostExecutionHook(recommendationsautoml)

except Exception as ex: 
	logging.error(ex)
spark.stop()
