# DSCI 511: Data acquistion and pre-processing<br>Chapter 10: Scaling for Big Data processing

## 10.0 Big Data
There's a great micro-history of the term "Big Data" from a fellow at the University of Pennsylvania. In it, [Francis X. Diebold documented](http://www.ssc.upenn.edu/~fdiebold/papers/paper112/Diebold_Big_Data.pdf) a number of sources for the term, including:

> Weiss and Indurkhya (1998) note that “... very large collections of data ... are now being compiled into centralized data warehouses, allowing analysts to make use of powerful methods to examine data more comprehensively. In theory, ‘Big Data’ can lead to much stronger conclusions for data-mining applications, but in practice many difficulties arise.”

that highlights a core aspect of Big Data that might actually generalize to a definition. Specifically, when data allow for "stronger conclusions" amidst the expense of "many difficulties". However, this becomes relativistic defintion&mdash;stronger that what conclusions, and difficult in practice how?

While there's no way to remove the relativistic nature of this definition, it can be refined to describe qualities that make data Big. These descriptors are helpful in conceptualizing the types of problems/opportunities that will emerge when presented with a dataset.

### 10.0.1 Intrinsic Qualities of Big Data and the Vs

In 2001, a different guy named Doug Laney [succinctly described](https://blogs.gartner.com/doug-laney/files/2012/01/ad949-3D-Data-Management-Controlling-Data-Volume-Velocity-and-Variety.pdf) Big Data with 3 Vs:

- __V__olume: The overall size of data
- __V__elocity: The rate at which new data emerges
- __V__ariety: The differences in forms of data

each of which highlighted a different intrinsic quality capable of posing exceptional processing challenges and/or opportunities.

### 10.0.2 WannaVs, huh?

As it turns out, there are some other words that start with __V__. So, over the years folks have suggested a slew of other descriptors for Big Data, including:

- __V__eracity: The uncertainties of data constitution
- __V__alue: The usefulness of data
- __V__alidity: The quality or trueness of data
- __V__ariability: The changing nature of data
- __V__isualization: The visually-descriptive power of data
- __V__agueness: Confusion over the meaning of big data
- __V__ocabulary: Structure metadata that provide context

So, which of these (if any) actually describe intrinsic qualities of data, while being distinct from the original three?

A common answer and personal bias includes only the first:

- __V__eracity: The uncertainties of data constitution

I figure, data scientists spend so much time inspecting data and getting data into shape that data reliability ought have its own dimension, if anything. 

![The Vs](./images/4-Vs-of-big-data.jpg)

### 10.0.3 Distancing Ourselves  From the Term

Regardless of how we distinguish the different features that make data Big, or of what mnemonics we use to keep track of data can surprise us, it is valuable to have an intuition for what a dataset can do, and how  hard it will be to work with. The Vs only serve as a device for developing this intuition. Otherwise, it's all about experience at working with data. This is why it so important as a student to get your hands dirty and involved with data as soon as possible.

## 10.1 Let's Talk About How Computers Do Stuff

According to this relativistic notion of Big Data, a core aspect of Bigness is relative processing capacity. Old computers were the size of buildings, but had less processing capacity than your average cell phone these days. Similarly, we can do a lot more today than we ever could when Big Data was defiened. So, in addition to having access to the problems and benefits of messy, real-world data, it's important to understand some basic aspects of computing hardware that impact processing capacity.

Althogether, the different components of modern computers force us to play a balancing act to process Big Data. It's all about synchronization and planning where and how you're going to do the work. For example, the differences between where data are located and processed can have serious impacts on how quickly work can be done, if lots of data have to be transferred across a network.

### 10.1.1 Processors Operate on Data
So, when your computer's spec sheet says

- 3.3 GHz Intel Core i7,

the __3.3 GHz__ refers to how quickly your computer can execute any one task that is completely lined up for it. Processors now have multiple cores and some cores now are able to carry multiple threads. However, cores and threads fall in a more general discussion of parallelization, which appears separately, below.

### 10.1.2 Disks Store Data
These days, hard drives are generally either _solid state_ or _spinning_. In either case, disks are measured by the number of bytes they can hold, e.g., a 1TB (terabyte) hard holds $10^{12}$ bytes.

- Spinning hard drives (HDDs) have been around for longer, are cheaper to produce, and theoretically last longer if kept in an ideal, stable environment. They are literally disk-shaped on the inside and spin to get from one bit to the next. This physical construction limits their speed with RPMs, but they're cheap and good for long-term storage. Thus, HDDs commonly appear in desktop computers and in large data storage centers.

- Solid state hard drives (SSDs) have no moving parts. This allows them to provide faster access to data, while making them much portably durable. Thus, SSDs appear in laptops, tables, and as startup disks for performance. Primary limitations of SSDs are their relatively small capacities and high prices.

### 10.1.3 Memory is Data Suspended
Functionally, there is overlap between what _memory_, a.k.a. _RAM_ and disks do. Both are measured by data capacity, e.g., "16 GB RAM". However, unlike disk storage, a computer's memory is not persistent, and generally is a much smaller. As soon as the machine turns off, anything in memory is purged. Why? Memory is all about fast&mdash;essentially immediate&mdash;access to data. When you load a csv file into a pandas dataframe, your computer's processor reads the data from a disk and stores it (temporarily) in memory for rapid interaction. 

Memory makes it very fast and convenient to work with data. However, because memory is not something that can be scaled the same way as disks (which also have limits), large quantities of data (Volume) generally impose significant memory challenges. However, modern Big Data processing pulls all of these technologies together.

### 10.1.4 Networks Transmit Data

Whether its DSL, wireless, or ethernet, a computer's network allows it to transmit data with other computers and disks. These types of long-distance data transmision are generally slower than the speed with which your computer's disks can transmit data to its processor. So, just as with the other component categories, network have specialized value for transmission and limitations. 

## 10.2 Parallelization
One common concept across the different computational components is the limitations of any single machine. The basic fact of there now being lots of data (Volume) means that a database must often store its data across many disks. Similarly, a task might require processing so many zeros and ones that the service of many processors is necessary to complete the task in a reasonable amount of time. Thus, a lot of the Big Data tools that exist coordinate the function of many same-function component (i.e., many disks) in parallel.

### 10.2.1 Distributed File Systems
A _distributed file system (DFS)_ allows a user to interact with data that is stored across many disks as though it were all stored in one place. This is generally done by a network that connects many disks together under the management of a single computer, sometimes called a _storage node_. Thus, storing and retrieving data on a DFS means uploading or downloading across a network, which forms a significant bottleneck for processing.

### 10.2.2 Parallel Processing
Just like a computer can have more than one hard drive, a multi-threading, e.g., quad-core, computer can process along multiple threads at it's maxim processing speed. However, just like with distributed file systems this requires a lot of coordination. We'll discuss this below in our conversation about map-reduce, but parallel processing pops up commonly in data science in a few different ways:

- Single-machine, multi-threading CPUs for general purpose programming
- Single-machine, multi-threading GPUs for vector-based programming
- Clusters of multi-threading G- or CPUs for extreme scaling

The first two contexts take place on a single machine, and therefore are limited by the size of any one machine, but don't have to rely on slow, network communication, while the latter is only limited by how much data must be shared between the different machines and how fast the network they communicate on is.

## 10.3 Map-Reduce

For heavy, Big Data processes that would just take too long to run and/or require operating on too much data, the question is ultimately 

- Can it be separated into tasks that have little or nothing to do with one another?

Why? Because it's costly (slow) to transmit data across a network!

### 10.3.1 From Shared Everything to Nothing 

Whether or not it's just the current state of computing capacity, there's more data to store and process and than our computers can reasonably transmit across networks. Thus, can't expect any one machine in a cluster to have access to all of our data. Reasonably, this not only applies to raw data, but often the intermediate stages of processing.

The type of processing environment that you're probably used to by now is called _shared everything_. For example, multiple threads working with data on a laptoop share the computer's entire memory capacity. For clusters, this is a difficult environment to match, and while software implementations, e.g., message passing interfacse (MPIs), have been created to share in-memory data between computers, they are very network heavy, which can result in serious bottlenecks.

The far other end of the spectrum is called a _shared nothing_ environment. In a shared nothing environment, each processor operates _only_ on a specific portion of a dataset, with no knowledge of the rest of the data.

### 10.3.2 Embarrassingly Parallelizable Algorithms
As it turns out, the distributed environments that are most common for Big Data processing are closest to being shared nothing. While this might be for cost, convenience, and/or simplicity, we'll stick with the shared nothing technologies in this class.

Not all algorithms are parallelizable in a shared nothing (thereabout) environment. Thus, a terminology has developed for the relatively simple types of algorithms that can be parallelized. In particular, such an algorithm is referred to as _embarrassingly parallel_. For example, incrementing each number in a list by $+1$ doesn't require any communication (shared nothing) between tasks. Normalizing by the sum of all numbers requires computing a global sum first and communicating it to all division tasks. Thus, while the latter (normalization) task is not a shared nothing task, it is embarrassingly parallel, since only one number must be shared to each of the parallel processors.

![Embarrassingly Parallel Algorithms](./images/emb_par.gif)

### 10.3.3 Map-Reduce is a Programming Pattern

So, how is embarrassingly parallel programming done? The primary programming pattern is referred to as _map-reduce_, which has two core functions, indicated by the name. While map-reduce was originally formalized through a proprietary software development used internally by Google, it's really just an abstraction of a pattern, and not necessarily a piece of software. As discussed below, we could do map-reduce programming using basic command-line utilities (if we wanted to make life hard).

So what are _map_ and _reduce_? Let's start out by thinking of a dataset as an ordered array of $n$ records. We'll refer to the data records as values: $v_1,\cdots, v_n$.

### 10.3.4 Map Functions

`map()` is referred to as a higher-order function, because it takes a function as part of its input, and applies it to all values in an ordered array (the second input). Now, let's separately define a map function, $f_m$, that inputs a single value $v$ and outputs one transformed value, $f_m(v)$. For example, a squaring function, $f_m(v) = v^2$, outputs the square of a number. Mapping our squaring function over our list $n$ values (numbers) produces an output list of squares:

$$map(f_m,[v_1,\cdots,v_n]) = [v_1^2,\cdots,v_n^2]$$

Conceptually, it's ok to think of `map()` as the "apply function to all values" process, which is slightly less general/flexible than our list comprehensions. To play with the concept, we can use Python's built in `map()`.

In [1]:
## define a squaring function to map
square = lambda x: x ** 2

## define a list of integers as values
values = range(1,11)

print(map(square,values))

[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]


### 10.3.5 Map example
Even though `map()`  applies to all, it doesn't have to output the same type. For example, we could define a map function that takes a string and outputs a generator of `(word,count)` tuples pertaining to the number of times each word appears in a string. Here's this example using several uninteresting cooked up sentences.

In [2]:
import re
from collections import Counter, defaultdict

## make a word counting function that
## yields a generator of word-count tuples
def wordCountTuples(string):
    counts = Counter()
    words = re.split(" ", string)
    for word in words:
        counts[word] += 1
    for word in counts:
        yield(word,counts[word])
      
## create a list of strings as values
strings = [
    "This is a sentence with the word is in it.",
    "This is another sentence with is.",
    "What do you know, another sentence, but not with is?",
    "More sentences means more fun?",
    "Okay, this is the last sentence."
]

## loop over the generators produced as mapped output
## for each input string.
for generator in map(wordCountTuples, strings):
    ## loop over the word-count tuples 
    ## in the current generator to check output
    for word, count in generator:
        print(word,count)
    print("")

('a', 1)
('it.', 1)
('word', 1)
('sentence', 1)
('This', 1)
('is', 2)
('in', 1)
('the', 1)
('with', 1)

('sentence', 1)
('This', 1)
('is', 1)
('is.', 1)
('another', 1)
('with', 1)

('do', 1)
('What', 1)
('is?', 1)
('sentence,', 1)
('but', 1)
('another', 1)
('not', 1)
('you', 1)
('with', 1)
('know,', 1)

('more', 1)
('means', 1)
('sentences', 1)
('fun?', 1)
('More', 1)

('last', 1)
('this', 1)
('Okay,', 1)
('is', 1)
('the', 1)
('sentence.', 1)



### 10.3.6 Reduce Functions

`reduce()` is also a higher-order function that applies a specified _reduce function_, $f_r$. While `reduce()` takes $f_r$ and a list of values, $v_1\cdots, v_n$, just like `map()`, there are more restrictions on how $f_r$ must work. Specifically, $f_r(v_a,v_b)$ must take two values as an input and operate both _commutatively_ and _associatively_, producing a single, aggregated output of the same form as the inputs, i.e., $f_r(v_a,v_b) = v_c$. So, why is reduce commutative and associative? So that different processors can recursively operate on the values in any order or grouping&mdash;successive reduce outputs are cycled back into $f_r$ until one single value is left. If a reduce function were not associative and/or commutative, then the order in which pairs of values are reduced can change the final result. So, canonical examples of a reduce function might be addition or multiplication. Since `reduce()` is also a built-in function for Python, we can similarly explore its function. Note: because reduce is all about aggregation, it will produce a single output from a list of input values.

In [3]:
## make a function that sums a pair of numbers
PlusPair = lambda a,b: a+b

## make a list of values for reduce
values = range(1,11)

print(reduce(PlusPair,values))

55


### 10.3.7 Shuffling from Map to Reduce
Here, the goal might be to find our word counts as totaled across all of the input strings. Well, totaling requires a plus operation, which is commutative and associatve, but it should be done by word, unless a sum across all words is desired, e.g., for normalization. So what would be required of us would be to first collect and then sort our generated word-count output from our map function. In other words, this requires managing our data from map to reduce steps. This is referred to as the map-reduce _shuffle_. Let's charge ahead with our example and see if we can make this work! We have everything we need except a nice neat way to perform the grouping by word, but this is provided by `itertools.groupby()`, which takes a list of values to be grouped as the first, and a function to group by as the second argument. This makes `groupby()` another higher-order function. Note: Python's `groupby()` requires a list that is sorted by the grouping key to function properly.

In [4]:
from itertools import groupby

## initialize a list for all word-count tuples
AllWordsList = []

## this function returns the first element of a tuple,
## which for us will be the key by which we group
TupleKey = lambda x: x[0]

for generator in map(wordCountTuples, strings):
    ## loop over the word-count tuples 
    ## and store in a master list to be grouped
    for tup in generator:
        AllWordsList.append(tup)

## sort our tuples by the first elements (keys)
AllWordsList = sorted(AllWordsList)

## group the tuples by word
for key,group in groupby(AllWordsList,TupleKey):
    ## make a list of numeric values for each word
    values = [x[1] for x in group]
    
    ## print out the total count across all sentences
    print(key, reduce(PlusPair,values))

('More', 1)
('Okay,', 1)
('This', 2)
('What', 1)
('a', 1)
('another', 2)
('but', 1)
('do', 1)
('fun?', 1)
('in', 1)
('is', 4)
('is.', 1)
('is?', 1)
('it.', 1)
('know,', 1)
('last', 1)
('means', 1)
('more', 1)
('not', 1)
('sentence', 2)
('sentence,', 1)
('sentence.', 1)
('sentences', 1)
('the', 2)
('this', 1)
('with', 3)
('word', 1)
('you', 1)


### 10.3.8 Thinking about Keys and Values

Map, shuffle, and reduce are the three primary stages of a canonical map-reduce job, and if it wasn't clear in the previous example, what differentiates map-reduce from disconnected applications of higher-order functions is the coordination that goes into a job. So, how can we generalize this coordination effort?

Coordination generalizes through _keys_. As framed above, map and reduce function are simply applied to lists of values, but having keys, e.g., the words we grouped by _is_ generally the coordination mechanism by which map-reduce works. So, from now on we'll be talk about our values, $v_1,\cdots,v_n$ as having associated keys: $k_1,\cdots,k_n$, and thing of a dataset as a list of key-value pairs: $(k_1,v_1), \cdots, (k_n,v_n)$.

So, getting from map to reduce is the critical step in which some amount of data is shared (transmitted) between processors. As obseved in the above example, the map-reduce shuffle is actually a sorting-and-partition (groupby) operation! However, the whole point of map-reduce programming is not to create a different, more-confusing method for completing task we can do more easily in different ways (like word count), but to create a pattern that guarentees our ability to _scale_ a process across many processors. 

Now, the primary communication in map-reduce occurs when data are output by a map function and shuffled into a reduce function. While this is not quite a shared nothing environment, it does allow us to control the amount of data being transmitted. Let's take a look now at how workload can be distributed in a map-reduce process to multiple machines. Here's a good picture that crystalizes the entire map-reduce process:

![Map-Reduce](./images/mr.jpg)

Note: circles represent processors, rectanges represent values (data), and the rectangles' colors represent their keys.

## 10.4 Distributed Programming Software
So, what was challenging about our word count map-reduce example what coordinating the values between map and reduce operations. Our Python example is actually made very easy by the existence of the `map()`, `reduce()`, `sorted()`, and `groupby()` functions, and in the deep history of map-reduce operations, implementations certainly haven't had the benefit of higher-order functions. However, even in our nice comfy Python environment there are some real challenges we haven't yet touched on. 

As discussed, map-reduce exists primarily to support processing across multiple threads and machines, but how is this done? Well, in addition sorting and grouping keys and values, a map-reduce implementation is also responsible for managing data and code, i.e., sending data and code to different threads and processors, and waiting for them to finish and return results to be shuffled and handed back out. This takes a lot of effort, and is where specific distributed programming software implementations come into play. We'll explore one specific tool called _Spark_, because it has a nice API for Python, but Spark is new and it's good to have an idea of where this all started.

### 10.4.1 The Old Fashioned Way
As seen in our example above, we can set up a map-reduce process in Python, but a Python script only ever runs on a single process. However, we can execute a Python (or other-language) script from a command line to run on a different thread! Thus, the type of map-reduce work we've done above can be thought of more as a _control script_ that manages the map-reduce operation. The individual map and reduce functions would need to be written out as executible `map.py` and `reduce.py` scripts so that they could be run by multiple threads. 

Alternatively, our map and reduce functions could be sent directly over a network to other running threads as functions (which is what Spark does). Here, we would have to spend a lot of effort writing code that watches the operation of the various running threads, waits for them to finish and produce output, shuffle the output and reduce. Following this, subsequence map-reduce steps could be conducted to complete an overall task. However, keeping track of a map-reduce process' data is exactly the kind of thing we don't want to focus on as data science programmers. So, we'll want to utilize the tools that have been specifically designed to allow us to focus on our job, and not it's nitty gritty details of execution.

### 10.4.2 Hadoop
It's probably unlikely that you haven't the the name Hadoop before now, as it has been the primary map-reduce software framework until recently. Hadoop is open source and written in java, which means that any application we might work on in Python is an API running java "under the hood". While it does do all of the map-reduce stuff we just talked about, the important thing that differentiates (and sustains) Hadoop as a Big Data processing tool is the way it handles data.

Hadoop is designed for high-__V__olume data, and relies heavily on the existence of the Hadoop Distributed File System (HDFS), which is a DFS specially designed for map-reduce jobs. While map-reduce is a scalable programming pattern, the question that HDFS answers is "how does data get to the different threads?" HDFS is a file system that integrates complete control of a cluster's data and processing. The idea behind HDFS it to make sure that _data is transmitted (through network) as little as possible_. This is done by spreading data out semi-permanantly across _processing_ nodes. In other words, in an HDFS cluser data "lives" at the place where it is generally processed. This means you never have to move Big Data once it has been stored in a cluster, making processing a more-immediate. However, this does require that one master head node keeps track of where all of the data exists.

### 10.4.3 Python and Hadoop: MRJob
So, full disclosure: we won't be working with Hadoop in this class. There are a few reasons for this. Primarily, to actually see what Hadoop can do we would need to have an HDFS system set up, which we don't. This also means that we can't really see any parallelization value from Hadoop without a cluster. However, Python does have a very nice Hadoop API that was originally started by the folks at Yelp!. This exists as a library called `MRJob`, which you can access via `pip`. While this is quite possibly the nicest way to interact with map-reduce in Python, it also required creating Python classes, which we haven't covered in our programming primer. However, if you do want to see more about MRJob you can easily get the feel for it by demoing code (not actually Hadoop) on your own in Python. Here's a few documentation links to learn a bit more about MRJob:

- https://pythonhosted.org/mrjob/
- https://pythonhosted.org/mrjob/guides/why-mrjob.html#overview
- https://pythonhosted.org/mrjob/guides/quickstart.html#installation

Since any true map-reduce value coming out of Hadoop requires a cluster, a real introduction to Hadoop/MRJob would require a relatively challengin discussion about job configurations&mdash;basically, creating a file on your computer that tells Hadoop where the cluster is and how to communicate with it, and what kind/how many resources to reques. Thus, the configuration for Hadoop is entirely dependent on the cluster being used, in additon to the job being done.

### 10.4.4 Spark
Ah, yes, finally, we made it. Spark is a newer distributed programming framework that is designed to do a few things that Hadoop and other tools don't. Most importantly for us, Spark is designed to support parallel processing on a single machine, so we can really see improvements on relatively-small jobs that don't require a cluster. However, there are a few other things that Spark brings to the table that should be discussed.

Spark uses Resilient Distributed Datasets (RDDs). So, when you start a Spark job the first thing that has to be done is the creation of an RDD. In Python, you can think of RDDs as another type of python object, like a data frame. They store data in an ordered way that is designed to be passed out to different threads, so we don't have to worry about how data are being partitioned and distributed! Moreover, RDDs are _resilient_. While we haven't brought this up as an issue, distributed processing is also challenging because every thread has to be successful for an entire process to be successful. What this means is that if one computer in a cluster fails, the whole job breaks! Well, RDDs are not only  data objects. They also include the code that operates on the data, but more importantly, keep a record of the code that has been applied to the data and the stages and intermediate data that are successfully produced along the way. What this means is that if a thread fails at any point in a Spark job Spark can automatically restart from the appropriate intermediate position and complete the overall task. The best part is that we don't have to worry about any of this!

Spark Manages System Memory Intelligently. As discussed in __Sec. 1__, access to data in memory is much faster that to data that is stored on a disk. Hadoop solved an important issue: "where to store the data and processors?". However, Hadoop primarily stores all intermediate process data on the disks of the processing machines. What this means is that data are constantly being read and written, which is slow. The advancement that Spark provided was similar management of data, distributed across the memory of processing computers. So, while Spark will still "spill" data to disk if it has to, it does its level best to keep intermediate data in memory, which results in overall faster processing, since data are read/written less often.

### 10.4.5 Hadoop and Spark are not Mutually Exclusive
Finally, before continuing it's important to note that while Spark is the shiny new parallel processign software, it doesn't replace Hadoop! Spark is open source software supported by the same people as Hadoop&mdash;the Apache software foundation, and actually works best when it works _with_ Hadoop. So, while we can use Spark easily in Python (below) on a single computer without Hadoop or HDFS, Hadoop and HDFS can be exceptiopally beneficial to Spark on a cluster. Basically, here's the line between the two:

- Hadoop: deals with Big Volumes of data on a cluster and makes sure a job can get done, at the very least.
- Spark: speeds up large jobs either on a single machine or cluster, and by virtue of increased speed of Big Data processing, helps optimize for Big Velocity.

Finally, one more note about Spark that we'll see next is that Spark is designed to make other regular, high-level tasks parallelized. Thus, Sparks comes with a number of RDD methods focused on grouping, filtering, etc., that allow for more complex map-reduce (technically) tasks to be completed. Moreover, whole libraries have since been created for Spark specifically focusing on machine learning distribution (MLlib) and database management (Spark SQL). We won't get to these either, but they are growing in importance.

### 10.4.6 Python and Spark: PySpark
Okay, well just like Hadoop Spark is not written natively in Python. So, what this means is that Python must use an API to run spark. Fortunately there is a library designed just for this called `pyspark`, which can easily be installed with `pip`. However, for pyspark to actually work you will have to install the native version of spark, which is written in a language called scala. Download instructions for spark are available in the documentation:

- https://spark.apache.org/downloads.html

but the process changes pretty rapidly with emerging development, in addition to variation across machines. Once you have Spark and PySpark set up, you can begin by initializing a Spark context. Note: the below can only be done once per Python session.

In [5]:
from pyspark import SparkContext, SparkConf

## initialize the spark context to run 4 processes locally,
## and give the application a name
sc = SparkContext("local[4]", "Week 10 Notes")

With a spark context we can initialize an RDD. Here's how we can create an RDD from an in-memory list.

In [6]:
## create the spark context with 4 partitions,
## which is equal to the number of threads
## we would like to have running
RDD = sc.parallelize(range(25),4)

You can use the `.take()` and `.collect()` methods to bring data back into Python. While running `.take(n)` retrieves the first $n$ values in a dataset, the `.collect()` method takes all contents of an RDD and turns it into a list, which can worked with using basic Python tools. Note: if your data is Big on Volume and doesn't fit on a single machine, then `.collect()` will probably cause your computer controling the process to crash.

In [7]:
print(RDD.take(5))

collectedRDD = RDD.collect()
print(collectedRDD)

[0, 1, 2, 3, 4]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24]


### 10.4.7 Word count example in PySpark 
Now, you might be wondering where map and reduce actually fit in with Spark, and the truth is that they appear in may ways. Unlike with Hadoop, Spark's API has built in a large number of higher-order programming functions meant for parallel processing, including `.groupby()`. This makes it a lot easier to perform complex tasks in parallel, but it also means that we need to be choosy with which operations we use.

In general, Spark does transformations and actions. This includes RDD methods for `.map()` and `.reduce()`, but these are executed in different ways! While `.reduce()` is an action that runs as soon as it is called, `.map()` is called a _transformation_ in Spark, which is a type of function that is only run once an action is initiated. For a full list of actions and transofrmations see the docs:

- https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations

As Spark has it defined, here are RDD `.map()` and `.reduce()`:

- `.map()`: Return a new distributed dataset formed by passing each element of the source through a function func.
- `.reduce()`: Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.

Two importants facts to parse out of these definitions is that 

1. `.reduce()` only operates on only a list of values (dataset), and doesn't take key-value pairs, and 
2. `.map()` expects there to be exactly one output for each input.

What we actually need are `.flatMap()` and `.reduceByKey()`. Our example of counting the words in a list of strings actually wouldn't be well supported by these basic operations, since we require a shuffle by words as keys. Instead, what we want are:

- `.flatMap()`: Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
- `.reduceByKey()`: When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. 

Rather than work on our cooked up example, let's do word count on a book. As it turns out, we don't have to do any data loading ourselves to create an RDD of a text files lines. Instead we can specify a file path and use the `sc.textFile()` method to create our RDD of text lines.

In [8]:
## create an RDD from of a text file's lines
## and store it in 4 partitions
RDD = sc.textFile("./data/books/2701.txt", 4)

print(RDD.take(25))

[u'', u'', u'Produced by Daniel Lazarus and Jonesey', u'', u'MOBY DICK; OR THE WHALE', u'', u'By Herman Melville', u'', u"Original Transcriber's Notes:", u'', u'This text is a combination of etexts, one from the now-defunct ERIS', u"project at Virginia Tech and one from Project Gutenberg's archives. The", u'proofreaders of this version are indebted to The University of Adelaide', u'Library for preserving the Virginia Tech version. The resulting etext', u'was compared with a public domain hard copy version of the text.', u'', u'In chapters 24, 89, and 90, we substituted a capital L for the symbol', u'for the British pound, a unit of currency.', u'', u'ETYMOLOGY.', u'', u'(Supplied by a Late Consumptive Usher to a Grammar School)', u'', u'The pale Usher--threadbare in coat, heart, body, and brain; I see him', u'now. He was ever dusting his old lexicons and grammars, with a queer']


Transformations and actions are RDD methods that are chained onto the RDD. So, we can take our RDD and apply our favorite map and reduce functions to complete our word counting task.

In [9]:
WordCountTuples = RDD.flatMap(wordCountTuples).reduceByKey(PlusPair)

for word, count in WordCountTuples.take(10):
    print(word,count)

(u'', 3570)
(u'funereal', 1)
(u'shouted,', 1)
(u'troubledly', 1)
(u'cod-liver', 1)
(u'four', 63)
(u'prices', 1)
(u'ocean,', 16)
(u'harpoon-line', 2)
(u'slew.', 1)


What if we wanted to see the most common words? Well, if we look in our handy-dandy list of transformations and actions we'll see that there's a `.sortByKey()` transformation, but it sorts by key! So, we'll have to run another map command that switches positions of keys and values. After this, we can `.sortByKey()` to get our data into the right order. Note: since `.sortByKey()` sorts ascending by default, we'll have to specify `.sortByKey(False)` to get the ordering (descending) we're interested in.

In [10]:
SortedWordCountTuples = WordCountTuples.map(lambda x: (x[1],x[0]))\
                                       .sortByKey(False)

for word, count in SortedWordCountTuples.take(10):
    print(word,count)

(13609, u'the')
(6480, u'of')
(5884, u'and')
(4476, u'a')
(4440, u'to')
(3824, u'in')
(3570, u'')
(2680, u'that')
(2415, u'his')
(1724, u'I')


What if we want to group the words by how many times they appeared? Well, there's a `.groupByKey()` method, and now that we've made the _counts_ into our RDD's keys this method will work perfectly for us!

In [11]:
GroupedCountWordsTuples = SortedWordCountTuples.groupByKey()\
                                               .sortByKey(False)

for count, words in GroupedCountWordsTuples.take(100):
    print(count,list(words))

(13609, [u'the'])
(6480, [u'of'])
(5884, [u'and'])
(4476, [u'a'])
(4440, [u'to'])
(3824, [u'in'])
(3570, [u''])
(2680, [u'that'])
(2415, [u'his'])
(1724, [u'I'])
(1646, [u'with'])
(1590, [u'as'])
(1565, [u'was'])
(1561, [u'is'])
(1506, [u'it'])
(1492, [u'he'])
(1360, [u'for'])
(1293, [u'all'])
(1212, [u'at'])
(1132, [u'this'])
(1093, [u'by'])
(1051, [u'from'])
(1030, [u'but'])
(1023, [u'not'])
(973, [u'be'])
(905, [u'on'])
(784, [u'so'])
(752, [u'one'])
(751, [u'had'])
(748, [u'have'])
(734, [u'you'])
(686, [u'or'])
(645, [u'were'])
(637, [u'But'])
(611, [u'their'])
(575, [u'an'])
(569, [u'some'])
(568, [u'they'])
(566, [u'are'])
(560, [u'my'])
(554, [u'him'])
(550, [u'which'])
(544, [u'like'])
(534, [u'The'])
(531, [u'upon'])
(516, [u'into'])
(501, [u'when'])
(457, [u'now'])
(439, [u'no'])
(436, [u'out'])
(427, [u'more'])
(414, [u'there'])
(413, [u'up'])
(412, [u'old'])
(406, [u'would'])
(401, [u'been'])
(396, [u'if'])
(392, [u'whale'])
(387, [u'we'])
(379, [u'what'])
(369, [u'its'])


So, where do we go from here? Well, Spark isn't about making more complex models or data queries, it's really there to make your programs that you _absolutely_ have to run work.  Whether it's because you can't wait for your script to finish on a single thread, or because you don't have enough capacity on a single machine to store and process all of your data, the whole point of spark is making what was computationally intractable using _serial_ programming&mdash;everything we have done on Python until this point&mdash;tractable by using parallelism. Since Spark can be used on a single machine, it might just be some peoples' preference to make work flows in Spark. However, this might some day just be standard practice, since it stages any work being _prototyped_ on a single machine for distribution on a cluster.