# PySpark

Spark is a data processing framework written in Java. It's made to run on clusters (several computers connected together) to process "big data" (data that doesn't fit on one computer).

The python interface to Spark is `PySpark`

The entry point to using Spark SQL is an object called SparkSession. It initiates a Spark Application which all the code for that Session will run on.

In [1]:
# This can help getting pyspark running
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://www-us.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz
# !tar xf spark-2.4.3-bin-hadoop2.7.tgz
# !pip install -q findspark
# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7"
# import findspark
# findspark.init()

In [2]:
# you may have to conda or pip install pyspark
# you also need java installed (sigh, I know)
from pyspark.sql import SparkSession
spark = (SparkSession.builder
    .master("local[*]")
    .appName("Learning_Spark")
    .getOrCreate()
)

`.builder` — gives access to Builder API which is used to configure the session .

`.master()` — determines where the program will run; "local[*]" sets it to run locally on all cores but you can use "local[1]" to run on one core for example. In this case, our programs will be run on Google’s servers.

`.appName()` — optional method to name the Spark Application

`.getOrCreate()` — gets an existing SparkSession or creates new one if none exists

In [3]:
data = spark.read.csv('data/vgsales.csv',inferSchema=True, header=True)
# show is equivalent to head
data.show(5)

+--------------------+--------+---------------+------------+---------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+---------+------+
|                Name|Platform|Year_of_Release|       Genre|Publisher|NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Critic_Score|Critic_Count|User_Score|User_Count|Developer|Rating|
+--------------------+--------+---------------+------------+---------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+---------+------+
|          Wii Sports|     Wii|           2006|      Sports| Nintendo|   41.36|   28.96|    3.77|       8.45|       82.53|          76|          51|         8|       322| Nintendo|     E|
|   Super Mario Bros.|     NES|           1985|    Platform| Nintendo|   29.08|    3.58|    6.81|       0.77|       40.24|        null|        null|      null|      null|     null|  null|
|      Mario Kart Wii|     Wii|           2008|      Racing|

In [4]:
data.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year_of_Release: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: double (nullable = true)
 |-- EU_Sales: double (nullable = true)
 |-- JP_Sales: double (nullable = true)
 |-- Other_Sales: double (nullable = true)
 |-- Global_Sales: double (nullable = true)
 |-- Critic_Score: integer (nullable = true)
 |-- Critic_Count: integer (nullable = true)
 |-- User_Score: string (nullable = true)
 |-- User_Count: integer (nullable = true)
 |-- Developer: string (nullable = true)
 |-- Rating: string (nullable = true)



In [5]:
(data.select("Name","Platform","EU_Sales")
     .show(15, truncate=False)
)

+---------------------------+--------+--------+
|Name                       |Platform|EU_Sales|
+---------------------------+--------+--------+
|Wii Sports                 |Wii     |28.96   |
|Super Mario Bros.          |NES     |3.58    |
|Mario Kart Wii             |Wii     |12.76   |
|Wii Sports Resort          |Wii     |10.93   |
|Pokemon Red/Pokemon Blue   |GB      |8.89    |
|Tetris                     |GB      |2.26    |
|New Super Mario Bros.      |DS      |9.14    |
|Wii Play                   |Wii     |9.18    |
|New Super Mario Bros. Wii  |Wii     |6.94    |
|Duck Hunt                  |NES     |0.63    |
|Nintendogs                 |DS      |10.95   |
|Mario Kart DS              |DS      |7.47    |
|Pokemon Gold/Pokemon Silver|GB      |6.18    |
|Wii Fit                    |Wii     |8.03    |
|Kinect Adventures!         |X360    |4.89    |
+---------------------------+--------+--------+
only showing top 15 rows



In [6]:
(data.groupBy("Platform")
     .count()
     .orderBy("count", ascending=False)
     .show(10)
)

+--------+-----+
|Platform|count|
+--------+-----+
|     PS2| 2161|
|      DS| 2152|
|     PS3| 1331|
|     Wii| 1320|
|    X360| 1262|
|     PSP| 1209|
|      PS| 1197|
|      PC|  974|
|      XB|  824|
|     GBA|  822|
+--------+-----+
only showing top 10 rows



# Filtering

In [7]:
condition1 = (data.User_Score.isNotNull()) | (data.User_Count.isNotNull())
condition2 = data.User_Score != "tbd"
data = data.filter(condition1).filter(condition2)
data.show(15,False)

+--------------------------------------------+--------+---------------+--------+----------------------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+-------------------+------+
|Name                                        |Platform|Year_of_Release|Genre   |Publisher             |NA_Sales|EU_Sales|JP_Sales|Other_Sales|Global_Sales|Critic_Score|Critic_Count|User_Score|User_Count|Developer          |Rating|
+--------------------------------------------+--------+---------------+--------+----------------------+--------+--------+--------+-----------+------------+------------+------------+----------+----------+-------------------+------+
|Wii Sports                                  |Wii     |2006           |Sports  |Nintendo              |41.36   |28.96   |3.77    |8.45       |82.53       |76          |51          |8         |322       |Nintendo           |E     |
|Mario Kart Wii                              |Wii     |2008           |Racin

# Building Models in PySpark

Building models in PySpark looks a little different than you might be used to, and you’ll see terms like Transformer, Estimator, and Param. This guide won’t go in-depth into what those terms mean but below is a link to a brief description of what they mean.



In [8]:
data2= data.filter(data.Year_of_Release != "N/A")
data.select("Year_of_Release").distinct().orderBy("Year_of_Release", ascending=True).show(50,False)

+---------------+
|Year_of_Release|
+---------------+
|1985           |
|1988           |
|1992           |
|1994           |
|1996           |
|1997           |
|1998           |
|1999           |
|2000           |
|2001           |
|2002           |
|2003           |
|2004           |
|2005           |
|2006           |
|2007           |
|2008           |
|2009           |
|2010           |
|2011           |
|2012           |
|2013           |
|2014           |
|2015           |
|2016           |
|N/A            |
+---------------+



In [9]:
from pyspark.sql.types import DoubleType
#  withColumn either creates a new column or replaces one that already exists
data = data.withColumn("Year_of_Release", data["Year_of_Release"].cast(DoubleType()))
data = data.withColumn("User_Score", data["User_Score"].cast(DoubleType()))
data = data.withColumn("Critic_Score", data["Critic_Score"].cast(DoubleType()))

Here we’ve delineated what features we want our model to use as predictors so that VectorAssembler can take those columns and transform them into a single column (named “predictors”) that contains all the data we want to predict with.

What VectorAssembler.transform() does is create a new DataFrame with a new column at the end where each row contains a list of all the features we included in the inputCols parameter when we created the assembler.

In [10]:
from pyspark.ml.feature import VectorAssembler
inputcols = ["Year_of_Release",  "Global_Sales", "Critic_Score"]
assembler = VectorAssembler(inputCols= inputcols,
                            outputCol = "predictors")

predictors = assembler.setHandleInvalid("skip").transform(data)
predictors = predictors.filter(
    predictors.predictors.isNotNull()
    |predictors.User_Score.isNotNull()
)
predictors.columns

['Name',
 'Platform',
 'Year_of_Release',
 'Genre',
 'Publisher',
 'NA_Sales',
 'EU_Sales',
 'JP_Sales',
 'Other_Sales',
 'Global_Sales',
 'Critic_Score',
 'Critic_Count',
 'User_Score',
 'User_Count',
 'Developer',
 'Rating',
 'predictors']

In [11]:
model_data = predictors.select("predictors", "User_Score")
model_data.show(10,truncate=False)

+-------------------+----------+
|predictors         |User_Score|
+-------------------+----------+
|[2006.0,82.53,76.0]|8.0       |
|[2008.0,35.52,82.0]|8.3       |
|[2009.0,32.77,80.0]|8.0       |
|[2006.0,29.8,89.0] |8.5       |
|[2006.0,28.92,58.0]|6.6       |
|[2009.0,28.32,87.0]|8.4       |
|[2005.0,23.21,91.0]|8.6       |
|[2007.0,22.7,80.0] |7.7       |
|[2010.0,21.81,61.0]|6.3       |
|[2009.0,21.79,80.0]|7.4       |
+-------------------+----------+
only showing top 10 rows



In [None]:
train_data,test_data = model_data.randomSplit([0.8,0.2])

from pyspark.ml.regression import LinearRegression
lr = LinearRegression(
    featuresCol = 'predictors', 
    labelCol = 'User_Score')
lrModel = lr.fit(train_data)
pred = lrModel.evaluate(test_data)