In [1]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [None]:
spark = SparkSession \
    .builder \
    .appName("feat-eng") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/04 11:47:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 57775)
Traceback (most recent call last):
  File "/opt/anaconda3/envs/concrec/lib/python3.13/socketserver.py", line 318, in _handle_request_noblock
    self.process_request(request, client_address)
    ~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/anaconda3/envs/concrec/lib/python3.13/socketserver.py", line 349, in process_request
    self.finish_request(request, client_address)
    ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/anaconda3/envs/concrec/lib/python3.13/socketserver.py", line 362, in finish_request
    self.RequestHandlerClass(request, client_address, self)
    ~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/anaconda3/envs/concrec/lib/python3.13/socketserver.py", line 766, in __init__
    self.handle()
    ~~~~~~~~~~~^^
  File "/opt/anaconda3/envs/concrec/lib/python3.13/site-packages/pyspark/accumulators.py", line 295, in 

In [7]:
anime_df = spark.read.csv('../dataset/parsed_anime.csv', header=True, inferSchema=True)
anime_df.printSchema()

                                                                                

root
 |-- anime_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- type: string (nullable = true)
 |-- episodes: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- members: integer (nullable = true)
 |-- japanese_title: string (nullable = true)
 |-- aired: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- aired_from: string (nullable = true)
 |-- aired_to: integer (nullable = true)



# Multi-hot encode genres

In [8]:
from pyspark.ml.feature import StringIndexer

In [9]:
genre_df = anime_df \
    .withColumn('genre_item', explode(split(col('genre'), '[,]'))) \
    .withColumn('genre_item', trim(col('genre_item')))

In [10]:
genre_df.select(['anime_id', 'genre_item']).show(10)

+--------+------------+
|anime_id|  genre_item|
+--------+------------+
|   32281|       Drama|
|   32281|     Romance|
|   32281|      School|
|   32281|Supernatural|
|    5114|      Action|
|    5114|   Adventure|
|    5114|       Drama|
|    5114|     Fantasy|
|    5114|       Magic|
|    5114|    Military|
+--------+------------+
only showing top 10 rows



In [11]:
string_indexer = StringIndexer(inputCol='genre_item', outputCol='genre_index')
genre_indexed_df = string_indexer \
    .fit(genre_df) \
    .transform(genre_df) \
    .withColumn('genre_index', col('genre_index').cast('int'))

                                                                                

In [12]:
genre_indexed_df[['anime_id', 'genre_item', 'genre_index']].show(10)

+--------+------------+-----------+
|anime_id|  genre_item|genre_index|
+--------+------------+-----------+
|   32281|       Drama|          5|
|   32281|     Romance|          8|
|   32281|      School|          9|
|   32281|Supernatural|         12|
|    5114|      Action|          1|
|    5114|   Adventure|          2|
|    5114|       Drama|          5|
|    5114|     Fantasy|          3|
|    5114|       Magic|         16|
|    5114|    Military|         23|
+--------+------------+-----------+
only showing top 10 rows



In [13]:
pre_multihot_df = genre_indexed_df \
    .groupby('anime_id') \
    .agg(collect_list('genre_index').alias('genre_indexes'))

In [14]:
pre_multihot_df.show(10)

[Stage 7:>                                                          (0 + 1) / 1]

+--------+--------------------+
|anime_id|       genre_indexes|
+--------+--------------------+
|       1| [1, 2, 0, 5, 4, 25]|
|       5|   [1, 5, 21, 4, 25]|
|       6|           [1, 0, 4]|
|       7|[1, 5, 16, 21, 32...|
|       8|       [2, 3, 6, 12]|
|      15|       [1, 0, 6, 20]|
|      16|       [0, 5, 40, 8]|
|      17|      [0, 6, 10, 20]|
|      18|  [1, 37, 5, 19, 20]|
|      19|[5, 26, 21, 32, 3...|
+--------+--------------------+
only showing top 10 rows



                                                                                

In [15]:
max_genre_index = genre_indexed_df \
    .agg(max(col('genre_index'))).head()['max(genre_index)']

In [16]:
max_genre_index

42

In [17]:
import numpy as np

@udf(returnType='array<int>')
def multihot_list(l, max_index):
    fill = np.zeros(max_index + 1, dtype=np.int32)
    for i in l:
        fill[i] = 1
    return fill.tolist()

In [18]:
multihot_df = pre_multihot_df \
    .withColumn(
        'genre_multihot',
        multihot_list(col('genre_indexes'), lit(max_genre_index))
    )

In [19]:
multihot_df.printSchema()

root
 |-- anime_id: integer (nullable = true)
 |-- genre_indexes: array (nullable = false)
 |    |-- element: integer (containsNull = false)
 |-- genre_multihot: array (nullable = true)
 |    |-- element: integer (containsNull = true)



In [21]:
multihot_df.show(10)

+--------+--------------------+--------------------+
|anime_id|       genre_indexes|      genre_multihot|
+--------+--------------------+--------------------+
|       1| [1, 2, 0, 5, 4, 25]|[1, 1, 1, 0, 1, 1...|
|       5|   [1, 5, 21, 4, 25]|[0, 1, 0, 0, 1, 1...|
|       6|           [1, 0, 4]|[1, 1, 0, 0, 1, 0...|
|       7|[1, 5, 16, 21, 32...|[0, 1, 0, 0, 0, 1...|
|       8|       [2, 3, 6, 12]|[0, 0, 1, 1, 0, 0...|
|      15|       [1, 0, 6, 20]|[1, 1, 0, 0, 0, 0...|
|      16|       [0, 5, 40, 8]|[1, 0, 0, 0, 0, 1...|
|      17|      [0, 6, 10, 20]|[1, 0, 0, 0, 0, 0...|
|      18|  [1, 37, 5, 19, 20]|[0, 1, 0, 0, 0, 1...|
|      19|[5, 26, 21, 32, 3...|[0, 0, 0, 0, 0, 1...|
+--------+--------------------+--------------------+
only showing top 10 rows



                                                                                

# Average Rating Min-Max Scale

In [24]:
rating_df = spark.read.csv('../dataset/rating.csv', header=True, inferSchema=True) \
    .filter(col('rating') > 0)

                                                                                

In [27]:
ave_rating_df = rating_df \
    .groupby('anime_id') \
    .agg(mean('rating').alias('ave_rating'))

In [28]:
ave_rating_df.show(10)



+--------+-----------------+
|anime_id|       ave_rating|
+--------+-----------------+
|   24171|7.386666666666667|
|    9465|8.098352214212152|
|   17679|7.293103448275862|
|    1829|7.341757827235005|
|    8086|7.939071817474721|
|   17389|8.601839684625492|
|   22097| 8.13076923076923|
|   30654|8.687342833193629|
|    5300|8.694010416666666|
|    6336|8.497902097902099|
+--------+-----------------+
only showing top 10 rows



                                                                                

In [29]:
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.ml import Pipeline

In [30]:
vec_assembler = VectorAssembler(inputCols=['ave_rating'], outputCol='ave_rating_vec')
ave_rating_scaler = MinMaxScaler(inputCol='ave_rating_vec', outputCol='ave_rating_scaled')
pipeline = Pipeline(stages=[vec_assembler, ave_rating_scaler])

rating_scaled_df = pipeline \
    .fit(ave_rating_df) \
    .transform(ave_rating_df)

                                                                                

In [31]:
rating_scaled_df.printSchema()

root
 |-- anime_id: integer (nullable = true)
 |-- ave_rating: double (nullable = true)
 |-- ave_rating_vec: vector (nullable = true)
 |-- ave_rating_scaled: vector (nullable = true)



In [32]:
rating_scaled_df.show(10)



+--------+-----------------+-------------------+--------------------+
|anime_id|       ave_rating|     ave_rating_vec|   ave_rating_scaled|
+--------+-----------------+-------------------+--------------------+
|   24171|7.386666666666667|[7.386666666666667]|[0.7096296296296296]|
|    9465|8.098352214212152|[8.098352214212152]| [0.788705801579128]|
|   17679|7.293103448275862|[7.293103448275862]|[0.6992337164750958]|
|    1829|7.341757827235005|[7.341757827235005]|[0.7046397585816672]|
|    8086|7.939071817474721|[7.939071817474721]|[0.7710079797194134]|
|   17389|8.601839684625492|[8.601839684625492]|[0.8446488538472768]|
|   22097| 8.13076923076923| [8.13076923076923]|[0.7923076923076922]|
|   30654|8.687342833193629|[8.687342833193629]|[0.8541492036881809]|
|    5300|8.694010416666666|[8.694010416666666]|[0.8548900462962962]|
|    6336|8.497902097902099|[8.497902097902099]|[0.8331002331002332]|
+--------+-----------------+-------------------+--------------------+
only showing top 10 

                                                                                

In [33]:
@udf(returnType='float')
def unwrap_list(rating):
    return rating.toArray().tolist()[0]

In [34]:
rating_scaled_df = rating_scaled_df \
    .withColumn('ave_rating_minmax', unwrap_list(col('ave_rating_scaled')))

In [35]:
rating_scaled_df.show(10)

[Stage 30:====>                                                   (1 + 11) / 12]

+--------+-----------------+-------------------+--------------------+-----------------+
|anime_id|       ave_rating|     ave_rating_vec|   ave_rating_scaled|ave_rating_minmax|
+--------+-----------------+-------------------+--------------------+-----------------+
|   24171|7.386666666666667|[7.386666666666667]|[0.7096296296296296]|       0.70962965|
|    9465|8.098352214212152|[8.098352214212152]| [0.788705801579128]|        0.7887058|
|   17679|7.293103448275862|[7.293103448275862]|[0.6992337164750958]|        0.6992337|
|    1829|7.341757827235005|[7.341757827235005]|[0.7046397585816672]|       0.70463973|
|    8086|7.939071817474721|[7.939071817474721]|[0.7710079797194134]|       0.77100796|
|   17389|8.601839684625492|[8.601839684625492]|[0.8446488538472768]|       0.84464884|
|   22097| 8.13076923076923| [8.13076923076923]|[0.7923076923076922]|        0.7923077|
|   30654|8.687342833193629|[8.687342833193629]|[0.8541492036881809]|        0.8541492|
|    5300|8.694010416666666|[8.6

                                                                                

In [36]:
rating_result_df = rating_scaled_df \
    .select(['anime_id', 'ave_rating_minmax'])

In [37]:
result_df = anime_df \
    .join(rating_result_df, on='anime_id')

In [38]:
result_df.printSchema()

root
 |-- anime_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- type: string (nullable = true)
 |-- episodes: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- members: integer (nullable = true)
 |-- japanese_title: string (nullable = true)
 |-- aired: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- aired_from: string (nullable = true)
 |-- aired_to: integer (nullable = true)
 |-- ave_rating_minmax: float (nullable = true)



In [51]:
result_df.show(10)

                                                                                

+--------+--------------------+--------------------+-----+--------+------+-------+---------------------------------+--------------------+--------------------+----------+----------+-----------------+
|anime_id|                name|               genre| type|episodes|rating|members|                   japanese_title|               aired|           image_url|aired_from|  aired_to|ave_rating_minmax|
+--------+--------------------+--------------------+-----+--------+------+-------+---------------------------------+--------------------+--------------------+----------+----------+-----------------+
|   24171|     Mushibugyou OVA|Action, Fantasy, ...|  OVA|       3|   7.2|   3636|          ムシブギョー 虫奉行 OVA|Jul 18, 2014 to J...|https://cdn.myani...|1405612800|1421337600|       0.70962965|
|    9465|Break Blade 4: Sa...|Action, Fantasy, ...|Movie|       1|  7.99|  41598|ブレイク ブレイド 第四章 惨禍ノ地|        Oct 30, 2010|https://cdn.myani...|1288368000|1288368000|        0.7887058|
|   17679|               Gamb

Exception ignored in: <_io.BufferedWriter name=5>
Traceback (most recent call last):
  File "/opt/anaconda3/envs/concrec/lib/python3.13/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 193, in manager
BrokenPipeError: [Errno 32] Broken pipe
Exception ignored in: <_io.BufferedWriter name=5>
Traceback (most recent call last):
  File "/opt/anaconda3/envs/concrec/lib/python3.13/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 193, in manager
BrokenPipeError: [Errno 32] Broken pipe


In [None]:
result_df \
    .write.format('csv') \
    .save('../dataset/featured_anime.csv')

                                                                                

25/03/04 15:07:15 WARN TransportChannelHandler: Exception in connection from wirelessprv-10-192-107-250.near.illinois.edu/10.192.107.250:57691
java.io.IOException: Operation timed out
	at java.base/sun.nio.ch.SocketDispatcher.read0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:47)
	at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:330)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:284)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:259)
	at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:417)
	at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:254)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:357)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.chan