***GENERATED CODE FOR recom2 PIPELINE.***

***DON'T EDIT THIS CODE.***

***CONNECTOR FUNCTIONS TO READ DATA.***

In [None]:
import os
import datetime
import logging
import warnings
warnings.filterwarnings('ignore')
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)


class HDFSConnector:

    def fetch(spark, config):
        ################### INPUT HADOOP HOST PORT TO CONNECT WITH ###############################
        hdfs_server = str(os.environ['HDFS_SERVER'])
        hdfs_port = int(os.environ['HDFS_PORT'])
        df = spark.read.options(header='true', inferschema='true').csv(
            f"hdfs://{hdfs_server}:{hdfs_port}{eval(config)['url']}", header='true')
        display(df.limit(2).toPandas())
        return df

    def put(df, spark, config):
        return df.write.format('csv').options(header='true' if eval(config)["is_header"] == "Use Header Line" else 'false',
                                              delimiter=eval(config)["delimiter"]).save(("%s %s") % (datetime.datetime.now().strftime("%Y-%m-%d %H.%M.%S")+"_", eval(config)['url']))


***TRANSFORMATIONS FUNCTIONS THAT WILL BE APPLIED ON DATA***

In [None]:
import json
from pyspark.sql.functions import col, when
from pyspark.sql.functions import mean, stddev, min, max, col


class CleanseData:
    # def __init__(self,df):
    #     #print()

    def cleanValueForFE(self, value):
        if value == None:
            return ""
        elif str(value) == 'nan':
            return "nan"
        else:
            return value

    def replaceByMean(self, feature, df, mean_=-1):
        df1 = df
        df1 = df1.dropna()
        meanValue = self.cleanValueForFE(df1.select(
            mean(col(feature.name)).alias('mean')).collect()[0]["mean"])
        df = df.fillna(meanValue, subset=[feature.name])
        df.withColumn(feature.name, when(col(feature.name) == " ",
                      meanValue).otherwise(col(feature.name).cast("Integer")))
        return df

    def replaceByMax(self, feature, df, max_=-1):
        df1 = df
        df1 = df1.dropna()
        maxValue = self.cleanValueForFE(df1.select(
            max(col(feature.name)).alias('max')).collect()[0]["max"])
        df = df.fillna(maxValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", maxValue).otherwise(col(feature.name)))
        return df

    def replaceByMin(self, feature, df, min_=-1):
        df1 = df
        df1 = df1.dropna()
        minValue = self.cleanValueForFE(df1.select(
            min(col(feature.name)).alias('min')).collect()[0]["min"])
        df = df.fillna(minValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", minValue).otherwise(col(feature.name)))
        return df

    def replaceByStandardDeviation(self, feature, df, stddev_=-1):
        df1 = df
        df1 = df1.dropna()
        stddevValue = self.cleanValueForFE(df1.select(
            stddev(col(feature.name)).alias('stddev')).collect()[0]["stddev"])
        df = df.fillna(stddevValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", stddevValue).otherwise(col(feature.name)))
        return df

    def replaceDateRandomly(self, feature, df):
        df1 = df
        df1 = df1.dropna()
        fillValue = self.cleanValueForFE(
            df.where(col(feature.name).isNotNull()).head(1)[0][feature.name])
        df = df.fillna(str(fillValue), subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", fillValue).otherwise(col(feature.name)))
        # print("CleanseData:replaceDateRandomly Schema : ", df.#printSchema())
        return df

    def replaceNullValues(self, fList, df):
        featuresList = df.schema.fields
        for featureObj in fList:
            for feat in featuresList:
                if featureObj["feature"] in feat.name:
                    featureName = feat
                    if "mean" in featureObj["replaceby"]:
                        df = self.replaceByMean(featureName, df)
                    elif "max" in featureObj["replaceby"]:
                        df = self.replaceByMax(featureName, df)
                    elif "min" in featureObj["replaceby"]:
                        df = self.replaceByMin(featureName, df)
                    elif "stddev" in featureObj["replaceby"]:
                        df = self.replaceByStandardDeviation(featureName, df)
                    elif "random" in featureObj["replaceby"]:
                        df = self.replaceDateRandomly(featureName, df)
        return df


class TransformationMain:
    # TODO: change df argument in run with following
    def run(transformationDF, config):
        configObj = json.loads(config)
        featureData = configObj["FE"]
        transformationDF = CleanseData().replaceNullValues(featureData, transformationDF)
        display(transformationDF.limit(2).toPandas())
        return transformationDF


***AUTOML FUNCTIONS***

In [None]:
import json
import pandas as pd
from scipy.sparse import csr_matrix
from lightfm import LightFM
import numpy as np


def sparseMatrix(df, user, product, rating):
    df_selected = df[[user, product, rating]]
    user_df = pd.pivot_table(df_selected, index=user,
                             columns=product, values=rating)
    # fill missing values with 0
    user_df = user_df.fillna(0)
    user_id = list(user_df.index)
    user_dict = {}
    counter = 0
    for i in user_id:
        user_dict[i] = counter
        counter += 1
    # convert to csr matrix
    user_df_csr = csr_matrix(user_df.values)
    return user_df, user_df_csr, user_id, user_dict


def recommenderModel(user_df_csr, loss='logistic', learning_rate=0.05):
    loss = loss.lower()
    model = LightFM(loss=loss, learning_rate=learning_rate)
    model = model.fit(user_df_csr)
    return model


def recommendPredict(model, interactions, user_id, user_dict, threshold=0, nrec_items=10, show=True):
    n_users, n_items = interactions.shape
    user_x = user_dict[user_id]
    scores = pd.Series(model.predict(user_x, np.arange(n_items)))
    scores.index = interactions.columns
    scores = list(pd.Series(scores.sort_values(ascending=False).index))

    known_items = list(pd.Series(interactions.loc[user_id, :]
                                 [interactions.loc[user_id, :] > threshold].index).sort_values(ascending=False))

    scores = [x for x in scores if x not in known_items]
    return_score_list = scores[0:nrec_items]
    known_items = list(pd.Series(known_items))
    scores = list(pd.Series(return_score_list))
    known_likes = {}
    recommend_dict = []
    counter = 1
    for i in known_items:
        known_likes[counter] = i
        counter += 1
    counter = 1
    for i in scores:
        recommend_dict.append(i)
        counter += 1

    if show == True:
        print("User: " + str(user_id))
        print("Known Likes:")
        counter = 1
        for i in known_items:
            print(str(counter) + '- ' + str(i))
            counter += 1
        print("\n Recommended Items:")
        counter = 1
        for i in scores:
            print(str(counter) + '- ' + str(i))
            counter += 1
    return known_likes, recommend_dict


def recommend(df, originalfile, user, product, isRating, rating=None, loss='logistic', learning_rate=0.05):
    df = df.toPandas()
    if isRating == 0:
        ratingCol = np.ones(len(df))
        df.insert(1, 'ratingCol', ratingCol)
        rating = 'ratingCol'
    total_user = df[user].to_list()
    user_df, user_df_csr, user_id, user_dict = sparseMatrix(
        df, user, product, rating)
    model = recommenderModel(user_df_csr, loss=loss,
                             learning_rate=learning_rate)
    recommend_dict_all = []
    for us in total_user:
        recommend_obj = {}
        known_likes, recommend_dict = recommendPredict(
            model, user_df, us, user_dict, threshold=0, nrec_items=10, show=False)
        recommend_obj["recommendedOn"] = us
        recommend_obj["recomendations"] = recommend_dict
        recommend_dict_all.append(recommend_obj)
    rec_df = pd.DataFrame(recommend_dict_all)
    display(rec_df)
    return rec_df


class Recommender:

    def run(spark_DF, spark, config):
        stageAttributes = json.loads(config)
        stageAttributes['model']
        user = stageAttributes['user']
        product = stageAttributes['product']
        rating = stageAttributes['rating']
        ratingexists = stageAttributes['ratingexists']
        originalfile = stageAttributes['originalfile']
        recommender_obj = recommend(
            spark_DF, originalfile, user, product, ratingexists, rating)
        [recommender_obj]


***READING DATAFRAME***

In [None]:
############## CREATE SPARK SESSION ############################ ENTER YOUR SPARK MASTER IP AND PORT TO CONNECT TO SERVER ################
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[1]').getOrCreate()
#%run recom2Hooks.ipynb
try:
	#sourcePreExecutionHook()

	hotelcurrentcutomertxav = HDFSConnector.fetch(spark, "{'url': '/FileStore/platform/testdata/1715337737433_HotelCurrentCustomers.csv', 'filename': 'HotelCurrentCustomers.csv', 'delimiter': ',', 'file_type': 'Delimeted', 'dbfs_token': '', 'dbfs_domain': '', 'FilePath': '/DNM_TESTING 9th May/Test data/Recommendations/HotelCurrentCustomers.csv', 'viewFileName': 'HotelCurrentCustomers.csv', 'is_header': 'Use Header Line', 'baseType': 'hdfs', 'server_url': '/nexusMax/NexusMaxPlatform/uploads/platform/', 'results_url': 'http://dnm.bfirst.ai:44040/api/read/hdfs'}")

except Exception as ex: 
	logging.error(ex)
#spark.stop()


***TRANSFORMING DATAFRAME***

In [None]:
#%run recom2Hooks.ipynb
try:
	#transformationPreExecutionHook()

	autofe = TransformationMain.run(hotelcurrentcutomertxav,json.dumps( {"FE": [{"transformationsData": [{"transformation_label": "novalue"}], "feature": "user_id", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "5294.79", "stddev": "1959.63", "min": "12", "max": "7209", "missing": "0"}, "updatedLabel": "user_id"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "hotel_id", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "52.78", "stddev": "26.96", "min": "0", "max": "99", "missing": "0"}, "updatedLabel": "hotel_id"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "user_location_country", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "83.71", "stddev": "52.42", "min": "46", "max": "205", "missing": "0"}, "updatedLabel": "user_location_country"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "user_location_city", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "21857.99", "stddev": "13529.64", "min": "902", "max": "54122", "missing": "0"}, "updatedLabel": "user_location_city"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "user_hotel_distance", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "500", "mean": "1789.97", "stddev": "2040.44", "min": "0.5397", "max": "7469.6325", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "user_hotel_distance"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "is_mobile", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "0.3", "stddev": "0.46", "min": "0", "max": "1", "missing": "0"}, "updatedLabel": "is_mobile"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "is_package", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "0.49", "stddev": "0.5", "min": "0", "max": "1", "missing": "0"}, "updatedLabel": "is_package"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "channel", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "5.09", "stddev": "3.54", "min": "0", "max": "9", "missing": "0"}, "updatedLabel": "channel"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "adults_cnt", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "2.06", "stddev": "0.85", "min": "1", "max": "8", "missing": "0"}, "updatedLabel": "adults_cnt"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "children_cnt", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "0.14", "stddev": "0.47", "min": "0", "max": "2", "missing": "0"}, "updatedLabel": "children_cnt"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "room_cnt", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "1.07", "stddev": "0.29", "min": "1", "max": "4", "missing": "0"}, "updatedLabel": "room_cnt"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "is_click_booking", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "0.09", "stddev": "0.28", "min": "0", "max": "1", "missing": "0"}, "updatedLabel": "is_click_booking"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Hotel_destination_type_id", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "2.78", "stddev": "2.1", "min": "1", "max": "6", "missing": "0"}, "updatedLabel": "Hotel_destination_type_id"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "hotel_continent", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "3.33", "stddev": "1.39", "min": "0", "max": "6", "missing": "0"}, "updatedLabel": "hotel_continent"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "hotel_country", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "79.55", "stddev": "59.2", "min": "0", "max": "208", "missing": "0"}, "updatedLabel": "hotel_country"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "hotel_location", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "14784.69", "stddev": "8870.36", "min": "356", "max": "53940", "missing": "0"}, "updatedLabel": "hotel_location"}]}))

	#transformationPostExecutionHook(autofe)

except Exception as ex: 
	logging.error(ex)


***TRAIN MODEL***

In [None]:
#%run recom2Hooks.ipynb
try:
	#mlPreExecutionHook()

	recommendationsautoml = Recommender.run(autofe,spark,json.dumps( {"model": "LightFm", "autorecommend": 1, "ratingexists": 0, "rating": "no value selected", "user": "user_id", "product": "hotel_id", "originalfile": "/FileStore/platform/testdata/1715337737433_HotelCurrentCustomers.csv"}))

	#mlPostExecutionHook(recommendationsautoml)

except Exception as ex: 
	logging.error(ex)
spark.stop()
