In [1]:
from IPython.display import display, HTML
display(HTML("<style>.container {width : 99% !important;}</style>"))
display(HTML("<style>.output_result {width : 99% !important;}</style>"))
display(HTML("<style>.jp-Notebook {--jp-notebook-max-width: 99%;}/style>"))

# Introduction

## Machine Learning & Spark
1. Machine Learning & Spark

Hi! Welcome to the course on Machine Learning with Apache Spark, in which you will learn how to build Machine Learning models on large data sets using distributed computing techniques. Let's start with some fundamental concepts.

2. Building the perfect waffle (an analogy)

Suppose you wanted to teach a computer how to make waffles. You could find a good recipe and then give the computer explicit instructions about ingredients and proportions. Alternatively, you could present the computer with a selection of different waffle recipes and let it figure out the ingredients and proportions for the best recipe. The second approach is how Machine Learning works: the computer literally learns from examples.

3. Regression & classification

Machine Learning problems are generally less esoteric than finding the perfect waffle recipe. The most common problems apply either Regression or Classification. A regression model learns to predict a number. For example, when making waffles, how much flour should be used for a particular amount of sugar? A classification model, on the other hand, predicts a discrete or categorical value. For example, is a recipe calling for a particular amount of sugar and salt more likely to be for waffles or cupcakes?

4. Data in RAM

The performance of a Machine Learning model depends on data. In general, more data is a good thing. If an algorithm is able to train on a larger set of data, then its ability to generalize to new data will inevitably improve. However, there are some practical constraints. If the data can fit entirely into RAM then the algorithm can operate efficiently. What happens when those data no longer fit into memory?

5. Data exceeds RAM

The computer will start to use *virtual memory* and data will be *paged* back and forth between RAM and disk. Relative to RAM access, retrieving data from disk is slow. As the size of the data grows, paging becomes more intense and the computer begins to spend more and more time waiting for data. Performance plummets.

6. Data distributed across a cluster

How then do we deal with truly large datasets? One option is to distribute the problem across multiple computers in a cluster. Rather than trying to handle a large dataset on a single machine, it's divided up into partitions which are processed separately. Ideally each data partition can fit into RAM on a single computer in the cluster. This is the approach used by Spark.

7. What is Spark?

Spark is a general purpose framework for cluster computing. It is popular for two main reasons: 1. it's generally much faster than other Big Data technologies like Hadoop, because it does most processing in memory and 2. it has a developer-friendly interface which hides much of the complexity of distributed computing.

8. Components: nodes

Let's review the components of a Spark cluster. The cluster itself consists of one or more nodes. Each node is a computer with CPU, RAM and physical storage.

9. Components: cluster manager

A cluster manager allocates resources and coordinates activity across the cluster.

10. Components: driver

Every application running on the Spark cluster has a driver program. Using the Spark API, the driver communicates with the cluster manager, which in turn distributes work to the nodes.

11. Components: executors

On each node Spark launches an executor process which persists for the duration of the application. Work is divided up into tasks, which are simply units of computation. The executors run tasks in multiple threads across the cores in a node. When working with Spark you normally don't need to worry *too* much about the details of the cluster. Spark sets up all of that infrastructure for you and handles all interactions within the cluster. However, it's still useful to know how it works under the hood.

12. Onward!

You now have a basic understanding of the principles of Machine Learning and distributed computing with Spark. Next we'll learn how to connect to a Spark cluster.

### Characteristics of Spark
Spark is currently the most popular technology for processing large quantities of data. Not only is it able to handle enormous data volumes, but it does so very efficiently too! Also, unlike some other distributed computing technologies, developing with Spark is a pleasure.

Which of these describes Spark?

Answer the question


- Spark is a framework for cluster computing.

- Spark does most processing in memory.

- Spark has a high-level API, which conceals a lot of complexity.

- **All of the above.**

### Components in a Spark Cluster
Spark is a distributed computing platform. It achieves efficiency by distributing data and computation across a cluster of computers.

A Spark cluster consists of a number of hardware and software components which work together.

Which of these is not part of a Spark cluster?

Answer the question

- One or more nodes

- A cluster manager

- **A load balancer**

- Executors

## Connecting to Spark
1. Connecting to Spark

The previous lesson was high level overviews of Machine Learning and Spark. In this lesson you'll review the process of connecting to Spark.

2. Interacting with Spark

The connection with Spark is established by the driver, which can be written in either Java, Scala, Python or R. Each of these languages has advantages and disadvantages. Java is relatively verbose, requiring a lot of code to accomplish even simple tasks. By contrast, Scala, Python and R, are high-level languages which can accomplish much with only a small amount of code. They also offer a REPL, or Read-Evaluate-Print loop, which is crucial for interactive development. You'll be using Python.

3. Importing pyspark

Python doesn't talk natively to Spark, so we'll kick off by importing the pyspark module, which makes Spark functionality available in the Python interpreter. Spark is under vigorous development. Because the interface is evolving it's important to know what version you're working with. We'll be using version 2.4.1, which was released in March 2019.

4. Sub-modules

In addition to the main pyspark module, there are a few sub-modules which implement different aspects of the Spark interface. There are two versions of Spark Machine Learning: mllib, which uses an unstructured representation of data in RDDs and has been deprecated, and ml which is based on a structured, tabular representation of data in DataFrames. We'll be using the latter.

5. Spark URL

With the pyspark module loaded, you are able to connect to Spark. The next thing you need to do is tell Spark where the cluster is located. Here there are two options. You can either connect to a remote cluster, in which case you need to specify a Spark URL, which gives the network location of the cluster's master node. The URL is composed of an IP address or DNS name and a port number. The default port for Spark is 7077, but this must still be explicitly specified. When you're figuring out how Spark works, the infrastructure of a distributed cluster can get in the way. That's why it's useful to create a local cluster, where everything happens on a single computer. This is the setup that you're going to use throughout this course. For a local cluster, you need only specify "local" and, optionally, the number of cores to use. By default, a local cluster will run on a single core. Alternatively, you can give a specific number of cores or simply use the wildcard to choose all available cores.

6. Creating a SparkSession

You connect to Spark by creating a SparkSession object. The SparkSession class is found in the pyspark.sql sub-module. You specify the location of the cluster using the master() method. Optionally you can assign a name to the application using the appName() method. Finally you call the getOrCreate() method, which will either create a new session object or return an existing object. Once the session has been created you are able to interact with Spark. Finally, although it's possible for multiple SparkSessions to co-exist, it's good practice to stop the SparkSession when you're done.

7. Let's connect to Spark!

Great! Let's connect to Spark!

### Location of Spark master
Which of the following is not a valid way to specify the location of a Spark cluster?

Answer the question

- spark://13.59.151.161:7077

- spark://ec2-18-188-22-23.us-east-2.compute.amazonaws.com:7077

- **spark://18.188.22.23**

- local

- local[4]

- local[*]

### Creating a SparkSession
In this exercise, you'll spin up a local Spark cluster using all available cores. The cluster will be accessible via a `SparkSession` object.

The `SparkSession` class has a builder attribute, which is an instance of the Builder class. The Builder class exposes three important methods that let you:

- specify the location of the master node;
- name the application (optional); and
- retrieve an existing `SparkSession` or, if there is none, create a new one.
- The `SparkSession` class has a version attribute that gives the version of Spark. Note: The version can also be accessed via the `__version__` attribute on the `pyspark` module.

Find out more about `SparkSession` [here](https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession).

Once you are finished with the cluster, it's a good idea to shut it down, which will free up its resources, making them available for other processes.

**Note**:: You might find it useful to review the slides from the lessons in the Slides panel next to the *IPython Shell*.

**Instructions**

- Import the `SparkSession` class from `pyspark.sql`.
- Create a `SparkSession` object connected to a local cluster. Use all available cores. Name the application `'test'`.
- Use the version attribute on the `SparkSession` object to retrieve the version of Spark running on the cluster. **Note**: The version might be different from the one that's used in the presentation (it gets updated from time to time).
- Shut down the cluster.

In [2]:
# Import the SparkSession class
from pyspark.sql import SparkSession

# Create SparkSession object
spark = SparkSession.builder \
                    .master('local[*]') \
                    .appName('test') \
                    .getOrCreate()

# What version of Spark?
print(spark.version)

# Terminate the cluster
spark.stop()

23/10/01 22:33:44 WARN Utils: Your hostname, Alashmony-Lenovo-Z51-70 resolves to a loopback address: 127.0.1.1; using 192.168.1.182 instead (on interface wlp3s0)
23/10/01 22:33:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/01 22:33:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


3.4.0


## Loading Data
1. Loading Data

In this lesson you'll look at how to read data into Spark.

3. DataFrames: A refresher

Spark represents tabular data using the DataFrame class. The data are captured as rows (or "records"), each of which is broken down into one or more columns (or "fields"). Every column has a name and a specific data type. Some selected methods and attributes of the DataFrame class are listed here. The count() method gives the number of rows. The show() method will display a subset of rows. The printSchema() method and the dtypes attribute give different views on column types. This is really scratching the surface of what's possible with a DataFrame. You can find out more by consulting the extensive documentation.

4. CSV data for cars

CSV is a common format for storing tabular data. For illustration we'll be using a CSV file with characteristics for a selection of motor vehicles. Each line in a CSV file is a new record and within each record, fields are separated by a delimiter character, which is normally a comma. The first line is an optional header record which gives column names.

5. Reading data from CSV

Our session object has a "read" attribute which, in turn, has a csv() method which reads data from a CSV file and returns a DataFrame. The csv() method has one mandatory argument, the path to the CSV file. There are a number of optional arguments. We'll take a quick look at some of the most important ones. The header argument specifies whether or not there is a header record. The sep argument gives the field separator, which is a comma by default. There are two arguments which pertain to column data types, schema and inferSchema. Finally, the nullValue argument gives the placeholder used to indicate missing data. Let's take a look at the data we've just loaded.

6. Peek at the data

Using the show() method we can take a look at a slice of the DataFrame. The csv() method has split the data into rows and columns and picked up the column names from the header record. Looks great, doesn't it? Unfortunately there's a small snag. Before we unravel that snag, it's important to note that the first value in the cylinder column is not a number. It's the string "NA" which indicates missing data.

7. Check column types

If you check the column data types then you'll find that they are all strings. That doesn't make sense since the last six columns are clearly numbers! However, this is the expected behavior: the csv() method treats all columns as strings by default. You need to do a little more work to get the correct column types. There are two ways that you can do this: infer the column types from the data or manually specify the types.

8. Inferring column types from data

It's possible to reasonably deduce the column types by setting the inferSchema argument to True. There is a price to pay though: Spark needs to make an extra pass over the data to figure out the column types before reading the data. If the data file is big then this will increase load time notably. Using this approach all of the column types are correctly identified except for cylinder. Why? The first value in this column is "NA", so Spark thinks that the column contains strings.

9. Dealing with missing data

Missing data in CSV files are normally represented by a placeholder like the "NA" string. You can use the nullValue argument to specify the placeholder. It's always a good idea to explicitly define the missing data placeholder. The nullValue argument is case sensitive, so it's important to provide it in exactly the same form as it appears in the data file.

10. Specify column types

If inferring column type is not successful then you have the option of specifying the type of each column in an explicit schema. This also makes it possible to choose alternative column names.

11. Final cars data

This is what the final cars data look like. Note that the missing value at the top of the cylinders column is indicated by the special null constant.

12. Let's load some data!

You're ready to use what you've learned to load data from CSV files!

In [3]:
from pyspark import SparkContext
sc = SparkContext()
sc

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession(sc).builder\
.master("local[2]")\
.appName("MLSpark")\
.getOrCreate()
spark

23/10/01 22:33:49 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


### Loading flights data
In this exercise, you're going to load some airline flight data from a CSV file. To ensure that the exercise runs quickly these data have been trimmed down to only 50 000 records. You can get a larger dataset in the same format [here](https://assets.datacamp.com/production/repositories/3918/datasets/e1c1a03124fb2199743429e9b7927df18da3eacf/flights-larger.csv).

Notes on CSV format:

- fields are separated by a comma (this is the default separator) and
- missing data are denoted by the string 'NA'.

**Data dictionary**:

- `mon` — month (integer between 1 and 12)
- `dom` — day of month (integer between 1 and 31)
- `dow` — day of week (integer; 1 = Monday and 7 = Sunday)
- `carrier` — carrier (IATA code)
- `flight` — flight number
- `org` — origin airport (IATA code)
- `mile` — distance (miles)
- `depart` — departure time (decimal hour)
- `duration` — expected duration (minutes)
- `delay` — delay (minutes)

`pyspark` has been imported for you and the session has been initialized.

Note: The data have been aggressively down-sampled.

**Instructions**

- Read data from a CSV file called `'flights.csv'`. Assign data types to columns automatically. Deal with missing data.
- How many records are in the data?
- Take a look at the first five records.
- What data types have been assigned to the columns? Do these look correct?

In [5]:
# Read data from CSV file
flights = spark.read.csv('flights.csv',
                         sep=',',
                         header=True,
                         inferSchema=True,
                         nullValue='NA')

# Get number of records
print("The data contain %d records." % flights.count())

# View the first five records
flights.show(5)

# Check column data types
print(flights.dtypes)

                                                                                

The data contain 50000 records.
+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 11| 20|  6|     US|    19|JFK|2153|  9.48|     351| null|
|  0| 22|  2|     UA|  1107|ORD| 316| 16.33|      82|   30|
|  2| 20|  4|     UA|   226|SFO| 337|  6.17|      82|   -8|
|  9| 13|  1|     AA|   419|ORD|1236| 10.33|     195|   -5|
|  4|  2|  5|     AA|   325|ORD| 258|  8.92|      65| null|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 5 rows

[('mon', 'int'), ('dom', 'int'), ('dow', 'int'), ('carrier', 'string'), ('flight', 'int'), ('org', 'string'), ('mile', 'int'), ('depart', 'double'), ('duration', 'int'), ('delay', 'int')]


In [6]:
# Read data from CSV file
flights_full = spark.read.csv('flights-larger.csv',
                         sep=',',
                         header=True,
                         inferSchema=True,
                         nullValue='NA')

# Get number of records
print("The data contain %d records." % flights_full.count())

# View the first five records
flights_full.show(5)

# Check column data types
print(flights_full.dtypes)

                                                                                

The data contain 275000 records.
+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 10| 10|  1|     OO|  5836|ORD| 157|  8.18|      51|   27|
|  1|  4|  1|     OO|  5866|ORD| 466|  15.5|     102| null|
| 11| 22|  1|     OO|  6016|ORD| 738|  7.17|     127|  -19|
|  2| 14|  5|     B6|   199|JFK|2248| 21.17|     365|   60|
|  5| 25|  3|     WN|  1675|SJC| 386| 12.92|      85|   22|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 5 rows

[('mon', 'int'), ('dom', 'int'), ('dow', 'int'), ('carrier', 'string'), ('flight', 'int'), ('org', 'string'), ('mile', 'int'), ('depart', 'double'), ('duration', 'int'), ('delay', 'int')]


### Loading SMS spam data
You've seen that it's possible to infer data types directly from the data. Sometimes it's convenient to have direct control over the column types. You do this by defining an explicit schema.

The file `sms.csv` contains a selection of SMS messages which have been classified as either `'spam'` or `'ham'`. These data have been adapted from the [UCI Machine Learning Repository](https://archive.ics.uci.edu/ml/datasets/sms+spam+collection). There are a total of 5574 SMS, of which 747 have been labeled as spam.

**Notes on CSV format**:

- no header record and
- fields are separated by a semicolon (this is not the default separator).

**Data dictionary**:

- `id` — record identifier
- `text` — content of SMS message
- `label` — spam or ham (integer; 0 = ham and 1 = spam)

**Instructions**

- Specify the data schema, giving columns names (`"id"`, `"text"`, and `"label"`) and column types.
- Read data from a delimited file called `"sms.csv"`.
- Print the schema for the resulting DataFrame.

In [8]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Specify column names and types
schema = StructType([
    StructField("id", IntegerType()),
    StructField("text", StringType()),
    StructField("label", IntegerType())
])

# Load data from a delimited file
sms = spark.read.csv('sms.csv', sep=';', header=False, schema=schema)

# Print schema of DataFrame
sms.printSchema()

root
 |-- id: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)



# Classification

## Data Preparation
1. Data Preparation

In this lesson you are going to learn how to prepare data for building a Machine Learning model.

2. Do you need all of those columns?

You'll be working with the cars data again. This is what the data look like at present. There are columns for the maker and model, the origin (either USA or non-USA), the type, number of cylinders, engine size, weight, length, RPM and fuel consumption. The models that you'll be building will depend on the physical characteristics of the cars rather than the model names or manufacturers, so you'll remove the corresponding columns from the data.

3. Dropping columns

There are two approaches to doing this: either you can drop() the columns that you don't want or you can select() the fields which you do want to retain. Either way, the resulting data does not include those columns.

4. Filtering out missing data

Earlier you saw that there is a missing value in the cylinders column. Let's check to see how many other missing values there are. You'll use the filter() method and provide a logical predicate using SQL syntax which identifies NULL values. Then the count() method tells you how many records there are remaining. Just one. In this case it makes sense to simply remove the record with the missing value. There are a couple of ways that you could to do this. You could use the filter() method again with a different predicate. Or you could take a more aggressive approach and use the dropna() method to drop all records with missing values in any column. However, this should be done with care because it could result in the loss of a lot of otherwise useful data. You've now stripped down the data to what's needed to build a model.

5. Mutating columns

At present the weight and length columns are in units of pounds and inches respectively. You'll use the withColumn() method to create a new mass column in units of kilograms. The round() function is used to limit the precision of the result. You can also use the withColumn() method to replace the existing length column with values in meters. You now have mass and length in metric units.

6. Indexing categorical data

The type column consists of strings which represent six categories of vehicle type. You'll need to transform those strings into numbers. You do this using an instance of the StringIndexer class. In the constructor you provide the name of the string input column and a name for the new output column to be created. The indexer is first fit to the data, creating a StringIndexerModel. During the fitting process the distinct string values are identified and an index is assigned to each value. The model is then used to transform the data, creating a new column with the index values. By default the index values are assigned according to the descending relative frequency of each of the string values. Midsize is most common, so it gets an index of zero. Small is next most common, so its index is one. And so on. It's possible to choose different strategies for assigning index values by specifying the stringOrderType argument. Rather than using frequency of occurrence, strings can be ordered alphabetically. It's also possible to choose between ascending and descending order.

7. Indexing country of origin

You'll be building a classifier to predict whether or not a car was manufactured in the USA. So the origin column also needs to be converted from strings into numbers.

8. Assembling columns

The final step in preparing the cars data is to consolidate the various input columns into a single column. This is necessary because the Machine Learning algorithms in Spark operate on a single vector of predictors, although each element in that vector may consist of multiple values. To illustrate the process you'll start with just a pair of features, cylinders and size. First you create an instance of the VectorAssembler class, providing it with the names of the columns that you want to consolidate and the name of the new output column. The assembler is then used to transform the data. Taking a look at the relevant columns you see that the new "features" column consists of values from the cylinders and size columns consolidated into a vector. Ultimately you are going to assemble all of the predictors into a single column.

9. Let's practice!

Let's try out what we have learned on the SMS and flights data.