# What is Spark?
To quote the Spark website:
"Apache Spark™ is a unified analytics engine for large-scale data processing." But what does this mean? We can think of Spark as a powerful egine that is able to perform a wide arrary of data manipulation and analytical tasks while also being able to scale to truly big data sets. (Parham: This is a super small intro, not sure how deep we should go. If you want to add a bit from your data warehouse talks, it might fill things out better)


So how are we going to use this engine? While Spark itself is written is Scala, Pyspark provides us Python programmers an library to run Spark. In order to install the Pyspark module, run the following in your Python environment:

## SparkContext: how you interact with your data
Now that we have Pyspark rinstalled, let's restart the kernel for this notebook and figure out how to actually use it. Our first goal is to be able to load in our data. Becaues Spark is designed to work with data in HDFS and other big data storage tools as well as local data, it has its own set of data interface objects. As A SparkContext is the most basic of these for connecting to your data. In larger scale operations, a SparkContext could be configured to work across clusters or other cloud computing setups but for now, we're just using the little local cluster that is the machine that is running this notebook. To do this, we create a SparkContext object with the `'local[*]'` options. The `[*]` allows this SparkContext to have access to all local cores; you can manully set this number lower if you would like to limit the context. 

In [1]:
import pyspark
sc = pyspark.SparkContext('local[*]')

## Resilient Distributed Datasets (RDD)
Now that we have a SparkContext that is ready to interact with our data, what happens to that data? The main data abstraction for Spark is the Resilient Distributed Dataset or RDD. The distributed porntion of the name allows Spark to spread the data out and work on it in parallel. The resiliency comes from the redundant nature of the distrubutes similar to what was seen in HDFS. In our little local example, we won't be leveraging all of this power but we can start by taking a simple Python data structure and parallizing it into an RDD with the following code:

In [2]:
list_of_arrivals = [
    ("PDX", 1),
    ("LAX", 5),
    ("DEN", 3),
    ("PDX", 2),
    ("JFK", 9),
    ("DEN", 5),
    ("PDX", 7),
    ("JFK", 10),
]
arrivals_rdd = sc.parallelize(list_of_arrivals)
print(arrivals_rdd)

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:262


Great! We have an RDD object! But how do we see what is in our RDD? In order to avoid unecessary computation or memory usage, Spark uses lazy evaluation. Nothing is computed before it is needed. In order to view the data, we need to tell Spark to collect the distributed data and luckily the `.collect()` method does just that. It is worth noting that `collect()` does exactly what it sounds like and collects all the data distributed across the nodes by the RDD. This means it is not always the best way to view our data once we are using truly big data that cannot all be collected on one node. For those cases, we can use tools like `count()` to see how much data we have without collecting that data together:

In [3]:
print("This is our RDD", arrivals_rdd.collect())
print("It has {} elements".format(arrivals_rdd.count()))

This is our RDD [('PDX', 1), ('LAX', 5), ('DEN', 3), ('PDX', 2), ('JFK', 9), ('DEN', 5), ('PDX', 7), ('JFK', 10)]
It has 8 elements


## Filter data
Now that we've created our RDD, let's go over some basic functions. First, let's look at filtering data based on a condition. If you're not famililar with Python lambda functions, here is a quick overview: [LINK TO ACUTAL OVERVIEW THAT I DON'T HAVE RIGHT NOW] We will use this function to filter our data. The `x` in our lambda function is one of the elements of our RDD, for this RDD, it is the tuple of ('aport code', 'arrival count'). Let's filter the data to just Portland arrivals:

In [4]:
pdx_arrivals = arrivals_rdd.filter(lambda x: x[0] == "PDX")
print(pdx_arrivals.collect())

[('PDX', 1), ('PDX', 2), ('PDX', 7)]


## Group by key
In additon to filter our data, we also may want to pull together the data based off a key. In our tuples, the first element, airport code, functions as a key:

In [5]:
grouped_arrivals = arrivals_rdd.groupByKey()
print(grouped_arrivals.collect())

[('JFK', <pyspark.resultiterable.ResultIterable object at 0x7fa94bd61760>), ('LAX', <pyspark.resultiterable.ResultIterable object at 0x7fa94bd617f0>), ('DEN', <pyspark.resultiterable.ResultIterable object at 0x7fa94bd618b0>), ('PDX', <pyspark.resultiterable.ResultIterable object at 0x7fa94bd61850>)]


As you can see above, this create an iteriable object for each airport code. This is the lazy evaluation noted earlier. The iterable allows all the arrival counts to be grouped together without having to load them into memory. If we would like to combine the arrival counts in each group we can use the `mapValues()` function. This argument of this function is the function to apply to the iterator. In our case, let's get the count per-airport code so we cause use the basic Python `sum` function as the argument: 

In [6]:
grouped_arrivals_count = grouped_arrivals.mapValues(sum)
print(grouped_arrivals_count.collect())

[('JFK', 19), ('LAX', 5), ('DEN', 8), ('PDX', 10)]


In [7]:
sorted_arrivals = arrivals_rdd.sortByKey()
print(sorted_arrivals.first())
print(sorted_arrivals.collect())

('DEN', 3)
[('DEN', 3), ('DEN', 5), ('JFK', 9), ('JFK', 10), ('LAX', 5), ('PDX', 1), ('PDX', 2), ('PDX', 7)]


## External data
In addition to parallizing exisiting data into RDD, our SparkContext can interact with external data. For our case, we can start with a simple local csv:

In [8]:
flight_file = '../data/flights.csv'
txt = sc.textFile(flight_file)
print("We have {} flights!".format(txt.count()))

#we can apply the same sort of functions to the RDD as well
pdx_lines = txt.filter(lambda line: 'pdx' in line.lower())
print("Of these flights, {} involved PDX".format(pdx_lines.count()))

We have 12716 flights!
Of these flights, 270 involved PDX


However, as you can probably guess, this is not the most efficient way to deal with CSV data. Luckily, the PysparkSQL module will give us a much better toolset for tabular data which we will explore in the next notebook.