In [1]:

from pyspark.sql import SparkSession

# @hidden_cell
# This function is used to setup the access of Spark to your Object Storage. The definition contains your credentials.
# You might want to remove those credentials before you share your notebook.
def set_hadoop_config_with_credentials_3a500b3a65a940508dadf71c13437e1d(name):
    """This function sets the Hadoop configuration so it is possible to
    access data from Bluemix Object Storage using Spark"""

    prefix = 'fs.swift.service.' + name
    hconf = sc._jsc.hadoopConfiguration()
    hconf.set(prefix + '.auth.url', 'https://identity.open.softlayer.com'+'/v3/auth/tokens')
    hconf.set(prefix + '.auth.endpoint.prefix', 'endpoints')
    hconf.set(prefix + '.tenant', '266ba3381db2401faf8aaa2af05a9146')
    hconf.set(prefix + '.username', '88d895f0866c474693d73f58ef63c579')
    hconf.set(prefix + '.password', 'XI}9#K8V~!VoafSy')
    hconf.setInt(prefix + '.http.port', 8080)
    hconf.set(prefix + '.region', 'dallas')
    hconf.setBoolean(prefix + '.public', False)

# you can choose any name
name = 'keystone'
set_hadoop_config_with_credentials_3a500b3a65a940508dadf71c13437e1d(name)

spark = SparkSession.builder.getOrCreate()

df = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load('swift://VideoGamesSalesanalysisandpredictivemodel.' + name + '/Video_Games_Sales_as_at_22_Dec_2016.csv')


In [2]:
df.show(2)

+-----------------+--------+---------------+--------+---------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+---------+------+
|             Name|Platform|Year_of_Release|   Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Critic_Score|Critic_Count|User_Score|User_Count|Developer|Rating|
+-----------------+--------+---------------+--------+---------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+---------+------+
|       Wii Sports|     Wii|           2006|  Sports| Nintendo|   41.36|   28.96|    3.77|       8.45|       82.53|          76|          51|         8|       322| Nintendo|     E|
|Super Mario Bros.|     NES|           1985|Platform| Nintendo|   29.08|    3.58|    6.81|       0.77|       40.24|        null|        null|      null|      null|     null|  null|
+-----------------+--------+---------------+--------+---------+--------+--------+--------+-----

In [3]:
df.registerTempTable("df")
cols = ["Critic_Score", "Critic_Count", "User_Score", "User_Count"]
queries = ["SELECT  Name, Platform, Year_of_Release, Genre, Publisher, CAST(NA_Sales AS FLOAT), CAST(EU_Sales AS FLOAT), CAST(JP_Sales AS FLOAT),\
           CAST(Other_Sales AS FLOAT), CAST(Global_Sales AS FLOAT), CAST(" + col + " AS FLOAT) AS label FROM df WHERE "+col for col in cols]
# Preparing training & testing dataframes
test_critic_score = sqlContext.sql(queries[0]+" IS NULL")
train_critic_score = sqlContext.sql(queries[0]+" IS NOT NULL")

test_Critic_Count = sqlContext.sql(queries[1]+" IS NULL")
train_Critic_Count = sqlContext.sql(queries[1]+" IS NOT NULL")

test_User_Score = sqlContext.sql(queries[2]+" IS NULL")
train_User_Score = sqlContext.sql(queries[2]+" IS NOT NULL")
train_User_Score.registerTempTable("us")
train_User_Score = sqlContext.sql("SELECT Name, Platform, Year_of_Release, Genre, Publisher, NA_Sales, EU_Sales, JP_Sales, Other_Sales, Global_Sales, COALESCE(label, 0.0) AS label FROM us")

test_User_Count = sqlContext.sql(queries[3]+" IS NULL")
train_User_Count = sqlContext.sql(queries[3]+" IS NOT NULL")

In [4]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

assembler = VectorAssembler(
    inputCols=["NA_Sales", "EU_Sales", "JP_Sales", "Other_Sales"],
    outputCol="features")
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
pipeline = Pipeline(stages=[assembler, lr])

critic_score_model = pipeline.fit(train_critic_score)
Critic_Count_model = pipeline.fit(train_Critic_Count)
User_Score_model = pipeline.fit(train_User_Score)
User_Count_model = pipeline.fit(train_User_Count)

In [5]:
cs = critic_score_model.transform(test_critic_score)
cc = Critic_Count_model.transform(test_Critic_Count)
us = User_Score_model.transform(test_User_Score)
uc = User_Count_model.transform(test_User_Count)

In [6]:
# cols = ["Critic_Score", "Critic_Count", "User_Score", "User_Count"]
new_queries = ["SELECT  Name, Platform, Year_of_Release, Genre, Publisher, NA_Sales, EU_Sales, JP_Sales, Other_Sales, Global_Sales, label AS " + col + " FROM "+col for col in cols]
train_critic_score.registerTempTable("Critic_Score")
tcs = sqlContext.sql(new_queries[0])
train_Critic_Count.registerTempTable("Critic_Count")
tcc = sqlContext.sql(new_queries[1])
train_User_Score.registerTempTable("User_Score")
tus = sqlContext.sql(new_queries[2])
train_User_Count.registerTempTable("User_Count")
tuc = sqlContext.sql(new_queries[3])

In [7]:
cs_filled = cs.select(cs.Name, cs.Platform, cs.Year_of_Release, cs.Genre, cs.Publisher, cs.NA_Sales, cs.EU_Sales, cs.JP_Sales, cs.Other_Sales, cs.Global_Sales, cs.prediction.alias("Critic_Score"))\
              .unionAll(tcs)
            
cc_filled = cc.select(cc.Name, cc.Platform, cc.Year_of_Release, cc.Genre, cc.Publisher, cc.NA_Sales, cc.EU_Sales, cc.JP_Sales, cc.Other_Sales, cc.Global_Sales, cc.prediction.alias("Critic_Count"))\
            .unionAll(tcc)
    
us_filled = us.select(us.Name, us.Platform, us.Year_of_Release, us.Genre, us.Publisher, us.NA_Sales, us.EU_Sales, us.JP_Sales, us.Other_Sales, us.Global_Sales, us.prediction.alias("User_Score"))\
            .unionAll(tus)
    
uc_filled = uc.select(uc.Name, uc.Platform, uc.Year_of_Release, uc.Genre, uc.Publisher, uc.NA_Sales, uc.EU_Sales, uc.JP_Sales, uc.Other_Sales, uc.Global_Sales, uc.prediction.alias("User_Count"))\
            .unionAll(tuc)

In [8]:
new_cols = [cs_filled.Name, cs_filled.Platform, cs_filled.Year_of_Release, cs_filled.Genre, cs_filled.Publisher, cs_filled.NA_Sales,
            cs_filled.EU_Sales, cs_filled.JP_Sales, cs_filled.Other_Sales, cs_filled.Global_Sales, cs_filled.Critic_Score, cc_filled.Critic_Count]
df_filled = cs_filled.join(cc_filled, (cs_filled.Name == cc_filled.Name) & (cs_filled.Platform == cc_filled.Platform) & (cs_filled.Year_of_Release == cc_filled.Year_of_Release)).select(new_cols)
new_cols.append(us_filled.User_Score)
df_filled = df_filled.join(us_filled, (df_filled.Name == us_filled.Name) & (df_filled.Platform == us_filled.Platform) & (df_filled.Year_of_Release == us_filled.Year_of_Release)).select(new_cols)
new_cols.append(uc_filled.User_Count)
df_filled = df_filled.join(uc_filled, (df_filled.Name == uc_filled.Name) & (df_filled.Platform == uc_filled.Platform) & (df_filled.Year_of_Release == uc_filled.Year_of_Release)).select(new_cols)

In [9]:
# df_filled.show()

In [10]:
from pyspark.sql import functions as F
platforms = df_filled.select("Platform").distinct().rdd.flatMap(lambda x: x).map(lambda x: "c"+x).collect()
dummy_platforms = [F.when(F.col("Platform") == pf[1:], 1).otherwise(0).alias(pf)
         for pf in platforms]

genres = df_filled.select("Genre").distinct().rdd.flatMap(lambda x: x).map(lambda x: "c"+x).collect()
dummy_genres = [F.when(F.col("Genre") == ge[1:], 1).otherwise(0).alias(ge)
         for ge in genres]
publishers = df_filled.select("Publisher").distinct().rdd.flatMap(lambda x: x).map(lambda x: "c"+x+str("_pub")).collect()
dummy_publishers = [F.when(F.col("Publisher") == pub[1:-4], 1).otherwise(0).alias(pub)
         for pub in publishers]


In [11]:
dummifier = dummy_platforms + dummy_genres
df_dummified = df_filled.select("Name", df_filled.Year_of_Release.cast("integer"), "Global_Sales", *dummifier).na.drop()

In [12]:
(training, testing) = df_dummified.randomSplit([0.8, 0.2])
dummy_assembler = VectorAssembler(
    inputCols=df_dummified.columns[2:],
    outputCol="features")
lr_gs = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, labelCol="Global_Sales", featuresCol="features")
pipeline_gs = Pipeline(stages=[dummy_assembler, lr_gs])

In [13]:
# df_dummified.show()

In [14]:
model_gs = pipeline_gs.fit(training)

In [15]:
preds = model_gs.transform(testing).select("Name", "Global_Sales", "prediction")
withMSE = preds.withColumn("MSE", (preds.Global_Sales - preds.prediction)**2)
withMSE.show()

+--------------------+------------+-------------------+--------------------+
|                Name|Global_Sales|         prediction|                 MSE|
+--------------------+------------+-------------------+--------------------+
| Haikyu!! Cross T...|        0.04|0.13093090428568666|0.008268429516809835|
|.hack//G.U. Vol.2...|        0.16|0.22885244000927907|0.004740658987702413|
|.hack//Quarantine...|        0.18|  0.245172705082865|0.004247480555515126|
|007: Tomorrow Nev...|        3.21| 2.7176915626605163| 0.24236763503579883|
|        1/2 Summer +|        0.01|0.10645052035478854|0.009302702919826222|
|   100 Classic Games|        0.04|0.13093090428568666|0.008268429516809835|
|1080°: TenEighty ...|        2.03| 1.7547963853814084| 0.07573701375186198|
|                1942|         1.0| 0.9143032083137337|0.007343940105319324|
|2 Games in 1 Doub...|        0.12| 0.1962119281014149|0.005808258393767884|
|2 Games in 1: Son...|        1.83|  1.591593880560011| 0.05683749824899754|

In [16]:
MSE = withMSE.select("MSE").groupBy().sum().select(F.col("sum(MSE)").alias("mse")).collect()[0].mse / withMSE.count()

In [17]:
print MSE

0.0767408779833


In [18]:
# Clustering
from pyspark.ml.clustering import BisectingKMeans
# Trains a k-means model.
bkm = BisectingKMeans().setK(10).setSeed(1)
clust_pipeline = Pipeline(stages=[dummy_assembler, bkm])

In [19]:
cluster_model = clust_pipeline.fit(df_dummified)

In [21]:
# Evaluate clustering by computing Within Set Sum of Squared Errors.
bkm_model = cluster_model.stages[1]
new_df = dummy_assembler.transform(df_dummified)

cost = bkm_model.computeCost(new_df)
print("Within Set Sum of Squared Errors = " + str(cost))

# Shows the result.
print("Cluster Centers: ")
centers = bkm_model.clusterCenters()
for center in centers:
    print(center)

Within Set Sum of Squared Errors = 27116.5368132
Cluster Centers: 
[  2.85362595e-01   4.40399295e-04   9.38050499e-02   9.18966530e-02
   6.60598943e-03   1.17586612e-01   6.31238990e-03   1.90839695e-03
   0.00000000e+00   2.53963594e-02   0.00000000e+00   1.46799765e-04
   6.89958896e-03   2.06987669e-02   5.43159131e-03   4.40399295e-04
   3.36171462e-02   2.08455666e-02   9.80622431e-02   6.89958896e-03
   1.57075749e-02   9.20434527e-02   6.57662948e-02   2.93599530e-04
   1.03640634e-01   5.46095126e-02   2.51027598e-02   8.36758661e-02
   0.00000000e+00   2.93599530e-04   1.76159718e-03   2.01115678e-02
   1.23605402e-01   2.43394011e-01   1.29036994e-01   0.00000000e+00
   1.41368174e-01   0.00000000e+00   8.66118614e-02   4.38931298e-02
   8.98414563e-02   0.00000000e+00   7.47210804e-02   6.75278920e-02]
[  2.45186656e-01   0.00000000e+00   3.01826847e-02   5.90415674e-02
   2.11808313e-03   4.20969023e-02   1.05904157e-03   1.05904157e-03
   0.00000000e+00   3.36245698e-02 

In [22]:
from pyspark.ml.clustering import KMeans
kmeans = KMeans().setK(2).setSeed(1)
kmeans_pipeline = Pipeline(stages=[dummy_assembler, kmeans])
model = kmeans_pipeline.fit(df_dummified)

In [23]:
# Evaluate clustering by computing Within Set Sum of Squared Errors.
k_model = cluster_model.stages[1]
new_df = dummy_assembler.transform(df_dummified)

cost = k_model.computeCost(new_df)
print("Within Set Sum of Squared Errors = " + str(cost))

# Shows the result.
print("Cluster Centers: ")
centers = k_model.clusterCenters()
for center in centers:
    print(center)

Within Set Sum of Squared Errors = 27116.5368132
Cluster Centers: 
[  2.85362595e-01   4.40399295e-04   9.38050499e-02   9.18966530e-02
   6.60598943e-03   1.17586612e-01   6.31238990e-03   1.90839695e-03
   0.00000000e+00   2.53963594e-02   0.00000000e+00   1.46799765e-04
   6.89958896e-03   2.06987669e-02   5.43159131e-03   4.40399295e-04
   3.36171462e-02   2.08455666e-02   9.80622431e-02   6.89958896e-03
   1.57075749e-02   9.20434527e-02   6.57662948e-02   2.93599530e-04
   1.03640634e-01   5.46095126e-02   2.51027598e-02   8.36758661e-02
   0.00000000e+00   2.93599530e-04   1.76159718e-03   2.01115678e-02
   1.23605402e-01   2.43394011e-01   1.29036994e-01   0.00000000e+00
   1.41368174e-01   0.00000000e+00   8.66118614e-02   4.38931298e-02
   8.98414563e-02   0.00000000e+00   7.47210804e-02   6.75278920e-02]
[  2.45186656e-01   0.00000000e+00   3.01826847e-02   5.90415674e-02
   2.11808313e-03   4.20969023e-02   1.05904157e-03   1.05904157e-03
   0.00000000e+00   3.36245698e-02 

In [24]:
kmeans = KMeans().setK(10).setSeed(1)
kmeans_pipeline = Pipeline(stages=[dummy_assembler, kmeans])
model = kmeans_pipeline.fit(df_dummified)

# Evaluate clustering by computing Within Set Sum of Squared Errors.
k_model = cluster_model.stages[1]
new_df = dummy_assembler.transform(df_dummified)

cost = k_model.computeCost(new_df)
print("Within Set Sum of Squared Errors = " + str(cost))

# Shows the result.
print("Cluster Centers: ")
centers = k_model.clusterCenters()
for center in centers:
    print(center)

Within Set Sum of Squared Errors = 27116.5368132
Cluster Centers: 
[  2.85362595e-01   4.40399295e-04   9.38050499e-02   9.18966530e-02
   6.60598943e-03   1.17586612e-01   6.31238990e-03   1.90839695e-03
   0.00000000e+00   2.53963594e-02   0.00000000e+00   1.46799765e-04
   6.89958896e-03   2.06987669e-02   5.43159131e-03   4.40399295e-04
   3.36171462e-02   2.08455666e-02   9.80622431e-02   6.89958896e-03
   1.57075749e-02   9.20434527e-02   6.57662948e-02   2.93599530e-04
   1.03640634e-01   5.46095126e-02   2.51027598e-02   8.36758661e-02
   0.00000000e+00   2.93599530e-04   1.76159718e-03   2.01115678e-02
   1.23605402e-01   2.43394011e-01   1.29036994e-01   0.00000000e+00
   1.41368174e-01   0.00000000e+00   8.66118614e-02   4.38931298e-02
   8.98414563e-02   0.00000000e+00   7.47210804e-02   6.75278920e-02]
[  2.45186656e-01   0.00000000e+00   3.01826847e-02   5.90415674e-02
   2.11808313e-03   4.20969023e-02   1.05904157e-03   1.05904157e-03
   0.00000000e+00   3.36245698e-02 

In [25]:
from pyspark.ml.clustering import LDA, LDAModel
lda = LDA(k=5, maxIter=10)
lda_pipeline = Pipeline(stages=[dummy_assembler, lda])
model = lda_pipeline.fit(df_dummified)

In [26]:
lda_model = model.stages[1]
games_indices = lda_model.describeTopics(5)

In [27]:
games_indices.show()

+-----+-------------------+--------------------+
|topic|        termIndices|         termWeights|
+-----+-------------------+--------------------+
|    0|[34, 36, 0, 21, 27]|[0.18539371401488...|
|    1| [0, 37, 32, 28, 8]|[0.35003419232833...|
|    2| [0, 22, 38, 43, 2]|[0.21794375839907...|
|    3|  [33, 0, 8, 5, 41]|[0.19239178826433...|
|    4| [41, 3, 0, 18, 35]|[0.26867594974359...|
+-----+-------------------+--------------------+



In [28]:
# Shows the result
transformed = model.transform(df_dummified)
transformed.select("Name","topicDistribution").show()

+--------------------+--------------------+
|                Name|   topicDistribution|
+--------------------+--------------------+
|   Super Mario Bros.|[0.00455799043403...|
|Pokemon Red/Pokem...|[0.00574606639449...|
|              Tetris|[0.00592659574989...|
|           Duck Hunt|[0.00641472615818...|
|          Nintendogs|[0.00712262544412...|
|Pokemon Gold/Poke...|[0.00757279784693...|
|   Super Mario World|[0.00859513676214...|
|Pokemon Diamond/P...|[0.00930063720866...|
|    Super Mario Land|[0.00932543946233...|
| Super Mario Bros. 3|[0.00972176118638...|
|Pokemon Ruby/Poke...|[0.01049553162710...|
|Pokemon Black/Pok...|[0.01090190843330...|
|Pokémon Yellow: S...|[0.01122444682682...|
|Call of Duty: Bla...|[0.01164670616502...|
| Pokemon X/Pokemon Y|[0.01123554837791...|
|      Super Mario 64|[0.01324844020005...|
|Pokemon HeartGold...|[0.01334532395045...|
|Pokemon Omega Rub...|[0.01348183477211...|
|Super Mario Land ...|[0.01389685933960...|
|Super Mario All-S...|[0.0149827

In [29]:
games_names = df_dummified.select("Name").distinct().rdd.flatMap(lambda x: x).collect()
games = games_indices.select("termIndices").rdd\
                     .map(lambda row:[games_names[row.termIndices[i]] for i in range(5)]).collect()
for group in range(len(games)):
    print "Group" + str(group)
    for game in games[group]:
        print game
    print

Group0
All-Star Baseball 2003
Sherlock Holmes: The Mystery of the Mummy
The Legend of Zelda: Oracle of Ages
July
Calcio Bit

Group1
The Legend of Zelda: Oracle of Ages
RalliSport Challenge 2
Poupee Girl DS 2: Elegant Mint / Sweet Pink Style
Angelique: Maren no Rokukishi
Yu-Gi-Oh! GX: Tag Force 2

Group2
The Legend of Zelda: Oracle of Ages
Raw Danger! (JP sales)
The Last Airbender
Dungeons 2
Joust

Group3
Call of Duty Black Ops: Declassified
The Legend of Zelda: Oracle of Ages
Yu-Gi-Oh! GX: Tag Force 2
Paws & Claws: Dogs & Cats Best Friends
Men of War: Assault Squad

Group4
Men of War: Assault Squad
J-League Soccer: Prime Goal
The Legend of Zelda: Oracle of Ages
Crayon Shin-Chan Shokkugan! Densetsu o Yobu Omake Daiketsusen!!
RIFT

