In [None]:
import warnings
warnings.filterwarnings('ignore')

import pickle
import os
import copy
import time
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
# import seaborn as sns
# sns.set()

from sklearn.base import BaseEstimator, TransformerMixin

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC

from sklearn.metrics import roc_auc_score

from xgboost import XGBClassifier

from hyperopt import fmin, tpe, hp, STATUS_OK, Trials
from hyperopt.pyll import scope
from hyperopt import space_eval

In [None]:
class NumImputer(BaseEstimator, TransformerMixin):
    def __init__(self, method="median"):
        self._method = method
    
    def fit(self, df_train):
        num_cols = df_train.select_dtypes(["number"]).columns.to_list()
        self._train_cols = df_train.columns.to_list()
        
        self._impute_values = {}
        for col in num_cols:
            self._impute_values[col] = df_train[col].agg(self._method)
        return self
    
    def transform(self, df):
        df = df.copy()
        cols = df.columns.to_list()
        assert set(cols) == set(self._train_cols), "Do not have the same set of cols as train"
        
        for col, val in self._impute_values.items():
            if df[col].isnull().sum() > 0:
                df[col] = df[col].fillna(val)
        
        # align columns
        df = df[self._train_cols]
        return df
    

class CatImputer(BaseEstimator, TransformerMixin):
    def __init__(self, val="MISSING"):
        self._val = val
    
    def fit(self, df_train):
        cat_cols = df_train.select_dtypes(["object", "category", "bool"]).columns.to_list()
        self._train_cols = df_train.columns.to_list()
        
        self._impute_values = {}
        for col in cat_cols:
            self._impute_values[col] = self._val
        return self
    
    def transform(self, df):
        df = df.copy()
        cols = df.columns.to_list()
        assert set(cols) == set(self._train_cols), "Do not have the same set of cols as train"
        
        for col, val in self._impute_values.items():
            if df[col].isnull().sum() > 0:
                df[col] = df[col].astype("object").fillna(val).astype("category")
                
        # align columns
        df = df[self._train_cols]
        return df

In [None]:
def feature_importance(estimator, features):
    """
    :param estimator: an estimator object that has feature_importances_ attribute
    :param features: list of str, list of feature names
    :return: feature_imp, dataframe
    """
    feature_imp = pd.DataFrame({"feature": features, "importance": estimator.feature_importances_})
    feature_imp = feature_imp.sort_values(by=["importance"], ascending=False)
    
    feature_imp["rank"] = np.arange(feature_imp.shape[0]) + 1
    return feature_imp


def roc_auc(estimator, X_eval, y_eval):
    """
    :param estimator: sklearn estimator that have predict_proba() method
    :param X_eval: test features
    :param y_eval: test target
    :return: float
    """
    proba = estimator.predict_proba(X_eval)
    return roc_auc_score(y_eval, proba[:, 1])

In [27]:
from datetime import datetime

class CF():    
        
    def __init__(self, Y_data, k, filename_sims = "hdfs://master:9000/test/similarity.parquet"):
        self.k_nearest = k
        self.Ybar_data = Y_data
        self.S = None
        self.mu = None
        self.filename_sims = filename_sims
        
    
    def normalize_Y(self):
        from pyspark.sql.functions import monotonically_increasing_id 
        import pyspark.sql.functions as F
        import numpy as np
        from numpy.linalg import norm
        from pyspark.sql.types import IntegerType, DoubleType, StructField, StringType, FloatType, StructType
        from pyspark.sql.window import Window
        
        print('Doing normalize_Y', datetime.now().strftime("%H:%M:%S"))
        
        windowSpec  = Window.partitionBy("user")
        self.Ybar_data = self.Ybar_data.withColumn("rating", F.round((F.col("rating") - F.mean("rating").over(windowSpec)) 
                                              / (F.stddev_pop("rating").over(windowSpec) + 1e-8) + 3, 2))
    
    def pivot_save(self, filename_pivot):
        import pyspark.sql.functions as F
        import numpy as np
        from numpy.linalg import norm
        from pyspark.sql.types import IntegerType, DoubleType, StructField, StringType, FloatType, StructType
        from pyspark.sql.window import Window
        
        self.normalize_Y()
        w = Window.orderBy('mid')
        ps_pivot = self.Ybar_data.groupBy("user").pivot("item").sum("rating").fillna(0).select("*")
        ps_pivot = ps_pivot.withColumn("mid", F.monotonically_increasing_id())\
                            .withColumn("row_idx", F.row_number().over(w) - 1)\
                            .drop('mid')
        ps_pivot.write.mode("overwrite").parquet(filename_pivot)
        
    def similarity(self, filename_pivot=None):
        import pyspark.sql.functions as F
        import numpy as np
        from numpy.linalg import norm
        from pyspark.sql.types import IntegerType, DoubleType, StructField, StringType, FloatType, StructType
        from pyspark.sql.window import Window
        from pyspark.ml.linalg import Vectors
        from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
        import gc
        
        print('Doing similarity', datetime.now().strftime("%H:%M:%S"))   
        
        # pivot table
        if filename_pivot is None:
            w = Window.orderBy('mid')
            ps_pivot = self.Ybar_data.groupBy("user").pivot("item").sum("rating").fillna(0).select("*")
            ps_pivot = ps_pivot.withColumn("mid", F.monotonically_increasing_id())\
                                .withColumn("row_idx", F.row_number().over(w) - 1)\
                                .drop('mid')\
                                .checkpoint()
        else:
            ps_pivot = spark_ss.read.parquet(filename_pivot)

# First 
#         cols = list(set(ps_pivot.columns) - set(['row_idx', 'user']))
#         dot_udf = F.udf(lambda x: cosine(x[0], x[1]), DoubleType())
#         def cosine(A, B):
#             return round(float(np.dot(np.array(A), np.array(B))/ (norm(np.array(A))*norm(np.array(B)))), 2)

#         sims = ps_pivot.alias("a").join(ps_pivot.alias("b"), F.col("a.row_idx") < F.col("b.row_idx"))\
#                 .select([F.col(f"a.{c}").alias(f"a_{c}") for c in ps_pivot.columns] + \
#                         [F.col(f"b.{c}").alias(f"b_{c}") for c in ps_pivot.columns]) \
#                 .withColumn("sim", dot_udf(F.struct([
#                     F.struct([F.col(f"a_{col}") for col in cols]), \
#                     F.struct([F.col(f"b_{col}") for col in cols])])))\
#                 .select(F.col("a_row_idx").alias("i"), F.col("b_row_idx").alias("j"), F.col("sim"))

# Second 
#         ps_pivot = ps_pivot.withColumn("joint_cols", F.array(*cols))
#         dot_udf = F.udf(lambda A, B: round(float(np.dot(np.array(A),
#                                                     np.array(B))/
#                                              (norm(np.array(A))*norm(np.array(B)))), 2), 
#                           DoubleType())
#         sims = ps_pivot.alias("i").join(ps_pivot.alias("j"), F.col("i.row_idx") < F.col("j.row_idx"))\
#                 .select(
#                     F.col("i.row_idx").alias("i"), 
#                     F.col("j.row_idx").alias("j"), 
#                     dot_udf("i.joint_cols", "j.joint_cols").alias("sim"))

# Third 
#         cols = list(set(ps_pivot.columns) - set(['row_idx', 'user', 'joint_cols']))        
#         ps_pivot = ps_pivot.withColumn("joint_cols", F.array(*cols))
#         sims = ps_pivot.alias("i").join(ps_pivot.alias("j"), F.col("i.row_idx") < F.col("j.row_idx"))\
#                 .withColumn("dot_prod", F.lit(sum([F.col("i.joint_cols")[i] * F.col("j.joint_cols")[i] for i in range(len(cols))]))) \
#                 .withColumn("norm_1", F.lit(F.sqrt(sum([F.col("i.joint_cols")[i] * F.col("i.joint_cols")[i] for i in range(len(cols))])))) \
#                 .withColumn("norm_2", F.lit(F.sqrt(sum([F.col("j.joint_cols")[i] * F.col("j.joint_cols")[i] for i in range(len(cols))])))) \
#                 .withColumn("sim", F.lit(F.round(F.col("dot_prod") / (F.col("norm_1") * F.col("norm_2")), 2))) \
#                 .select(F.col("i.user").alias("id_1"), F.col("j.user").alias("id_2"), F.col("sim"))\
#                 .filter(~F.col("sim").isNull() & ~F.isnan(F.col("sim")) & (F.col("sim") != 0))\
#                 .write.mode("overwrite").parquet(self.filename_sims)

# Fourth 
#         mat = IndexedRowMatrix(
#                 ps_pivot.select(['row_idx'] + [c for c in ps_pivot.columns if c not in ['user', 'row_idx']])\
#                     .rdd.map(lambda row: IndexedRow(row[0], row[1:])))
#         sims = spark_ss.createDataFrame(mat.columnSimilarities().entries)    
#         sims.join(ps_pivot, sims.i == ps_pivot.row_idx, "left").select(F.col("i"), 
#                                                                      F.col("j"), 
#                                                                      F.col("user").alias("id_1"),
#                                                                     F.col("value"))\
#             .join(ps_pivot, sims.j == ps_pivot.row_idx, "left").select(F.col("id_1"),
#                                                                              F.col("user").alias("id_2"),
#                                                                             F.col("value").alias("sim"))\
#             .filter(~F.col("sim").isNull() & ~isnan(F.col("sim")) & (F.col("sim") != 0))\
#             .write.mode("overwrite").parquet(self.filename_sims)

# Fifth
#         total = ps_pivot.count()
#         pst, step = None, 7000
#         for i in range(0, total, step):            
#             limit_ps = min(total, i+step)
#             tail_ps = min(total-i, step)
#             print(i, "/", total, "---", limit_ps, tail_ps)

#             # Get chunk of data to do similarities            
#             pst = spark_ss.createDataFrame(ps_pivot.limit(limit_ps).tail(tail_ps))
#             mat = IndexedRowMatrix(pst.select(['row_idx'] + [c for c in pst.columns if c not in ['user', 'row_idx']])\
#                 .rdd.map(lambda row: IndexedRow(row[0], row[1:])))
#             sims = spark_ss.createDataFrame(mat.columnSimilarities().entries)
#             spf = sims.join(pst, sims.i == pst.row_idx, "left").select(F.col("i"), 
#                                                                                  F.col("j"), 
#                                                                                  F.col("user").alias("id_1"),
#                                                                                 F.col("value"))\
#                         .join(pst, sims.j == pst.row_idx, "left").select(F.col("id_1"),
#                                                                                          F.col("user").alias("id_2"),
#                                                                                         F.col("value").alias("sim"))\
#                         .filter(~F.col("sim").isNull() & ~F.isnan(F.col("sim")) & (F.col("sim") != 0))
#             spf.write.mode("overwrite").parquet(self.filename_sims) if i == 0 else \
#                 spf.write.mode("append").parquet(self.filename_sims)
            
#             del mat, sims, spf, pst
#             gc.collect()

        total = ps_pivot.count()
        step = 10000
        cols = ['idx'] + [c for c in ps_pivot.columns if c not in ['user', 'row_idx', 'idx']]

        for i in range(0, total, step):
            print(i, "/", total)

            # Get chunk of data to do similarities
            w = Window.orderBy('row_idx')
            pst = ps_pivot.filter((F.col('row_idx') >= i) & (F.col('row_idx') < i+step))\
                            .withColumn("idx", F.row_number().over(w) - 1).select('*')

            mat = IndexedRowMatrix(pst.rdd.map(lambda row: IndexedRow(row[0], row[1:])))
            sims = spark_ss.createDataFrame(mat.columnSimilarities().entries)
            spf = sims.join(pst, sims.i == pst.idx, "left").select(F.col("i"), 
                                                                                 F.col("j"), 
                                                                                 F.col("user").alias("id_1"),
                                                                                F.col("value"))\
                        .join(pst, sims.j == pst.idx, "left").select(F.col("id_1"),
                                                                                         F.col("user").alias("id_2"),
                                                                                        F.col("value").alias("sim"))\
                        .filter(~F.col("sim").isNull() & ~F.isnan(F.col("sim")) & (F.col("sim") != 0))

            spf.write.mode("overwrite").parquet(self.filename_sims) if i == 0 else \
                spf.write.mode("append").parquet(self.filename_sims)

            del mat, sims, spf, pst
            gc.collect()
    
        self.S = spark_ss.read.parquet(self.filename_sims)
    

    def refresh(self, normalized = True):
        """
        Normalize data and calculate similarity matrix again (after
        some few ratings added)
        """
        self.normalize_Y() if normalized else None
        self.similarity()
        
    def fit(self):
        self.refresh()
        
    def load_sim(self, filename_sims = None):
        if filename_sims is not None:
            self.filename_sims = filename_sims
        self.S = spark_ss.read.parquet(self.filename_sims)
        
    def __pred(self, u, i, ps_ratings):
        import pyspark.sql.functions as F

        # Filter sims match
        sims_match = self.S.filter((self.S.id_1 == u) | (self.S.id_2 == u)).toPandas()
        if len(sims_match) == 0:
            return None
        sims_match['id_x'] = sims_match.apply(lambda x: x['id_2'] if x['id_1'] == u else x['id_1'], axis=1)
        ls_ids = list(sims_match['id_x'].values)

        # Filter ratings
        ratings = ps_ratings.filter(F.col("user").isin(ls_ids) & (F.col("item") == i)).toPandas()

        # Merge sims and ratings
        sim_ratings = ratings.merge(sims_match, left_on='user', right_on='id_x')
        sim_nearest = sim_ratings.sort_values(by=['sim'], ascending=False).head(self.k_nearest)

        if len(sim_nearest) == 0:
            return None
        else:
            r = np.array(list(sim_nearest['rating']))
            s = np.array(list(sim_nearest['sim']))
            ret = (r*s).sum()/(np.abs(s).sum() + 1e-8)
            
            return round(ret, 2)
    
    
    def pred(self, u, i):
        """ 
        predict the rating of user u for item i (normalized)
        if you need the un
        """
        return self.__pred(u, i, self.Ybar_data)
    
    
    def recommend(self, u):
        return self.__recommend(u, self.Ybar_data)

    
    def __recommend(self, u, ps_ratings):

        ratings_ret = []

        # Get list items
        items = list(set(ps_ratings.select('item').rdd.flatMap(lambda x: x).collect()))

        # Filter sims match
        sims = self.S
        sims_match = sims.filter((sims.id_1 == u) | (sims.id_2 == u)).toPandas()
        if len(sims_match) == 0:
            return []
        sims_match['id_x'] = sims_match.apply(lambda x: x['id_2'] if x['id_1'] == u else x['id_1'], axis=1)
        ls_ids = list(sims_match['id_x'].values)
        ls_ids = list(sims_match['id_x'].values) + [u]

        # Filter ratings
        ratings = ps_ratings.filter(F.col("user").isin(ls_ids)).toPandas()
        rated_items = ratings[ratings['user'] == u]['item'].unique()
        ratings = ratings[ratings['user'] != u]

        # Merge sims and ratings
        sim_ratings = ratings.merge(sims_match, left_on='user', right_on='id_x')

        # Rating items not yet rating
        for i in items:
            if i not in rated_items:
                sim_nearest = sim_ratings[sim_ratings['item'] == i].sort_values(by=['sim'], 
                                                                                ascending=False).head(self.k_nearest)
                if len(sim_nearest) > 0:
                    r = np.array(list(sim_nearest['rating']))
                    s = np.array(list(sim_nearest['sim']))
                    ret = (r*s).sum()/(np.abs(s).sum() + 1e-8)

                    ratings_ret.append((u, i, round(float(ret), 2)))
        return ratings_ret
    
    
    def recommend_all(self, top_product=None, db_write=None, file_recommend=None):
        import pyspark.sql.functions as F
        import numpy as np
        from numpy.linalg import norm
        from pyspark.sql.types import IntegerType, DoubleType, StructField, StringType, FloatType, StructType
        from pyspark.sql.window import Window
        from pyspark.ml.linalg import Vectors
        from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
        import gc
        
        return self.__recommend_all(top_product, db_write, file_recommend)
    
    
    def __recommend_all_pandas(self, ps_ratings, top_product=None):
        df_new_rating = None

        # Rating dataframe
        dft = ps_ratings.select('user', 'item', 'rating').to_pandas_on_spark()

        # 
        for i in range(len(dft)):
            x = dft.loc[i, :]
            ratings = self.__recommend(x['user'], ps_ratings)
            if len(ratings) > 0:
                schema = StructType([
                    StructField("user_id", StringType(), True),
                    StructField("item_id", StringType(), True),
                    StructField("predict_rating", DoubleType(), True)])
                newRow = spark_ss.createDataFrame(data=ratings, schema=schema)
                df_new_rating = newRow if df_new_rating is None else df_new_rating.union(newRow)
                
        if top_product is not None:
            windowDept = Window.partitionBy("user_id").orderBy(F.col("item_id").desc())
            df_new_rating = df_new_rating.withColumn("top", \
                                F.row_number().over(windowDept)).filter(F.col("top") <= top_product) \
                                .select("user_id", "item_id", "rating")

        return df_new_rating
    
    
    def recommend_all_users(self, top_product=None, ps_pivot=None):
        
        import pyspark.sql.functions as F
        import numpy as np
        from numpy.linalg import norm
        from pyspark.sql.types import IntegerType, DoubleType, StructField, StringType, FloatType, StructType
        from pyspark.sql.window import Window
        from pyspark.ml.linalg import Vectors
        from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
        import gc
        
        ps_ratings = self.Ybar_data
        sims = self.S
        ps_ret = None
        
        # pivot ratings
        if ps_pivot is None:
            print('Building pivot ', datetime.now().strftime("%H:%M:%S"))
            wd = Window.orderBy('midx')
            ps_pivot = ps_ratings.groupBy("user").pivot("item").sum("rating")\
                .withColumn('midx', F.monotonically_increasing_id()).select("*")\
                .withColumn('idx', F.row_number().over(wd) - 1)\
                .drop('midx').checkpoint()
        
        # recommend
        print('Start loop predict rating ', datetime.now().strftime("%H:%M:%S"))
        total = ps_pivot.count()
        step = 5000
        for i in range(0, total, step):
            print(i, "/", total)

            # Get chunk of data to do similarities
            pst = ps_pivot.filter((F.col('idx') >= i) & (F.col('idx') < i+step)).select('*')

            sims_rating = sims.join(pst.select('user'), (F.col('id_1') == F.col('user')) | (F.col('id_2') == F.col('user')), "inner")\
                            .filter(~F.col("user").isNull())\
                            .withColumn('user_sim', \
                                        F.when(F.col('id_1') == F.col('user'), F.col('id_2')).otherwise(F.col('id_1')))\
                            .select('user', 'user_sim', 'sim').alias('sm')\
                            .join(ps_pivot.alias('pv'), F.col('sm.user_sim') == F.col('pv.user'), "inner")\
                            .select(['sm.user', 'sm.user_sim', 'sm.sim']\
                                        + [F.col('pv.' + c) for c in ps_pivot.columns if c not in ['user']])

            # schema
            pstt = self.__predict_rating(sims_rating, 'user')
            ps_ret = pstt if ps_ret is None else ps_ret.union(pstt)
        
        print('End loop predict rating ', datetime.now().strftime("%H:%M:%S"))
        ps_ret = self.__recommend_sub(ps_ret, top_product)
        
        return ps_ret, ps_pivot

    def __recommend_sub(self, ps_ret, top_product):
        import pyspark.sql.functions as F
        import numpy as np
        from numpy.linalg import norm
        from pyspark.sql.types import IntegerType, DoubleType, StructField, StringType, FloatType, StructType
        from pyspark.sql.window import Window
        from pyspark.ml.linalg import Vectors
        from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
        import gc
        
        ps_ratings = self.Ybar_data
        
        # Exclude existing ratings from ps_ratings
        ps_ret = ps_ret.alias("a").join(ps_ratings.select('user', 'item', 'rating').alias("b"), 
                   (ps_ret.user_id == ps_ratings.user) & (ps_ret.item_id == ps_ratings.item), "left")\
                    .filter(F.col("rating").isNull())\
                    .select(F.col("a.user_id"), F.col("a.item_id"), F.col("a.predict_rating"))

        # Top product
        if top_product is not None:
            windowDept = Window.partitionBy("user_id").orderBy(F.col("predict_rating").desc())
            ps_ret = ps_ret.withColumn("top", \
                                F.row_number().over(windowDept)).filter(F.col("top") <= top_product) \
                                .select("user_id", "item_id", "predict_rating")
            
        return ps_ret
    
    def __recommend_all(self, top_product=None, db_write=None, file_recommend=None):
        import pyspark.sql.functions as F
        import numpy as np
        from numpy.linalg import norm
        from pyspark.sql.types import IntegerType, DoubleType, StructField, StringType, FloatType, StructType
        from pyspark.sql.window import Window
        from pyspark.ml.linalg import Vectors
        from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
        import gc
        
        ps_ratings = self.Ybar_data
        ps_ret_all, ps_pivot = self.recommend_all_sim_users(top_product)
        ps_ret_sim, _ = self.recommend_all_users(top_product, ps_pivot)
        ps_ret = ps_ret_all.union(ps_ret_sim).dropDuplicates()
        
        from datetime import datetime
        ps_ret = ps_ret.withColumn('par_col', F.lit(datetime.now().strftime("%Y-%m-%d")))
        
        if ps_ret.count() > 0:
            if file_recommend is not None:
                ps_ret.write.mode("append").partitionBy('par_col').parquet(file_recommend)
                
            if db_write is not None:                
                ps_ret.write.mode("overwrite") \
                    .format("jdbc") \
                    .option("url", f"jdbc:sqlserver://{db_write['host']};databaseName={db_write['database']}") \
                    .option("dbtable", db_write['table']) \
                    .option("user", db_write['username']) \
                    .option("password", db_write['password']) \
                    .save()
        else:
            print('No existing record to write into DB / HDFS')
                          
        return ps_ret

    
    def recommend_all_sim_users(self, top_product=None, ps_pivot=None):
        import pyspark.sql.functions as F
        import numpy as np
        from numpy.linalg import norm
        from pyspark.sql.types import IntegerType, DoubleType, StructField, StringType, FloatType, StructType
        from pyspark.sql.window import Window
        from pyspark.ml.linalg import Vectors
        from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
        import gc
        from itertools import chain
        
        sims = self.S
        ps_ratings = self.Ybar_data
        ps_ret = None
        
        # pivot ratings
        if ps_pivot is None:
            print('Building pivot ', datetime.now().strftime("%H:%M:%S"))
            wd = Window.orderBy('midx')
            ps_pivot = ps_ratings.groupBy("user").pivot("item").sum("rating")\
                .withColumn('midx', F.monotonically_increasing_id()).select("*")\
                .withColumn('idx', F.row_number().over(wd) - 1)\
                .drop('midx').checkpoint()
        
        # Get sims join rating
        print('Get similarity list', datetime.now().strftime("%H:%M:%S"))
        sims_rating = sims.join(ps_pivot.select('user'), (F.col('id_1') == F.col('user')) | (F.col('id_2') == F.col('user')), "inner")\
                            .filter(~F.col("user").isNull())\
                            .withColumn('user_sim', \
                                        F.when(F.col('id_1') == F.col('user'), F.col('id_2')).otherwise(F.col('id_1')))\
                            .select('user', 'user_sim', 'sim')\
                            .groupby('user_sim').agg(F.collect_list("user"))
        sims_rating.cache()
        sims_rating.collect()

        # 
        print('Start loop predict rating ', datetime.now().strftime("%H:%M:%S"))
        df_sims_rating = sims_rating.to_pandas_on_spark()
        step = 5000
        total = len(df_sims_rating)
        for i in range(0, total, step):
            dft = df_sims_rating.iloc[i:i+step, :]
            ls_id_s = list(dft.iloc[:, 0].values)
            ls_id = list(set(chain(*dft.iloc[:, 1].values)))

            sims_spl = sims.filter(F.col('id_1').isin(ls_id_s) | F.col('id_2').isin(ls_id_s)).select('*')
            pivot_spl = ps_pivot.filter(F.col('user').isin(ls_id))

            sims_rating = sims_spl.join(pivot_spl.select('user'), (F.col('id_1') == F.col('user')) | (F.col('id_2') == F.col('user')), "inner")\
                            .filter(~F.col("user").isNull())\
                            .withColumn('user_sim', \
                                        F.when(F.col('id_1') == F.col('user'), F.col('id_2')).otherwise(F.col('id_1')))\
                            .select('user', 'user_sim', 'sim').alias('sm')\
                            .join(pivot_spl.alias('pv'), F.col('sm.user_sim') == F.col('pv.user'), "inner")\
                            .select(['sm.user', 'sm.user_sim', 'sm.sim']\
                                        + [F.col('pv.' + c) for c in pivot_spl.columns if c not in ['user']])

            pst = self.__predict_rating(sims_rating, 'user_sim')
            ps_ret = pst if ps_ret is None else ps_ret.union(pst)
        
        print('End loop predict rating ', datetime.now().strftime("%H:%M:%S"))
        ps_ret = self.__recommend_sub(ps_ret, top_product)
            
        return ps_ret, ps_pivot
    
    def __predict_rating(self, sims_rating, gb_col):
        import pyspark.sql.functions as F
        import numpy as np
        from numpy.linalg import norm
        from pyspark.sql.types import IntegerType, DoubleType, StructField, StringType, FloatType, StructType
        from pyspark.sql.window import Window
        from pyspark.ml.linalg import Vectors
        from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
        import gc
        
        schema = StructType([StructField('user_id', StringType()),
                         StructField('item_id', StringType()),
                        StructField('predict_rating', DoubleType())])
        k_nearest = self.k_nearest

        def sim_udf(df):
            ret = []
            user_id = df[gb_col].iloc[0]

            # Get nearest sim
            cols = list(set(df.columns) - set(['user_sim', 'sim', 'idx', 'user']))
            for c in cols:
                dft = df[df[c].notnull()].sort_values(by='sim', ascending=False).head(k_nearest)

                if len(dft) > 0:        
                    r = np.array(list(df[c]))
                    s = np.array(list(df['sim']))
                    x_rate = (r*s).sum()/(np.abs(s).sum() + 1e-8)
                    ret.append([user_id] + [c] + [x_rate])
            return pd.DataFrame(ret)


        return sims_rating.groupBy(gb_col).applyInPandas(sim_udf, schema)\
                    .filter(~F.col("predict_rating").isNull())
