# Feature Engineering for Deep Learning
### 数据预处理：分别对数值型和类别型数据进行encode

In [1]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [2]:
spark = SparkSession \
    .builder \
    .appName("concrec-rank") \
    .config("spark.driver.memory", "11g") \
    .getOrCreate()

Py4JError: org.apache.spark.api.python.PythonUtils.isEncryptionEnabled does not exist in the JVM

## Load Data

In [None]:
anime_df = spark.read.csv('../data/anime/parsed_anime.csv', header=True, inferSchema=True)

In [None]:
anime_df.printSchema()

In [None]:
# cast aired_from into int
from pyspark.sql.types import IntegerType
anime_df = anime_df.withColumn('aired_from', col('aired_from').cast('int'))

In [None]:
anime_df.printSchema()

In [None]:
anime_df.show(5)

In [None]:
rating_df = spark.read.csv('../data/anime/rating.csv', header=True, inferSchema=True)

In [None]:
rating_df.printSchema()

In [None]:
# valid rating only
rating_df = rating_df.filter(rating_df['rating'] > 0)

In [None]:
rating_df.show(5)

## Merge rating with anime

In [None]:
merged_df = rating_df.join(
    anime_df.select('anime_id', 'name', 'genre', 'type', 'episodes', 
                    'rating', 'members', 'aired_from', 'aired_to').withColumnRenamed('rating', 'all_rating'), 
    on=['anime_id'], how='left'
)

In [None]:
merged_df.show(5)

## Build Label

把用户的打分转换成是否喜欢：大于7.5分为喜欢（1），否则为不喜欢（0）        
可以尝试不同策略，如直接用评分值

In [None]:
like_threshold = 7.5

def build_label(df):
    return df.withColumn('label',
                         when(col('rating') >= like_threshold, 1).otherwise(0)
                        )

In [None]:
labeled_df = build_label(merged_df)
labeled_df.show(5)

## Sliding Window
这里要在df的每个row上，额外增加和用户相关的信息。比如该用户最爱的电影类型、该用户看过多少电影、平均打分是多少        
为了防止泄露未来信息，需把所有评分按照时间顺序排序，然后用滑动窗口聚合        
理论应该使用评分时间，但是由于没有这个数据，所以采用电影上映时间

In [None]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F
import pyspark.sql.types as types

In [None]:
windowSpec = Window \
    .partitionBy('user_id') \
    .orderBy('aired_from') \
    .rowsBetween(-100, -1)

In [None]:
# 帮助方法：对于某一列，在聚合的时候，如果用户不喜欢这个电影，则不聚合这个电影的信息
likedMoviesCol = lambda cname: when(col('label') == 1, col(cname)).otherwise(lit(None))

@udf(types.ArrayType(types.StringType()))
def most_liked_genres(gen_strs):
    """
    gen_strs = ["Action, Adventure, Drama", "Comedy, Drama, School"]
    """
    gens = [s.split(",") for s in gen_strs]
    gens = [x for l in gens for x in l] # flatten
    gens = [s.strip() for s in gens]
    
    gen_set = set(gens)
    count_occur = lambda gen, l: len([g for g in l if g == gen])
    gen_with_occur = [(gen, count_occur(gen, gens)) for gen in gen_set]
    gen_with_occur.sort(key=lambda x: x[1], reverse=True)
    
    # pick 3 most liked genres
    return [x[0] for x in gen_with_occur[:5]]

In [None]:
NUMBER_PRECISION = 2

feat_df = labeled_df \
    .withColumn('user_rating_cnt', count(lit(1)).over(windowSpec)) \
    .withColumn('user_rating_ave', mean(col('rating')).over(windowSpec)) \
    .withColumn('user_rating_ave', F.round(col('user_rating_ave'), NUMBER_PRECISION)) \
    .withColumn('user_rating_std', stddev(col('rating')).over(windowSpec)) \
    .withColumn('user_rating_std', F.round(col('user_rating_std'), NUMBER_PRECISION)) \
    .withColumn('user_aired_from_ave', mean(likedMoviesCol('aired_from')).over(windowSpec)) \
    .withColumn('user_aired_from_ave', F.round(col('user_aired_from_ave'), 0)) \
    .withColumn('user_aired_to_ave', mean(likedMoviesCol('aired_to')).over(windowSpec)) \
    .withColumn('user_aired_to_ave', F.round(col('user_aired_to_ave'), 0)) \
    .withColumn('user_liked_genres', most_liked_genres(collect_list(likedMoviesCol('genre')).over(windowSpec)))

In [None]:
feat_df.printSchema()

In [None]:
feat_df.select('anime_id', 'user_id', 'rating',
#                'user_rating_cnt', 'user_rating_ave', 'user_rating_std',
#                'user_aired_from_ave', 'user_aired_to_ave'
               'genre', 'user_liked_genres'
              ).head(10)

## Encoding
### 将数值型和分类型特征分布进行encode表达

### 1. Genres: multi-hot

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
import pyspark.sql.types as types
from pyspark.ml.linalg import SparseVector
import numpy as np

In [None]:
# 1. parse genre to list
@udf(returnType='array<string>')
def genre_to_list(gen_str):
    if gen_str is None:
        return []
    
    gens = gen_str.split(",")
    return [gen.strip() for gen in gens]

genres_df = feat_df.withColumn('genres', genre_to_list(col('genre'))).drop('genre')

In [None]:
genres_df.head(5)

In [None]:
feat_df.printSchema()

In [None]:
def encode_genres_col(index_mapping_broadcasted):
    @udf(returnType='array<int>')
    def encode_genres_col(genres, max_genre_index):
        """
        用已经训练好的string index mapping对genres数组进行encode
        """
        if genres is None:
            genres = []
        gen_vec = [index_mapping_broadcasted.value.get(gen) for gen in genres]
        gen_vec = list(set(gen_vec)) # dedup

        # convert genre vector to multi-hot
        fill = np.ones(len(gen_vec), dtype=np.int32)
        sorted_index = np.sort(gen_vec)
        multihot_vec = SparseVector(max_genre_index + 1, sorted_index, fill)
        return multihot_vec.toArray().astype(np.int32).tolist()
    return encode_genres_col
    

def multi_hot_encode_genres(featdf):
    df = featdf.withColumn('genre_item', explode(col('genres')))
    
    genre_string_indexer = StringIndexer(inputCol='genre_item', outputCol='genre_index')
    indexer_model = genre_string_indexer.fit(df)
    
    # get mapping from string indexer
    gens_df = spark.createDataFrame(
        [{'genre_item': g} for g in indexer_model.labels]
    )
    mapping_df = indexer_model.transform(gens_df).collect()
    mapping_dict = {row.genre_item: int(row.genre_index) for row in mapping_df}
    max_genre_index = __builtin__.max(mapping_dict.values())
    broadcasted = spark.sparkContext.broadcast(mapping_dict)
    
    encode_fn = encode_genres_col(broadcasted)
   
    return featdf \
        .withColumn( 'genres_multihot', encode_fn(col('genres'), lit(max_genre_index)) ) \
        .withColumn( 'user_liked_genres_multihot', encode_fn(col('user_liked_genres'), lit(max_genre_index)) )

In [None]:
genre_encoded_df = multi_hot_encode_genres(genres_df)

In [None]:
genre_encoded_df.printSchema()

In [None]:
# genre_encoded_df.head(5)
# genre_encoded_df.collect()

### 2.  min max scaler for numeric features

In [None]:
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.ml import Pipeline

In [None]:
@udf(types.FloatType())
def extract_float(l):
    r = __builtin__.round(l[0], NUMBER_PRECISION)
        
    return float(r)


def min_max_scale(featdf, col):
    output_col = f"{col}_min_max"
    vec_assembler = VectorAssembler(inputCols=[col], outputCol=f"{col}_vec", handleInvalid='keep')
    min_max_scaler = MinMaxScaler(inputCol=f"{col}_vec", outputCol=output_col)
    pipeline = Pipeline(stages=[vec_assembler, min_max_scaler])
    
    return pipeline \
        .fit(featdf) \
        .transform(featdf) \
        .drop(f"{col}_vec") \
        .withColumn(output_col, extract_float(F.col(output_col)))

In [None]:
scaled_df = genre_encoded_df

In [None]:
scaled_df = min_max_scale(scaled_df, 'all_rating')
scaled_df = min_max_scale(scaled_df, 'members')
scaled_df = min_max_scale(scaled_df, 'aired_from')
scaled_df = min_max_scale(scaled_df, 'aired_to')
scaled_df = min_max_scale(scaled_df, 'user_rating_ave')
scaled_df = min_max_scale(scaled_df, 'user_rating_std')
scaled_df = min_max_scale(scaled_df, 'user_aired_from_ave')
scaled_df = min_max_scale(scaled_df, 'user_aired_to_ave')

In [None]:
scaled_df.select('anime_id', 'user_id', 
                 'user_aired_from_ave', 'user_aired_from_ave_min_max'
                ).show(1000)

### Pick useful features

In [None]:
scaled_df.printSchema()

In [None]:
output_df = scaled_df.select('anime_id', 'user_id', 'label', 
                             'all_rating_min_max', 'members_min_max', 
                             'aired_from_min_max', 'aired_to_min_max',
                             'genres_multihot',
                             'user_rating_ave_min_max', 'user_rating_std_min_max',
                             'user_aired_from_ave_min_max', 'user_aired_to_ave_min_max',
                             'user_liked_genres_multihot'
                            )

In [None]:
output_df.printSchema()

### Output

In [None]:
output_df.fillna(0) \
    .write \
    .mode('overwrite') \
    .save('../data/anime/dnn_feat_eng')
#     .format('csv').option("header", "true") \