<h1 align='center'>Apache Spark Briefed</h1>

![Spark-bigdata_feature.jpg](attachment:Spark-bigdata_feature.jpg)

# Table of Contents

1.   <a href='#1'>What is Apache Spark?</a>
    2. <a href='#2'>Spark Context in Big Data Environments</a>
    3. <a href='#3'>Spark Core also brings its own set of useful APIs to the tables</a>
4.   <a href='#4'>What Makes Spark, Spark?</a>
    5. <a href='#5'>The Driver and the Executer</a>
    6. <a href='#6'>The DAG</a>
7.   <a href="#7">Install and setup Apache Spark</a>
8.   <a href="#8">PySpark Basics: RDDs</a>
    9. <a href='#9'>The Python Spark Shell</a>
    10. <a href='#10'>Creating RDDs</a>
    11. <a href='#11'>RDD Operations</a>
10.   <a href='#12'>The Data</a>
    11. <a href='#13'>Loading And Exploring Your Data</a>
        12. <a href='#14'>Creating Your First Spark Program</a>
        13. <a href='#15'>Loading In Your Data</a>
    14. <a href='#16'>Data Exploration</a>
    15. <a href='#17'>Data Preprocessing</a>
        16. <a href='#18'>Preprocessing The Target Values</a>
        17. <a href='#19'>Feature Engineering</a>
        18. <a href='#20'>Standardization</a>
19.   <a href="#21">Building A Machine Learning Model With Spark ML</a>
20.   <a href="#22">Evaluating the Model</a>
21.   <a href='#23'> Difference between SparkML and SciKit-ML</a>

<a id='1'></a>
# What is Apache Spark?

* Spark is a general-purpose computing engine, in memory framework
* It lets you execute real-time and batch work in a scripting manner in a variety of languages with powerful fault tolerance
* Why should you care what Spark is? To put it bluntly, it has addressed many of the shortcomings Hadoop MapReduce has and is roughly 10 to 100-fold faster than Hadoop MapReduce.
* Spark is a big deal in Data Science; some notable organisations that use Spark are; Amazon, NASA Jet Propulsion Labs, IBM and Hitachi. 

<a id='2'></a>
## Spark Context in Big Data Environments

* Spark is designed to work with an external cluster manager or its own standalone manager. 
* Spark also relies on a distributed storage system to function from which it calls the data it is meant to use. 


* The following systems are supported:
    * ### Cluster Managers:
        * Spark Standalone Manager
        * Hadoop YARN
        * Apache Mesos
    * ### Distributed Storage Systems:
        * Hadoop Distributed File System (HDFS)
        * MapR File System (MapR-FS)
        * Cassandra
        * OpenStack Swift
        * Amazon S3
        * Kudu

* Spark Core is very versatile and has been designed with the Hadoop ecosystem in mind; it can work alongside MapReduce or providing an alternate platform for PIG, HIVE and SEARCH to work on top of. 

![1_z0Vm749Pu6mHdlyPsznMRg.png](attachment:1_z0Vm749Pu6mHdlyPsznMRg.png)

<a id='3'></a>
## Spark Core also brings its own set of useful APIs to the tables

* ### Spark Streaming: 
    * Manage live microbursts of data from a variety of sources. It allows for real-time results to be computed by enabling the implementation of ML Lib and Graphx on the live streams.

* ### GraphX: 
    * A very powerful library to handle graph-parallel computation. Don’t confuse this with “Power Point graphs”, this library is all about a field in mathematics called graph theory and modelling pairwise relationships between objects.

* ### ML Lib: 
    * Library to run machine learning algorithms on large data sets in a native distributed environment. The library is still in its infancy compared to more robust machine learning libraries that would be found in Python or Matlab.

* ### Spark SQL: 
    * Allows the use of SQL quarries to quarry non-relational distributed databases.

<a id='4'></a>
# What Makes Spark, Spark?

* At the highest level of abstraction, Spark consists of three components that make it uniquely Spark: 
    * The Driver, The Executer and
    * The DAG.

<a id='5'></a>
## The Driver and the Executer

* Spark uses a master-slave architecture. 
* A driver coordinates many distributed workers in order to execute tasks in a distributed manner while a resource manager deals with the resource allocation to get the tasks done.

### DRIVER
* Think of it as the “Orchestrator”. 
* The driver is where the main method runs. 
* It converts the program into tasks and then schedules the tasks to the executors.
* The driver has at its disposal 3 different ways of communicating with the executors; 
    * Broadcast, 
    * Take, 
    * DAG.

### EXECUTER — “WORKERS”
* Executers execute the delegated tasks from the driver within a JVM instance.
* Executors are launched at the beginning of a Spark application and normally run for the whole life span of an application. 
* This method allows for data to persist in memory while different tasks are loaded in and out of the execute throughout the application’s lifespan.

* JVM is Java Virtual Machine, a memory space where classes (code) are loaded and objects (data) are shared. JVM is equivalent to an Operating System process.

![1_sCv4GPI4qThyFL8Ej3-D9w.png](attachment:1_sCv4GPI4qThyFL8Ej3-D9w.png)

## DRIVER COMMUNICATION WITH EXECUTERS

* There are several methods a driver can communicate with executors. 
* As a developer or data scientist it’s important that you be aware of the different types of communication and their use cases.

1. ### Broadcast Action:
    * The driver transmits the necessary data to each executor. This action is optimal for data sets under a million records, +- 1gb of data. This action can become a very expensive task.
2. ### Take Action: 
    * Driver takes data from all Executors. This action can be a very expensive and dangerous action as the driver might run out of memory and the network could become overwhelmed.
3. ### DAG Action: 
    * This is the by far least expensive action out of the three. It transmits control flow logic from the driver to the executors.

###  System Requirments
* Spark has a considerable performance gain over Hadoop MapReduce, but it also has a higher operation cost as it operates in memory and requires a high bandwidth network environment (+10Gb/s is advised).
* It is recommended that the memory in the Spark cluster should be at least as large as the amount of data you need to process.
* If there isn’t enough memory for a job, Spark has several methods to spill the data over onto disk.

<a id='6'></a>
## The DAG

* The DAG is a Directed Acyclic Graph which outlines of a series of steps needed to get from point A to point B. 
* Hadoop MapReduce, like most other computing engines, works independently of the DAG. 
* These DAG independent computing engines rely on a scripting platforms like HIVE or PIG to link together jobs to achieve the desired result. 
* What makes Spark in comparisons so powerful is that it is cognitive of the DAG and actively manages the DAG.
* This allows Spark to optimise job flows for optimal performance and allows for rollback and job redundancy features.

![1_WGXwt6BVCEyknQZh7_Vpzw.png](attachment:1_WGXwt6BVCEyknQZh7_Vpzw.png)

1. ### SOURCE
    * A source can be any data source supported by Spark. Some of them are: HDFS, Relational Database, CSV file etc. 
    * You will see later that we define this within our environment context setup.

2. ### RDD
    * Resilient Distributed Datasets are essentially sets of data that cannot be changed. 
    * These entities exist in memory and by their very nature are immutable. 
    * Due to this immutability; A new RDD is created after every transformation performed on an existing RDD.
    * A consequence of this design is redundancy; if at any point in the DAGs execution there is a failure then it is possible to roll back to a functioning state and reattempt the failed action/transformation.

    * RDDs in their original form don’t have a schema attached to them but they can be extended using something called a DataFrames. 
    * DataFrames adds schema functionality to the data set contained within; this is very useful when dealing with relational datasets.

3. ### TRANSFORMATION
    * Transformations transform an RDD into another RDD. Some example transformations are:
        * Map
        * reduceByKey
        * GroupByKey
        * JoinByKey
        * SparkSQL

4. ### ACTION
    * An action is anything that retrieves data to answer a question. Some examples are; Count, Take, For each.

##  EXECUTING THE DAG
* Spark does something called lazy evaluation. 
* The DAG itself is constructed by the Transformations but nothing happens until an Action is called. 
* When an action is executed, Spark will look at the DAG and then optimise it in the context of what jobs it needs to execute to reach the action step it has been asked to do.
* When the DAG is finally executed, the driver sends out the transformation commands to the executers on the cluster.

<a id='7'></a>
# Install and setup Apache Spark

* Installing Spark and getting it to work can be a challenge. 
* In this section, you’ll cover some steps that will show you how to get it installed on your pc.


* First thing that you want to do is checking whether you meet the prerequisites. 
* Spark is written in Scala Programming Language and runs on Java Virtual Machine (JVM) environment.
* That’s why you need to check if you have a Java Development Kit (JDK) installed. 
* You do this because the JDK will provide you with one or more implementations of the JVM.
* Preferably, you want to pick the latest one.

* Download the source file from the following link:
    * http://www-us.apache.org/dist/spark/spark-2.3.0/spark-2.3.0-bin-hadoop2.7.tgz

* Make sure you untar the downloaded file

![Screenshot%20from%202018-05-19%2011-12-08.png](attachment:Screenshot%20from%202018-05-19%2011-12-08.png)

* Next, move the untarred folder to /usr/local/spark by running the following line

![Screenshot%20from%202018-05-19%2011-14-11.png](attachment:Screenshot%20from%202018-05-19%2011-14-11.png)

* Note that if you get an error that says that the permission is denied to move this folder to the new location, you should add sudo in front of this command.
* You’ll be prompted to give your password, which is usually the one that you also use to unlock your pc when you start it up :)

For Detailed steps, follow the <a href='http://jmedium.com/pyspark-in-python/'>link</a>

* Then run the following command from the terminal:

<h3 align='center'>$SPARK_HOME/bin/pyspark</h3>

<a id='8'></a>
# PySpark Basics: RDDs

## Resilient Distributed Datasets
* Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark.
* It is an immutable distributed collection of objects. 
* Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster. 
* RDDs can contain any type of Python, Java, or Scala objects, including user-defined classes.

* There are two ways to create RDDs − 
    * parallelizing an existing collection in your driver program, or 
    * referencing a dataset in an external storage system, such as a shared file system, HDFS, HBase, or any data source offering a Hadoop Input Format.

<a id='10'></a>
## Creating RDDs

* Now, let’s start small and make an RDD, which is the most basic building block of Spark.
* An RDD simply represents data but it’s not one object, a collection of records, a result set or a data set. 
* That is because it’s intended for data that resides on multiple computers: a single RDD could be spread over thousands of Java Virtual Machines (JVMs), because Spark automatically partitions the data under the hood to get this parallelism. Of course, you can adjust the parallelism to get more partitions. 
* That’s why an RDD is actually a collection of partitions.

* You can easily create a simple RDD by using the parallelize() function and by simply passing some data (an iterable, like a list, or a collection) to it:

In [2]:
# SparkSession is available as spark
spark

In [3]:
rdd1 = spark.sparkContext.parallelize([('a',7),('a',2),('b',2)])

In [4]:
rdd2 = spark.sparkContext.parallelize([("a",["x","y","z"]), ("b",["p", "r"])])

In [5]:
rdd3 = spark.sparkContext.parallelize(range(100))

* Note that the SparkSession object has the SparkContext object, which you can access with spark.sparkContext. 
* For backwards compatibility reasons, it’s also still possible to call the SparkContext with sc, as in **rdd1 = sc.parallelize(['a',7),('a',2),('b',2)]).**

<a id='11'></a>
## RDD Operations

* Now that you have created the RDDs, you can use the distributed data in rdd1 and rdd2 to operate on in parallel. 
* You have two types of operations: 
    * Transformations and actions

* Now, to intuitively get the difference between these two, 
    * consider some of the most common transformations are map(), filter(), flatMap(), sample(), randomSplit(), coalesce() and repartition() and 
    * some of the most common actions are reduce(), collect(), first(), take(), count(), saveAsHadoopFile().

* Transformations are lazy operations on a RDD that create one or many new RDDs, while actions produce non-RDD values: they return a result set, a number, a file, …

* You can, for example, aggregate all the elements of rdd1 using the following, simple lambda function and return the results to the driver program:

In [6]:
rdd1.reduce(lambda a,b: a+b)

('a', 7, 'a', 2, 'b', 2)

* Executing this line of code will give you the following result: ('a', 7, 'a', 2, 'b', 2)

* Another example of a transformation is flatMapValues(), which you run on key-value pair RDDs, such as rdd2.
* In this case, you pass each value in the key-value pair RDD rdd2 through a flatMap function without changing the keys, which is the lambda function defined below and you perform an action after that by collecting the results with collect().

In [7]:
rdd2.flatMapValues(lambda x: x).collect()

[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]

<a id='12'></a>
# The Data

* Now that you have covered some basics with the interactive shell, it’s time to get started with some real data. 
* For this tutorial, you’ll make use of the California Housing data set. 
* Note, of course, that this is actually ‘small’ data and that using Spark in this context might be overkill
* This tutorial is for educational purposes only and is meant to give you an idea of how you can use PySpark to build a machine learning model.

<a id='13'></a>
## Loading And Exploring Your Data

* Even though you know a bit more about your data, you should take the time to go ahead and explore it more thoroughly
* Before you do this, however, you will set up your Jupyter Notebook with Spark and you’ll take some first steps to defining the SparkContext.

<a id='14'></a>
### Creating Your First Spark Program

* You simply import the findspark library and use the init() function.
* In this case, you’re going to supply the path to the folder to init() because you’re certain that this is the path where you installed Spark.

In [8]:
# Import findspark 
import findspark

# Initialize and provide path
findspark.init("/usr/local/spark/")

* What you first want to be doing is importing the SparkContext from the pyspark package and initializing it. 
* Remember that you didn’t have to do this before because the interactive Spark shell automatically created and initialized it for you! 
* Here, you’ll need to do a little bit more work yourself :)

* Import the SparkSession module from pyspark.sql and build a SparkSession with the builder() method.
* Afterwards, you can set the master URL to connect to, the application name, add some additional configuration like the executor memory and then lastly, use getOrCreate() to either get the current Spark session or to create one if there is none running.

In [9]:
# Import SparkSession
from pyspark.sql import SparkSession

# Build the SparkSession
spark = SparkSession.builder.master("local").appName("Linear Regression Model").config("spark.executor.memory", "1gb").getOrCreate()
   
sc = spark.sparkContext

<a id='15'></a>
### Loading In Your Data

* This tutorial makes use of the California Housing data set.
* It appeared in a 1997 paper titled Sparse Spatial Autoregressions, written by Pace, R. Kelley and Ronald Barry and published in the Statistics and Probability Letters journal. 
* The researchers built this data set by using the 1990 California census data.

* The data contains one row per census block group.
* A block group is the smallest geographical unit for which the U.S. Census Bureau publishes sample data (a block group typically has a population of 600 to 3,000 people). 
* In this sample a block group on average includes 1425.5 individuals living in a geographically compact area. 
* You’ll gather this information from <a href='http://www.dcc.fc.up.pt/~ltorgo/Regression/cal_housing.html'>this web page</a> or by reading the paper which was mentioned above and which you can find <a href='http://www.spatial-statistics.com/pace_manuscripts/spletters_ms_dir/statistics_prob_lets/html/ms_sp_lets1.html'>here</a>.

* These spatial data contain 20,640 observations on housing prices with 9 economic variables:
    * Longitude refers to the angular distance of a geographic place north or south of the earth’s equator for each block group;
    * Latitude refers to the angular distance of a geographic place east or west of the earth’s equator for each block group;
    * Housing median age is the median age of the people that belong to a block group. Note that the median is the value that lies at the midpoint of a frequency distribution of observed values;
    * Total rooms is the total number of rooms in the houses per block group;
    * Total bedrooms is the total number of bedrooms in the houses per block group;
    * Population is the number of inhabitants of a block group;
    * Households refers to units of houses and their occupants per block group;
    * Median income is used to register the median income of people that belong to a block group; And,
    * Median house value is the dependent variable and refers to the median house value per block group.


* Next, you’ll use the textFile() method to read in the data from the folder that you downloaded it to RDDs.
* This method takes an URI for the file, which is in this case the local path of your machine, and reads it as a collection of lines. 
* For all convenience, you’ll not only read in the .data file, but also the .domain file that contains the header. This will allow you to double check the order of the variables.

In [10]:
# Load in the data
rdd = sc.textFile('cal_housing.data')

# Load in the header
header = sc.textFile('cal_housing.domain')

<a id='16'></a>
### Data Exploration

* You already gathered a lot of information by just looking at the web page where you found the data set, but it’s always better to get hands-on and inspect your data with the help of Spark with Python, in this case.

* Important to understand here is that, because Spark’s execution is “lazy” execution, nothing has been executed yet. 
* Your data hasn’t been actually read in. The rdd and header variables are actually just concepts in your mind. 
* You have to push Spark to work for you, so let’s use the collect() method to look at the header:

In [11]:
header.collect()

['longitude: continuous.',
 'latitude: continuous.',
 'housingMedianAge: continuous. ',
 'totalRooms: continuous. ',
 'totalBedrooms: continuous. ',
 'population: continuous. ',
 'households: continuous. ',
 'medianIncome: continuous. ',
 'medianHouseValue: continuous.']

* Call the take() method on your RDD:

In [12]:
rdd.take(2)

['-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000',
 '-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000']

* By executing the previous line of code, you take the first 2 elements of the RDD.
* The result is as you expected: because you read in the files with the textFile() function, the lines are just all read in together.
* The entries are separated by a single comma and the rows themselves are also separated by a comma.

* You definitely need to solve this. 
* Now, you don’t need to split the entries, but you definitely need to make sure that the rows of your data are separate elements. 
* To solve this, you’ll use the map() function to which you pass a lambda function to split the line at the comma. 
* Then, check your result by running the same line with the take() method, just like you did before:

In [13]:
# Split lines on commas
rdd = rdd.map(lambda line: line.split(","))

# Inspect the first 2 lines 
rdd.take(2)

[['-122.230000',
  '37.880000',
  '41.000000',
  '880.000000',
  '129.000000',
  '322.000000',
  '126.000000',
  '8.325200',
  '452600.000000'],
 ['-122.220000',
  '37.860000',
  '21.000000',
  '7099.000000',
  '1106.000000',
  '2401.000000',
  '1138.000000',
  '8.301400',
  '358500.000000']]

Alternatively, you can also use the following functions to inspect your data:

In [14]:
# Inspect the first line 
rdd.first()

# Take top elements
rdd.top(2)


[['-124.350000',
  '40.540000',
  '52.000000',
  '1820.000000',
  '300.000000',
  '806.000000',
  '270.000000',
  '3.014700',
  '94600.000000'],
 ['-124.300000',
  '41.840000',
  '17.000000',
  '2677.000000',
  '531.000000',
  '1244.000000',
  '456.000000',
  '3.031300',
  '103600.000000']]

* If you’re used to working with Pandas or data frames, you’ll have probably also expected to see a header, but there is none. 
* To make your life easier, you will move on from the RDD and convert it to a DataFrame. 
* Dataframes are preferred over RDDs whenever you can use them. 
* Especially when you’re working with Python, the performance of DataFrames is better than RDDs.

* **But what is the difference between the two?**

* You can use RDDs when you want to perform low-level transformations and actions on your unstructured data.
* This means that you don’t care about imposing a schema while processing or accessing the attributes by name or column.
* Tying in to what was said before about performance, by using RDDs, you don’t necessarily want the performance benefits that DataFrames can offer for (semi-) structured data.
* Use RDDs when you want to manipulate the data with functional programming constructs rather than domain specific expressions.

* To recapitulate, you’ll switch to DataFrames now to use high-level expressions, to perform SQL queries to explore your data further and to gain columnar access.

* The first step is to make a SchemaRDD or an RDD of Row objects with a schema. 
* This is normal, because just like a DataFrame, you eventually want to come to a situation where you have rows and columns. 
* Each entry is linked to a row and a certain column and columns have data types.

* You’ll use the map() function again and another lambda function in which you’ll map each entry to a field in a Row.
* The lambda function says that you’re going to construct a row in a SchemaRDD and that the element at index 0 will have the name “longitude”, and so on.

**With this SchemaRDD in place, you can easily convert the RDD to a DataFrame with the toDF() method.**|

In [15]:
# Import the necessary modules 
from pyspark.sql import Row

# Map the RDD to a DF
df = rdd.map(lambda line: Row(longitude=line[0], 
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()

* Now that you have your DataFrame df, you can inspect it with the methods that you have also used before, namely first() and take(), but also with head() and show():

In [16]:
# Show the top 20 rows 
df.show()

+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| households|housingMedianAge| latitude|  longitude|medianHouseValue|medianIncome| population|totalBedRooms| totalRooms|
+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| 126.000000|       41.000000|37.880000|-122.230000|   452600.000000|    8.325200| 322.000000|   129.000000| 880.000000|
|1138.000000|       21.000000|37.860000|-122.220000|   358500.000000|    8.301400|2401.000000|  1106.000000|7099.000000|
| 177.000000|       52.000000|37.850000|-122.240000|   352100.000000|    7.257400| 496.000000|   190.000000|1467.000000|
| 219.000000|       52.000000|37.850000|-122.250000|   341300.000000|    5.643100| 558.000000|   235.000000|1274.000000|
| 259.000000|       52.000000|37.850000|-122.250000|   342200.000000|    3.846200| 565.000000|   280.000000|1627.000000|
| 193.000000|       52.000000|37

* The data seems all nicely ordered into columns, but what about the data types? 
* By reading in your data, Spark will try to infer a schema, but has this been successful here? Use either df.dtypes or df.printSchema() to get to know more about the data types that are contained within your DataFrame.

In [17]:
# Print the data types of all `df` columns
# df.dtypes

# Print the schema of `df`
df.printSchema()

root
 |-- households: string (nullable = true)
 |-- housingMedianAge: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- medianHouseValue: string (nullable = true)
 |-- medianIncome: string (nullable = true)
 |-- population: string (nullable = true)
 |-- totalBedRooms: string (nullable = true)
 |-- totalRooms: string (nullable = true)



* All columns are still of data type string… That’s disappointing!

* If you want to continue with this DataFrame, you’ll need to rectify this situation and assign “better” or more accurate data types to all columns. 
* Your performance will also benefit from this. Intuitively, you could go for a solution like the following, where you declare that each column of the DataFrame df should be cast to a FloatType():

In [18]:
df

DataFrame[households: string, housingMedianAge: string, latitude: string, longitude: string, medianHouseValue: string, medianIncome: string, population: string, totalBedRooms: string, totalRooms: string]

In [19]:
from pyspark.sql.types import *

df = df.withColumn("longitude", df["longitude"].cast(FloatType())).withColumn("latitude", df["latitude"].cast(FloatType())).withColumn("housingMedianAge",df["housingMedianAge"].cast(FloatType())).withColumn("totalRooms", df["totalRooms"].cast(FloatType())).withColumn("totalBedRooms", df["totalBedRooms"].cast(FloatType())).withColumn("population", df["population"].cast(FloatType())).withColumn("households", df["households"].cast(FloatType())).withColumn("medianIncome", df["medianIncome"].cast(FloatType())).withColumn("medianHouseValue", df["medianHouseValue"].cast(FloatType()))

In [20]:
df

DataFrame[households: float, housingMedianAge: float, latitude: float, longitude: float, medianHouseValue: float, medianIncome: float, population: float, totalBedRooms: float, totalRooms: float]

* Now that you’ve got that all sorted out, it’s time to really get started on the data exploration. 
* You have seen that columnar access and SQL queries were two advantages of using DataFrames. 
* Well, now it’s time to dig a little bit further into that. 
* Let’s start small and just select two columns from df of which you only want to see 10 rows:

In [21]:
df.select('population','totalBedRooms').show(10)

+----------+-------------+
|population|totalBedRooms|
+----------+-------------+
|     322.0|        129.0|
|    2401.0|       1106.0|
|     496.0|        190.0|
|     558.0|        235.0|
|     565.0|        280.0|
|     413.0|        213.0|
|    1094.0|        489.0|
|    1157.0|        687.0|
|    1206.0|        665.0|
|    1551.0|        707.0|
+----------+-------------+
only showing top 10 rows



You can also make your queries more complex, as you see in the following example:

In [22]:
df.groupBy("housingMedianAge").count().sort("housingMedianAge",ascending=False).show()

+----------------+-----+
|housingMedianAge|count|
+----------------+-----+
|            52.0| 1273|
|            51.0|   48|
|            50.0|  136|
|            49.0|  134|
|            48.0|  177|
|            47.0|  198|
|            46.0|  245|
|            45.0|  294|
|            44.0|  356|
|            43.0|  353|
|            42.0|  368|
|            41.0|  296|
|            40.0|  304|
|            39.0|  369|
|            38.0|  394|
|            37.0|  537|
|            36.0|  862|
|            35.0|  824|
|            34.0|  689|
|            33.0|  615|
+----------------+-----+
only showing top 20 rows



* Besides querying, you can also choose to describe your data and get some summary statistics.
* This will most definitely help you after!

In [23]:
df.describe().show()

+-------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|summary|        households|  housingMedianAge|          latitude|          longitude|  medianHouseValue|      medianIncome|        population|    totalBedRooms|        totalRooms|
+-------+------------------+------------------+------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|  count|             20640|             20640|             20640|              20640|             20640|             20640|             20640|            20640|             20640|
|   mean| 499.5396802325581|28.639486434108527| 35.63186143109965|-119.56970444871473|206855.81690891474|3.8706710030346416|1425.4767441860465|537.8980135658915|2635.7630813953488|
| stddev|382.32975283161136|12.585557612111613|2.1359523806029554| 2.0035317429328914|115395.61

* Look at the minimum and maximum values of all the (numerical) attributes.
* You see that multiple attributes have a wide range of values: you will need to normalize your dataset

<a id='17'></a>
# Data Preprocessing

* With all this information that you gathered from your small exploratory data analysis, you know enough to preprocess your data to feed it to the model.
    * You shouldn’t care about missing values; all zero values have been excluded from the data set.
    * You should probably standardize your data, as you have seen that the range of minimum and maximum values is quite big.
    * There are possibbly some additional attributes that you could add, such as a feature that registers the number of bedrooms per room or the rooms per household.
    * Your dependent variable is also quite big; To make your life easier, you’ll have to adjust the values slightly.

<a id='18'></a>
## Preprocessing The Target Values

* First, let’s start with the medianHouseValue, your dependent variable.
* To facilitate your working with the target values, you will express the house values in units of 100,000.
* That means that a target such as 452600.000000 should become 4.526:

In [24]:
# Import all from `sql.functions` 
from pyspark.sql.functions import *

# Adjust the values of `medianHouseValue`
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)

# Show the first 2 lines of `df`
df.take(2)

[Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0),
 Row(households=1138.0, housingMedianAge=21.0, latitude=37.86000061035156, longitude=-122.22000122070312, medianHouseValue=3.585, medianIncome=8.301400184631348, population=2401.0, totalBedRooms=1106.0, totalRooms=7099.0)]

* You can clearly see that the values have been adjusted correctly when you look at the result of the take() method

<a id='19'></a>
## Feature Engineering

* Now that you have adjusted the values in medianHouseValue, you can also add the additional variables that you read about above.
* You’re going to add the following columns to the data set:

    * Rooms per household which refers to the number of rooms in households per block group;
    * Population per household, which basically gives you an indication of how many people live in households per block group; And
    * Bedrooms per room which will give you an idea about how many rooms are bedrooms per block group;

* As you’re working with DataFrames, you can best use the select() method to select the columns that you’re going to be working with, namely totalRooms, households, and population. 
* Additionally, you have to indicate that you’re working with columns by adding the col() function to your code. 
* Otherwise, you won’t be able to do element-wise operations like the division that you have in mind for these three variables:

In [25]:
# Import all from `sql.functions` if you haven't yet
from pyspark.sql.functions import *

# Divide `totalRooms` by `households`
roomsPerHousehold = df.select(col("totalRooms")/col("households"))

# Divide `population` by `households`
populationPerHousehold = df.select(col("population")/col("households"))

# Divide `totalBedRooms` by `totalRooms`
bedroomsPerRoom = df.select(col("totalBedRooms")/col("totalRooms"))

# Add the new columns to `df`
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
   .withColumn("populationPerHousehold", col("population")/col("households")) \
   .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))
   
# Inspect the result
df.first()

Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0, roomsPerHousehold=6.984126984126984, populationPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908)

* You see that, for the first row, there are about 6.98 rooms per household, the households in the block group consist of about 2.5 people and the amount of bedrooms is quite low with 0.14

* Next, -and this is already forseeing an issue that you might have when you’ll standardize the values in your data set- you’ll also re-order the values. 
* Since you don’t want to necessarily standardize your target values, you’ll want to make sure to isolate those in your data set.

* In this case, you’ll need to do this by using the select() method and passing the column names in the order that is more appropriate. 
* In this case, the target variable medianHouseValue is put first, so that it won’t be affected by the standardization.

* In this case, let’s leave out variables such as longitude, latitude, housingMedianAge and totalRooms.

In [26]:
# Re-order and select columns
df = df.select("medianHouseValue", 
              "totalBedRooms", 
              "population", 
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
              "populationPerHousehold", 
              "bedroomsPerRoom")

<a id='20'></a>
## Standardization

* Now that you have re-ordered the data, you’re ready to normalize the data. 
* Or almost, at least. 
* There is just one more step that you need to go through: separating the features from the target variable. 
* In essence, this boils down to isolating the first column in your DataFrame from the rest of the columns.

* In this case, you’ll use the map() function that you use with RDDs to perform this action. 
* You also see that you make use of the DenseVector() function. 
* A dense vector is a local vector that is backed by a double array that represents its entry values. 
* In other words, it's used to store arrays of values for use in PySpark.

* Next, you go back to making a DataFrame out of the input_data and you re-label the columns by passing a list as a second argument.
* This list consists of the column names "label" and "features":

In [27]:
# Import `DenseVector`
from pyspark.ml.linalg import DenseVector

# Define the `input_data` 
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Replace `df` with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])

In [28]:
df.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|4.526|[129.0,322.0,126....|
|3.585|[1106.0,2401.0,11...|
|3.521|[190.0,496.0,177....|
|3.413|[235.0,558.0,219....|
|3.422|[280.0,565.0,259....|
|2.697|[213.0,413.0,193....|
|2.992|[489.0,1094.0,514...|
|2.414|[687.0,1157.0,647...|
|2.267|[665.0,1206.0,595...|
|2.611|[707.0,1551.0,714...|
|2.815|[434.0,910.0,402....|
|2.418|[752.0,1504.0,734...|
|2.135|[474.0,1098.0,468...|
|1.913|[191.0,345.0,174....|
|1.592|[626.0,1212.0,620...|
|  1.4|[283.0,697.0,264....|
|1.525|[347.0,793.0,331....|
|1.555|[293.0,648.0,303....|
|1.587|[455.0,990.0,419....|
|1.629|[298.0,690.0,275....|
+-----+--------------------+
only showing top 20 rows



* Next, you can finally scale the data. 
* You can use Spark ML to do this: this library will make machine learning on big data scalable and easy.
* You’ll find tools such as ML algorithms and everything you need to build practical ML pipelines.

* The input columns are the features, and the output column with the rescaled that will be included in the scaled_df will be named "features_scaled":

In [29]:
# Import `StandardScaler` 
from pyspark.ml.feature import StandardScaler

# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)

# Transform the data in `df` with the scaler
scaled_df = scaler.transform(df)

# Inspect the result
scaled_df.take(2)

[Row(label=4.526, features=DenseVector([129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([0.3062, 0.2843, 0.3296, 4.3821, 2.8228, 0.2461, 2.5264])),
 Row(label=3.585, features=DenseVector([1106.0, 2401.0, 1138.0, 8.3014, 6.2381, 2.1098, 0.1558]), features_scaled=DenseVector([2.6255, 2.1202, 2.9765, 4.3696, 2.5213, 0.2031, 2.6851]))]

* Let’s take a look at your DataFrame and the result. 
* You see that, indeed, a third column features_scaled was added to your DataFrame, which you can use to compare with features

* Note that these lines of code are very similar to what you would be doing in Scikit-Learn.

<a id='21'></a>
# Building A Machine Learning Model With Spark ML

* With all the preprocessing done, it’s finally time to start building your Linear Regression model! 
* Just like always, you first need to split the data into training and test sets. Luckily, this is no issue with the randomSplit() method:

In [30]:
# Split the data into train and test sets
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)

* You pass in a list with two numbers that represent the size that you want your training and test sets to have and a seed, which is needed for reproducibility reasons.

* Then, without further ado, you can make your model!

* Note that the argument elasticNetParam corresponds to α or the vertical intercept and that the regParam or the regularization paramater corresponds to λ. 

In [31]:
# Import `LinearRegression`
from pyspark.ml.regression import LinearRegression

# Initialize `lr`
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the data to the model
linearModel = lr.fit(train_data)

* With your model in place, you can generate predictions for your test data: use the transform() method to predict the labels for your test_data. 
* Then, you can use RDD operations to extract the predictions as well as the true labels from the DataFrame and zip these two values together in a list called predictionAndLabel.

* Lastly, you can then inspect the predicted and real values by simply accessing the list with square brackets [ ]

In [32]:
# Generate predictions
predicted = linearModel.transform(test_data)

# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()

# Print out first 5 instances of `predictionAndLabel` 
predictionAndLabel[:5]

[(1.4491508524918457, 0.14999),
 (1.5705029404692372, 0.14999),
 (2.148727956912464, 0.14999),
 (1.5831547768979277, 0.344),
 (1.5182107797955968, 0.398)]

<a id='22'></a>
# Evaluating the Model

* Looking at predicted values is one thing, but another and better thing is looking at some metrics to get a better idea of how good your model actually is.
* You can first start by printing out the coefficients and the intercept of the model:

In [33]:
# Coefficients for the model
linearModel.coefficients

DenseVector([0.0, 0.0, 0.0, 0.2762, 0.0, 0.0, 0.0])

In [34]:
# Intercept for the model
linearModel.intercept

0.9903995774620005

* Next, you can also use the summary attribute to pull up the rootMeanSquaredError and the r2:

In [35]:
# Get the RMSE
linearModel.summary.rootMeanSquaredError

0.8692118678997669

In [36]:
# Get the R2
linearModel.summary.r2

0.4240895287218379

* The **RMSE** measures how much error there is between two datasets comparing a predicted value and an observed or known value. The smaller an RMSE value, the closer predicted and observed values are.

* The **R2** (“R squared”) or the coefficient of determination is a measure that shows how close the data are to the fitted regression line. This score will always be between 0 and a 100% (or 0 to 1 in this case), where 0% indicates that the model explains none of the variability of the response data around its mean, and 100% indicates the opposite: it explains all the variability. That means that, in general, the higher the R-squared, the better the model fits your data.

* There's definitely some improvements needed to your model! 
* If you want to continue with this model, you can play around with the parameters that you passed to your model, the variables that you included in your original DataFrame

**Before you go, make sure to stop the SparkSession with the following line of code:**

In [37]:
spark.stop()

<a id='23'></a>
<h1 align='center'>Difference between SparkML and SciKit-ML</h1>

# References

* https://www.datacamp.com/community/tutorials/apache-spark-tutorial-machine-learning
* https://towardsdatascience.com/apache-spark-101-3f961c89b8c5
* https://towardsdatascience.com/deep-learning-with-apache-spark-part-1-6d397c16abd