In [25]:
# Import necessary libraries.
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('basics').getOrCreate()

from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

import warnings
warnings.filterwarnings('ignore')

import matplotlib.pyplot as plt
import numpy as np

plt.rcParams['figure.figsize'] = 8, 5
plt.rcParams['image.cmap'] = 'viridis'
import pandas as pd

In [26]:
# Create dataFrame_Initial from VideoGamesSales.csv file.
dataFrame_Initial = spark.read.csv('../Datasets/VideoGamesSales.csv', header=True, inferSchema=True)

In [27]:
# Data point count for dataFrame_Initial.
print("Total data points:", dataFrame_Initial.count())

Total data points: 16719


In [28]:
# Schema table for dataFrame_Initial.
dataFrame_Initial.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year_of_Release: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: double (nullable = true)
 |-- EU_Sales: double (nullable = true)
 |-- JP_Sales: double (nullable = true)
 |-- Other_Sales: double (nullable = true)
 |-- Global_Sales: double (nullable = true)
 |-- Critic_Score: integer (nullable = true)
 |-- Critic_Count: integer (nullable = true)
 |-- User_Score: string (nullable = true)
 |-- User_Count: integer (nullable = true)
 |-- Developer: string (nullable = true)
 |-- Rating: string (nullable = true)



In [29]:
# General statistics for 'Year_of_Release'
dataFrame_Initial.select('Year_of_Release').describe().show()

+-------+------------------+
|summary|   Year_of_Release|
+-------+------------------+
|  count|             16719|
|   mean|2006.4873556231003|
| stddev|5.8789947683491475|
|    min|              1980|
|    max|               N/A|
+-------+------------------+



In [30]:
dataFrame_Initial.filter("Year_of_Release > 2000 AND Year_of_Release < 2017").select('Year_of_Release').count()

14120

In [31]:
# Filter dataFrame_Initial to remove empty values.
dataFrame_Filtered = dataFrame_Initial.na.drop()

# Filter dataFrame_Filtered by 'Year_of_Release'
dataFrame_Filtered = dataFrame_Filtered.orderBy('Year_of_Release')

# Check dataFrame has been filtered. 
dataFrame_Filtered.head(5)

[Row(Name='Alter Ego', Platform='PC', Year_of_Release='1985', Genre='Simulation', Publisher='Activision', NA_Sales=0.0, EU_Sales=0.03, JP_Sales=0.0, Other_Sales=0.01, Global_Sales=0.03, Critic_Score=59, Critic_Count=9, User_Score='5.8', User_Count=19, Developer='Viva Media, Viva Media, LLC', Rating='T'),
 Row(Name='SimCity', Platform='PC', Year_of_Release='1988', Genre='Simulation', Publisher='Maxis', NA_Sales=0.0, EU_Sales=0.02, JP_Sales=0.0, Other_Sales=0.01, Global_Sales=0.03, Critic_Score=64, Critic_Count=75, User_Score='2.2', User_Count=4572, Developer='Maxis', Rating='E10+'),
 Row(Name='Doom', Platform='PC', Year_of_Release='1992', Genre='Shooter', Publisher='id Software', NA_Sales=0.02, EU_Sales=0.0, JP_Sales=0.0, Other_Sales=0.0, Global_Sales=0.03, Critic_Score=85, Critic_Count=44, User_Score='8.2', User_Count=1796, Developer='id Software', Rating='M'),
 Row(Name='Battle Arena Toshinden', Platform='PS', Year_of_Release='1994', Genre='Fighting', Publisher='Sony Computer Entertai

In [32]:
# Data point count for dataFrame_Filtered.
print("Total data points:", dataFrame_Filtered.count())

Total data points: 6947


In [33]:
# Add key column, 'ID' to dataFrame.
dataFrame_wKey = dataFrame_Filtered.select('*').withColumn('ID', monotonically_increasing_id())

In [34]:
# Set column types to accurately reflect data.
dataFrame_wKey = dataFrame_wKey.withColumn('User_Score', dataFrame_wKey['User_Score'].cast('float'))
dataFrame_wKey = dataFrame_wKey.withColumn('Year_of_Release', dataFrame_wKey['Year_of_Release'].cast('int'))
dataFrame_wKey = dataFrame_wKey.withColumn('User_Count', dataFrame_wKey['User_Count'].cast('int'))
dataFrame_wKey = dataFrame_wKey.withColumn('Critic_Count', dataFrame_wKey['Critic_Count'].cast('int'))

In [35]:
# Schema table with columns set to correct data type.
dataFrame_wKey.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year_of_Release: integer (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: double (nullable = true)
 |-- EU_Sales: double (nullable = true)
 |-- JP_Sales: double (nullable = true)
 |-- Other_Sales: double (nullable = true)
 |-- Global_Sales: double (nullable = true)
 |-- Critic_Score: integer (nullable = true)
 |-- Critic_Count: integer (nullable = true)
 |-- User_Score: float (nullable = true)
 |-- User_Count: integer (nullable = true)
 |-- Developer: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- ID: long (nullable = false)



In [36]:
# Create list of columns deemed useful.
columns_Useful = ['ID', 'Name', 'Platform', 'Year_of_Release', 'Genre', 
               'Global_Sales', 'Critic_Score', 'Critic_Count',
               'User_Score', 'User_Count', 'Rating']

In [37]:
# Create new dataframe which contains only useful columns.
dataFrame_Useful = dataFrame_wKey[columns_Useful]

In [38]:
# Show first 5 rows of dataFrame_Useful
dataFrame_Useful.head(5)

[Row(ID=0, Name='Alter Ego', Platform='PC', Year_of_Release=1985, Genre='Simulation', Global_Sales=0.03, Critic_Score=59, Critic_Count=9, User_Score=5.800000190734863, User_Count=19, Rating='T'),
 Row(ID=1, Name='SimCity', Platform='PC', Year_of_Release=1988, Genre='Simulation', Global_Sales=0.03, Critic_Score=64, Critic_Count=75, User_Score=2.200000047683716, User_Count=4572, Rating='E10+'),
 Row(ID=2, Name='Doom', Platform='PC', Year_of_Release=1992, Genre='Shooter', Global_Sales=0.03, Critic_Score=85, Critic_Count=44, User_Score=8.199999809265137, User_Count=1796, Rating='M'),
 Row(ID=3, Name='Battle Arena Toshinden', Platform='PS', Year_of_Release=1994, Genre='Fighting', Global_Sales=1.27, Critic_Score=69, Critic_Count=4, User_Score=6.300000190734863, User_Count=4, Rating='T'),
 Row(ID=4, Name='Tekken 2', Platform='PS', Year_of_Release=1996, Genre='Fighting', Global_Sales=5.74, Critic_Score=89, Critic_Count=8, User_Score=8.899999618530273, User_Count=102, Rating='T')]

In [39]:
pd_df_Useful = dataFrame_Useful.describe().toPandas().transpose()

In [40]:
# Create list of columns for input.
input_Columns = ['ID', 'Year_of_Release', 'Critic_Score', 'User_Score']

vector_Assembler = VectorAssembler(inputCols = input_Columns, outputCol = 'Features')

# Transform the data.
vector_Output = vector_Assembler.transform(dataFrame_Useful)

# Schema table with Features column added.
vector_Output.printSchema()

root
 |-- ID: long (nullable = false)
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year_of_Release: integer (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Global_Sales: double (nullable = true)
 |-- Critic_Score: integer (nullable = true)
 |-- Critic_Count: integer (nullable = true)
 |-- User_Score: float (nullable = true)
 |-- User_Count: integer (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Features: vector (nullable = true)



In [41]:
# Create new dataframe with only Features and GlobalSales columns
vector_output = vector_Output.select(['Features','Global_Sales'])

# dataFrame_Features now has only 2 columns
print(vector_output.head(5))

[Row(Features=DenseVector([0.0, 1985.0, 59.0, 5.8]), Global_Sales=0.03), Row(Features=DenseVector([1.0, 1988.0, 64.0, 2.2]), Global_Sales=0.03), Row(Features=DenseVector([2.0, 1992.0, 85.0, 8.2]), Global_Sales=0.03), Row(Features=DenseVector([3.0, 1994.0, 69.0, 6.3]), Global_Sales=1.27), Row(Features=DenseVector([4.0, 1996.0, 89.0, 8.9]), Global_Sales=5.74)]


In [44]:
# Split data by amounts stated above
data_train,data_test = vector_output.randomSplit([0.7,0.3])

# Show data_Train
data_train.show()

# Show data_Test
#data_test.show()


+--------------------+------------+
|            Features|Global_Sales|
+--------------------+------------+
|[0.0,1985.0,59.0,...|        0.03|
|[1.0,1988.0,64.0,...|        0.03|
|[2.0,1992.0,85.0,...|        0.03|
|[3.0,1994.0,69.0,...|        1.27|
|[4.0,1996.0,89.0,...|        5.74|
|[6.0,1996.0,91.0,...|        4.63|
|[8.0,1996.0,94.0,...|        1.59|
|[9.0,1996.0,86.0,...|        1.03|
|[10.0,1996.0,83.0...|        0.14|
|[11.0,1997.0,96.0...|       10.95|
|[12.0,1997.0,92.0...|        9.72|
|[14.0,1997.0,83.0...|        2.45|
|[15.0,1997.0,87.0...|        1.99|
|[16.0,1997.0,93.0...|        1.27|
|[17.0,1997.0,85.0...|        1.24|
|[18.0,1997.0,83.0...|        1.16|
|[19.0,1997.0,83.0...|        0.89|
|[20.0,1997.0,66.0...|         0.5|
|[22.0,1997.0,86.0...|        0.23|
|[24.0,1998.0,96.0...|        7.16|
+--------------------+------------+
only showing top 20 rows



In [21]:
regression = LinearRegression(featuresCol='Features', labelCol='Global_Sales')

# Fit the training data.
regression_Model = regression.fit(data_Train)

# Print the coefficients and intercept.
print("Liner Regression Coefficients: " + str(regression_Model.coefficients))
print("Linear Regression Intercept: " + str(regression_Model.intercept))

# Summarise the model
summary = regression_Model.summary

# Print RMSE and R2
print("Linear Regression RMSE on training data: " + str(summary.rootMeanSquaredError))
print("Linear Regression R2 on training data: " + str(summary.r2))

NameError: name 'data_Train' is not defined

In [None]:
# Visualize the coefficients.
beta = np.sort(regression_Model.coefficient)

# Initial plot of data.
plt.plot(beta)

# Add a label to y-axis of plot.
plt.ylabel('Beta Coefficients')

In [None]:
results_Test = regression_Model.evaluate(data_Test)

# Print RMSE and R2
print("Linear Regression RMSE on test data: " + str(results_test.rootMeanSquaredError))
print("Linear Regression R2 on test data: " + str(results_Test.r2))