***GENERATED CODE FOR newrecom PIPELINE.***

***DON'T EDIT THIS CODE.***

***CONNECTOR FUNCTIONS TO READ DATA.***

In [None]:
import os
import datetime
import logging
import warnings
warnings.filterwarnings('ignore')
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)


class HDFSConnector:

    def fetch(spark, config):
        ################### INPUT HADOOP HOST PORT TO CONNECT WITH ###############################
        hdfs_server = str(os.environ['HDFS_SERVER'])
        hdfs_port = int(os.environ['HDFS_PORT'])
        df = spark.read.options(header='true', inferschema='true').csv(
            f"hdfs://{hdfs_server}:{hdfs_port}{eval(config)['url']}", header='true')
        display(df.limit(2).toPandas())
        return df

    def put(df, spark, config):
        return df.write.format('csv').options(header='true' if eval(config)["is_header"] == "Use Header Line" else 'false',
                                              delimiter=eval(config)["delimiter"]).save(("%s %s") % (datetime.datetime.now().strftime("%Y-%m-%d %H.%M.%S")+"_", eval(config)['url']))


***AUTOML FUNCTIONS***

In [None]:
import json
import pandas as pd
from scipy.sparse import csr_matrix
from lightfm import LightFM
import numpy as np


def sparseMatrix(df, user, product, rating):
    df_selected = df[[user, product, rating]]
    user_df = pd.pivot_table(df_selected, index=user,
                             columns=product, values=rating)
    # fill missing values with 0
    user_df = user_df.fillna(0)
    user_id = list(user_df.index)
    user_dict = {}
    counter = 0
    for i in user_id:
        user_dict[i] = counter
        counter += 1
    # convert to csr matrix
    user_df_csr = csr_matrix(user_df.values)
    return user_df, user_df_csr, user_id, user_dict


def recommenderModel(user_df_csr, loss='logistic', learning_rate=0.05):
    loss = loss.lower()
    model = LightFM(loss=loss, learning_rate=learning_rate)
    model = model.fit(user_df_csr)
    return model


def recommendPredict(model, interactions, user_id, user_dict, threshold=0, nrec_items=10, show=True):
    n_users, n_items = interactions.shape
    user_x = user_dict[user_id]
    scores = pd.Series(model.predict(user_x, np.arange(n_items)))
    scores.index = interactions.columns
    scores = list(pd.Series(scores.sort_values(ascending=False).index))

    known_items = list(pd.Series(interactions.loc[user_id, :]
                                 [interactions.loc[user_id, :] > threshold].index).sort_values(ascending=False))

    scores = [x for x in scores if x not in known_items]
    return_score_list = scores[0:nrec_items]
    known_items = list(pd.Series(known_items))
    scores = list(pd.Series(return_score_list))
    known_likes = {}
    recommend_dict = []
    counter = 1
    for i in known_items:
        known_likes[counter] = i
        counter += 1
    counter = 1
    for i in scores:
        recommend_dict.append(i)
        counter += 1

    if show == True:
        print("User: " + str(user_id))
        print("Known Likes:")
        counter = 1
        for i in known_items:
            print(str(counter) + '- ' + str(i))
            counter += 1
        print("\n Recommended Items:")
        counter = 1
        for i in scores:
            print(str(counter) + '- ' + str(i))
            counter += 1
    return known_likes, recommend_dict


def recommend(df, originalfile, user, product, isRating, rating=None, loss='logistic', learning_rate=0.05):
    df = df.toPandas()
    if isRating == 0:
        ratingCol = np.ones(len(df))
        df.insert(1, 'ratingCol', ratingCol)
        rating = 'ratingCol'
    total_user = df[user].to_list()
    user_df, user_df_csr, user_id, user_dict = sparseMatrix(
        df, user, product, rating)
    model = recommenderModel(user_df_csr, loss=loss,
                             learning_rate=learning_rate)
    recommend_dict_all = []
    for us in total_user:
        recommend_obj = {}
        known_likes, recommend_dict = recommendPredict(
            model, user_df, us, user_dict, threshold=0, nrec_items=10, show=False)
        recommend_obj["recommendedOn"] = us
        recommend_obj["recomendations"] = recommend_dict
        recommend_dict_all.append(recommend_obj)
    rec_df = pd.DataFrame(recommend_dict_all)
    display(rec_df)
    return rec_df


class Recommender:

    def run(spark_DF, spark, config):
        stageAttributes = json.loads(config)
        stageAttributes['model']
        user = stageAttributes['user']
        product = stageAttributes['product']
        rating = stageAttributes['rating']
        ratingexists = stageAttributes['ratingexists']
        originalfile = stageAttributes['originalfile']
        recommender_obj = recommend(
            spark_DF, originalfile, user, product, ratingexists, rating)
        [recommender_obj]


***READING DATAFRAME***

In [None]:
############## CREATE SPARK SESSION ############################ ENTER YOUR SPARK MASTER IP AND PORT TO CONNECT TO SERVER ################
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[1]').getOrCreate()
#%run newrecomHooks.ipynb
try:
	#sourcePreExecutionHook()

	moviedataethitoryzesq = HDFSConnector.fetch(spark, "{'url': '/FileStore/platform/testdata/1715238553123_MovieDatasetHistory.csv', 'filename': 'MovieDatasetHistory.csv', 'delimiter': ',', 'file_type': 'Delimeted', 'dbfs_token': '', 'dbfs_domain': '', 'FilePath': '/Recommendation/MovieRecommendation/DataFiles/MovieDatasetHistory.csv', 'viewFileName': 'MovieDatasetHistory.csv', 'is_header': 'Use Header Line', 'baseType': 'hdfs', 'server_url': '/nexusMax/NexusMaxPlatform/uploads/platform/', 'results_url': 'http://dnm.bfirst.ai:44040/api/read/hdfs'}")

except Exception as ex: 
	logging.error(ex)
#spark.stop()


***TRAIN MODEL***

In [None]:
#%run newrecomHooks.ipynb
try:
	#mlPreExecutionHook()

	recommendationsautoml = Recommender.run(moviedataethitoryzesq,spark,json.dumps( {"model": "LightFm", "autorecommend": 1, "ratingexists": 1, "rating": "Rating", "user": "UserId", "product": "MovieId", "originalfile": "/FileStore/platform/testdata/1715238553123_MovieDatasetHistory.csv"}))

	#mlPostExecutionHook(recommendationsautoml)

except Exception as ex: 
	logging.error(ex)
spark.stop()
