# Anime LR Problem

Import the data

In [16]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName('Anime').getOrCreate()
sc = spark.sparkContext

import numpy as np
import pandas as pd

In [17]:
# read data
df = spark.read.option("header","true").csv("anime.csv",inferSchema = True)

Preprocess the data

In [18]:
# drop NA value
df = df[df['Score'] != 'Unknown']
df = df[df['Episodes'] != 'Unknown']

In [19]:
# convert string to float & integer
from pyspark.sql.functions import col
from pyspark.sql.types import FloatType
df = df.withColumn('Score',col('Score').cast('float'))
df = df.withColumn('Episodes',col('Episodes').cast('int'))
df = df.withColumn('Popularity',col('Popularity').cast('int'))
df = df.dropna()

In [20]:
# create a new column 'Views'
df = df.withColumn('Views',df['Score-10']+df['Score-9']+df['Score-8']+df['Score-7']+df['Score-6']
                  +df['Score-5']+df['Score-4']+df['Score-3']+df['Score-2']+df['Score-1'])

In [21]:
# select column
df = df.select(['Score','Type','Episodes','Source','Rating','Popularity','Members','Favorites','Watching',
                'On-Hold','Dropped','Plan to Watch','Views'])
df.show(5)

+-----+-----+--------+--------+--------------------+----------+---------+---------+--------+-------+-------+-------------+--------+
|Score| Type|Episodes|  Source|              Rating|Popularity|  Members|Favorites|Watching|On-Hold|Dropped|Plan to Watch|   Views|
+-----+-----+--------+--------+--------------------+----------+---------+---------+--------+-------+-------+-------------+--------+
| 8.78|   TV|      26|Original|R - 17+ (violence...|        39|1251960.0|    61971|  105808|  71513|  26678|       329800|641705.0|
| 8.39|Movie|       1|Original|R - 17+ (violence...|       518| 273145.0|     1174|    4143|   1935|    770|        57964|160349.0|
| 8.24|   TV|      26|   Manga|PG-13 - Teens 13 ...|       201| 558913.0|    12944|   29113|  25465|  13925|       146918|286146.0|
| 7.27|   TV|      26|Original|PG-13 - Teens 13 ...|      1467|  94683.0|      587|    4300|   5121|   5378|        33719| 39094.0|
| 6.98|   TV|      52|   Manga|       PG - Children|      4369|  13224.0|   

In [22]:
# x and y variables
df.printSchema()

root
 |-- Score: float (nullable = true)
 |-- Type: string (nullable = true)
 |-- Episodes: integer (nullable = true)
 |-- Source: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- Members: double (nullable = true)
 |-- Favorites: integer (nullable = true)
 |-- Watching: integer (nullable = true)
 |-- On-Hold: integer (nullable = true)
 |-- Dropped: integer (nullable = true)
 |-- Plan to Watch: integer (nullable = true)
 |-- Views: double (nullable = true)



In [23]:
# summary
df.summary().toPandas()

                                                                                

Unnamed: 0,summary,Score,Type,Episodes,Source,Rating,Popularity,Members,Favorites,Watching,On-Hold,Dropped,Plan to Watch,Views
0,count,12312.0,12312,12312.0,12312,12312,12312.0,12312.0,12312.0,12312.0,12312.0,12312.0,12312.0,12008.0
1,mean,6.510845519287622,,11.328866146848602,,,6437.678525016245,48716.99918778428,638.0909681611436,3044.6125730994154,1328.1386452241716,1633.9566276803118,11225.28248862898,25837.245419720188
2,stddev,0.887440196112299,,42.8041021875073,,,3851.467577178708,146707.64126986082,4703.773391208465,14389.574512895428,4701.004098135914,5350.285755354186,27362.514111349985,88884.16350757869
3,min,1.85,Movie,1.0,4-koma manga,G - All Ages,1.0,172.0,0.0,0.0,0.0,0.0,12.0,101.0
4,25%,5.93,,1.0,,,3129.0,1488.0,2.0,53.0,33.0,61.0,514.0,542.0
5,50%,6.52,,2.0,,,6320.0,5326.0,10.0,216.0,133.0,144.0,1736.0,2145.0
6,75%,7.14,,12.0,,,9585.0,27411.0,72.0,1014.0,567.0,548.0,7597.0,12394.0
7,max,9.19,TV,1818.0,Web manga,Unknown,15374.0,2589552.0,183914.0,566239.0,130961.0,174710.0,425531.0,1826691.0


Linear Regression

In [24]:
# convert string variables
from pyspark.ml.feature import StringIndexer,VectorAssembler
from pyspark.ml import Pipeline
convert = [StringIndexer(inputCol = column, outputCol = column+"_index")
            .fit(df) for column in ['Type','Source','Rating']]
pipeline = Pipeline(stages = convert)
df = pipeline.fit(df).transform(df)
df = df.drop('Type','Source','Rating')
df.show(10)

+-----+--------+----------+---------+---------+--------+-------+-------+-------------+--------+----------+------------+------------+
|Score|Episodes|Popularity|  Members|Favorites|Watching|On-Hold|Dropped|Plan to Watch|   Views|Type_index|Source_index|Rating_index|
+-----+--------+----------+---------+---------+--------+-------+-------+-------------+--------+----------+------------+------------+
| 8.78|      26|        39|1251960.0|    61971|  105808|  71513|  26678|       329800|641705.0|       0.0|         1.0|         3.0|
| 8.39|       1|       518| 273145.0|     1174|    4143|   1935|    770|        57964|160349.0|       2.0|         1.0|         3.0|
| 8.24|      26|       201| 558913.0|    12944|   29113|  25465|  13925|       146918|286146.0|       0.0|         0.0|         0.0|
| 7.27|      26|      1467|  94683.0|      587|    4300|   5121|   5378|        33719| 39094.0|       0.0|         1.0|         0.0|
| 6.98|      52|      4369|  13224.0|       18|     642|    766|   11

In [25]:
# create vector
feature = VectorAssembler(inputCols = df.columns[1:],outputCol = "Features")
feature_vector = feature.transform(df)

In [26]:
# split data to train and test subset
(traindata,testdata) = feature_vector.randomSplit([0.7, 0.3],seed = 24)

In [27]:
# model
from pyspark.ml.regression import LinearRegression
score_lr = LinearRegression(featuresCol = 'Features',labelCol = 'Score')
train_model = score_lr.fit(traindata)
results = train_model.evaluate(traindata)

22/12/03 16:29:05 WARN Instrumentation: [201a2ba6] regParam is zero, which might cause numerical instability and overfitting.
22/12/03 16:29:05 ERROR Executor: Exception in task 1.0 in stage 25.0 (TID 37)
org.apache.spark.SparkException: Failed to execute user defined function(VectorAssembler$$Lambda$3359/0x0000000801420040: (struct<Episodes_double_VectorAssembler_4315e48d728b:double,Popularity_double_VectorAssembler_4315e48d728b:double,Members:double,Favorites_double_VectorAssembler_4315e48d728b:double,Watching_double_VectorAssembler_4315e48d728b:double,On-Hold_double_VectorAssembler_4315e48d728b:double,Dropped_double_VectorAssembler_4315e48d728b:double,Plan to Watch_double_VectorAssembler_4315e48d728b:double,Views:double,Type_index:double,Source_index:double,Rating_index:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	a

Py4JJavaError: An error occurred while calling o559.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 25.0 failed 1 times, most recent failure: Lost task 1.0 in stage 25.0 (TID 37) (10.232.185.218 executor driver): org.apache.spark.SparkException: Failed to execute user defined function(VectorAssembler$$Lambda$3359/0x0000000801420040: (struct<Episodes_double_VectorAssembler_4315e48d728b:double,Popularity_double_VectorAssembler_4315e48d728b:double,Members:double,Favorites_double_VectorAssembler_4315e48d728b:double,Watching_double_VectorAssembler_4315e48d728b:double,On-Hold_double_VectorAssembler_4315e48d728b:double,Dropped_double_VectorAssembler_4315e48d728b:double,Plan to Watch_double_VectorAssembler_4315e48d728b:double,Views:double,Type_index:double,Source_index:double,Rating_index:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:219)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:219)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1429)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$3(RDD.scala:1230)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$5(RDD.scala:1231)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 31 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2291)
	at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1183)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1177)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1222)
	at org.apache.spark.ml.optim.WeightedLeastSquares.fit(WeightedLeastSquares.scala:107)
	at org.apache.spark.ml.regression.LinearRegression.trainWithNormal(LinearRegression.scala:452)
	at org.apache.spark.ml.regression.LinearRegression.$anonfun$train$1(LinearRegression.scala:346)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:328)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:185)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function(VectorAssembler$$Lambda$3359/0x0000000801420040: (struct<Episodes_double_VectorAssembler_4315e48d728b:double,Popularity_double_VectorAssembler_4315e48d728b:double,Members:double,Favorites_double_VectorAssembler_4315e48d728b:double,Watching_double_VectorAssembler_4315e48d728b:double,On-Hold_double_VectorAssembler_4315e48d728b:double,Dropped_double_VectorAssembler_4315e48d728b:double,Plan to Watch_double_VectorAssembler_4315e48d728b:double,Views:double,Type_index:double,Source_index:double,Rating_index:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:219)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:219)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1429)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$3(RDD.scala:1230)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$5(RDD.scala:1231)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 31 more
