# DAT202 - Introduction to Apache Spark

![SPARK](../../images/spark-logo2.001.jpeg)

# What is Spark and what is it good for?

[Apache Spark](https://spark.apache.org) maintainers define it as a "multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters."

While it is true that you can use R, Python, Scala or Java to write data engineering, data science and machine learning applications on Spark, we will use a different definition in this workshop. **Apache Spark is a user-friendly platform for Parallel Computing and Large-Scale Data Analytics**.

What we mean by that is that Spark's APIs in all aforementioned languages should allow you to write massively parallelized applications that run on large clusters without much hassle. And by "without much hassle" we mean that your code should not look or feel too different from code you would design to run on your own workstation or laptop.

There is a bit of a learning curve to get familiar with how Spark works, so you can make the most out of its large-scale capabilties. But the actual code-writting part will hopefully feel very familiar to you!

Bold claim? You will be the judge of that by the end of this workshop! 

The examples we will go through are all written in Python, and so the APIs we will cover are the ones available in Pyspark: Spark Core (RDD), SparkSQL and Pandas on Spark. That said, we will also spend some time on the basics of Spark as a platform, which apply no matter what language you decide to use in your work later. Once you get the gist of it, Spark becomes a handy way to scale up computations of almost any kind, regardless of which language you choose.

Without further ado, let's dive deeper into the world of Apache Spark.

# Table of Contents

- [1 - Spark Under The Hood](#Spark-Under-The-Hood)

  - [1.1 - High Level Architecture](#High-Level-Architecture)
  
  - [1.2 - Resilient Distributed Datasets](#Directed-Acyclic-Graphs)

  - [1.3 - Directed Acyclic Graphs](#Directed-Acyclic-Graphs)

- [2 - Spark Core API (RDD API)](#Spark-Core-API-(RDD-API))

- [3 - SparkSQL API](#SparkSQL-API)

- [4 - Pandas On Spark API](#Pandas-on-Spark-API)

- [5 - Spark Streaming API (Optional)](#Spark-Streaming-API-(Optional))

- [6 - Running Spark on The Alliance's Clusters](#Running-Spark-on-The-Alliance's-Clusters)

# Spark Under The Hood

While it is certainly not the only one, we have mentioned user-friendliness as the main motivation for choosing Spark to perform large-scale data analytics. Again, by user-friendliness we mean that, by using Spark, you can pretty much take the same kind of code you would write to process a moderate amount of data on your local computer, and scale it up to process truly massive amounts of data on a cluster. You don't have to worry about moving data around the network, about how processes scattered accross different nodes communicate with one another, or about managing multiple threads inside different processes. That's right - Spark's promise is to take care of all that stuff so you don't have to! 

In other words, Spark makes most of the complexities of distributed/parallel computing **transparent** to the user. 

How Spark pulls that off is a pretty deep rabbit hole to dive into, but we will spare you the nitty-gritty details. In this section we will give you a very high-level view of how it's done and that should be more than enough to get you up and running!

## High-Level Architecture

There are a few names you need to get familar with in order to understand how Spark works at a high level:

![spark-acrh](../../images/spark-arch.png)

- The **Driver Process**: Spark itself is written in Scala and Java. The Driver Process is a Scala program running inside of a Java Virtual Machine (JVM) and it fulfills a central role in Spark's user-friendly development experience. It reads your code, written in one of Spark's compatible languages, executes any parts that do not directly call any of Spark's APIs, then takes all lines that do call Spark's APIs and translates that into a special representation that we will cover later in the workshop: a Directed Acyclic Graph (DAG). Without going into too much detail for now, creating this DAG representation of the operations in your code entails two things:
  1. Spark will break your overall workload into pieces called **tasks**. 
  2. Spark will take these tasks and try to arrange them in a good way to carry them out. By "good" here we mean things like: Which of these tasks can be carried out in parallel? What groups of operations depend on the results of previous groups of operations? And many other lower-level things we will not delve into here... 

<br>

- The **Cluster Manager**: Responsible for scheduling and allocating tasks to another type of Spark process that we will cover in the next bullet: the Executors. The cluster manager will receive a DAG from the Driver process and will allocate enough resources across the cluster's nodes, then schedule tasks as it traverses the DAG, so they can get carried out in the worker nodes. The following options of Cluster Manager are available as of Spark release 3.3.0:
  1. Spark Standalone Cluster
  2. Apache Mesos
  3. Hadoop YARN
  4. Kubernetes

<br>

- The **Executor Processes**: Like the Driver, Executors are Scala programs running inside their own JVMs. Their role is, as their name suggests, to execute tasks assigned to them by the Cluster Manager. These processes are launched on worker nodes when a Spark job is submitted to the cluster. We will see how reason about this later, but it is up to you to decide how many executors you want to launch, and how much resources will be allocated to each one. One worker node can house multiple Executors, and each Executor gets allocated its own portion of RAM as well as a number of CPU cores. When an executor has multiple CPU cores, it can execute multiple tasks in parallel. If a given task turns out to be multi-threaded, it can execute that task using thread-parallelism. The data on which tasks operate is loaded in the Executors' memory. We will talk more about this later, but each Executor will store only the portions of data it needs to accomplish its tasks, plus a cache area that facilitates moving data around the cluster to where it is needed.

## Resilient Distributed Datasets

A Resilient Distributed Dataset (RDD) is a special data structure that lies at the heart of Spark's distributed computing capabilities. Formally, an RDD is what is called a "distributed memory abstraction". In plain English, that means an RDD is an entity that exposes to you, the user, data that lives in a cluster, scattered across the RAM of multiple individual nodes, as well as the means to perform operations using that data. Concretely, an RDD makes it so you don't have to care about where the data is physically during a computation. You can treat any collection of data as a unified entity in your code. More concretely, an RDD will look, to you the user, like a kind of array of elements, where each element contains a subset of the whole collection of data.

![RDD_unified](../../images/RDD_unified.png)


Under the hood however, the Executor processes will be the ones performing the computations required by your code. Each executor will only ever operate over a number of subsets of the total collection of data at a time. These subsets are called **Partitions** and you, the user, see them in your code as the elements of your RDD! 


![RDD_split](../../images/RDD_split.png)

We will talk more in detail about RDDs and their properties later on during the hands-on part of the workshop, but the important point you need to keep in mind for now is that your code will operate over separate chunks of your overall dataset in parallel. That is to say, Spark implements a form of Data Parallelism through the concept of an RDD. In other words, Spark applies a "divide and conquer" strategy to parallelize work and hopefully crunch through very large datasets fast!

Now let's look at what Spark is actually doing when Executors perform work on subsets of your dataset in parallel.

## Directed Acyclic Graphs

In the previous section we discussed how RDDs provide a way for the user to work with data scattered across nodes of a cluster transparently, without caring where data points are physically located. Executors then operate somewhat indepndently over separate chunks of your overall dataset, called Partitions. We had also previously mentioned that the role of teh Driver Process is to translate your code into a representation that allows Spark divide up tasks among multiple executors. That representation was called a Directed Acyclic Graph (DAG) and we will discuss how that ties together with RDDs to enable Spark to fulfill its promise of providing a user-friendly distributed computing platform.

The first API we will show you how to use in this workshop is the Spark Core API. Also known as the RDD API, it is at the base of all other Spark APIs. Spark Core is essentially made up of a set of fundamental operations geared towards parallelizing or *vectorizing* arbitrary work that you want done over a given dataset. These fundamental operations can be stacked, chained or otherwise *composed* into very complex sequences of steps of computations to be done over a dataset. Unlike "regular programming" though, all these fundamental operations are *functions*, which can only take other *functions* as their arguments. The RDD API is a *functional programming* framework. 

We will dive deeper into the RDD API soon, but to illustrate these ideas, let's have a look at what a "Word Count" program looks like in Spark:

```Python
    import pyspark
    
    sc = pyspark.SparkContext()
    
    input_file = sc.textFile("/path/to/input/file.txt")
    
    word_counts = input_file.flatMap(lambda line : line.split())\
                .map(lambda word : (word,1))\
                .reduceByKey(lambda a, b : a + b) 
    
```

You could very naturally describe what the code is doing like this:

*Read in the text file -> Go through all lines of text and break them down into a list of all words in the file -> take each word on the list and put it in a tuple along with the number 1 -> group together the tuples that contain the same word and sum all the number 1s*

Here is a cleaner way of writing down the exact same description:

![Word Count DAG](../../images/WC_DAG.png)

When we visualize the sequence of steps in this way, a few things that were hidden come to the forefront. Notice how the computation is broken down into two **stages**. That simply means the operations on the second stage can only start being carried out once the last operation of the first stage is finished being computed. In other words, the second stage **depends** on the results of the first being completely computed for it to be able to start. 

But what about the operations inside the first stage? Remember Spark will apply these operations to separate partitions of your dataset in parallel. That means that as soon as one of these operations is done being computed over a partition of the data, the next operation can start on that same partition regradless of what is happening with the other partitions! You don't necessarily need to wait for <code>FlatMap</code> to be applied to the entire dataset before you start computing <code>map</code>! So even though you see the operations in sequene on that diagram, in practice different partitions, in practice Spark may be assigning different computation tasks over different partitions of a dataset within different Executors!

This diagram that allows Spark to "decide" which operations can be carried out in parallel and which ones must wait for the results of preceding operations is called a Directed Acyclic Graph, and it is exactly the representation that the Driver Process will create based on your code!


# Spark Core API (RDD API)

In this Section you will dive deeper into the Spark Core API. You will not only learn by example how to write code using this API, but you will also learn about important properties of RDDs and Spark itself.

Let's start by importing <code>pyspark</code> - a package that enables you to use Spark APIs in Python!

In [None]:
import pyspark

The following line, calling the <code>SparkContext()</code> method will initialize a Spark session and return an object that encapsulates everything you need to "talk" to a Spark cluster. The convention is to name that object <code>sc</code>, and that is what you will find on examples on the Spark documentation and around the web.

In [None]:
sc = pyspark.SparkContext()

Now that we have our Spark session initialized on the <code>sc</code> object, we are ready to create RDDs. There are two ways to create an RDD and have Spark partition and distribute data across the cluster. Let's look at the first one - the <code>parallelize</code> method:

In [None]:
# Let's create an RDD containing a small list with integers for elements:

some_numbers = [1,2,3,4,5,6,7,8,9,10]

my_first_rdd = sc.parallelize(some_numbers)

In [None]:
my_first_rdd

What just happened here?

Spark took our list of integers and broke it down into several chunks, called **Partitions**. Each of these partitions can be operated on independently from each other by Executors, enabling Spark to "divide and conquer" and perform computations on your data in parallel!

In [None]:
# Let's see how many partitions Spark broke our list of numbers into
my_first_rdd.

In [None]:
# Let's see what's in these partitions:

my_first_rdd.

The number of Partitions is one of the important parameters of a Spark program that you need to be cognizant of. Split your data into too few partitions and Spark will not be able to do as much work in parallel as your Cluster hardware enables it to do; split it into too many and you may end up with empty partitions or not fully taking advantage of parallelism again, by forcing Executors to perform lots of very small tasks sequentially.

For now, let's set the number of partitions to 10:

In [None]:
my_first_rdd_repartitioned = my_first_rdd.
my_first_rdd_repartitioned.getNumPartitions()

The RDD API has two main types of methods: **Transformations** and **Actions**. In a nutshell, Transformations are operations carried out on RDDs that return other RDDs. Actions are operations carried out on RDDs that do not return other RDDs. On the line above, <code>repartition</code> is a Transformation and <code>getNumPartitions</code> is an Action. Let's look at a few more examples to see what that means in practice:

In [None]:
# Our first meaningful transformation to our RDD: add 1 to each element

my_first_rdd_repartitioned.

The <code>map</code> method applies a function to each element of each partition of an RDD. The output above tells us that this returned another RDD. Can we get its contents back from the cluster?

In [None]:
# The collect() method brings the contents of an RDD from the cluster back to the driver

my_first_rdd_repartitioned.

The numbers may be shuffled, but this is still our list of integers from 1 to 10... we had applied a transformation to our RDD, which created another RDD, but we had no way to refer to this new RDD!

In [None]:
# RDDs are immutable! Our transformation actually created another RDD we had no way to refer to on the Driver!

my_second_rdd = my_first_rdd.

my_second_rdd.

By creating new RDDs with each Transformation, Spark actually provides a type of fault-tolerance! It records these transformations in a DAG, so if ever an entire node or an Executor inside a node fails, Spark can immediately recompute your RDDs and your work isn't lost. 

In [None]:
# Spark preserves RDD lineage to automatically recompute them if they are lost!

my_second_rdd.

Now wait a minute... if Spark creates RDDs at every Transformation and Spark keeps things in memory... won't you quickly run out of memory by applying Transformations to RDDs?

The answer is: no! Spark performs "Lazy-Evaluation". This means all Spark does is record your transformations in a DAG without actually computing anything or using up any extra memory until an **Action** is called on an RDD!

Let's get a feeling of this concept by applying a long chain of Transformations to an RDD and timing it...

In [None]:
# Spark performs Lazy-Evaluation: No transformation actually gets computed until an "action" is called on an RDD

%time my_third_rdd = 

... it ran almost instantly! Now let's call an Action on this RDD and time it:

In [None]:
# The "reduce" method is an "action". For a complete list of actions see: https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions 

%time my_third_rdd.

And here is some good news for those of you who can't get used to the "lambda function" syntax. This also works just fine:

In [None]:


%time my_third_rdd.

RDDs are a pretty powerful concept and if you take anything home from this workshop let it be this: RDDs are a simple way of performing **Data Parellelism**. 

In other words, you can write your code almost the exact same way you would in a serial program (i.e., not parallel) and the "parallel" part simply means your code will run against different chunks of your data at the same time. 

All you need to do most of the time is wrap your usual code with one or more RDD API methods and be aware of the nature of the elements in your Partitions so you pick the right method. Once you've done that, Spark takes care of performing Data Parallelism for you!

Here is a slightly more difficult example - let's use Spark to multiply each element of a numpy array by a random number!

What makes this more difficult? Now we are doing Data Parallelism not on a native Python object like before (a list), but on an object defined by a non-native library: numpy.

We start by creating this object: a 1-d array of 100 elements.

In [None]:
import numpy as np

an_object = np.linspace(0,1,100)

In [None]:
an_object

In [None]:
my_new_rdd = sc.

Now you might be tempted to do like we did before and just do what you would do on your own workstation without Spark:

In [None]:
my_new_rdd.

This should have failed if you are running on a Cluster (as opposed to running Spark on a single computer). Why? Well, you imported the <code>numpy</code> library on the Driver, but you are asking the Executors to use it... you need to tell the Executors to import numpy too!

In [None]:
def multiply_by_random(x):
   

In [None]:
my_new_rdd.

Alright! This seems to have worked... but is it the best way to go about doing this? Remember, the <code>map</code> method applies whatever function you pass to it to every single element of each partition!

Does that mean we are importing <code>numpy</code> 100 times in this example? Yes it does.

This is a good segue into another very useful Transformation in the RDD API:

In [None]:
def partition_multiply_by_random(x):
   

In [None]:
my_new_rdd.

The <code>mapPartition</code> method applies whatever function you pass to it to each **Partition**, but with one caveat: whatever your function does, it must iterate through the elements of the input Partition. So in practice, this method also applies your function to the elements of a Partition, but it allows you more flexibility to do things like importing libraries only once per partition... or anything else that you don't need done repeatedly for each element of a partition.

***An important thing to note here:*** if your code imports libraries, you need to make sure they are installed on every node of your cluster! Generally that means asking your system admnistrator to do it for you...

We will talk more about options for handling your code's dependencies later on!

## Hands-on Guided Example - NASA's Website Log Analysis

So far we've used toy examples to introduce the RDD API along with a few of its Transformations and Actions. Now let's look at a more real-life example: let's wrangle a fairly big "semi-structured" file and turn it into something a Data Scientist would be ready to work with. In fact, let's ask a few Data Science-y questions of this data and use Spark itself to answer them while we are at it!

This example file is a standard Apache webserver log. It's the logs from a month's worth of requests to NASA's website, in the distant year of 1995, combined into one fairly big file to be more specific.

This log contains the following information:

1. The IP Address or the DNS name performing a request
2. A time stamp of the form: "dd/Mon/YYYY:hh:mm:ss Timezone"
3. The request type (HTTP verb), the resource being requested and the Protocol used
4. The code returned by the server (200 OK, 400 Not Found etc...)
5. The Size of the resource being requested

We will use the <code>textFile</code> method to read in this file. This, like the <code>parallelize</code> method, turns the data inside this file into an RDD. There are two **important things** you need to know about this method:

1. In a real-life Spark Cluster, the location of the file (the argument you will pass to <code>textFile</code>) must be visible/accessible to all nodes of the Cluster. In practice, a lot of the time this location will be a path on a Hadoop Distributed File System (HDFS), but this can be any Network File System, or a location mounted on all nodes, or Amazon S3... as long as it's visible accessible on all nodes!

2. This method turns **each line** of the input file into an element in a Partition. So ***no matter what the format of the file is*** - when it gets turned into an RDD, **each line** (as delimited by a newline a.k.a. "\n") becomes an element.

Without further ado... let's dive into it!

In [None]:
nasa_logs = sc.textFile('../../data/NASA_access_log_Jul95.gz')

The first step in any data problem is to look at the data to get a sense of what we are dealing with. The RDD API has the <code>take</code> Action, that brings a number of elements (remember, an element here is a line of the original file) back to the Driver so we can see them. The important thing here is to be careful not to bring too many elements back to Driver and blow up its memory capacity!

In [None]:
nasa_logs.take(5)

Another good practice is to find out how many elements we have to get a sense of what we are dealing with. The RDD API has the <code>count</code> method for that:

In [None]:
nasa_logs.count()

Now that we can see what the data looks like, a reasonable first step seems to be to split the data on the " " (space) character:

In [None]:
nasa_logs.

Next, for the sake of this example, let's say we are not interested in lines where there is data missing. In other words, we are only interested in lines that have all 10 elements. We will use the <code>filter</code> method to filter any lines that don't have all 10 elements out of our RDD:

In [None]:
nasa_logs.

Web server logs like this are called 'semi-structured' for a reason: we can be pretty sure that every line will be formatted the same way. This means every element in each of our Partitions looks pretty much the same after our first step. We can be confident that the same unwanted characters ended up inside the elements of all partitions of our RDD. So our next step takes care of removing them:

In [None]:
nasa_logs_structured = 

You might be asking yourself whether using the <code>take</code> method all the time to check if we are doing things right is the best practice... and the answer is no. Everytime you call it, you are computing a new RDD and thus having the Spark Cluster do work for you. In real-life you will rarely have a Cluster all for yourself, so you should expect your computations to get queued and competing for resources with other users. in this scenario, minimizing the amount of times you move things back and forth between the Driver and the Executors is a good idea.

So in practice, one approach would be to use the RDD API method <code>sample</code> to extract a sample of your data to examine in the driver and figure out what you need to do before farming out computations to the cluster. The <code>take</code> method also works here, but getting a random sample instead of the first N elements of your RDD is almost always a better plan.

In [None]:
# Make sure you know how much data 0.01% of your dataset is! It might look like a small fraction, but in the Big Data world even that might be too much for your local computer!

local_sample = nasa_logs.sample(withReplacement=False,fraction=0.0001).collect()

print(local_sample)

Ok, so now our RDD has the following elements: IP/NAME_OF_ORIGIN, DATE/TIME, TIMEZONE, REQUEST_METHOD, RESOURCE_REQUESTED, PROTOCOL, STATUS_CODE, SIZE_OF_RESOURCE

That looks pretty much like a CSV (or a Dataframe) a Data Scientist could work with!

We can now go ahead and save this data somewhere your Data Science team can go get it. For now, we will save this as a CSV file - we will talk about writing directly to a Relational DB or Data Warehouse on Day 2.

Unfortunately, the RDD API does not have a method to write CSVs directly: we will have to add the commas and make it look like a CSV before saving it: 

In [None]:
def CSVfy(rdd_element):
  

nasa_logs_structured.map(CSVfy).take(5)

In [None]:
csv_to_be_saved = nasa_logs_structured.map(CSVfy)

csv_to_be_saved.saveAsTextFile('nasa_logs.csv')

The <code>saveAsTextFile</code> method has the same caveats as its cousin <code>textFile</code>: the path where you save your data must be visible and accessible on all nodes of the cluster. As before, typically this will be a location on a Hadoop DFS. 

If you don't want to save this on whatever Distributed File System your Spark Cluster was configured to store things, you can always use the <code>collect</code> method of your RDD to bring your data over to the Driver, and then just save it to your local file system using your favourite library/function. Then again, the point of having a Spark Cluster is to deal with huge amounts of data that don't necessarily fit in your regular workstation...

You may also be thinking right now "how come Spark doesn't have something like a 'to_csv' method to write CSVs directly?", while pointing out that what we did above would certainly fail if there happened to be any commas **inside the elements** of our RDD. 

You would be right. 

It turns out Spark **does** have an easier method to create CSVs, one that handles escaping charcaters, quotes, commas and every other annoying thing we have to deal with when working with CSVs. This is part of the SparkSQL API though and we will talk about it on Day 2! 

But enough about CSVs! Let's take advantage of our now-structured dataset and see if we can do a bit of Data Science using the RDD API directly! Let's find out where most requests to the NASA webserver came from on our dataset.

To do this, let's go full Hadoop and do a little bit of Map-Reduce: 

In [None]:
# Take each line of our structured log and return a Key-Value Pair

nasa_logs_structured.

In [None]:
# Unlike "reduce", "reduceByKey" is not an Action!

nasa_logs_structured.

## Exercise 1 - When Did NASA's Server Serve The Most Data?

Now you try! Take our structured log file RDD <code>nasa_logs_structured</code> and find out on which timestamp NASA's webserver registered the highest amount of data served. If you are looking for a challenge, try figuring out on which **day** there was the highest amount of data served!

HINT: Some requests don't return any data, so there is no amount on the logs, i.e., the amount is "-".

HINT2: All elements on our structured version of the log are Strings... 

In [None]:
nasa_logs_structured.persist()

In [None]:
nasa_logs_structured.is_cached

In [None]:
nasa_logs_structured.

## Exercise 2 - What is the Resource With the Most Unique Request Origins?

Can you find out what NASA resource had the most unique visitors/requestors in our dataset?

HINT: The <code>distinct</code> method does exactly what its name suggests


In [None]:
nasa_logs_structured.

## Exercise 3 - Word count

If we take the element containing NASA's website resource names and we replace the "/"s and "."s by " "s, we sort of get words. I wonder how many words we get and I wonder what are the most frequent words... write a word count program to find the most frequent words and how many unique words there are.

HINT: The DAG for the word count program is on the slide deck!
HINT2: Use the <code>count</code> method for the unique words part.

In [None]:
words = nasa_logs_structured.

In [None]:
words.

# SparkSQL API

Admittedly, The RDD API and its Functional Programming flavour are not for everyone. Most people dealing with heavy-duty data analytics problems are used to far more structured data types and an imperative programming style. Whether they're R users, Python users or Relational Database ninjas, data people love data that is in a *tabular* format - a Table in database or a DataFrame in R or Pandas. The SparkSQL API is taylor-made to cater to these needs while taking advantage of Spark's distributed/parallel computing capabilities under the hood!

Like before, let's start by importing <code>pyspark</code> and instantiating a <code>SparkContext</code>:

In [None]:
import pyspark

sc = pyspark.SparkContext()

Now we can initialize a SQL Context passing the <code>sc</code> object to the <code>SQLContext</code> function. The convention when initializing a <code>SQLContext</code> is to pass it to a variable called <code>spark</code>, though you will also see many examples where this variable is called <code>sql</code> or <code>SQLContext</code>.


In [None]:
from pyspark.sql import SQLContext

spark = SQLContext(sc)

The Spark SQL API, unlike the RDD API, has a method to read CSV files directly. Let's use it to load our data. Those of you who took our DAT201 class will be familiar with this dataset!

In [None]:
surveys_df = spark.read.options(header='true').csv('../../data/surveys.csv')

An alterntive syntax for the command above is spark.read.format("csv").load('../../data/surveys.csv'). Reading this you might be asking yourself if the SparkSQL API has methods to read other formats directly... and the answer is yes, it does. We will mostly focus on CSV today, but you can find a complete list of supported data sources here: https://spark.apache.org/docs/latest/sql-data-sources.html

We have now loaded our CSV into a data structure called a <code>Spark DataFrame</code>. 

The <code>head</code> method shows you the first row of a DataFrame. Notice the word "Row" on the output: a DataFrame is an RDD where the elements are objects of the <code>Row</code> class!

In [None]:
surveys_df.head()

Another way of peeking into a DataFrame is the <code>show</code> method. This prints your DataFrame in a way analogous to how you'd get an output from a Relational Database Management Software (RDBMS) on the command line.

In [None]:
surveys_df.show()

A third option is using the method toPandas. As the name says, this will bring your DataFrame back to the Driver and convert it into a Pandas DataFrame. Notice the limit method called just before toPandas: since this is bringing data back from the Cluster to the Driver, you need to make sure you are not bringing too much back!

In [None]:
import pandas as pd

surveys_df.limit(5).toPandas()

Now let's look into how to maipulate data with the SparkSQL API. We'll start with the DataFrames API component. It looks just like SQL, but instead of writing statements, you will call methods just like we did with the RDD API. For example, you can select a single column out of a DataFrame where a condition based on another column is satisfied, then count the number of rows returned:

In [None]:
surveys_df.select("record_id").where(surveys_df.weight > 40).count()

Also like the RDD API, you can use the <code>take</code> method to bring a few rows back to the Driver and print the results:

In [None]:
surveys_df.select("record_id","year").where(surveys_df.weight > 40).take(5)

Similar to how you would do it using Pandas, you can retrieve the names of a DataFrame's columns stored in the <code>columns</code> attribute:

In [None]:
surveys_df.columns

And you can drop columns you don't want to keep with the <code>drop</code> method.

In [None]:
surveys_df.drop("plot_id").take(5)

Now let's focus on querying our DataFrame. You can use the <code>distinct</code> method to return unique instances out of one or more columns:

In [None]:
surveys_df.select("species_id").distinct().count()

In [None]:
surveys_df.select("species_id").distinct().show()

The <code>groupBy</code> method works like the "GROUP BY" SQL clause: use it to apply aggregations to your data based on one or more columns:

In [None]:
surveys_df.groupBy("species_id").count().take(5)

The <code>OrderBy</code> method works like the "ORDER BY" SQL statement: notice that its position in the chain of methods we call is analogous to where you'd place an ORDER BY statement in a SQL query!

In [None]:
surveys_df.groupBy("year","species_id").count().orderBy("year").take(5)

You can use the <code>where</code> method to return rows where a given condition is true:

In [None]:
surveys_df.where(surveys_df.species_id=="RF").take(5)

Or use the same method to return rows where a condition is NOT true. You can do this by using the <code>~</code> operator to negate a conditional expression:

In [None]:
surveys_df.where(~(surveys_df.species_id=="RF")).take(5)

Finally, you can perform pattern matching on strings using the <code>like</code> method. Here we want to return rows where the species id starts with a D. Notice how the percent sign <code>%</code> is the wildcard operator (meaning, rertun a D followed by "whatever") here:

In [None]:
surveys_df.where(surveys_df.species_id.like("D%")).take(5)

Now, the name of the API is SprkSQL and we compared a lot of the methods above to the actual SQL language. So let's get to it, here is how you can write actual SQL queries to work with Spark DataFrames just like if they were tables on a relational database.

The first thing you need to do is to "register" your DataFrame as a table using the method <code>registerTempTable</code>:

In [None]:
surveys_df.registerTempTable("surveys")

Now you can use the <code>spark</code> object we had created at the begining of this notebook to run SQL queries against this table you just registered, using the <code>sql</code> method

In [None]:
spark.sql("SELECT record_id FROM surveys WHERE weight > 40").take(5)

In [None]:
spark.sql("SELECT * FROM surveys WHERE species_id NOT IN ('RF')").take(5)

In [None]:
spark.sql("SELECT year, species_id, COUNT(record_id) FROM surveys GROUP BY year, \
species_id ORDER BY year").take(5)

In [None]:
spark.sql("SELECT * FROM surveys WHERE species_id LIKE 'D%'").take(5)

You can register multiple tables at a time and perform multi-table operations just like you would on a real relational database. For example, here we join our surveys table with a new table containing the full name of each species, based on the species id:

In [None]:
species_df = spark.read.options(header='true').csv('../../data/species.csv')

species_df.registerTempTable("species")

spark.sql("SELECT * FROM species").take(5)

In [None]:
spark.sql("SELECT * FROM surveys JOIN species ON surveys.species_id=species.species_id").\
show()

At any time you can check what tables are currently registered and present in your Spark data warehouse:

In [None]:
spark.tableNames()

To remove tables from the warehouse, use <code>dropTempTable</code>:

In [None]:
spark.dropTempTable("species")

If you are familiar with SQL, you must be feeling pretty at home right now with the SparkSQL API. However, you might be asking yourself... how you handle column types? In other words, how can I work in a Database where the tables have no schema?

It turns out Tables and DataFrames in Spark do have schemas! You can check the <code>schema</code> attriubte:

In [None]:
surveys_df.schema

In our case, wee did not specify what the schema of the DataFrame should be when we loaded the source CSV file, so Spark went ahead and read everything as Strings.

As it turns out, you can actually impose schemas on Spark DataFrames before you start populating them with data, in a way analogous to how you write a DDL statement to define a Table's schema in SQL.

Below you will see an example of what the schema of our survey data could look like. The following entities are worth of note: <code>StructType</code>, <code>StructField</code> and the different data Types. In SparkSQL, the <code>StructType</code> class defines what a Row will contain, and the <code>StructField</code> class defines what the columns in that row will look like.

The order of the StructFields in a StructType must match what is in the dataset you want to import into your DataFrame.

In [None]:
from pyspark.sql.types import *

survey_schema = StructType([StructField('record_id',IntegerType()), 
                     StructField('month',     ByteType()), 
                     StructField('day',       ByteType()), 
                     StructField('year',      ShortType()),
                     StructField('plot_id',   IntegerType()),
                     StructField('species_id',StringType()),
                     StructField('sex',       StringType()),
                     StructField('hindfoot_length',   FloatType()),
                     StructField('weight',    FloatType())
           ])

Now you can enforce this schema when you load the survey dataset from a csv file by adding the parameter <code>schema</code> to the <code>read.csv</code> method: 

In [None]:
survey_df = spark.read.options(header='true').\
csv('../../data/surveys.csv', schema=survey_schema)

In general, enforcing schemas in SparkSQL is a good idea when you know you can trust the source to be **mostly** clean, i.e., that the number of columns and their types will mostly match the schema you intend to enforce. Having a well-defined schema in SparkSQL will not only make your computations and resource usage more efficient, but it will also enable you to use a number of typed built-in functions directly, without having to convert between types or wrangling data just so you can use a certain function. A notable example of when this comes in handy is whenever you are dealing with Dates or Timestamps.

See here for more information on data types supported by Spark: https://spark.apache.org/docs/latest/sql-ref-datatypes.html

Last, but not least, you can supplement the SparkSQL API with the RDD API. Remember that DataFrames are just RDDs where each element is an object of the special class <code>Row</code>? This means that, whenever doing something using SQL is too complicated, but it would be easy to do the same thing with Python, you can use the RDD API to bypass SQL and do what you need to do in Python!

You can extract the RDD behind any DataFrame by accessing the <code>rdd</code> attribute:

In [None]:
survey_df_rdd = survey_df.rdd

A DataFrame, however, is an RDD where the elements are objects of the class Row as we've seen before:

In [None]:
survey_df_rdd.take(1)

This isn't very useful outside of the SparkSQL API, so we convert all elements to good-old Python <code>lists</code> next: 

In [None]:
survey_df_rdd.map(list).take(1)

Now let's create a new column called <code>ratio</code> that contains hindfoot width divided by weight:

In [None]:
survey_df_rdd = survey_df_rdd.map(list)\
.map(lambda row : row + [None if not row[7] or not row[8] else row[7]/row[8]])

Let's add a new column name to our list of columns before we convert this RDD back to a DataFrame:

In [None]:
new_columns = surveys_df.columns + ["ratio"]

Now we can convert this RDD back into a DataFrame that has one extra column using the method <code>toDF</code>:

In [None]:
surveys_df = survey_df_rdd.toDF(new_columns,sampleRatio=0.01)
surveys_df.limit(5).toPandas()

# Pandas on Spark API

The SparkSQL API is a powerful tool for operating on very large DataFrames. It is especially great in terms of expressivity if you are already familiar with the SQL language and Relational Database environments. SparkSQL also tries to mimic some functionality and design choices from the Pandas package, and while it succeeds in many aspects, Pandas users are left without some of the packages' most useful features when they move into Spark.

To cater to this particular user base, Spark maintainers have introduced a new API in Spark v3: **Pandas on Spark**. As the name suggests, the idea behind this API is to reproduce the user experience from the Pandas package with as many of its methods and operators as possible, but on very large scale distributed DataFrames.

To get started, first let's import the module <code>pyspark.pandas</code> and re-use our Spark DataFrame from the previous section by turning it into Pandas on Spark DataFrame:

In [None]:
import pyspark.pandas as ps

survey_df_pandas = surveys_df.pandas_api()
survey_df_pandas.head()

Another handy way of using Pandas on Spark is by converting an actual **Pandas DataFrame** into a **Pandas on Spark DataFrame**. In this scenario, you would have a regular Pandas DataFrame, created without any calls to Spark that you wish to perform work on in a parallelized or even distributed fashion.

In [None]:
import pandas as pd

survey_df_local = pd.read_csv("../../data/surveys.csv")

survey_df_distributed = ps.from_pandas(survey_df_local)
survey_df_distributed.head()

In both cases, the goal is the same - we want to have a parallelized / distributed DataFrame that looks and behaves just like a regular Pandas DataFrame.


**IMPORTANT NOTICE:** parallelizing a DataFrame does not necessarily mean any arbitrary operation will run faster. *In general*, you can expect Pandas on Spark to outperform Pandas as the size of a DataFrame grows, even if you are running pyspark on a single node. That being said, you should always reason about scalability before choosing to parallelize work over multiple cores, or multiple nodes. See this article for more about scalability: https://docs.alliancecan.ca/wiki/Scalability 

**IMPORTANT NOTICE 2:** Pandas on Spark is not a 100% perfect clone of Pandas - some Pandas functionalities have not yet been implemented, some probably never will be, and Pandas on Spark has a few features that do not exist on Pandas. See the complete API reference for more details: https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/index.html


Next, we will go through a few examples of how to use a Pandas on Spark DataFrame. Acessing columns and rows, as well as slicing a DataFrame works just like in Pandas.

In [None]:
# Access columns by name with two different syntaxes:
survey_df_pandas['weight'].head()
#survey_df_pandas.weight.head()

In [None]:
# Use the .iloc() method to access a row by index
survey_df_pandas.iloc[10]

In [None]:
# Use conditionals to find subsets of a DataFrame that match a condition
survey_df_pandas[survey_df_pandas.weight > 40].head()

Pandas on Spark also includes Pandas' statistical functions.

In [None]:
# summary statistics
survey_df_pandas.describe()

In [None]:
# location and dispersion measures
survey_df_pandas.weight.mean()
survey_df_pandas.weight.median()
survey_df_pandas.weight.quantile()
survey_df_pandas.weight.std()
survey_df_pandas.weight.var()

Like in Pandas, you can use the <code>apply</code> method to invoke an arbitrary function on all rows of a given column of a DataFrame. Using <code>apply</code> is almost always a better idea than iterating over rows manually in Pandas, and this is especially true in Pandas on Spark.

In [None]:
# Scale all individual weights by 0.1 
import math
survey_df_pandas['weight'] = survey_df_pandas.weight.apply(lambda x : int(x) / 10 if not math.isnan(x) else 0)

Last, but not least, Pandas on Spark introduces the <code>plot</code> class along with its subclasses that allow you to easily create different types of plots. Before Pandas on Spark, it would have been necessary to bring data over from the cluster to the Driver in order to visualize it. Of course, this would have been impractical with very large datasets that do not fit in the Driver's memory. With this new API, plot objects are generated directly on the cluster and only then returned to the Driver for you to see!

In [None]:
# Density plot of the distribution of weights in the dataset
survey_df_pandas.weight.plot.kde(backend="matplotlib", bw_method=0.3)

# Spark Streaming API (Optional)

Spark Streaming is Spark's API for processing constant streams of data in near real-time. 

To demonstrate the Spark Streaming API in action, we will follow the wordCount example from Spark's official documentation. To get this working, first open a terminal window, or a terminal tab on JupyterHub, and run the following command:

<code>ncat -lk 9999</code>

Now, anything you type into that window will be visible at port 9999. The code block below sets up a spark Streaming Context (convention is to name it <code>ssc</code>) and tells spark to listen for anything that pops up on port 9999.

We will pick up what we'll type on the other window and will run the WordCount program from the RDD section to... count the words in the text we just typed.

In [None]:
import pyspark
from pyspark.streaming import StreamingContext

#Uncomment the next line to run the code block on jupyter. Keep it commented if copy-pasting into the pyspark shell
#sc = pyspark.SparkContext()

# This tells Spark Streaming to bacth-up the contents of a data stream and "ingest" them every 10 seconds.
ssc = StreamingContext(sc,10)

# Tell spark to listen on port 9999 of our localhost.
lines = ssc.socketTextStream("localhost", 9999)

words = lines.flatMap(lambda line : line.split(" "))

pairs = words.map(lambda word: (word, 1))
wordCount = pairs.reduceByKey(lambda a, b: a + b)

wordCount.pprint()

ssc.start()
ssc.awaitTermination()

In [None]:
ssc.stop()

The methods available in the Spark Streaming API are very similar to the ones from the RDD API. The example below sets up a Spark Streaming Context and tells Spark to **poll** (or monitor) a specific directory in our filesystem. Whenever a new file gets moved into that directory, Spark will ingest its contents as Text, just like we did with the RDD API. 

In [None]:
import pyspark
from pyspark.streaming import StreamingContext

#sc = pyspark.SparkContext()
ssc = StreamingContext(sc,10)

records = ssc.textFileStream("/home/user74/scratch/")

rows = records.map(lambda line : line.split(","))
rows.pprint()

ssc.start()
ssc.awaitTermination()

In [None]:
ssc.stop()

As we've seen before, the RDD API is great, but the SparkSQL API really makes manipulating data feel more familiar for those who already know SQL or use R or Pandas regularly. You can leverage the SparkSQL API by making Spark Streaming populate a DataFrame as it reads the stream: 

In [None]:
import pyspark
from pyspark.sql import SQLContext

sc = pyspark.SparkContext()

spark = SQLContext(sc)

my_dataframe = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

This tells spark to listen on port 9999, just like we did before with SparkStreamingContext. The difference here is that we are telling Spark we want the contents of the stream to go into a DataFrame! Next, we inform Spark of what operations we wish to perform on our Stream. 

In this example, we will simply write the contents of the Stream to "memory" and call it "new_dataframe". We will also tell Spark to "append" new records to "new_dataframe" as they arrive. This is equivalent to registering a temp table like we did before, then populating it with the incoming contents of the stream!

In [None]:
my_dataframe.writeStream.format("memory").queryName("new_dataframe").outputMode("append").start()

You can verify that indeed we now have a registered table called "new_dataframe":

In [None]:
spark.tableNames()

And you can use SQL to query this table. Here, we assume the contents of the stream are comma separated values and we split them into columns:

In [None]:
spark.sql("SELECT SPLIT(value,',')[0] AS Col1, SPLIT(value,',')[1] as Col2 FROM new_dataframe").collect()

Here we have demonstrated how Spark Streaming works by simulating a message broker that passes Text content over to Spark. This is a fairly common use case, but you will also often come across messages that are in a specific format, like JSON for example.

The flow to set up a stream that handles files of a specific format is very similar to the examples above, it suffices to change the "format" option! See here for other supported formats: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#api-using-datasets-and-dataframes

## Exercise 1 - What is the Spread?

In finance, the term **spread** is often used to refer to the difference between two metrics of interest. In the stock market in particular, the **Bid-Ask spread** is one of the many tools used to help inform decision making around a trader's positions in the market. The Bid-Ask spread is simply the difference between the price traders selling a security are asking, **the ask**, and the price traders buying that security are offering to pay, **the bid**. 

A prized piece of information that many firms pay good money to acquire is known as a **BBO** - the **Best Bid-Offer** datset. BBO datasets are simple tabular-formatted collections of data with 3 key columns: a timestamp, the best bid and the best offer for a given security at that exact timestamp. Stock brokers have a fiduciary duty to get the best bid for their clients selling a security and the best offer for their clients buying a security. Hence the usefulness of BBO data... but that is not all! If you have current BBO data, you can also keep an eye on the **spread** on a given security in real-time. Knowing this quantity in real-time can be used in many ways, the simplest of which is as a gauge of the **liquidity** of a security. In general, if the **spread** on a security is small, that suggests there is a hot market for that security (i.e. people are actively buying and selling the seurity). Conversely, if the spread is large, that suggests the market is not really interested in trading that security at that time.

We will not show you how to create your own BBO dataset today, but we will use one to keep an eye on the spread of a certain stock. 

First, let's simulate a real-time feed from a BBO provider. To do this, run the following command on a terminal window:

<code>stdbuf -oL cat data/14081.csv | ncat localhost 9999</code>

Now, use the Structured Streaming approach we've seen before to read the BBO data into a DataFrame and get the **Spread** second-by-second. If you are looking for a challenge, try computing the **average spread** on a minute-by-minute basis.

## Exercise 2 - Anomaly Detection

Now let's look at another application of Spark: anomaly detection. In this problem, a fictional Utility Company started using Machine Learning to determine the price it charges its customers depending on the hour of the day. You believe this move on the part of the Utility Company causing you to pay way too much for electricity and have decided to put together a dossier exposing how their algorithm is out of whack. One way of exposing the weakness of their algorithm would be to catch anomalies in the price they are charging. The company themselves defines an anomaly as "**a 2 standard deviation or larger increase over the average price for the same hour over the past two weeks.**"

Use the SparkSQL API to read the <code>utility.csv</code> and find instances of anomalous pricing.

# Running Spark on The Alliance's Clusters

There are three main ways to use Spark, each being the best choice in a different scenario:

- [Local Mode](#Local-Mode) 
- [Cluster Mode](#Cluster-Mode) 
- [Client Mode](#Client-Mode) 

In the sections that follow, you will see when and how you should use **Local Mode** and **Cluster Mode** in The Alliance's Clusters. We will briefly describe **Client Mode** as well. While this particular mode is generally not compatible with The Alliance's terms of use and you should avoid it on our Clusters, it can still be helpful to know about it so you can use it elsewhere - maybe on an on-prem cluster, or a commercial cloud.


### Local Mode

![Spark Local Mode](../../images/spark_local_mode.png)

This is the mode we've been running Spark in so far during the workshop. As depicted in the diagram above, in this mode, the Driver **is** the Executor and there is no cluster manager. By default, when you instantiate a <code>SparkContext</code>, the Driver/Executor program will be able to use all the cores and all the memory available in your environment. Using Spark in local mode on a computer equipped with many cores and a relatively large amount of RAM, like a node on one of the Alliance's clusters for example, is a simple way of parallelizing operations over large DataFrames. This allows you to immediately get a significant speed up over Pandas, for example, by using SparkSQL or Pandas on Spark. It is also a simple way of parallelizing work on unstructured data when you use the RDD API.

There are a few options for you to launch Spark in local mode on the Alliance's clusters. Next we will go through them in detail.

#### JupyterHub

The simplest way to use Spark in local mode is by using one of our cluster's web-based [JupyterHub](https://docs.alliancecan.ca/wiki/JupyterHub#Compute_Canada_initiatives) interface:

- [Narval](https://jupyterhub.narval.computecanada.ca)
- [Beluga](https://jupyterhub.beluga.computecanada.ca)
- [Cedar](https://jupyterhub.cedar.computecanada.ca)
- [Graham](https://jupyterhub.sharcnet.ca/)

When you access one of the links above, you will be asked to enter your Compute Canada username and password. Once you've logged in, you should see the following prompt, where you can select the amount of cores, memory and time you will require during your session:

![jupyter resources prompt](../../images/resources2.png)

After clicking on "Start", you might have to wait a few minutes, but you should eventually see this screen:

![jupyter home screen](../../images/jupyter_home.png)

Click on the highlighted icon that looks like a hypercube to see a list of Modules available in the Alliance's software stack. Then go to search bar at the top on the left-hand side and type in "Spark":

![jupyter_search_bar](../../images/jupyter_search.png)

You should now see a filtered list of Modules that match your search. Select the latest version of Spark and click on "Load":

![jupyter_load_spark](../../images/jupyter_spark_load.png)

Now you can launch a Python Notebook:

![jupyter_launch_notebook](../../images/jupyter_launch_notebook.png)

Then import <code>pyspark</code>, no need to install it first:

![jupyter_import_pyspark](../../images/jupyter_import_pyspark.png)

And that is it! In the example above we've asked for 16 cores and approximately 80GB of memory for 2.5 hours. That is enough to efficiently load and crunch some fairly large datasets using Spark!

#### Interactive Job

The web-based JupyterHub option from the previous section is simple and straightforward to use. Under the hood however, the JupyterHub web interface is actually launching a job on its host cluster, then exposing that job allocation to you via the Jupyter user interface, which runs on a given port on a compute node in the cluster. The amount of resources you can request through this interface is somewhat limited depending on the cluster, and these limits vary from one cluster to another. In cases where your workload is larger than the one from the JupyterHub example, but a single node on one of our clusters is still enough to accomodate it, running Spark inside your own [interactive job](https://docs.alliancecan.ca/wiki/Running_jobs#Interactive_jobs) becomes a viable option.

The first step is to launch an interactive job on one of the Alliance's clusters. In this example, we will request a whole node on Narval to run our Spark session for 2.5 hours:

```shell
[account@narval01 ~]$ salloc --account=my_account --nodes=1 --ntasks-per-node=1 --cpus-per-task=64 --mem=0 --time=2:00:00
```

You might need to wait a bit to get your job allocated on the cluster. Once the job starts, the next step is to load the Spark module:

```shell
[account@ngXxXx ~]$ module load spark/3.3.0
```

Next, you have three main options to start writing your Spark code. First, you can launch a Python shell with the command <code>python</code>, then import <code>pyspark</code>, start a <code>sparkContext</code> and use Spark just like we did on our jupyter notebooks before. You can also load the module <code>ipykernel</code> and then launch an actual iPython notebook inestead of the Python shell:
    
```shell
[account@ngXxXx ~]$ module load python/3.8 ipykernel
[account@ngXxXx ~]$ ipython

Python 3.8.10 (default, Jun 16 2021, 14:19:02) 
Type 'copyright', 'credits' or 'license' for more information
IPython 7.31.1 -- An enhanced Interactive Python. Type '?' for help.

In [1]: import pyspark

In [2]: sc = pyspark.SparkContext()

```

The second option is the Pyspark shell, which can be launched with the command <code>pyspark</code>. This will launch a console just like the iPython or Python shells above, the main difference being that you do not need to <code>import pyspark</code>, or instantiate a <code>SparkContext()</code>. These two things will be done automatically when you launch the Pyspark shell.

The third option is starting your own JupyterHub interface inside of an interactive job. See our documentation for a [step-by-step guide](https://docs.alliancecan.ca/wiki/Advanced_Jupyter_configuration) on how to launch your own JupyterHub interface. Once you can connect to JupyterHub from your browser, follow the steps outlined in the [JupyterHub section](#JupyterHub) of this notebook!


### Cluster Mode

Whenever a single compute node is not enough for your workload, **cluster mode** is the way to go.

![cluster mode](../../images/spark_cluster_mode.png)

As shown in the diagram above, this is a non-interactive mode, where the Driver process runs inside the cluster. Concretely, you will use the command line to submit your pyspark code to the cluster, then wait until it finishes running. In the next few sections, you will see how to spawn a Spark cluster inside one of the Alliance's clusters using SLURM.

#### SLURM Job Submission Script

To spawn a Spark cluster and then submit your pyspark script to it, you will need to submit a SLURM batch job that does these two things. In this section, we will break down the steps to create such a job submission script, which we will call <code>launch_spark_job.sh</code>:

##### 1 - Request Resources

In this example we will spawn a Spark cluster consisting of 3 whole compute nodes on the Narval cluster. The first lines of our script <code>launch_spark_job.sh</code> contain <code>#SBATCH</code> directives telling SLURM how much resources our job will require:

```bash
#!/bin/bash

#SBATCH --nodes=3
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=64
#SBATCH --mem=0
#SBATCH --time=00:05:00
#SBATCH --account=your_account
```

As you will see later on, to spawn a Spark cluster we will need to execute certain commands once on each node. That is why we set <code>ntasks-per-node=1</code>. When our Spark cluster will be up and running, it is Spark's internals that will handle parallelizing our code across executors, so asking for multiple tasks-per-node in your SLURM job sumission script is not necessary. 

##### 2 - Prepare the environment

Next, you will load the necessary modules to Spawn a Spark cluster and execute your Pyspark code, then you will set a few environment variables that will prove useful along the way:

```bash
module load spark/3.3.0 python/3.8

export MKL_NUM_THREADS=1
export SPARK_IDENT_STRING=$SLURM_JOBID
export SPARK_WORKER_DIR=$SLURM_TMPDIR
export SLURM_SPARK_MEM=$(printf "%.0f" $((${SLURM_MEM_PER_NODE} *95/100)))
```

Here's a break down of these environment variables:

- <code>MKL_NUM_THREADS=1</code>: This variable controls how many threads will be spawned by routines from a key library used in many programs that perform mathematical operations, called Math Kernel Library, or MKL. Routines from this library are the workhorse behind functions in many poipular numerical computing libraries, including <code>numpy</code> and <code>scipy</code>. Setting it to 1 means that any routines from this library that are called inside your Pyspark program will not spawn more than one thread. We do this to keep things simple: each Executor process will have multiple cores at its disposal, but these will be used to carry out multiple single-threaded tasks concurrently, as opposed to one multi-threaded task at a time. If you know, however, that the main performance bottleneck in your workload is the complexity of mathematical operations computed over a given partition of the data, as opposed to lots of simple operations being computed over a large amount of data, then it might be more advantageous to set this variable to a number greater than 1.

<br>

- <code>SPARK_IDENT_STRING=$SLURM_JOBID</code>: **SPARK_IDENT_STRING** is, as the name suggests, a string that we will use later to grab some information out of logs that will help us automate spawning the worker nodes of our Spark cluster. Here we set it to the unique JOB ID assigned by SLURM to our batch job.

<br>

- <code>SPARK_WORKER_DIR</code>: Spark derives a lot of its performance notoriety from its ability to execute complex DAGs over data stored in memory. However, it is not always possible for Spark to do absolutely everything in-memory. In fact, Spark often writes intermediate results to disk when performing certain operations, and so it needs a "scracth space" it can use to store temporary files. This variable sets that space to the special location <code>SLURM_TMPDIR</code>. This location is a te,porary directory on each compute node's local storage that gets created as soon as you submit a batch job to SLURM, and it remains available for reading and writing for the duration of the job.

<br>

- <code>SLURM_SPARK_MEM=$(printf "%.0f" $((${SLURM_MEM_PER_NODE} *95/100)))</code>: This variable looks intimidating, but all it does is store a number equal to 95% of the total memory available to your batch job. We will use this variable later on to allocate that amount of memory to each of our Spark cluster's worker nodes. Why don't we allocate the entire node memory to Spark? This is not mandatory, but it is good practice to leave some leg room for any non-spark processes we might need to run alongside our Spark cluster in this batch job.

##### 3 - Start Spark Main Node

Next you will start Spark's main node, where the Driver and the Cluster Manager will run, on the so-called "head-node" of your SLURM batch job:

```shell
start-master.sh
sleep 5
MASTER_URL=$(grep -Po '(?=spark://).*' $SPARK_LOG_DIR/spark-${SPARK_IDENT_STRING}-org.apache.spark.deploy.master*.out)
```

That script <code>start-master.sh</code> ships with Spark and it was added to your <code>PATH</code> automatically when you loaded the Spark module. You will give it a 5 second <code>sleep</code> interval to make sure the main node of your Spark cluster is up and runnign before the next steps. You will also run that intimidating regex to extract the address of your Spark cluster's main node from Spark's logs.

##### 4 - Start Spark Worker Nodes

Next, you will start one Spark worker node, where the Executors will run, on each node of your SLURM batch job:

```shell
NWORKERS=$((SLURM_NTASKS - 1))
SPARK_NO_DAEMONIZE=1 srun -n ${NWORKERS} -N ${NWORKERS} --label --output=$SPARK_LOG_DIR/spark-%j-workers.out start-worker.sh -m ${SLURM_SPARK_MEM}M -c ${SLURM_CPUS_PER_TASK} ${MASTER_URL} &
workers_pid=$!
```

The first thing you did here is to store the number of workers in an environment variable as the total number of total tasks in your batch job minus 1. This way, no matter how many nodes you request, with one task per node, you will always get one node as the main node and the remainder as workers.

The second thing you did is tell Spark to run **without** daemonizing any processes, which would not have been possible in the Alliance's clusters, then you called the <code>start-worker.sh</code> script once per worker node. In that same command, you passed the number of cpus, as well as the total amount of memory available to each worker node. The third thing you did was to store the ids of each one of your worker processes in a variable, so we can kill them later on.

Now your Spark cluster should be up and running! 

##### 5 - Submit Your Pyspark Code

The last step now is to submit your Pyspark code to the Spark cluster and wait for it to finish running. Here you will run a simple example that ships with Spark:

```shell
srun -n 1 -N 1 spark-submit --master ${MASTER_URL} --executor-memory ${SLURM_SPARK_MEM}M $SPARK_HOME/examples/src/main/python/pi.py

kill $slaves_pid
stop-master.sh
```

The script <code>spark-submit</code> has a number of paramaters you can use to tune your Spark job. In this small example we pass only two of them: <code>--master</code>, which points to the main node, and <code>--executor-memory</code> which in this case assigns 95% of the total node memory to a single Executors. We will not go any deeper into <code>spark-submit</code> parameters, but here are some things you should reason about when submitting a Spark job:

- How many Executors per worker should you Spawn? Just one big worker with all cpus and memory? Or multiple smaller Executors?

- Should you allow Executors to run multi-threaded tasks? Or only multiple single-threaded tasks concurrently?

For insights on these, and many more considerations when tuning a Spark job, this post is a good place to start:

https://blog.cloudera.com/how-to-tune-your-apache-spark-jobs-part-2/

To wrap things up, here is the complete job submission script:

```bash
#!/bin/bash

#SBATCH --nodes=3
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=64
#SBATCH --mem=0
#SBATCH --time=00:05:00
#SBATCH --account=your_account

module load spark/3.3.0 python/3.8

export MKL_NUM_THREADS=1
export SPARK_IDENT_STRING=$SLURM_JOBID
export SPARK_WORKER_DIR=$SLURM_TMPDIR
export SLURM_SPARK_MEM=$(printf "%.0f" $((${SLURM_MEM_PER_NODE} *95/100)))

start-master.sh
sleep 5
MASTER_URL=$(grep -Po '(?=spark://).*' $SPARK_LOG_DIR/spark-${SPARK_IDENT_STRING}-org.apache.spark.deploy.master*.out)

NWORKERS=$((SLURM_NTASKS - 1))
SPARK_NO_DAEMONIZE=1 srun -n ${NWORKERS} -N ${NWORKERS} --label --output=$SPARK_LOG_DIR/spark-%j-workers.out start-worker.sh -m ${SLURM_SPARK_MEM}M -c ${SLURM_CPUS_PER_TASK} ${MASTER_URL} &
workers_pid=$!

srun -n 1 -N 1 spark-submit --master ${MASTER_URL} --executor-memory ${SLURM_SPARK_MEM}M $SPARK_HOME/examples/src/main/python/pi.py

kill $slaves_pid
stop-master.sh
```

### Handling Dependencies

As briefly mentioned in the RDD section, if your program calls non-native Python packages, it is not enough to import these packages at the top of your code as it's usually done in Python. When running in **Cluster Mode**, Driver and Executors run in completely separate runtimes, and we have seen that importing dependencies inside of calls to RDD methods is a way of making Executors use non-native libraries. This then leads to the following question: *How can I make my dependencies available inside an Executor runtime in the first place so they can be imported?*. In this section you will learn two ways of doing this.

#### Using a Virtualenv

The Alliance recommends always installing Python packages inside Virtualenvs. If you don't know what a virtualenv is, read this before continuing: https://docs.alliancecan.ca/wiki/Python#Creating_and_using_a_virtual_environment

Once you have created a virtualenv and all your dependencies have been installed inside it, you can instruct both Driver and Executors to use it to run your Pyspark code. All you have to do is set these two environment variables in your SLURM job submission script *before* calling <code>spark-submit</code>:

```bash
export PYSPARK_DRIVER_PYTHON=/path/to/virtualenv/bin/python
export PYSPARK_PYTHON=/path/to/virtualenv/bin/python
```

Note that you can even assign different virtualenvs to the Driver and the Executors! For example, you might not want to install any visualization libraries in the Executors' virtualenv and keep them all only in the Driver's virtualenv.

#### Using addPyFile

You should always use Virtualenvs to handle python packages as dependencies. However, Virtualenvs won't be much help if your dependencies are **.py** files, like if you organized your project's Python classes and helper functions in separate files for example. To enable code running on Executors to import code from these **.py** files, use the <code>addPyFile</code> method from <code>SparkContext</code>:

```python
import pyspark

sc =  pyspark.SparkContext()
sc.addPyFiles("/path/to/py_files")

rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])

from my_file import my_function

rdd.map(my_function).collect()

```


### Client Mode

Last, we will briefly cover **Client Mode**. As previously mentioned, this mode is generally not compatible with the Alliance's terms of use, and so we do not encourage its use on our clusters. 

![spark_client_mode](../../images/spark_client_mode.png)

Unlike Cluster Mode, **Client Mode** is designed to be used interactively. In this mode, the Driver runs on a client computer and dispatches tasks to a remote Spark Cluster. A great setup for this mode, is to launch a Spark Cluster on a set of Cloud VMs, or a pod of containers, then connect to it from a Jupyter Notebook interface running pyspark:

```python
import pyspark

sc = pyspark.SparkContext(master="spark://<address of your cluster's master node>:<port>")
```

Congratulations! You made to the end of this material! We hope Spark will become a useful member of your toolkit of technologies. If you have any difficulties using Spark on The Alliance's clusters, please write to our help desk at **support@alliancecan.ca**