# Neanderthal's Guide to Apache Spark

## Getting PySpark Running

In [0]:
# first install java development kit
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# download spark with hadoop
# notice the example now is in archie
!wget -q https://archive.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz
# untar and unzip the files
!tar xf spark-2.4.3-bin-hadoop2.7.tgz
# installing spark finder
!pip install -q findspark
import os
# now we tell our enviroment where to find Java and Spark
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7"
# finally install findspark to locate spark libraries
import findspark
findspark.init()

In [0]:
## Source : https://towardsdatascience.com/a-neanderthals-guide-to-apache-spark-in-python-9ef1f156d427

## Setting up

In [3]:
# https://realpython.com/pyspark-intro/
import pyspark
sc = pyspark.SparkContext('local[*]')

txt = sc.textFile('file:////usr/share/doc/python/copyright')
print(txt.count())

python_lines = txt.filter(lambda line: 'python' in line.lower())
print(python_lines.count())

316
52


In [5]:
type(txt)

pyspark.rdd.RDD

In [79]:
lineLength = python_lines.map(lambda s: len(s))
lineLength.persist()

PythonRDD[254] at RDD at PythonRDD.scala:53

In [81]:
totalLength = lineLength.reduce(lambda a,b:a+b)
print(totalLength)

3282


In [0]:
type(txt)

pyspark.rdd.RDD

In [82]:
python_lines.reduce(lambda a,b: a+b)

'This is the Debian GNU/Linux prepackaged version of the Python programminglanguage. Python was written by Guido van Rossum <guido@cwi.nl> and others.sources from ftp.python.org:/pub/python, based on the Debianization byPython was created in the early 1990s by Guido van Rossum at Stichtingas a successor of a language called ABC.  Guido remains Python\'sIn 1995, Guido continued his work on Python at the Corporation forIn May 2000, Guido and the Python core development team moved toBeOpen.com to form the BeOpen PythonLabs team.  In October of the sameyear, the PythonLabs team moved to Digital Creations (now ZopeCorporation, see http://www.zope.com).  In 2001, the Python SoftwareFoundation (PSF, see http://www.python.org/psf/) was formed, anon-profit organization created specifically to own Python-relatedAll Python releases are Open Source (see http://www.opensource.org forthe Open Source Definition).  Historically, most, but not all, Python(1) GPL-compatible doesn\'t mean that we\'re dis

In [0]:
# Find cubes of first numbers 1-20 which end with the digit 3
# List comprehensions example
mycubes = [el**3 for el in range(1,21) if el%2 == 1 and el**3 % 10 == 3]
mycubes

[343, 4913]

In [0]:
results = []
for el in range(1,21):
  if el%2 == 1:
    cube = el**3
    if cube % 10 == 3:
      results.append(cube)
print(results)

[343, 4913]


In [0]:
# filter example
myodds = list(filter(lambda el: el%2 == 1, range(1,10)))
myodds

[1, 3, 5, 7, 9]

In [0]:
def myOddFilter(num):
  return num%2 == 1

In [0]:
myOddFilter(5),myOddFilter(12)

(True, False)

In [0]:
list(filter(myOddFilter, range(1,10)))

[1, 3, 5, 7, 9]

In [0]:
list(map(lambda el: el**3, myodds))

[1, 27, 125, 343, 729]

In [0]:
myoddcubes = list(map(lambda el: el**3, filter(lambda el: el%2 == 1, range(1,21))))
myoddcubes

[1, 27, 125, 343, 729, 1331, 2197, 3375, 4913, 6859]

In [0]:
list(filter(lambda num: num%10 == 3,myoddcubes))

[343, 4913]

In [0]:
list(filter(lambda num: num%10 == 3,(map(lambda el: el**3, filter(lambda el: el%2 == 1, range(1,21))))))

[343, 4913]

In [0]:
# Reducer
from functools import reduce


In [0]:
reduce(lambda a,b: a/b, [1,2,3,5])

0.03333333333333333

In [0]:
reduce(lambda a,b: a+b, [1,2,3,5], 20)

31

In [0]:
reduce(lambda a,b: a+b, [2,3,5], 21)

31

In [0]:
1/2/3/5

0.03333333333333333

In [0]:
19**3

6859

Remember, a **PySpark program isn’t that much different from a regular Python program**, but the **execution model can be very different from a regular Python program**, especially if you’re **running on a cluster**.

In [83]:
type(python_lines)

pyspark.rdd.PipelinedRDD

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local") \
    .appName("Word Count") \
    .getOrCreate()

## Load Data

In [0]:
# how to upload into google Colab from your computer
from google.colab import files


In [11]:
files.upload()

Saving vgsales.csv to vgsales.csv




In [0]:
data = spark.read.csv('vgsales.csv',inferSchema=True, header =True)

In [13]:
type(data)

pyspark.sql.dataframe.DataFrame

In [14]:
data.columns

['Rank',
 'Name',
 'Platform',
 'Year',
 'Genre',
 'Publisher',
 'NA_Sales',
 'EU_Sales',
 'JP_Sales',
 'Other_Sales',
 'Global_Sales']

In [15]:
# in Spark Dataframe something that we could do in Pandas or numpy with mydata.shape
data.count(), len(data.columns)


(16598, 11)

In [0]:
import pandas as pd

In [0]:
pdf = data.toPandas()

In [18]:
pdf.head()

Unnamed: 0,Rank,Name,Platform,Year,Genre,Publisher,NA_Sales,EU_Sales,JP_Sales,Other_Sales,Global_Sales
0,1,Wii Sports,Wii,2006,Sports,Nintendo,41.49,29.02,3.77,8.46,82.74
1,2,Super Mario Bros.,NES,1985,Platform,Nintendo,29.08,3.58,6.81,0.77,40.24
2,3,Mario Kart Wii,Wii,2008,Racing,Nintendo,15.85,12.88,3.79,3.31,35.82
3,4,Wii Sports Resort,Wii,2009,Sports,Nintendo,15.75,11.01,3.28,2.96,33.0
4,5,Pokemon Red/Pokemon Blue,GB,1996,Role-Playing,Nintendo,11.27,8.89,10.22,1.0,31.37


## Viewing DataFrames

In [19]:
data.show(5)

+----+--------------------+--------+----+------------+---------+--------+--------+--------+-----------+------------+
|Rank|                Name|Platform|Year|       Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|
+----+--------------------+--------+----+------------+---------+--------+--------+--------+-----------+------------+
|   1|          Wii Sports|     Wii|2006|      Sports| Nintendo|   41.49|   29.02|    3.77|       8.46|       82.74|
|   2|   Super Mario Bros.|     NES|1985|    Platform| Nintendo|   29.08|    3.58|    6.81|       0.77|       40.24|
|   3|      Mario Kart Wii|     Wii|2008|      Racing| Nintendo|   15.85|   12.88|    3.79|       3.31|       35.82|
|   4|   Wii Sports Resort|     Wii|2009|      Sports| Nintendo|   15.75|   11.01|    3.28|       2.96|        33.0|
|   5|Pokemon Red/Pokem...|      GB|1996|Role-Playing| Nintendo|   11.27|    8.89|   10.22|        1.0|       31.37|
+----+--------------------+--------+----+------------+---------+

In [20]:
data.printSchema()

root
 |-- Rank: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: double (nullable = true)
 |-- EU_Sales: double (nullable = true)
 |-- JP_Sales: double (nullable = true)
 |-- Other_Sales: double (nullable = true)
 |-- Global_Sales: double (nullable = true)



In [21]:
data.dtypes

[('Rank', 'int'),
 ('Name', 'string'),
 ('Platform', 'string'),
 ('Year', 'string'),
 ('Genre', 'string'),
 ('Publisher', 'string'),
 ('NA_Sales', 'double'),
 ('EU_Sales', 'double'),
 ('JP_Sales', 'double'),
 ('Other_Sales', 'double'),
 ('Global_Sales', 'double')]

In [0]:
data.columns

['Rank',
 'Name',
 'Platform',
 'Year',
 'Genre',
 'Publisher',
 'NA_Sales',
 'EU_Sales',
 'JP_Sales',
 'Other_Sales',
 'Global_Sales']

In [23]:
data.select("Name","Platform","EU_Sales", "Global_Sales").show(15, truncate=False)

+---------------------------+--------+--------+------------+
|Name                       |Platform|EU_Sales|Global_Sales|
+---------------------------+--------+--------+------------+
|Wii Sports                 |Wii     |29.02   |82.74       |
|Super Mario Bros.          |NES     |3.58    |40.24       |
|Mario Kart Wii             |Wii     |12.88   |35.82       |
|Wii Sports Resort          |Wii     |11.01   |33.0        |
|Pokemon Red/Pokemon Blue   |GB      |8.89    |31.37       |
|Tetris                     |GB      |2.26    |30.26       |
|New Super Mario Bros.      |DS      |9.23    |30.01       |
|Wii Play                   |Wii     |9.2     |29.02       |
|New Super Mario Bros. Wii  |Wii     |7.06    |28.62       |
|Duck Hunt                  |NES     |0.63    |28.31       |
|Nintendogs                 |DS      |11.0    |24.76       |
|Mario Kart DS              |DS      |7.57    |23.42       |
|Pokemon Gold/Pokemon Silver|GB      |6.18    |23.1        |
|Wii Fit                

In [0]:
# So we have a bad dataset with missing columns
# Turns out we can get a different dataset at https://github.com/Yorko/mlcourse.ai/blob/master/data/video_games_sales.csv
# So try to upload that one!

In [25]:
files.upload()

KeyboardInterrupt: ignored

In [0]:
data = spark.read.csv('video_games_sales.csv',inferSchema=True, header =True)

In [27]:
data.select("Name","Platform","User_Score","User_Count").show(15, truncate=False)

+---------------------------+--------+----------+----------+
|Name                       |Platform|User_Score|User_Count|
+---------------------------+--------+----------+----------+
|Wii Sports                 |Wii     |8         |322       |
|Super Mario Bros.          |NES     |null      |null      |
|Mario Kart Wii             |Wii     |8.3       |709       |
|Wii Sports Resort          |Wii     |8         |192       |
|Pokemon Red/Pokemon Blue   |GB      |null      |null      |
|Tetris                     |GB      |null      |null      |
|New Super Mario Bros.      |DS      |8.5       |431       |
|Wii Play                   |Wii     |6.6       |129       |
|New Super Mario Bros. Wii  |Wii     |8.4       |594       |
|Duck Hunt                  |NES     |null      |null      |
|Nintendogs                 |DS      |null      |null      |
|Mario Kart DS              |DS      |8.6       |464       |
|Pokemon Gold/Pokemon Silver|GB      |null      |null      |
|Wii Fit                

## Summary Statistics

In [28]:
data.count()

16719

In [29]:
import pyspark.sql.functions as fun
dir(fun)

['Column',
 'DataFrame',
 'DataType',
 'PandasUDFType',
 'PythonEvalType',
 'SparkContext',
 'StringType',
 'UserDefinedFunction',
 '__all__',
 '__builtins__',
 '__cached__',
 '__doc__',
 '__file__',
 '__loader__',
 '__name__',
 '__package__',
 '__spec__',
 '_binary_mathfunctions',
 '_collect_list_doc',
 '_collect_set_doc',
 '_create_binary_mathfunction',
 '_create_column_from_literal',
 '_create_function',
 '_create_udf',
 '_create_window_function',
 '_functions',
 '_functions_1_4',
 '_functions_1_6',
 '_functions_2_1',
 '_functions_2_4',
 '_functions_deprecated',
 '_lit_doc',
 '_message',
 '_string_functions',
 '_test',
 '_to_java_column',
 '_to_seq',
 '_window_functions',
 '_wrap_deprecated_function',
 'abs',
 'acos',
 'add_months',
 'approxCountDistinct',
 'approx_count_distinct',
 'array',
 'array_contains',
 'array_distinct',
 'array_except',
 'array_intersect',
 'array_join',
 'array_max',
 'array_min',
 'array_position',
 'array_remove',
 'array_repeat',
 'array_sort',
 'array_

In [30]:
data.show(5)

+--------------------+--------+---------------+------------+---------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+---------+------+
|                Name|Platform|Year_of_Release|       Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Critic_Score|Critic_Count|User_Score|User_Count|Developer|Rating|
+--------------------+--------+---------------+------------+---------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+---------+------+
|          Wii Sports|     Wii|           2006|      Sports| Nintendo|   41.36|   28.96|    3.77|       8.45|       82.53|          76|          51|         8|       322| Nintendo|     E|
|   Super Mario Bros.|     NES|           1985|    Platform| Nintendo|   29.08|    3.58|    6.81|       0.77|       40.24|        null|        null|      null|      null|     null|  null|
|      Mario Kart Wii|     Wii|           2008|      Racing|

In [31]:
data.filter("Critic_Score is null").show(10)

+--------------------+--------+---------------+------------+---------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+---------+------+
|                Name|Platform|Year_of_Release|       Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Critic_Score|Critic_Count|User_Score|User_Count|Developer|Rating|
+--------------------+--------+---------------+------------+---------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+---------+------+
|   Super Mario Bros.|     NES|           1985|    Platform| Nintendo|   29.08|    3.58|    6.81|       0.77|       40.24|        null|        null|      null|      null|     null|  null|
|Pokemon Red/Pokem...|      GB|           1996|Role-Playing| Nintendo|   11.27|    8.89|   10.22|        1.0|       31.37|        null|        null|      null|      null|     null|  null|
|              Tetris|      GB|           1989|      Puzzle|

In [32]:
data.select(fun.isnull("User_Score")).count()

16719

In [33]:
data.describe(["User_Score","User_Count"]).show()

+-------+------------------+------------------+
|summary|        User_Score|        User_Count|
+-------+------------------+------------------+
|  count|             10015|              7590|
|   mean|7.1250461133070315|162.22990777338603|
| stddev|1.5000060936257986| 561.2823262473789|
|    min|                 0|                 4|
|    max|               tbd|             10665|
+-------+------------------+------------------+



In [34]:
data.describe(["User_Score","User_Count"]).count()

5

In [0]:
# this way we can filter out null reviews and count all rows with user score
data.count()-data.filter("User_Score is null").count()

10015

In [0]:
# how to filter now null
data.filter("User_Score is not null").count()

10015

In [35]:
data.groupBy("Platform").count().show(50)

+--------+-----+
|Platform|count|
+--------+-----+
|     3DO|    3|
|      PC|  974|
|     PS3| 1331|
|     NES|   98|
|      PS| 1197|
|      DC|   52|
|     GEN|   29|
|     PS2| 2161|
|     3DS|  520|
|    PCFX|    1|
|      GG|    1|
|    WiiU|  147|
|    SNES|  239|
|      GB|   98|
|     SCD|    6|
|     N64|  319|
|     PS4|  393|
|     PSP| 1209|
|    2600|  133|
|    XOne|  247|
|    X360| 1262|
|     GBA|  822|
|      WS|    6|
|     Wii| 1320|
|      GC|  556|
|     PSV|  432|
|      XB|  824|
|      DS| 2152|
|    TG16|    2|
|      NG|   12|
|     SAT|  173|
+--------+-----+



In [36]:
data.groupBy("Platform").count().orderBy("count", ascending=False).show(10)

+--------+-----+
|Platform|count|
+--------+-----+
|     PS2| 2161|
|      DS| 2152|
|     PS3| 1331|
|     Wii| 1320|
|    X360| 1262|
|     PSP| 1209|
|      PS| 1197|
|      PC|  974|
|      XB|  824|
|     GBA|  822|
+--------+-----+
only showing top 10 rows



In [37]:
data.groupBy("Publisher").count().orderBy("count",ascending=False).show(50,False)

+--------------------------------------+-----+
|Publisher                             |count|
+--------------------------------------+-----+
|Electronic Arts                       |1356 |
|Activision                            |985  |
|Namco Bandai Games                    |939  |
|Ubisoft                               |933  |
|Konami Digital Entertainment          |834  |
|THQ                                   |715  |
|Nintendo                              |706  |
|Sony Computer Entertainment           |687  |
|Sega                                  |638  |
|Take-Two Interactive                  |422  |
|Capcom                                |386  |
|Atari                                 |367  |
|Tecmo Koei                            |348  |
|Warner Bros. Interactive Entertainment|235  |
|Square Enix                           |234  |
|Disney Interactive Studios            |218  |
|Unknown                               |201  |
|Eidos Interactive                     |198  |
|Midway Games

## Filtering DataFrames

In [38]:
condition1 = (data.User_Score.isNotNull()) | (data.User_Count.isNotNull())
condition2 = data.User_Score != "tbd"
data2 = data.filter(condition1).filter(condition2)

data2.orderBy("EU_Sales", ascending=False).show(15,False)

+--------------------------------------------+--------+---------------+------------+--------------------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+----------------------+------+
|Name                                        |Platform|Year_of_Release|Genre       |Publisher           |NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Critic_Score|Critic_Count|User_Score|User_Count|Developer             |Rating|
+--------------------------------------------+--------+---------------+------------+--------------------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+----------------------+------+
|Wii Sports                                  |Wii     |2006           |Sports      |Nintendo            |41.36   |28.96   |3.77    |8.45       |82.53       |76          |51          |8         |322       |Nintendo              |E     |
|Mario Kart Wii                              |Wii     |2

In [39]:
data.select("Name","Platform","User_Score","User_Count").filter(data.User_Score == "tbd").show(truncate=False)

+----------------------------------------+--------+----------+----------+
|Name                                    |Platform|User_Score|User_Count|
+----------------------------------------+--------+----------+----------+
|Zumba Fitness                           |Wii     |tbd       |null      |
|Namco Museum: 50th Anniversary          |PS2     |tbd       |null      |
|Zumba Fitness 2                         |Wii     |tbd       |null      |
|uDraw Studio                            |Wii     |tbd       |null      |
|Frogger's Adventures: Temple of the Frog|GBA     |tbd       |null      |
|Just Dance Kids                         |Wii     |tbd       |null      |
|Dance Dance Revolution X2               |PS2     |tbd       |null      |
|The Incredibles                         |GBA     |tbd       |null      |
|Who wants to be a millionaire           |PC      |tbd       |null      |
|Tetris Worlds                           |GBA     |tbd       |null      |
|Imagine: Teacher                     

In [40]:
data2.select("Name","Platform","User_Score","User_Count").describe(["User_Score","User_Count"]).show()

+-------+------------------+------------------+
|summary|        User_Score|        User_Count|
+-------+------------------+------------------+
|  count|              7590|              7590|
|   mean|7.1250461133070315|162.22990777338603|
| stddev|1.5000060936257986| 561.2823262473789|
|    min|                 0|                 4|
|    max|               9.7|             10665|
+-------+------------------+------------------+



In [41]:
data2.select("Name","Platform","User_Score","User_Count").filter(data.User_Count > 100).orderBy("User_Score", ascending=False).show(15, truncate=False)

+-------------------------------------+--------+----------+----------+
|Name                                 |Platform|User_Score|User_Count|
+-------------------------------------+--------+----------+----------+
|Harvest Moon: Friends of Mineral Town|GBA     |9.6       |116       |
|Golden Sun: The Lost Age             |GBA     |9.5       |150       |
|Cory in the House                    |DS      |9.5       |1273      |
|Metal Gear Solid                     |PS      |9.4       |918       |
|Shenmue                              |DC      |9.4       |201       |
|Shenmue II                           |DC      |9.4       |201       |
|Paper Mario: The Thousand-Year Door  |GC      |9.4       |306       |
|Resident Evil 4                      |GC      |9.4       |767       |
|Castlevania: Symphony of the Night   |PS      |9.4       |358       |
|Metal Gear Solid 3: Snake Eater      |PS2     |9.3       |955       |
|Metroid Prime                        |GC      |9.3       |747       |
|Metro

In [42]:
data2.show()

+--------------------+--------+---------------+--------+--------------------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+--------------------+------+
|                Name|Platform|Year_of_Release|   Genre|           Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Critic_Score|Critic_Count|User_Score|User_Count|           Developer|Rating|
+--------------------+--------+---------------+--------+--------------------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+--------------------+------+
|          Wii Sports|     Wii|           2006|  Sports|            Nintendo|   41.36|   28.96|    3.77|       8.45|       82.53|          76|          51|         8|       322|            Nintendo|     E|
|      Mario Kart Wii|     Wii|           2008|  Racing|            Nintendo|   15.68|   12.76|    3.79|       3.29|       35.52|          82|          73|       8.3|       709

## Building a Model in PySpark

### Linear Regression

In [43]:
data2.show(5)

+--------------------+--------+---------------+--------+---------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+---------+------+
|                Name|Platform|Year_of_Release|   Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Critic_Score|Critic_Count|User_Score|User_Count|Developer|Rating|
+--------------------+--------+---------------+--------+---------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+---------+------+
|          Wii Sports|     Wii|           2006|  Sports| Nintendo|   41.36|   28.96|    3.77|       8.45|       82.53|          76|          51|         8|       322| Nintendo|     E|
|      Mario Kart Wii|     Wii|           2008|  Racing| Nintendo|   15.68|   12.76|    3.79|       3.29|       35.52|          82|          73|       8.3|       709| Nintendo|     E|
|   Wii Sports Resort|     Wii|           2009|  Sports| Nintendo|   15.61|   10

In [44]:
data2.select("Year_of_Release").distinct().orderBy("Year_of_Release", ascending=True).show(50,False)

+---------------+
|Year_of_Release|
+---------------+
|1985           |
|1988           |
|1992           |
|1994           |
|1996           |
|1997           |
|1998           |
|1999           |
|2000           |
|2001           |
|2002           |
|2003           |
|2004           |
|2005           |
|2006           |
|2007           |
|2008           |
|2009           |
|2010           |
|2011           |
|2012           |
|2013           |
|2014           |
|2015           |
|2016           |
|N/A            |
+---------------+



In [45]:
data2.groupBy("Publisher").count().orderBy("count",ascending=False).show()

+--------------------+-----+
|           Publisher|count|
+--------------------+-----+
|     Electronic Arts| 1026|
|          Activision|  573|
|             Ubisoft|  557|
|                 THQ|  342|
|Sony Computer Ent...|  327|
|Take-Two Interactive|  302|
|                Sega|  297|
|            Nintendo|  294|
|Konami Digital En...|  270|
|  Namco Bandai Games|  265|
|              Capcom|  204|
|               Atari|  186|
|Warner Bros. Inte...|  169|
|Microsoft Game St...|  146|
|          Tecmo Koei|  144|
|         Square Enix|  140|
|   Eidos Interactive|  131|
|       Vivendi Games|  123|
|         Codemasters|  116|
|        Midway Games|  111|
+--------------------+-----+
only showing top 20 rows



In [46]:
data2 = data2.filter(data2.Year_of_Release != "N/A")
data2.select("Year_of_Release").distinct().orderBy("Year_of_Release", ascending=True).show(50,False)

+---------------+
|Year_of_Release|
+---------------+
|1985           |
|1988           |
|1992           |
|1994           |
|1996           |
|1997           |
|1998           |
|1999           |
|2000           |
|2001           |
|2002           |
|2003           |
|2004           |
|2005           |
|2006           |
|2007           |
|2008           |
|2009           |
|2010           |
|2011           |
|2012           |
|2013           |
|2014           |
|2015           |
|2016           |
+---------------+



In [47]:
# Here we create 4 new columns
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DoubleType
data2 = data2.withColumn("Year_of_Release", data2["Year_of_Release"].cast(DoubleType()))
data2 = data2.withColumn("User_Score", data2["User_Score"].cast(DoubleType()))
data2 = data2.withColumn("User_Count", data2["User_Count"].cast(DoubleType()))
data2 = data2.withColumn("Critic_Score", data2["Critic_Score"].cast(DoubleType()))

data2.dtypes

[('Name', 'string'),
 ('Platform', 'string'),
 ('Year_of_Release', 'double'),
 ('Genre', 'string'),
 ('Publisher', 'string'),
 ('NA_Sales', 'double'),
 ('EU_Sales', 'double'),
 ('JP_Sales', 'double'),
 ('Other_Sales', 'double'),
 ('Global_Sales', 'double'),
 ('Critic_Score', 'double'),
 ('Critic_Count', 'int'),
 ('User_Score', 'double'),
 ('User_Count', 'double'),
 ('Developer', 'string'),
 ('Rating', 'string')]

In [0]:
# https://spark.apache.org/docs/latest/ml-features.html#vectorassembler
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [49]:
#Input all the features in one vector column
assembler = VectorAssembler(inputCols=['Year_of_Release', 'Global_Sales', 'Critic_Score', 'User_Count'], outputCol = 'predictors')
output = assembler.transform(data2)
#Input vs Output
finalized_data = output.select("predictors","User_Score")
finalized_data.show(5)


+--------------------+----------+
|          predictors|User_Score|
+--------------------+----------+
|[2006.0,82.53,76....|       8.0|
|[2008.0,35.52,82....|       8.3|
|[2009.0,32.77,80....|       8.0|
|[2006.0,29.8,89.0...|       8.5|
|[2006.0,28.92,58....|       6.6|
+--------------------+----------+
only showing top 5 rows



In [50]:
finalized_data.select("predictors").show(5)

+--------------------+
|          predictors|
+--------------------+
|[2006.0,82.53,76....|
|[2008.0,35.52,82....|
|[2009.0,32.77,80....|
|[2006.0,29.8,89.0...|
|[2006.0,28.92,58....|
+--------------------+
only showing top 5 rows



In [51]:
a2 = VectorAssembler(inputCols=['Year_of_Release', 'Global_Sales'], outputCol = 'predictors')
o2 = a2.transform(data2)
#Input vs Output
f2 = o2.select("predictors","User_Score")
f2.show(5)

+--------------+----------+
|    predictors|User_Score|
+--------------+----------+
|[2006.0,82.53]|       8.0|
|[2008.0,35.52]|       8.3|
|[2009.0,32.77]|       8.0|
| [2006.0,29.8]|       8.5|
|[2006.0,28.92]|       6.6|
+--------------+----------+
only showing top 5 rows



In [52]:
finalized_data = output.select("predictors","User_Score")
finalized_data.show(5)


+--------------------+----------+
|          predictors|User_Score|
+--------------------+----------+
|[2006.0,82.53,76....|       8.0|
|[2008.0,35.52,82....|       8.3|
|[2009.0,32.77,80....|       8.0|
|[2006.0,29.8,89.0...|       8.5|
|[2006.0,28.92,58....|       6.6|
+--------------------+----------+
only showing top 5 rows



In [0]:
# https://spark.apache.org/docs/latest/ml-features.html
from pyspark.ml.regression import LinearRegression

In [0]:
tr,td = f2.randomSplit([0.8,0.2])

In [55]:
tr.show(5)

+-------------+----------+
|   predictors|User_Score|
+-------------+----------+
|[1985.0,0.03]|       5.8|
|[1992.0,0.03]|       8.2|
|[1994.0,1.27]|       6.3|
|[1996.0,0.14]|       7.4|
|[1996.0,1.03]|       8.5|
+-------------+----------+
only showing top 5 rows



In [56]:
lr = LinearRegression(
    featuresCol = 'predictors', 
    labelCol = 'User_Score')

lrModel = lr.fit(tr)

#here we evaluate mode against test data the ones that we randomly split from whole dataset
p2 = lrModel.evaluate(td)

p2.predictions.show(5)

+-------------+----------+-----------------+
|   predictors|User_Score|       prediction|
+-------------+----------+-----------------+
|[1988.0,0.03]|       2.2|8.879214931431108|
|[1996.0,0.25]|       8.7|8.161797353374112|
|[1996.0,1.59]|       8.7|8.257563576189284|
|[1996.0,4.63]|       8.6|8.474824260784857|
|[1996.0,5.05]|       9.0|8.504840539577685|
+-------------+----------+-----------------+
only showing top 5 rows



In [58]:
assembler.setHandleInvalid("skip").transform(finalized_data).show

IllegalArgumentException: ignored

In [59]:
type(finalized_data)

pyspark.sql.dataframe.DataFrame

In [0]:
# https://stackoverflow.com/questions/37262762/filter-pyspark-dataframe-column-with-none-value/37262973

In [0]:
df = finalized_data

In [62]:
df.columns

['predictors', 'User_Score']

In [0]:
df = df.na.drop(subset=["predictors"])

In [0]:
df = df.na.drop(subset=["User_Score"])

In [75]:
df.filter("predictors is not NULL").show()

+--------------------+----------+
|          predictors|User_Score|
+--------------------+----------+
|[2006.0,82.53,76....|       8.0|
|[2008.0,35.52,82....|       8.3|
|[2009.0,32.77,80....|       8.0|
|[2006.0,29.8,89.0...|       8.5|
|[2006.0,28.92,58....|       6.6|
|[2009.0,28.32,87....|       8.4|
|[2005.0,23.21,91....|       8.6|
|[2007.0,22.7,80.0...|       7.7|
|[2010.0,21.81,61....|       6.3|
|[2009.0,21.79,80....|       7.4|
|[2013.0,21.04,97....|       8.2|
|[2004.0,20.81,95....|       9.0|
|[2005.0,20.15,77....|       7.9|
|[2013.0,16.27,97....|       8.1|
|[2002.0,16.15,95....|       8.7|
|[2005.0,15.29,77....|       7.1|
|[2001.0,14.98,95....|       8.4|
|[2011.0,14.73,88....|       3.4|
|[2010.0,14.61,87....|       6.3|
|[2012.0,13.79,83....|       5.3|
+--------------------+----------+
only showing top 20 rows



In [76]:
df.filter("predictors is NULL").show()

Py4JJavaError: ignored

In [71]:
#Split training and testing data
train_data,test_data = df.randomSplit([0.8,0.2])

lr = LinearRegression(
    featuresCol = 'predictors', 
    labelCol = 'User_Score')

lrModel = lr.fit(train_data)

pred = lrModel.evaluate(test_data)

pred.predictions.show(5)

Py4JJavaError: ignored

In [0]:
# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

Coefficients: [-0.08981772769332257,0.08749933314802805]
Intercept: 187.38738531475448
numIterations: 1
objectiveHistory: [0.0]
+--------------------+
|           residuals|
+--------------------+
|   -3.30182082350363|
|  -6.632367640423655|
| -0.2730967296503657|
| -0.7234507455233707|
|  0.5669243278303604|
| 0.29867484797489396|
|  0.4496752214119901|
| -0.8791995585268406|
| 0.08367724864198145|
| 0.44692752871981156|
|  0.2865529888476832|
| -0.5222581045208017|
| 0.07774189547919796|
| -0.2275080645096752|
|  0.9584920421866343|
| -0.2581328311114932|
| 0.13486722223667513|
| 0.10074248230894156|
| 0.19024256233116787|
|-0.22288233764104426|
+--------------------+
only showing top 20 rows

RMSE: 1.441367
r2: 0.075587


### Evaluating model

In [0]:
pred = p2

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(
    labelCol="User_Score", 
    predictionCol="prediction", 
    metricName="rmse")
# Root Mean Square Error
rmse = eval.evaluate(pred.predictions)
print("RMSE: %.3f" % rmse)
# Mean Square Error
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)
# Mean Absolute Error
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)
# r2 - coefficient of determination
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

RMSE: 1.452
MSE: 2.107
MAE: 1.107
r2: 0.087


In [0]:
data2.columns

['Name',
 'Platform',
 'Year_of_Release',
 'Genre',
 'Publisher',
 'NA_Sales',
 'EU_Sales',
 'JP_Sales',
 'Other_Sales',
 'Global_Sales',
 'Critic_Score',
 'Critic_Count',
 'User_Score',
 'User_Count',
 'Developer',
 'Rating']