In [1]:
import os
import sys

os.environ['JAVA_HOME'] = 'C:\Program Files\Java\jdk1.8.0_281'

In [2]:
import findspark
findspark.init("E:\spark-3.1.1")

import pyspark

In [3]:
import pyspark
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
from pyspark.ml.linalg import VectorUDT, Vectors
from pyspark.sql.types import ArrayType, StringType, IntegerType
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer, QuantileDiscretizer, MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import DoubleType
from pyspark.ml import Pipeline
from pyspark.sql.functions import max as sparkMax
from pyspark.sql.functions import udf
spark = SparkSession.builder.getOrCreate()
# spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
# spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [4]:
df = spark.read.csv('archive/anime_cleaned.csv',
                    header='true', 
                    inferSchema='true')
# df2 = spark.read.csv('archive/users_cleaned.csv',
#                     header='true', 
#                     inferSchema='true')

In [5]:
useful_columns = ['type','episodes','status','airing','score','scored_by','rank', 
                  'popularity','members','favorites','genre',  'anime_id',]
useless_columns = [col for col in df.columns if col not in useful_columns]

df = df.drop(*useless_columns)

In [6]:
df = df.fillna(0)

In [7]:
print((df.count(), len(df.columns)))

(6668, 12)


In [8]:
def apply_onehot_encoder(_df, column_name, drop_origin=True):
    ohe = pyspark.ml.feature.OneHotEncoder()
    ohe.setInputCols([column_name])
    ohe.setOutputCols([column_name+'_vector'])
    ohe_model = ohe.fit(_df)
    ohe_model.setOutputCols([column_name+'_vector'])
    ohe_model.getHandleInvalid()
    _df = ohe_model.transform(_df)
    if drop_origin:
        _df = _df.drop(column_name)
    
    return _df


def array2vec(genreIndexes, indexSize):
    genreIndexes.sort()

    fill_list = [1.0 for _ in range(len(genreIndexes))]
    return Vectors.sparse(indexSize, genreIndexes, fill_list)


def apply_multihot_encoder(movieSamples, col_name, _splitter = ", ", drop_origin=True):
    if col_name not in movieSamples.columns:
        return None
    all_cols_list = movieSamples.columns
    all_cols_list.remove("anime_id")
    samplesWithGenre = movieSamples.withColumn(col_name+'_x',F.explode(
        F.split(F.col(col_name), _splitter).cast(ArrayType(StringType()))))
    genreIndexer = StringIndexer(inputCol=col_name+'_x', outputCol=col_name+"Index")
    StringIndexerModel = genreIndexer.fit(samplesWithGenre)
    genreIndexSamples = StringIndexerModel.transform(samplesWithGenre).withColumn(col_name+"IndexInt",
                                                                                  F.col(col_name+"Index").cast(IntegerType()))
    indexSize = genreIndexSamples.agg(sparkMax(F.col(col_name+"IndexInt"))).head()[0] + 1
    processedSamples = genreIndexSamples.groupBy('anime_id').agg(
        F.array_distinct(F.collect_list(col_name+'IndexInt')).alias(col_name+"Indexes"), 
        *[F.max(i).alias(i) for i in all_cols_list]
    ).withColumn("indexSize", F.lit(indexSize))
    finalSample = processedSamples.withColumn(col_name+"_vector", udf(array2vec, VectorUDT())(F.col(col_name+"Indexes"), F.col("indexSize")))
#     finalSample.printSchema()
    finalSample = finalSample.drop(col_name+"Indexes", "indexSize")
    

    finalSample.show(1, False, vertical=True)
    return finalSample
#     movieSamples = movieSamples.join(finalSample, on=['anime_id'], how='left_outer')
#     movieSamples.show(10)

In [9]:
anime_id_list = df.select('anime_id').sample(0.15, seed=0).collect()
anime_id_set = set([row.anime_id for row in anime_id_list])
del anime_id_list

# user_id_list = df2.select('user_id').sample(0.03, seed=0).collect()
# user_id_set = set([row.user_id for row in user_id_list])
# del user_id_list

cat_feature = ['anime_id', 'type', 'status','airing', 'genre']
num_feature = ['score', 'scored_by', 'rank', 'popularity','members','favorites']

In [10]:
df = df.filter(df["anime_id"].isin(anime_id_set))

In [11]:
df = apply_multihot_encoder(df, 'genre')
df = df.drop('genre')

-RECORD 0-------------------------------------------------------
 anime_id     | 6336                                            
 type         | OVA                                             
 episodes     | 7                                               
 status       | Finished Airing                                 
 airing       | False                                           
 score        | 8.31                                            
 scored_by    | 23492                                           
 rank         | 232.0                                           
 popularity   | 1481.0                                          
 members      | 52688.0                                         
 favorites    | 1066                                            
 genre        | Action, Drama, Mecha, Military, Sci-Fi, Space   
 genre_vector | (84,[1,4,5,12,23,26],[1.0,1.0,1.0,1.0,1.0,1.0]) 
only showing top 1 row



In [12]:
from pyspark.ml.feature import StringIndexer
to_label = ['type', 'status', 'airing']
for col in to_label:
    indexer = StringIndexer(inputCol=col, outputCol=col+"_index") 
    df = indexer.fit(df).transform(df) 
    df = df.drop(col)
# df.show(1, False, True)
index_columns = [i+'_index' for i in to_label]
for i in index_columns:
    df = apply_onehot_encoder(df, i)
df.show(1, False, True)

-RECORD 0--------------------------------------------------------------
 anime_id            | 6336                                            
 episodes            | 7                                               
 score               | 8.31                                            
 scored_by           | 23492                                           
 rank                | 232.0                                           
 popularity          | 1481.0                                          
 members             | 52688.0                                         
 favorites           | 1066                                            
 genre_vector        | (84,[1,4,5,12,23,26],[1.0,1.0,1.0,1.0,1.0,1.0]) 
 type_index_vector   | (6,[1],[1.0])                                   
 status_index_vector | (2,[0],[1.0])                                   
 airing_index_vector | (2,[0],[1.0])                                   
only showing top 1 row



In [13]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import DoubleType
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())

In [14]:
df.printSchema()
df.show(1, False, True)

root
 |-- anime_id: integer (nullable = true)
 |-- episodes: string (nullable = true)
 |-- score: string (nullable = true)
 |-- scored_by: string (nullable = true)
 |-- rank: double (nullable = true)
 |-- popularity: double (nullable = true)
 |-- members: double (nullable = true)
 |-- favorites: integer (nullable = true)
 |-- genre_vector: vector (nullable = true)
 |-- type_index_vector: vector (nullable = true)
 |-- status_index_vector: vector (nullable = true)
 |-- airing_index_vector: vector (nullable = true)

-RECORD 0--------------------------------------------------------------
 anime_id            | 6336                                            
 episodes            | 7                                               
 score               | 8.31                                            
 scored_by           | 23492                                           
 rank                | 232.0                                           
 popularity          | 1481.0                    

In [15]:
df = df.fillna(0)

In [16]:
to_num_and_then_scale = ["episodes", "scored_by", "favorites",
               'rank', 'popularity', 'members', 'score']

from pyspark.ml.feature import StandardScaler



for i in to_num_and_then_scale:
    df = df.withColumn(i+"_num", df[i].cast(DoubleType()))
    df = df.drop(i)
    df = df.fillna(0, subset=[i+"_num"])
    print('processing: column ', i)
    assembler = VectorAssembler(inputCols=[i+"_num"],outputCol=i+"_Vect")
    df = assembler.transform(df)
#     df.show(1, False ,True)
#     print(df.schema[i+"_Vect"].dataType)
#     df = df.withColumn(i+"_Vect", Vectors.dense())
#     print(df.schema[i+"_Vect"].dataType)

    sscaler = StandardScaler(inputCol=i+"_Vect", outputCol='standard'+i+'_v')
    df = sscaler.fit(df).transform(df)
#     df = smodel.transform(df).drop(i+"_Vect")
    unlist = udf(lambda x: float(list(x)[0]), DoubleType())

    df = df.withColumn(i+'_stdscaled', unlist('standard'+i+'_v'))        

    df = df.drop(i+"_num", 'standard'+i+'_v', i+"_Vect")

processing: column  episodes
VectorUDT
processing: column  scored_by
VectorUDT
processing: column  favorites
VectorUDT
processing: column  rank
VectorUDT
processing: column  popularity
VectorUDT
processing: column  members
VectorUDT
processing: column  score
VectorUDT


In [17]:
df = df.drop('anime_id')

In [18]:
df.printSchema()
df.show(1, False, True)

root
 |-- genre_vector: vector (nullable = true)
 |-- type_index_vector: vector (nullable = true)
 |-- status_index_vector: vector (nullable = true)
 |-- airing_index_vector: vector (nullable = true)
 |-- episodes_stdscaled: double (nullable = true)
 |-- scored_by_stdscaled: double (nullable = true)
 |-- favorites_stdscaled: double (nullable = true)
 |-- rank_stdscaled: double (nullable = true)
 |-- popularity_stdscaled: double (nullable = true)
 |-- members_stdscaled: double (nullable = true)
 |-- score_stdscaled: double (nullable = true)

-RECORD 0---------------------------------------------------------------
 genre_vector         | (84,[1,4,5,12,23,26],[1.0,1.0,1.0,1.0,1.0,1.0]) 
 type_index_vector    | (6,[1],[1.0])                                   
 status_index_vector  | (2,[0],[1.0])                                   
 airing_index_vector  | (2,[0],[1.0])                                   
 episodes_stdscaled   | 0.2543923113347032                              
 scored_by_stds

In [None]:
to_num_scale = ["favorites"]


from pyspark.ml.feature import StandardScaler



for i in to_num_scale:
    assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")
    scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")
    pipeline = Pipeline(stages=[assembler, scaler])

    df = pipeline.fit(df).transform(df).withColumn(i+"_Scaled", unlist(i+"_Scaled"))

    scaler = StandardScaler(inputCol=i+"_Vect", outputCol='standard'+i+'_v',
                        withStd=True, withMean=False)
    smodel = scaler.fit(df)
    df = smodel.transform(df).drop(i+"_Vect")
    unlist = udf(lambda x: float(list(x)[0]), DoubleType())

    df = df.withColumn('standard'+i, unlist('standard'+i+'_v'))        

    df = df.drop(i, 'standard'+i+'_v')


In [17]:
to_num_scale = ["num_scored_by"]

to_array = F.udf(lambda v: v.toArray().tolist(), ArrayType(DoubleType()))
from pyspark.ml.feature import StandardScaler



for i in to_num_scale:
    print(i)
    assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")
    df = assembler.transform(df)
    df.show(1)
    print(df.schema[i+"_Vect"].dataType)
#     df = df.withColumn(i+"_Vect", Vectors.dense())
#     print(df.schema[i+"_Vect"].dataType)
    mmscaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")
    df = mmscaler.fit(df).transform(df)
    df = df.withColumn(i+"_Scaled", unlist(i+"_Scaled"))
    df.show(1)
    sscaler = StandardScaler(inputCol=i+"_Vect", outputCol='standard'+i+'_v',
                        withStd=True, withMean=False)
    smodel = sscaler.fit(df)
    df = smodel.transform(df).drop(i+"_Vect")
    unlist = udf(lambda x: float(list(x)[0]), DoubleType())

    df = df.withColumn('standard'+i, unlist('standard'+i+'_v'))        

    df = df.drop(i, 'standard'+i+'_v')


num_scored_by
+--------+-----+----------+-------+--------------------+---------+-----------+-----------+---------+-------------+------------+----------------+-------------------+------------------+
|anime_id| rank|popularity|members|        genre_vector|typeIndex|statusIndex|airingIndex|num_score|num_scored_by|num_episodes|favorites_Scaled|  standardfavorites|num_scored_by_Vect|
+--------+-----+----------+-------+--------------------+---------+-----------+-----------+---------+-------------+------------+----------------+-------------------+------------------+
|    6336|232.0|    1481.0|52688.0|(84,[1,4,5,12,23,...|      1.0|        0.0|        0.0|     8.31|      23492.0|         7.0|           0.026|0.46619812595480015|         [23492.0]|
+--------+-----+----------+-------+--------------------+---------+-----------+-----------+---------+-------------+------------+----------------+-------------------+------------------+
only showing top 1 row

VectorUDT
+--------+-----+----------+-----

In [18]:
df.printSchema()
df.show(1, False, True)

root
 |-- anime_id: integer (nullable = true)
 |-- rank: double (nullable = true)
 |-- popularity: double (nullable = true)
 |-- members: double (nullable = true)
 |-- genre_vector: vector (nullable = true)
 |-- typeIndex: double (nullable = false)
 |-- statusIndex: double (nullable = false)
 |-- airingIndex: double (nullable = false)
 |-- num_score: double (nullable = true)
 |-- num_episodes: double (nullable = true)
 |-- favorites_Scaled: double (nullable = true)
 |-- standardfavorites: double (nullable = true)
 |-- num_scored_by_Scaled: double (nullable = true)
 |-- standardnum_scored_by: double (nullable = true)

-RECORD 0----------------------------------------------------------------
 anime_id              | 6336                                            
 rank                  | 232.0                                           
 popularity            | 1481.0                                          
 members               | 52688.0                                         
 genr

In [28]:
with open('allnums.json', 'w') as f:
    f.write(df.toPandas().to_json(orient='records', lines=True))