# Feature Engineering

In [49]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [50]:
spark = SparkSession.builder.appName('rank').config('spark.driver.memory', '11g').getOrCreate()

## Load Data

In [51]:
anime_df = spark.read.csv('../data/parsed_anime.csv', header=True, inferSchema=True)
anime_df = anime_df.withColumn('aired_from', col('aired_from').cast('int'))
anime_df = anime_df.withColumnRenamed('rating', 'all_rating')
anime_df.printSchema()

root
 |-- anime_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- type: string (nullable = true)
 |-- episodes: string (nullable = true)
 |-- all_rating: double (nullable = true)
 |-- members: integer (nullable = true)
 |-- japanese_title: string (nullable = true)
 |-- aired: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- aired_from: integer (nullable = true)
 |-- aired_to: integer (nullable = true)



In [52]:
anime_df.show(5)

+--------+--------------------+--------------------+-----+--------+----------+-------+--------------------------+--------------------+--------------------+----------+----------+
|anime_id|                name|               genre| type|episodes|all_rating|members|            japanese_title|               aired|           image_url|aired_from|  aired_to|
+--------+--------------------+--------------------+-----+--------+----------+-------+--------------------------+--------------------+--------------------+----------+----------+
|   32281|      Kimi no Na wa.|Drama, Romance, S...|Movie|       1|      9.37| 200630|                君の名は。|        Aug 26, 2016|https://cdn.myani...|1472140800|1472140800|
|    5114|Fullmetal Alchemi...|Action, Adventure...|   TV|      64|      9.26| 793665|鋼の錬金術師 FULLMETAL ...|Apr 5, 2009 to Ju...|https://cdn.myani...|1238860800|1278172800|
|   28977|            Gintama°|Action, Comedy, H...|   TV|      51|      9.25| 114262|                     銀魂°|Apr 8, 201

In [53]:
rating_df = spark.read.csv('../data/rating.csv', header=True, inferSchema=True)
rating_df = rating_df.filter(rating_df['rating'] > 0)
rating_df.printSchema()



root
 |-- user_id: integer (nullable = true)
 |-- anime_id: integer (nullable = true)
 |-- rating: integer (nullable = true)



                                                                                

In [54]:
rating_df.show(5)

+-------+--------+------+
|user_id|anime_id|rating|
+-------+--------+------+
|      1|    8074|    10|
|      1|   11617|    10|
|      1|   11757|    10|
|      1|   15451|    10|
|      2|   11771|    10|
+-------+--------+------+
only showing top 5 rows



## Merge anime_df with rating_df and Build Label

'rating' > 7.5 ? like(1) : dislike(0)
'7.5' comes from our previous data analysis

In [55]:
like_threshold = 7.5

merged_df = rating_df.join(
    anime_df.select('anime_id', 'name', 'genre', 'type', 'episodes', 
                    'all_rating', 'members', 'aired_from', 'aired_to'),
    on=['anime_id'], how='left'
).withColumn('label', when(col('rating') >= like_threshold, 1).otherwise(0))

In [56]:
merged_df.show(5)

+--------+-------+------+--------------------+--------------------+----+--------+----------+-------+----------+----------+-----+
|anime_id|user_id|rating|                name|               genre|type|episodes|all_rating|members|aired_from|  aired_to|label|
+--------+-------+------+--------------------+--------------------+----+--------+----------+-------+----------+----------+-----+
|    8074|      1|    10|Highschool of the...|Action, Ecchi, Ho...|  TV|      12|      7.46| 535892|1278259200|1284912000|    1|
|   11617|      1|    10|     High School DxD|Comedy, Demons, E...|  TV|      12|       7.7| 398660|1325779200|1332432000|    1|
|   11757|      1|    10|    Sword Art Online|Action, Adventure...|  TV|      25|      7.83| 893100|1341676800|1356192000|    1|
|   15451|      1|    10| High School DxD New|Action, Comedy, D...|  TV|      12|      7.87| 266657|1373126400|1379779200|    1|
|   11771|      2|    10|    Kuroko no Basket|Comedy, School, S...|  TV|      25|      8.46| 3383

## Feature Builder

In [57]:
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.ml import Pipeline
import pyspark.sql.types as types
from pyspark.sql import functions as F
import builtins

In [58]:
NUMBER_PRECISION = 2

@udf(types.FloatType())
def extract_float(l):
    r = builtins.round(l[0], NUMBER_PRECISION)
    return float(r)

In [59]:
class NumericScaler:
    def __init__(self, cols):
        self.cols = cols
        self.pipeline = self.__build_min_max_scalers(cols)
        
    def __build_min_max_scalers(self, cols):
        pipelines = [self.__build_one_min_max_scaler(col) for col in cols]
        return Pipeline(stages=pipelines)
        
    def __build_one_min_max_scaler(self, col):
        output_col = f"{col}_min_max"

        vec_assembler = VectorAssembler(inputCols=[col], outputCol=f"{col}_vec", handleInvalid='keep')
        min_max_scaler = MinMaxScaler(inputCol=f"{col}_vec", outputCol=output_col)
        pipeline = Pipeline(stages=[vec_assembler, min_max_scaler])

        return pipeline
        
    def fit(self, df):
        self.model = self.pipeline.fit(df)
        
    def transform(self, df):
        result = self.model.transform(df)
        
        # drop all intermedia cols and convert output to float
        for col in self.cols:
            output_col = f"{col}_min_max"
            result = result \
                .drop(f"{col}_vec") \
                .withColumn(output_col, extract_float(F.col(output_col)))
            
        return result

In [60]:
item_numeric_cols = [
    'all_rating',
    'members',
    'aired_from',
    'aired_to'
]

item_numeric_scaler = NumericScaler(item_numeric_cols)

In [61]:
user_numeric_cols = [
    'user_rating_ave',
    'user_rating_std',
    'user_aired_from_ave',
    'user_aired_to_ave'
]

user_numeric_scaler = NumericScaler(user_numeric_cols)

In [62]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.linalg import SparseVector
import numpy as np

In [63]:
def encode_genres_col(index_mapping_broadcasted):
    @udf(returnType='array<int>')
    def encode_genres_col(genres, max_genre_index):
        if genres is None:
            genres = []
        gen_vec = [index_mapping_broadcasted.value.get(gen) for gen in genres]
        gen_vec = list(set(gen_vec)) # dedup

        # convert genre vector to multi-hot
        fill = np.ones(len(gen_vec), dtype=np.int32)
        sorted_index = np.sort(gen_vec)
        multihot_vec = SparseVector(max_genre_index + 1, sorted_index, fill)
        return multihot_vec.toArray().astype(np.int32).tolist()
    
    return encode_genres_col

In [64]:
class CategoricalEncoder:
    def __init__(self, colname):
        self.colname = colname
    
    def fit(self, df):
        exploded_df = df.withColumn(
            'genre_item',
            explode(col('genres'))
        )
        
        genre_string_indexer = StringIndexer(inputCol='genre_item', outputCol='genre_index')
        indexer_model = genre_string_indexer.fit(exploded_df)
        
        # get mapping from string indexer
        gens_df = spark.createDataFrame(
            [{'genre_item': g} for g in indexer_model.labels]
        )
        mapping_df = indexer_model.transform(gens_df).collect()
        mapping_dict = {row.genre_item: int(row.genre_index) for row in mapping_df}
        self.max_genre_index = builtins.max(mapping_dict.values())
        broadcasted = spark.sparkContext.broadcast(mapping_dict)
        
        self.encode_fn = encode_genres_col(broadcasted)
        
    def transform(self, df):
        return df \
            .withColumn(
                f"{self.colname}_multihot", 
                self.encode_fn(col(self.colname), lit(self.max_genre_index))
            )

In [65]:
item_categorical_encoder = CategoricalEncoder('genres')
user_categorical_encoder = CategoricalEncoder('user_liked_genres')

## Add More Relative Info for Each Row

Used sliding window to avoid leak of future information.

In [66]:
from pyspark.sql.window import Window

In [67]:
liked_movie_col = lambda col_name: when(col('label') == 1, col(col_name)).otherwise(lit(None))

@udf(returnType='array<string>')
def genre_to_list(gen_str):
    if gen_str is None:
        return []
    
    gens = gen_str.split(",")
    return [gen.strip() for gen in gens]


@udf(types.ArrayType(types.StringType()))
def most_liked_genres(gen_strs):
    """
    gen_strs = ["Action, Adventure, Drama", "Comedy, Drama, School"]
    """
    gens = [s.split(",") for s in gen_strs]
    gens = [x for l in gens for x in l] # flatten
    gens = [s.strip() for s in gens]
    
    gen_set = set(gens)
    count_occur = lambda gen, l: len([g for g in l if g == gen])
    gen_with_occur = [(gen, count_occur(gen, gens)) for gen in gen_set]
    gen_with_occur.sort(key=lambda x: x[1], reverse=True)
    
    # pick 5 most liked genres
    return [x[0] for x in gen_with_occur[:5]]

In [68]:
window_spec = Window.partitionBy('user_id').orderBy('aired_from').rowsBetween(-100, -1)

In [69]:
feat_df = merged_df \
    .withColumn('genres', genre_to_list(col('genre'))) \
    .withColumn('user_rating_cnt', count(lit(1)).over(window_spec)) \
    .withColumn('user_rating_ave', mean(col('rating')).over(window_spec)) \
    .withColumn('user_rating_ave', F.round(col('user_rating_ave'), NUMBER_PRECISION)) \
    .withColumn('user_rating_std', stddev(col('rating')).over(window_spec)) \
    .withColumn('user_rating_std', F.round(col('user_rating_std'), NUMBER_PRECISION)) \
    .withColumn('user_aired_from_ave', mean(liked_movie_col('aired_from')).over(window_spec)) \
    .withColumn('user_aired_from_ave', F.round(col('user_aired_from_ave'), 0)) \
    .withColumn('user_aired_to_ave', mean(liked_movie_col('aired_to')).over(window_spec)) \
    .withColumn('user_aired_to_ave', F.round(col('user_aired_to_ave'), 0)) \
    .withColumn('user_liked_genres', most_liked_genres(collect_list(liked_movie_col('genre')).over(window_spec)))

In [70]:
feat_df.show(5, vertical=True)



-RECORD 0-----------------------------------
 anime_id            | 523                  
 user_id             | 31                   
 rating              | 7                    
 name                | Tonari no Totoro     
 genre               | Adventure, Comedy... 
 type                | Movie                
 episodes            | 1                    
 all_rating          | 8.48                 
 members             | 271484               
 aired_from          | 577123200            
 aired_to            | 577123200            
 label               | 0                    
 genres              | [Adventure, Comed... 
 user_rating_cnt     | 0                    
 user_rating_ave     | null                 
 user_rating_std     | null                 
 user_aired_from_ave | null                 
 user_aired_to_ave   | null                 
 user_liked_genres   | []                   
-RECORD 1-----------------------------------
 anime_id            | 15                   
 user_id  

                                                                                

In [71]:
item_numeric_scaler.fit(feat_df)
user_numeric_scaler.fit(feat_df)
item_categorical_encoder.fit(feat_df)
user_categorical_encoder.fit(feat_df)

                                                                                

In [72]:
transformed_df = item_numeric_scaler.transform(feat_df)
transformed_df = user_numeric_scaler.transform(transformed_df)
transformed_df = item_categorical_encoder.transform(transformed_df)
transformed_df = user_categorical_encoder.transform(transformed_df)

## Transform Data for Feature Serving

In [73]:
anime_genres_df = anime_df.withColumn('genres', genre_to_list(col('genre')))

In [74]:
item_numeric_transformed_df = item_numeric_scaler.transform(anime_genres_df)
item_both_transformed_df = item_categorical_encoder.transform(item_numeric_transformed_df)

In [75]:
w = Window.partitionBy('user_id')

user_feat_df = feat_df.withColumn('max_aired', max('aired_from').over(w)) \
    .where(col('aired_from') == col('max_aired')) \
    .drop('max_aired') 

In [76]:
user_numeric_transformed_df = user_numeric_scaler.transform(user_feat_df)
user_both_transformed_df = user_categorical_encoder.transform(user_numeric_transformed_df)

## Save

In [77]:
item_features = item_both_transformed_df.collect()

                                                                                

In [78]:
item_features[1]

Row(anime_id=5114, name='Fullmetal Alchemist: Brotherhood', genre='Action, Adventure, Drama, Fantasy, Magic, Military, Shounen', type='TV', episodes='64', all_rating=9.26, members=793665, japanese_title='鋼の錬金術師 FULLMETAL ALCHEMIST', aired='Apr 5, 2009 to Jul 4, 2010', image_url='https://cdn.myanimelist.net/images/anime/1223/96541.jpg', aired_from=1238860800, aired_to=1278172800, genres=['Action', 'Adventure', 'Drama', 'Fantasy', 'Magic', 'Military', 'Shounen'], all_rating_min_max=0.9900000095367432, members_min_max=0.7799999713897705, aired_from_min_max=0.8999999761581421, aired_to_min_max=0.8999999761581421, genres_multihot=[0, 1, 0, 1, 1, 0, 0, 1, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

In [79]:
user_features = user_both_transformed_df.collect()

                                                                                

In [80]:
user_features[1]

Row(anime_id=16001, user_id=53, rating=9, name='Kokoro Connect: Michi Random', genre='Comedy, Drama, Romance, School, Slice of Life, Supernatural', type='Special', episodes='4', all_rating=8.19, members=106989, aired_from=1353254400, aired_to=1355068800, label=1, genres=['Comedy', 'Drama', 'Romance', 'School', 'Slice of Life', 'Supernatural'], user_rating_cnt=39, user_rating_ave=7.62, user_rating_std=1.76, user_aired_from_ave=1238832000.0, user_aired_to_ave=1247838171.0, user_liked_genres=['Romance', 'Comedy', 'School', 'Drama', 'Slice of Life'], user_rating_ave_min_max=0.7400000095367432, user_rating_std_min_max=0.2800000011920929, user_aired_from_ave_min_max=0.9300000071525574, user_aired_to_ave_min_max=0.9300000071525574, user_liked_genres_multihot=[1, 0, 1, 1, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

Save to Redis

In [82]:
from redis import Redis

In [83]:
redis = Redis(
    host='localhost',
    port=6379,
    db=0,
)

In [84]:
item_numeric_feature_prefix = 'rank:item:num'
user_numeric_feature_prefix = 'rank:user:num'
item_categorical_feature_prefix = 'rank:item:cat'
user_categorical_feature_prefix = 'rank:user:cat'

def save_numeric_features(rows, features, idcol, prefix):
    for row in rows:
        mapping = {feat: row[feat] for feat in features}
        key = f"{prefix}:{row[idcol]}"
        redis.hset(key, mapping=mapping)

In [85]:
item_num_features = [
    'all_rating_min_max',
    'members_min_max',
    'aired_from_min_max',
    'aired_to_min_max'
]

save_numeric_features(
    item_features, 
    item_num_features,
    'anime_id',
    item_numeric_feature_prefix
)

In [86]:
user_num_features = [
    'user_rating_ave_min_max',
    'user_rating_std_min_max',
    'user_aired_from_ave_min_max',
    'user_aired_to_ave_min_max'
]

save_numeric_features(
    user_features,
    user_num_features,
    'user_id',
    user_numeric_feature_prefix
)

In [87]:
import json

def list2str(l):
    return json.dumps(l)

def save_categorical_features(rows, feature, idcol, prefix):
    for row in rows:
        mapping = {
            feature: list2str(row[feature])
        }
        key = f"{prefix}:{row[idcol]}"
        redis.hset(key, mapping=mapping)

In [88]:
save_categorical_features(
    item_features,
    'genres_multihot',
    'anime_id',
    item_categorical_feature_prefix
)

In [89]:
save_categorical_features(
    user_features,
    'user_liked_genres_multihot',
    'user_id',
    user_categorical_feature_prefix
)