<B>Setup your dataframe using SQL</B>

In [0]:
df = sqlContext.sql("SELECT \
  a.Name \
, a.Platform as Console \
, CASE \
    WHEN a.Platform IN ('PS3','PS','PS2','PS4','PSP','PSV') THEN 'Sony' \
    WHEN a.Platform IN ('3DS','WiiU','GBA','GC','Wii','DS') THEN 'Nintendo' \
    WHEN a.Platform IN ('XOne','X360','XB') THEN 'Microsoft' \
  END Console_Company \
, CAST(a.Year_of_Release AS INT) Year_of_Release \
, a.Genre \
, CASE \
    WHEN a.Publisher LIKE 'Sony%' THEN 1 \
    WHEN a.Publisher IN ('Microsoft','Nintendo') THEN 1 \
    ELSE 0 \
  END AS is_Console_Publisher_Match \
, a.Global_Sales \
, a.Critic_Score \
, a.Critic_Count \
, CAST(a.User_Score * 10 AS INT) User_Score \
, a.User_Count \
, CASE \
    WHEN a.Rating IN ('E','EC','K-A') THEN 7 \
    WHEN a.Rating = 'E10+' THEN 10 \
    WHEN a.Rating = 'T' THEN 13 \
    WHEN a.Rating = 'M' THEN 17 \
    WHEN a.Rating = 'AO' THEN 18 \
  END Rating_Age \
, CASE \
    WHEN a.Platform IN ('3DS','PSP','GBA','PSV','DS') THEN 1 ELSE 0 END AS is_Handheld \
, CASE WHEN c.Name IS NULL THEN 0 ELSE 1 END AS is_Exclusive \
, CASE WHEN a.NAME LIKE '%Mario%' THEN 1 ELSE 0 END AS is_Mario \
FROM \
  video_games_sales a LEFT JOIN \
  (\
    SELECT \
    Name \
  , Year_of_Release \
  FROM \
    video_games_sales \
  GROUP BY \
    Name \
  , Year_of_Release \
  HAVING \
    COUNT(DISTINCT platform) = 1 \
  ) c ON a.Name = c.Name AND a.Year_of_Release = c.Year_of_Release \
WHERE \
  a.Platform NOT IN ('PC','DC') \
  AND a.User_Score NOT IN ('','tbd')\
  AND a.Critic_Score IS NOT NULL\
  AND a.User_Score IS NOT NULL\
  AND a.Publisher IS NOT NULL\
  AND a.Developer IS NOT NULL\
  AND a.Rating <> 'RP' \
  AND a.Year_of_Release IS NOT NULL \
  AND a.Year_of_Release NOT IN ('N/A') \
  AND a.Name NOT IN ('Wii Sports Resort','Wii Sports','Wii Play','Wii Fit Plus','Wii Fit','Kinect Adventures!') \
  ")
                    

In [0]:
df.printSchema()

In [0]:
df.groupBy('Rating_Age').count().show()

In [0]:
df.show()

<B>Convert Catagorical Values into Numeric Values</B>

Use one-hot encoding (StringIndexer,OneHotEncoder)

In [0]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

In [0]:
categoricalCols = ['Console_Company','Console','Genre']
numericCols = ['Year_of_Release','Critic_Score','Critic_Count','User_Score','User_Count','Rating_Age','is_Exclusive', 'is_Console_Publisher_Match','is_Handheld','is_Mario']

In [0]:
indexOutputCols = [x + "Index" for x in categoricalCols]
oheOutputCols = [x + "OHE" for x in categoricalCols]

In [0]:
stringIndexer = StringIndexer(inputCols=categoricalCols
                             ,outputCols=indexOutputCols)
oheEncoder = OneHotEncoder(inputCols=indexOutputCols
                           ,outputCols=oheOutputCols)

In [0]:
indexed = stringIndexer.fit(df).transform(df)
encoded = oheEncoder.fit(indexed).transform(indexed)

In [0]:
encoded.printSchema()

<B>Create Feature column</B>

In [0]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [0]:
train_data,test_data = encoded.randomSplit([0.7,0.3])

In [0]:
assemblerInputs = oheOutputCols + numericCols

In [0]:
assembler = VectorAssembler(inputCols=assemblerInputs,outputCol="features")

In [0]:
output = assembler.transform(train_data)

<B> Linear Regression </B>

In [0]:
from pyspark.ml.regression import LinearRegression

In [0]:
lr = LinearRegression(featuresCol='features',labelCol="Global_Sales",maxIter=20)
lrModel = lr.fit(output)

In [0]:
type(lrModel)

<B>Create a Pipeline</B>

Apply the Model to the Test Data Set

In [0]:
from pyspark.ml import Pipeline

In [0]:
pipeline = Pipeline(stages=[assembler, lr])
pipelineModel = pipeline.fit(test_data)

In [0]:
test_df = pipelineModel.transform(test_data)

Review Actual North American Sales vs Prediction

In [0]:
test_df.select('Name','Console','Year_of_Release','Global_Sales','prediction').sort(test_df.prediction.desc()).show()

Evaluate Model

In [0]:
sales_prediction = lrModel.evaluate(test_df.select('features','Global_Sales'))

In [0]:
sales_prediction.rootMeanSquaredError

In [0]:
test_data.select('Global_Sales').describe().show()

In [0]:
sales_prediction.r2