In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [3]:
spark = SparkSession \
    .builder \
    .appName("concrec-rank") \
    .config("spark.driver.memory", "11g") \
    .getOrCreate()

### load data

In [4]:
from pyspark.sql.types import IntegerType

anime_df = spark.read.csv('../dataset/parsed_anime.csv', header=True, inferSchema=True)
anime_df = anime_df.withColumn('aired_from', col('aired_from').cast('int'))

# rename rating to all_rating
anime_df = anime_df.withColumnRenamed('rating', 'all_rating')

anime_df.printSchema()

root
 |-- anime_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- type: string (nullable = true)
 |-- episodes: string (nullable = true)
 |-- all_rating: double (nullable = true)
 |-- members: integer (nullable = true)
 |-- japanese_title: string (nullable = true)
 |-- aired: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- aired_from: integer (nullable = true)
 |-- aired_to: integer (nullable = true)



In [5]:
rating_df = spark.read.csv('../dataset/rating.csv', header=True, inferSchema=True)
rating_df = rating_df.filter(rating_df['rating'] > 0)
rating_df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- anime_id: integer (nullable = true)
 |-- rating: integer (nullable = true)



In [6]:
like_threshold = 7.5

merged_df =  rating_df.join(
    anime_df.select('anime_id', 'name', 'genre', 'type', 'episodes', 
                    'all_rating', 'members', 'aired_from', 'aired_to'),
    on=['anime_id'], how='left'
).withColumn(
    'label',
    when(col('rating') >= like_threshold, 1).otherwise(0)
)

## Feature Builder


In [7]:
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.ml import Pipeline
import pyspark.sql.types as types
from pyspark.sql import functions as F


In [8]:
NUMBER_PRECISION = 2

@udf(types.FloatType())
def extract_float(l):
    r = __builtin__.round(l[0], NUMBER_PRECISION)
        
    return float(r)

In [9]:
class NumericScaler:
    def __init__(self, cols):
        self.cols = cols
        self.pipeline = self.__build_min_max_scalers(cols)
        
    def __build_min_max_scalers(self, cols):
        pipelines = [self.__build_one_min_max_scaler(col) for col in cols]
        return Pipeline(stages=pipelines)
        
    def __build_one_min_max_scaler(self, col):
        output_col = f"{col}_min_max"

        vec_assembler = VectorAssembler(inputCols=[col], outputCol=f"{col}_vec", handleInvalid='keep')
        min_max_scaler = MinMaxScaler(inputCol=f"{col}_vec", outputCol=output_col)
        pipeline = Pipeline(stages=[vec_assembler, min_max_scaler])

        return pipeline
        
    def fit(self, df):
        self.model = self.pipeline.fit(df)
        
    def transform(self, df):
        result = self.model.transform(df)
        
        # drop all intermedia cols and convert output to float
        for col in self.cols:
            output_col = f"{col}_min_max"
            result = result \
                .drop(f"{col}_vec") \
                .withColumn(output_col, extract_float(F.col(output_col)))
            
        return result

In [10]:

item_numeric_cols = [
    'all_rating',
    'members',
    'aired_from',
    'aired_to'
]

item_numeric_scaler = NumericScaler(item_numeric_cols)

In [11]:
user_numeric_cols = [
    'user_rating_ave',
    'user_rating_std',
    'user_aired_from_ave',
    'user_aired_to_ave'
]


In [12]:
user_numeric_scaler = NumericScaler(user_numeric_cols)


3. categorical data - multihot


In [13]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
import pyspark.sql.types as types
from pyspark.ml.linalg import SparseVector
import numpy as np

In [14]:
# 用已经训练好的string index mapping对genres数组进行encode
def encode_genres_col(index_mapping_broadcasted):
    @udf(returnType='array<int>')
    def encode_genres_col(genres, max_genre_index):
        if genres is None:
            genres = []
        gen_vec = [index_mapping_broadcasted.value.get(gen) for gen in genres]
        gen_vec = list(set(gen_vec)) # dedup

        # convert genre vector to multi-hot
        fill = np.ones(len(gen_vec), dtype=np.int32)
        sorted_index = np.sort(gen_vec)
        multihot_vec = SparseVector(max_genre_index + 1, sorted_index, fill)
        return multihot_vec.toArray().astype(np.int32).tolist()
    
    return encode_genres_col

In [15]:
class CategoricalEncoder:
    def __init__(self, colname):
        self.colname = colname
    
    def fit(self, df):
        exploded_df = df.withColumn(
            'genre_item',
            explode(col('genres'))
        )
        
        genre_string_indexer = StringIndexer(inputCol='genre_item', outputCol='genre_index')
        indexer_model = genre_string_indexer.fit(exploded_df)
        
        # get mapping from string indexer
        gens_df = spark.createDataFrame(
            [{'genre_item': g} for g in indexer_model.labels]
        )
        mapping_df = indexer_model.transform(gens_df).collect()
        mapping_dict = {row.genre_item: int(row.genre_index) for row in mapping_df}
        self.max_genre_index = __builtin__.max(mapping_dict.values())
        broadcasted = spark.sparkContext.broadcast(mapping_dict)
        
        self.encode_fn = encode_genres_col(broadcasted)
        
    def transform(self, df):
        return df \
            .withColumn(
                f"{self.colname}_multihot", 
                self.encode_fn(col(self.colname), lit(self.max_genre_index))
            )
        
        

In [16]:
item_categorical_encoder = CategoricalEncoder('genres')


In [17]:
user_categorical_encoder = CategoricalEncoder('user_liked_genres')


Build Dataset for DNN Training


In [18]:
from pyspark.sql.window import Window


In [19]:
# 帮助方法：对于某一列，在聚合的时候，如果用户不喜欢这个电影，则不聚合这个电影的信息
likedMoviesCol = lambda colname: when(col('label') == 1, col(colname)).otherwise(lit(None))


@udf(returnType='array<string>')
def genre_to_list(gen_str):
    if gen_str is None:
        return []
    
    gens = gen_str.split(",")
    return [gen.strip() for gen in gens]


@udf(types.ArrayType(types.StringType()))
def most_liked_genres(gen_strs):
    """
    gen_strs = ["Action, Adventure, Drama", "Comedy, Drama, School"]
    """
    gens = [s.split(",") for s in gen_strs]
    gens = [x for l in gens for x in l] # flatten
    gens = [s.strip() for s in gens]
    
    gen_set = set(gens)
    count_occur = lambda gen, l: len([g for g in l if g == gen])
    gen_with_occur = [(gen, count_occur(gen, gens)) for gen in gen_set]
    gen_with_occur.sort(key=lambda x: x[1], reverse=True)
    
    # pick 5 most liked genres
    return [x[0] for x in gen_with_occur[:5]]

In [20]:
windowSpec = Window \
    .partitionBy('user_id') \
    .orderBy('aired_from') \
    .rowsBetween(-100, -1)


In [21]:
feat_df = merged_df \
    .withColumn('genres', genre_to_list(col('genre'))) \
    .withColumn('user_rating_cnt', count(lit(1)).over(windowSpec)) \
    .withColumn('user_rating_ave', mean(col('rating')).over(windowSpec)) \
    .withColumn('user_rating_ave', F.round(col('user_rating_ave'), NUMBER_PRECISION)) \
    .withColumn('user_rating_std', stddev(col('rating')).over(windowSpec)) \
    .withColumn('user_rating_std', F.round(col('user_rating_std'), NUMBER_PRECISION)) \
    .withColumn('user_aired_from_ave', mean(likedMoviesCol('aired_from')).over(windowSpec)) \
    .withColumn('user_aired_from_ave', F.round(col('user_aired_from_ave'), 0)) \
    .withColumn('user_aired_to_ave', mean(likedMoviesCol('aired_to')).over(windowSpec)) \
    .withColumn('user_aired_to_ave', F.round(col('user_aired_to_ave'), 0)) \
    .withColumn('user_liked_genres', most_liked_genres(collect_list(likedMoviesCol('genre')).over(windowSpec)))

In [22]:
feat_df.show(5, vertical=True)


-RECORD 0-----------------------------------
 anime_id            | 1090                 
 user_id             | 148                  
 rating              | 6                    
 name                | Mobile Suit Gundam I 
 genre               | Action, Adventure... 
 type                | Movie                
 episodes            | 1                    
 all_rating          | 7.43                 
 members             | 12877                
 aired_from          | 353347200            
 aired_to            | 353347200            
 label               | 0                    
 genres              | [Action, Adventur... 
 user_rating_cnt     | 0                    
 user_rating_ave     | null                 
 user_rating_std     | null                 
 user_aired_from_ave | null                 
 user_aired_to_ave   | null                 
 user_liked_genres   | []                   
-RECORD 1-----------------------------------
 anime_id            | 1091                 
 user_id  

Train


In [23]:
item_numeric_scaler.fit(feat_df)


In [24]:
user_numeric_scaler.fit(feat_df)


In [25]:
item_categorical_encoder.fit(feat_df)




In [26]:
user_categorical_encoder.fit(feat_df)


In [27]:
transformed_df = item_numeric_scaler.transform(feat_df)
transformed_df = user_numeric_scaler.transform(transformed_df)

In [28]:
transformed_df.show(5, vertical=True)


-RECORD 0-------------------------------------------
 anime_id                    | 1090                 
 user_id                     | 148                  
 rating                      | 6                    
 name                        | Mobile Suit Gundam I 
 genre                       | Action, Adventure... 
 type                        | Movie                
 episodes                    | 1                    
 all_rating                  | 7.43                 
 members                     | 12877                
 aired_from                  | 353347200            
 aired_to                    | 353347200            
 label                       | 0                    
 genres                      | [Action, Adventur... 
 user_rating_cnt             | 0                    
 user_rating_ave             | null                 
 user_rating_std             | null                 
 user_aired_from_ave         | null                 
 user_aired_to_ave           | null           

In [29]:
transformed_df = item_categorical_encoder.transform(transformed_df)
transformed_df = user_categorical_encoder.transform(transformed_df)

Transform data for Feature Serving


In [30]:
anime_genres_df = anime_df \
    .withColumn('genres', genre_to_list(col('genre')))

In [31]:
item_numeric_transformed_df = item_numeric_scaler.transform(anime_genres_df)


In [32]:
item_both_transformed_df = item_categorical_encoder.transform(item_numeric_transformed_df)


2.1 build user feature df



In [33]:
w = Window.partitionBy('user_id')

user_feat_df = feat_df.withColumn('max_aired', max('aired_from').over(w)) \
    .where(col('aired_from') == col('max_aired')) \
    .drop('max_aired') 

In [34]:
user_numeric_transformed_df = user_numeric_scaler.transform(user_feat_df)


In [35]:
user_both_transformed_df = user_categorical_encoder.transform(user_numeric_transformed_df)


In [36]:
item_features = item_both_transformed_df.collect()


In [37]:
item_features

[Row(anime_id=32281, name='Kimi no Na wa.', genre='Drama, Romance, School, Supernatural', type='Movie', episodes='1', all_rating=9.37, members=200630, japanese_title='君の名は。', aired='Aug 26, 2016', image_url='https://cdn.myanimelist.net/images/anime/5/87048.jpg', aired_from=1472140800, aired_to=1472140800, genres=['Drama', 'Romance', 'School', 'Supernatural'], all_rating_min_max=1.0, members_min_max=0.20000000298023224, aired_from_min_max=0.9700000286102295, aired_to_min_max=0.949999988079071, genres_multihot=[0, 0, 1, 1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
 Row(anime_id=5114, name='Fullmetal Alchemist: Brotherhood', genre='Action, Adventure, Drama, Fantasy, Magic, Military, Shounen', type='TV', episodes='64', all_rating=9.26, members=793665, japanese_title='鋼の錬金術師 FULLMETAL ALCHEMIST', aired='Apr 5, 2009 to Jul 4, 2010', image_url='https://cdn.myanimelist.net/images/anime/1223/96541.jpg', aired_from=1238

In [38]:
user_features = user_both_transformed_df.collect()


In [39]:
user_features[1]


Row(anime_id=30276, user_id=463, rating=10, name='One Punch Man', genre='Action, Comedy, Parody, Sci-Fi, Seinen, Super Power, Supernatural', type='TV', episodes='12', all_rating=8.82, members=552458, aired_from=1443974400, aired_to=1450627200, label=1, genres=['Action', 'Comedy', 'Parody', 'Sci-Fi', 'Seinen', 'Super Power', 'Supernatural'], user_rating_cnt=74, user_rating_ave=8.46, user_rating_std=1.09, user_aired_from_ave=1168336312.0, user_aired_to_ave=1188773573.0, user_liked_genres=['Comedy', 'Romance', 'Drama', 'Shounen', 'Adventure'], user_rating_ave_min_max=0.8299999833106995, user_rating_std_min_max=0.17000000178813934, user_aired_from_ave_min_max=0.8999999761581421, user_aired_to_ave_min_max=0.9100000262260437, user_liked_genres_multihot=[1, 0, 1, 1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

2.2 Save to Redis


In [40]:
from redis import Redis


In [41]:
redis = Redis()


In [42]:
item_numeric_feature_prefix = 'rank:item:num'
user_numeric_feature_prefix = 'rank:user:num'
item_categorical_feature_prefix = 'rank:item:cat'
user_categorical_feature_prefix = 'rank:user:cat'