In [10]:
# Import necessary libraries.
import findspark
findspark.init('/home/user/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('basics').getOrCreate()

from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GeneralizedLinearRegression

import warnings
warnings.filterwarnings('ignore')

import matplotlib.pyplot as plt
import numpy as np

plt.rcParams['figure.figsize'] = 8, 5
plt.rcParams['image.cmap'] = 'viridis'
import pandas as pd

In [11]:
# Create dataFrame_Initial from VideoGamesSales.csv file.
dataFrame_Initial = spark.read.csv('../Datasets/VideoGamesSales.csv', header=True, inferSchema=True)

In [12]:
# Data point count for dataFrame_Initial.
print("Total data points:", dataFrame_Initial.count())

Total data points: 16719


In [13]:
# Schema table for dataFrame_Initial.
dataFrame_Initial.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year_of_Release: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: double (nullable = true)
 |-- EU_Sales: double (nullable = true)
 |-- JP_Sales: double (nullable = true)
 |-- Other_Sales: double (nullable = true)
 |-- Global_Sales: double (nullable = true)
 |-- Critic_Score: integer (nullable = true)
 |-- Critic_Count: integer (nullable = true)
 |-- User_Score: string (nullable = true)
 |-- User_Count: integer (nullable = true)
 |-- Developer: string (nullable = true)
 |-- Rating: string (nullable = true)



In [14]:
# General statistics for 'Year_of_Release'
dataFrame_Initial.select('Year_of_Release').describe().show()

+-------+------------------+
|summary|   Year_of_Release|
+-------+------------------+
|  count|             16719|
|   mean|2006.4873556231003|
| stddev|5.8789947683491475|
|    min|              1980|
|    max|               N/A|
+-------+------------------+



In [15]:
dataFrame_Initial.filter("Year_of_Release > 2000 AND Year_of_Release < 2017").select('Year_of_Release').count()

14120

In [16]:
# Filter dataFrame_Initial to remove empty values.
dataFrame_Filtered = dataFrame_Initial.na.drop()

# Filter dataFrame_Filtered by 'Year_of_Release'
dataFrame_Filtered = dataFrame_Filtered.orderBy('Year_of_Release')

# Check dataFrame has been filtered. 
dataFrame_Filtered.head(5)

[Row(Name='Alter Ego', Platform='PC', Year_of_Release='1985', Genre='Simulation', Publisher='Activision', NA_Sales=0.0, EU_Sales=0.03, JP_Sales=0.0, Other_Sales=0.01, Global_Sales=0.03, Critic_Score=59, Critic_Count=9, User_Score='5.8', User_Count=19, Developer='Viva Media, Viva Media, LLC', Rating='T'),
 Row(Name='SimCity', Platform='PC', Year_of_Release='1988', Genre='Simulation', Publisher='Maxis', NA_Sales=0.0, EU_Sales=0.02, JP_Sales=0.0, Other_Sales=0.01, Global_Sales=0.03, Critic_Score=64, Critic_Count=75, User_Score='2.2', User_Count=4572, Developer='Maxis', Rating='E10+'),
 Row(Name='Doom', Platform='PC', Year_of_Release='1992', Genre='Shooter', Publisher='id Software', NA_Sales=0.02, EU_Sales=0.0, JP_Sales=0.0, Other_Sales=0.0, Global_Sales=0.03, Critic_Score=85, Critic_Count=44, User_Score='8.2', User_Count=1796, Developer='id Software', Rating='M'),
 Row(Name='Battle Arena Toshinden', Platform='PS', Year_of_Release='1994', Genre='Fighting', Publisher='Sony Computer Entertai

In [17]:
# Add key column, 'ID' to dataFrame.
dataFrame_wKey = dataFrame_Filtered.select('*').withColumn('ID', monotonically_increasing_id())

In [18]:
# Set column types to accurately reflect data.
dataFrame_wKey = dataFrame_wKey.withColumn('User_Score', dataFrame_wKey['User_Score'].cast('float'))
dataFrame_wKey = dataFrame_wKey.withColumn('Year_of_Release', dataFrame_wKey['Year_of_Release'].cast('int'))

In [19]:
# Schema table with columns set to correct data type.
dataFrame_wKey.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year_of_Release: integer (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: double (nullable = true)
 |-- EU_Sales: double (nullable = true)
 |-- JP_Sales: double (nullable = true)
 |-- Other_Sales: double (nullable = true)
 |-- Global_Sales: double (nullable = true)
 |-- Critic_Score: integer (nullable = true)
 |-- Critic_Count: integer (nullable = true)
 |-- User_Score: float (nullable = true)
 |-- User_Count: integer (nullable = true)
 |-- Developer: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- ID: long (nullable = false)



In [20]:
# Create list of columns deemed useful.
columns_Useful = ['ID', 'Name', 'Platform', 'Year_of_Release', 'Genre', 
               'Global_Sales', 'Critic_Score', 'Critic_Count',
               'User_Score', 'User_Count']

In [21]:
# Create new dataframe which contains only useful columns.
dataFrame_Useful = dataFrame_wKey[columns_Useful]

In [22]:
# Show first 5 rows of dataFrame_Useful
dataFrame_Useful.head(5)

[Row(ID=0, Name='Alter Ego', Platform='PC', Year_of_Release=1985, Genre='Simulation', Global_Sales=0.03, Critic_Score=59, Critic_Count=9, User_Score=5.800000190734863, User_Count=19, Rating='T'),
 Row(ID=1, Name='SimCity', Platform='PC', Year_of_Release=1988, Genre='Simulation', Global_Sales=0.03, Critic_Score=64, Critic_Count=75, User_Score=2.200000047683716, User_Count=4572, Rating='E10+'),
 Row(ID=2, Name='Doom', Platform='PC', Year_of_Release=1992, Genre='Shooter', Global_Sales=0.03, Critic_Score=85, Critic_Count=44, User_Score=8.199999809265137, User_Count=1796, Rating='M'),
 Row(ID=3, Name='Battle Arena Toshinden', Platform='PS', Year_of_Release=1994, Genre='Fighting', Global_Sales=1.27, Critic_Score=69, Critic_Count=4, User_Score=6.300000190734863, User_Count=4, Rating='T'),
 Row(ID=4, Name='Tekken 2', Platform='PS', Year_of_Release=1996, Genre='Fighting', Global_Sales=5.74, Critic_Score=89, Critic_Count=8, User_Score=8.899999618530273, User_Count=102, Rating='T')]

In [23]:
pd_df_Useful = dataFrame_Useful.describe().toPandas().transpose()

In [24]:
# Create list of columns for input.
input_Columns = ['ID', 'Year_of_Release', 'Critic_Score', 'User_Score']

vector_Assembler = VectorAssembler(inputCols = input_Columns, outputCol = 'Features')

# Transform the data.
vector_Output = vector_Assembler.transform(dataFrame_Useful)

# Schema table with Features column added.
vector_Output.printSchema()

root
 |-- ID: long (nullable = false)
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year_of_Release: integer (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Global_Sales: double (nullable = true)
 |-- Critic_Score: integer (nullable = true)
 |-- Critic_Count: integer (nullable = true)
 |-- User_Score: float (nullable = true)
 |-- User_Count: integer (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Features: vector (nullable = true)



In [25]:
# Create new dataframe with only Features and GlobalSales columns
dataFrame_Features = vector_Output.select(['Features','Global_Sales'])

# dataFrame_Features now has only 2 columns
print(dataFrame_Features.head(5))

[Row(Features=DenseVector([0.0, 1985.0, 59.0, 5.8]), Global_Sales=0.03), Row(Features=DenseVector([1.0, 1988.0, 64.0, 2.2]), Global_Sales=0.03), Row(Features=DenseVector([2.0, 1992.0, 85.0, 8.2]), Global_Sales=0.03), Row(Features=DenseVector([3.0, 1994.0, 69.0, 6.3]), Global_Sales=1.27), Row(Features=DenseVector([4.0, 1996.0, 89.0, 8.9]), Global_Sales=5.74)]


In [26]:
# Split data by amounts stated above
data_train,data_test = dataFrame_Features.randomSplit([0.7,0.3])

# Show data_Train
data_train.describe().show()

# Show data_Test
data_test.describe().show()

Py4JJavaError: An error occurred while calling o155.describe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 in stage 32.0 failed 1 times, most recent failure: Lost task 19.0 in stage 32.0 (TID 72, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function(: (struct<ID_double_VectorAssembler_4b76905d0806db53dc13:double,Year_of_Release_double_VectorAssembler_4b76905d0806db53dc13:double,Critic_Score_double_VectorAssembler_4b76905d0806db53dc13:double,User_Score_double_VectorAssembler_4b76905d0806db53dc13:double>) => vector)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:844)
Caused by: org.apache.spark.SparkException: Values to assemble cannot be null.
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:160)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:143)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:143)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:99)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98)
	... 14 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2386)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2788)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2385)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2392)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2128)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2127)
	at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2818)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2127)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2136)
	at org.apache.spark.sql.Dataset$$anonfun$describe$1.apply(Dataset.scala:2100)
	at org.apache.spark.sql.Dataset$$anonfun$describe$1.apply(Dataset.scala:2082)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2845)
	at org.apache.spark.sql.Dataset.describe(Dataset.scala:2082)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.base/java.lang.Thread.run(Thread.java:844)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function(: (struct<ID_double_VectorAssembler_4b76905d0806db53dc13:double,Year_of_Release_double_VectorAssembler_4b76905d0806db53dc13:double,Critic_Score_double_VectorAssembler_4b76905d0806db53dc13:double,User_Score_double_VectorAssembler_4b76905d0806db53dc13:double>) => vector)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
Caused by: org.apache.spark.SparkException: Values to assemble cannot be null.
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:160)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:143)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:143)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:99)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98)
	... 14 more


In [None]:
regression = GeneralizedLinearRegression(featuresCol='Features', labelCol='Global_Sales')

# Fit the model. 
regression_Model = regression.fit(data_train)

print("Coefficients: " + str(regression_Model.coefficients))
print("Intercept: " + str(regression_Model.intercept))

# Summarize the model over the training set and print out some metrics
summary = regression_Model.summary
print("Coefficient Standard Errors: " + str(summary.coefficientStandardErrors))
print("T Values: " + str(summary.tValues))
print("P Values: " + str(summary.pValues))
print("Dispersion: " + str(summary.dispersion))
print("Null Deviance: " + str(summary.nullDeviance))
print("Residual Degree Of Freedom Null: " + str(summary.residualDegreeOfFreedomNull))
print("Deviance: " + str(summary.deviance))
print("Residual Degree Of Freedom: " + str(summary.residualDegreeOfFreedom))
print("AIC: " + str(summary.aic))
print("Deviance Residuals: ")
summary.residuals().show()

In [None]:
# Visualize the coefficients
beta = np.sort(regression_Model.coefficients)

# Then plot the data
plt.plot(beta)

plt.ylabel('Beta Coefficients')