# `hadoop`

*soon we will need the following file, and it takes a considerable amount of time to download. start downloading it now*

https://resources.oreilly.com/examples/0636920035275/raw/master/hfpd3.vmdk.gz

## the problem(s)

reach all the way back in your memory to two lectures ago, when we talked about `aws` `dynamodb`. the proposed use case for `dynamodb` was an ambiguous "webby" one:

*we're reading and writing way too much data way too fast for our one machine*

this sentiment is reflective of a modern data reality that often goes by the buzziest of buzzwords:

**big data**

*mandatory caveat: big data $\neq$ data science*

traditional data analyses were optimized for super-powerful single machines such as the monolithic, super-powerful `sql` servers

*note: this is not to say that cluster computing didn't exist; indeed it was one of the main computational frameworks in the early days of computers*

our exponential growth in disk space and memory space per dollar spent fueled a lot of this work and innovation.

basically, for a long while our ability to *compute* data grew faster than our ability to *create* or *acquire* data. in the modern world, though, that notion is absolute history.

take, for example, a relatively trivial process for modern computation: word counts for a document of several MBs:

In [None]:
%%bash
rm /tmp/shakespeare.txt
wget --quiet -O /tmp/shakespeare.txt.zip https://github.com/bbengfort/hadoop-fundamentals/raw/master/data/shakespeare.txt.zip
unzip /tmp/shakespeare.txt.zip -d /tmp
ls -alh /tmp/shak*

In [None]:
%%bash
head /tmp/shakespeare.txt

In [None]:
with open('/tmp/shakespeare.txt', 'r') as f:
    s = f.read()

print(s[:100])

In [None]:
import collections

wordct = collections.Counter(
    word.lower()
    for line in s.split('\n')
    for word in line.strip().split('\t')
    if word
)

wordct.most_common(10)

that was easy, but it relied on some important features:

1. I had enough disk space to have that 8.5MB file stored locally
2. I had enough memory to load that 8.5MB file's contents directly into memory

obviously, that isn't always the case. It's not even hard to think of counter-examples

1. a larger text corpus (e.g. all of wikipedia, 10TB as of 2015, or publically available SEC EDGAR filings)
2. any reasonably large image recognition project
3. the logs of web traffic for any modestly sized website or service
4. IoT information (usage records of your smartphone or headphones, e.g.)

so, back to `dynamodb`. when we (theoretically) started to run into resource issues for our single-machine architecture, we decided to change the way we were doing things and start "scaling horizontally" -- choose an architecture and software that can spread the storage and computation burden across multiple machines

for `dynamodb` we were attempting to distribute out our database writes and reads as actions, but the test scenario I laid out above was one of

+ data storage
+ resource availability

`hadoop` is the *de facto* operating system for distributed computing

it is a software solution that abstracts out all the "hard stuff" (the complicated networking and resource marshalling) that needs to happen to get multiple computers on the same page, and instead provides the user (you) with a single api for

+ accessing distributed files (`hdfs`)
+ securing computational resources and memory (`yarn`)

## `hadoop` nuts and bolts

let's dig into the details of distributed computing a bit

### terminology

+ **node**: a single machine (real or virtual)
+ **cluster**: a collection of *nodes* which can communicate with eachother
+ **master**: a *node* which can request information from or delegate tasks to other *nodes*
+ **worker**: a *node* which merely receives, processes, and responds to requests

### basic concepts

in the database world we had certain requirements a `dbms` needed to meet to ensure that all clients of that database service could share those resources (the `ACID` principles)

similarly, for distributed computing to be well defined and robust, we have four requirements:

1. *fault tolerance*: if one computer goes down, we're good. if it comes back, we're even gooder
2. *recoverability*: we don't lose data when things fail
3. *consistency*: results shouldn't depend on jobs failing or succeeding
4. *scalability*: more data means longer time, not failure; we can increase resources if desired

`hadoop` addresses these requirements by making the following decisions:

+ data is distribute across many nodes in the cluster; each node prefers it's local data
+ all data is chunked into blocks (say, 128 MB) and is *replicated* (copied to other nodes)
+ jobs (computations) are broken into tasks applied to single blocks
+ jobs are completely unaware that they are distributed
+ worker nodes don't care about eachother
+ tasks are redundant and repeatable
+ master nodes handle allocation of all resources (storage, cpus, memory)

### `hadoop` architecture

`hadoop` as an operating system is basically just two pieces of software:

1. `hdfs` (a program for handling distributed file storage)
2. `yarn` (a program for handling distributed resource allocation)

together, these two process conspire to enforce some of those design decisions above: namely, to make sure that all data is robustly distributed and that all distributed tasks are working on local data

`hdfs` and `yarn` are the defaults and they were built to work in tandem, but either can be replaced:

+ you could change stoarge methods (e.g. `hdfs` replaced by `s3`)
+ you could change resource managers or computational layers on top of storage (e.g. `yarn` replaced by `hbase`)

#### a `hadoop` cluster

`hadoop` is a software. the hardware is a cluster of computers. the benefit you get in using `hdfs` and `yarn` are abstracted `api`s that hide cluster administration details and tasks from you.

to put it another way: `hadoop` lets someone else do the hard task of distribution so you can do what you came here to do (analysis)

when we've talked before about databases or `aws` `REST` `api`s, we've often called them *services* and the programs we wrote to utilize those services *clients*

both `hdfs` and `yarn` have several *services* that our tools (*clients*) use

`hdfs` services:

+ `NameNode` (master): stores the directory tree, file metadata, file cluster locations. this is the access point for `hdfs` usage
+ `Secondary NameNode` (master): performs housekeeping, checkpointing. not a backup `NameNode`
+ `DataNode` (worker): local `io` for `hdfs` blocks

the basic interaction with `hdfs`:

1. client asks `NameNode` where data lives.
2. `NameNode` tells client
3. client is responsible for going and getting data from `DataNode`

`yarn` services:

+ `ResourceManager` (master): allocates and monitor resources (memory, cores), schedules jobs
+ `ApplicationMaster` (master): coordinates a particular app after `ResourceManager` has scheduled it
+ `NodeManager` (worker): runs tasks and reports on task status

the basic interaction with `yarn` is very similar:

1. client asks `ResourceManager` for resources
2. `ResourceManager` assigns `ApplicationMaster` instance to manage the individual application
3. `ApplicationMaster` submits a job to a single `NodeManager`, tracks all submitted jobs
4. `NodeManager` executes incoming assigned tasks

to give you a sense of scale for a typical `hadoop` cluster

+ 20 - 30 workers and one master can handle 10s of terrabytes of data in simulatneous workflows
+ single servers (resource absolutism) is needed once you have hundreds of nodes
+ multiple masters are needed when you start talking about thousands of nodes

#### details on `hdfs`

`hdfs` is a file system on top of another filesystem. in many respects, it behaves like you're used to the `linux` filesystem behaving (with slightly different commands). there are a few nuances worth discussing, however.

##### blocks

files are blocked into large (e.g. 128MB) chunks. this means that a file larger than that will be separated up into different blocks. it's worth noting: this is effectively the *only* sense in which the block size matters

a small file will not wastefully take up the remainder of the space on the OS.

that's not to say there isn't a problem with small files, though -- there is. It's not wasteful disk usage, it's wasteful *resource* usage. we will discuss `mappers` and `reducers` later, but for now it suffices to say: when we distributed tasks, we already said we distribute them to blocks.

if one of those blocks contains a small amount of information, that will be pretty wasteful.

better a million files of 100 MB than a billion files of 0.1 MB

## a demo `hadoop` environment

a single `hadoop` (or, related, `aws` `emr`) environment is often a large, complicated, expensive, and unruly engineering project.

to avoid the hastle of constantly building up complicated development environments, many developers will create a *virtual execution environment* in a *virtual machine*.

we are going to build one such virtual environment right now using oracle's `virtualbox` and the Ubuntu 14.04 `vmdk` provided by the authors of our text book

<div align="center">**walkthrough: installing `virtualbox` and a `hadoop` virtual machine**</div>

1. if you didn't start the download of https://resources.oreilly.com/examples/0636920035275/raw/master/hfpd3.vmdk.gz at the beginning of class, do so now
2. download `virtualbox` for your os and follow instructions: https://www.virtualbox.org/wiki/Downloads
4. unzip the `vmdk` file once it is downloaded
    1. in a terminal, `gunzip -k hfpd3.vmdk.gz`
5. create the VM
    1. open `virtualbox` and click the "new" button
    2. name it whatever you want, change the type to `linux`, and make the version "ubuntu 64-bit"
    3. set the memory however you want (I'll go high because yolo)
    4. select "Use an existing virtual hard disk file" and navigate to the `vmdk` file
    5. start up the VM. password is `password`
    6. click "Devices > Insert guest additions cd" and the run (again, password is `password`)
    7. restart the VM when finished (now you can resize!)
    8. back in the `virtualbox` program, navigate to "Settings", and on the "General > Advanced" tab make "Shared Clipboard" "Bidirectional"

note: [the `cloudera` vm](https://www.cloudera.com/downloads/quickstart_vms/5-12.html) is actually pretty excellent to use and I highly recommend it for your general development and hacking.

I opted for the course-specific `vmdk` so that we would avoid configuration and implementation discrepancies as much as is possible, and also because the `cloudera` download requires you provide a lot of identifying information and I am attempting to respect privacy when possible

<div align="center">**starting `hadoop`**</div>

1. log in to your `hadoop` vm
2. execute the following:

```bash
sudo -H -u hadoop $HADOOP_HOME/sbin/start-dfs.sh
sudo -H -u hadoop $HADOOP_HOME/sbin/start-yarn.sh
sudo -H -u hadoop $HADOOP_HOME/bin/hadoop fs -chown -R hadoop:hadoop /
sudo -H -u hadoop $HADOOP_HOME/bin/hadoop fs -chmod g+w /
sudo chmod g+w /var/app/hadoop/data

# demonstrate it worked
hadoop fs -mkdir -p /user/student
hadoop fs -ls -h /
```

### working with a distributed file system

#### basic file system operations

many of the common `linux` command line file system tools are available with the same names in `hadoop`. try

```bash
hadoop fs -help
```

(note the single-dash parameters and curse the `java` gods)

tired of reading those 4000 lines? try any one subcommand too:

```bash
hadoop fs -help ls
hadoop fs -help chmod
```

out on the etherwebs, you may see floating around commands such as

```bash
hdfs dfs -ls /
```

`hdfs dfs` is *related to* `hadoop fs`, but is not exactly the same. `hadoop fs` defaults to looking at `hdfs` files, but is actually file-system agnostic(ish), and supports local files (via the `file://` schema), `s3` files, `ftp` services, and any other people have been kind enough to implement.

`hdfs dfs` *only* works with `hdfs`.

to demonstrate how `hadoop fs` can be used with local files as well, try out

```bash
hadoop fs -ls file:///tmp/
```

none of this is to say *prefer* `hadoop fs` or *avoid* `hdfs dfs`. just knowing what the difference is may help you avoid some confusion when you try the subcommands or flags of one and don't experience the same result as you would with the other.

let's prepare our `hadoop` cluster to actually do some `hadoop`-y stuff. on the master node (your virtual machine), run:

```bash
mkdir ~/code && cd ~/code
git clone https://github.com/bbengfort/hadoop-fundamentals.git
cd hadoop-fundamentals/data
unzip shakespeare.txt.zip
hadoop fs -copyFromLocal shakespeare.txt shakespeare.txt
```

the files we save in `hadoop` are generally enormous. it's good to know right away how to read portions of such large files:

+ `hadoop fs -cat shakespeare.txt | less`
+ `hadoop fs -cat shakespeare.txt | head` (this aborts the streaming when `head` has had enough)
+ `hadoop fs -tail shakespeare.txt`

#### other `hdfs` interfaces

finally, there are `http` integrations. in particular, check out the web interface for the `DataNode`s found at `datanode_url:50075` (for our `hadoop` vm, try `127.0.0.1:50075`)

### working with distributed computing

as we said above, `yarn` is the main resource manager and one of the main access points for computation. in the original instance of `hadoop`, however, the computational framework was a software called `mapreduce`

knowing what `mapreduce` is helps illuminate the engineering paradigm at play in `hadoop` programs

#### `mapreduce`: a functional programming model

`mapreduce` was [first proposed](https://research.google.com/archive/mapreduce.html) by google developers as a way of performing easily distributable computations

the name comes from the two "pieces":

+ a `map` function takes input as a series of key-value pairs ("kvps") and performs the same computation on each pair, generating a (possibly empty) sequence of intermediate kvps
    + this is where analysis happens (usually)
    + e.g. filter: take a key, check if it belongs in a list of acceptable keys, emit the kvp if yes, pass silently if no
+ a `reduce` function takes a key and an iterator of values and process the values, usually to determine some aggregate statistic

these functions ought to be stateless functional programming functions

[this simple diagram](https://www.tutorialspoint.com/map_reduce/images/mapreduce_work.jpg) shows a simple prototype mapreduce process 

#### `mapreduce`: implemented on a cluster

the `mapreduce` framework is great for a distributed computation environment because it is assumes many of the central tenets of the distribution framework. specifically, because mappers and reducers are stateless functions, they can be executed by a worker node to independently work on any number of blocks and emit their responses back to the master node.

mappers are already set: individual blocks are key-value pairs where the keys are file or line metadata and the values are the contents of the file / line. we can distribute the mapper function to any number of workers and let them process blocks at their own pace without any outside information

reducers needs all the output values for a single key across all processed blocks, so we have to wait until all mappers are done to "reduce".

we create as many reducers as there are output keys and distribute them among the workers

because reducers expect to get the keys emitted by mappers and **all** values for those keys, we need to perform a shuffle and sort of those intermediate kvps before we can reduce. this stage is called exactly that: *shuffle and sort*

so, in the end, we have a general framework:

+ input: `hdfs` kvps
+ mapping: input kvps are processed by mappers and generate intermediate kvps
+ shuffle and sort: take the generated key, partition the key space, and assign keys to reducers
+ reduce: take the keys and the iterated list of values and reduce them to aggregate kvps

it's kvps all the way down!

##### mapreduce examples

we already counted words in the shakespeare corpus, in memory in plain `python`, and the pseudo-code which can fit this wordcount problem into `mapreduce` is not that different:

```python
def mapper(documentkey, line):
    for word in line.split():
        emit(word, 1)
        
def reducer(word, values):
    emit(word, sum(val for val in values))
```

### submitting a mapreduce job to `yarn`

`yarn` is responsible for scheduling tasks, so if we would like to perform some task we need to give it to `yarn`.

one way (and the most basic) is to create a `jar` file (compiled `java` code) and to pass that directly to `yarn` using the `hadoop jar` command.

in [the github repo](https://github.com/bbengfort/hadoop-fundamentals) for the "Data Analytics with Hadoop" O'Reilly book, we have been provided with a couple `java` files to implement a simple `mapreduce` word count job

+ [`WordCount.java`](https://github.com/bbengfort/hadoop-fundamentals/blob/master/wordcount/WordCount/WordCount.java)
+ [`WordMapper.java`](https://github.com/bbengfort/hadoop-fundamentals/blob/master/wordcount/WordCount/WordMapper.java)
+ [`SumReducer.java`](https://github.com/bbengfort/hadoop-fundamentals/blob/master/wordcount/WordCount/SumReducer.java)

let's compile and run that code on the shakespeare corpus.

first thing's first, let's compile our `java` code into a `jar` file

```bash
export HADOOP_CLASSPATH=$JAVA_HOME/lib/tools.jar
cd ~/code/hadoop-fundamentals/wordcount/WordCount/
hadoop com.sun.tools.javac.Main *.java
jar cf wc.jar WordCount.class WordMapper.class SumReducer.class
```

second thing's second, let's fix a simple permission problem on our local machine and then another one on our `hdfs`.

```bash
sudo chmod g+w /var/app/hadoop/data
sudo su hadoop
hadoop fs -chmod g+w /
```

final thing's final, we can submit the `jar` file to `yarn` by calling

```bash
hadoop jar wc.jar WordCount shakespeare.txt wordcounts
```

we can track the results of that job via a web interface at 127.0.0.1:8088

## not using `java`  with `hadoop` streaming

so we were able to write `java` code to create `mapreduce` jobs. super.

I mean... not knowing `java` is a bit of a problem though. not to be ungrateful.

this is just what we get out of the box with `hadoop` `mapreduce`.

+ java api with input, output, map and reduce functions, job params exposed as *job configuration*
+ jobs get packaged into a jar which is passed to the `ResourceManager` by the *job client*
+ `ResourceManager` handles the rest

but what if you don't want to write `java` code that implements this same workflow over and over and over again?

or just don't want to write `java` code *at all*, because you already did everything you needed to do in `python`?

*hadoop streaming* is here to help.

### `hadoop` streaming

hadoop streaming is a `java` util which can take any executable (in *any* language!) and use that as a mapper or reducer or combiner.

really, this is just a super hacky `jar` file that is submitted in the same was as our `wc.jar` in our example above. for this `hadoop`-specific `jar` file, you pass executable scripts or commands as parameters to this `jar` file

note: the word "streaming" is used because the input and output method is unix streams (`stdin`, `stdout`), not in reference to streaming data.

this is actually pretty cool, because we know how to access those streams:

+ `python`: `sys` module
+ `R`: `file("stdin")` (I think? who even knows. does anyone?)

when we develop a `mapper.py` script, know the following:

+ *each* mapper launches the executable. spin-up time sucks for obvious reasons
+ `hadoop streaming` parses input data into lines of text and pipes them through `stdin`
+ `python` streaming script parses those lines of texts and prints (to `stdout`) kvps delimited in some way (default is `\t`)
+ these intermediate kvps are scooped up by `hadoop streaming` again and passed on to the reducer
+ the mapper gets an entire block via `sys.stdin`. so it doesn't receive a *file*, or a *line number*, it receives a file handler to a block. that's important.

the `reducer.py` script follows much of the same logic, but in addition:

+ the reducer doesn't receive a key and an iterable, it reads shuffled and sorted kvp records (like a table) from stdin (they are in the `a\tb` format)
+ a single reducer task will always get *all* records for given key, but *may* get more than one key (so your reducer doesn't have a key, we need logic there)

for both files (and for any file in any language being used as a `hadoop streaming` script), the shebang (`#!`) declaration at the top of the file is important -- it tells the streaming process (a bash shell) how to execute the script (e.g. in `python`)

#### real example: flight data

the [bureau of transporation statistics](https://transtats.bts.gov/) makes [on-time flight data](https://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time) publicly available.

let's download some and use a pre-written `mapper.py` and `reducer.py` to calculate the average delay per airport

first, download the airline data:

```bash
cd /tmp
wget --no-check-certificate https://transtats.bts.gov/PREZIP/On_Time_On_Time_Performance_2017_1.zip
unzip On_Time_On_Time_Performance_2017_1.zip
```

the code we will use is in

```bash
cd ~/code/hadoop-fundamentals/avgdelay
```

let's take a look at `mapper.py` and `reducer.py` in that repo.

one nice thing about this simple framework is that we can test our functions in a simple series of pipes:

```bash
head -n100 /tmp/On_Time_On_Time_Performance_2017_1.csv | ./mapper.py | sort | ./reducer.py
```

so, we have seen that the `avgdelay` code is able to `map` and `reduce` the records in the `csv` of airport delays we downloaded. let's ship that over to `hdfs` and run a `streaming` job with these files

```bash
hadoop fs -put /tmp/On_Time_On_Time_Performance_2017_1.csv .
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.5.2.jar \
    -files mapper.py,reducer.py \
    -input On_Time_On_Time_Performance_2017_1.csv \
    -output average_delay \
    -mapper mapper.py \
    -reducer reducer.py
    
# trust, but verify
hadoop fs -cat average_delay/part* | head -n25
```

note the `-files` params -- what's going on there?

in a cluster environment, the materials of the executed code will generally have to be either

1. pre-installed on the cluster, so that it is obvious
2. shipped along with the request to execute.

## `spark`

while it *is* actually still somewhat common to write `hadoop streaming` `python` scripts for doing etl work, it's really not great for data science.

as you've seen, it's pretty orchestrated and low-level. we're thinking hard about the simple things we want to do (like count words or calculate averages), and when we've figured them out, we're only doing them *once*

think about how that looks when we want to move on to something more complicated, like a gradient descent algorithm.

or anything iterative, for that matter.

1. take parameters and applying a model defined by those parameters to every record in our `hdfs` dataset -- map records of features to predicted `y` values, calcualte individual `y` error term and gradients for each record, and `emit` those
2. reduce those partial gradients to determine the parameter update
3. update paraemters

each time we move between steps we are reading and writing to `hdfs` and that can be crazy wasteful

also, this is 2017. we should expect that someone has just done this for us. that's fair.

the name of the game in data science applications is `spark`.

`spark` is a `scala` (an OO functional programming language which runs on the `jvm`) application which is a fast query and iterative algorithm calculation platform. it was built as a replacement for `mapreduce` for calculation workflows just like the ones in which we are most interested.

perhaps most importantly for us: there are libraries for the `spark` `api` in `python` and `R`, and this has lead to pretty wide adoption in the communities.

### the `spark` stack

`spark` ignores data management and focuses exclusively on resource management.

the primary programmer interface is the `spark` core api module (i.e. the standard library of `spark`), and this library is entirely focused on implementing commong computation tasks (file `io`, `mapping`, `reducing` , `filtering`) in as efficient a way as possible.

just like in `python` and `R`, after the core language has taken care of the low-level stuff, specialized tools are built on top of this. in `spark`, some of the most important are:

+ `spark sql`: an implementation of the `sql` standard against `rdd`s. dirty.
+ `spark streaming`: unbounded data stream processing
+ `mllib` and `mahout`: common machine learning algorithms implemented
    + `spark-sklearn` is an implementation of the ubiquitous `sklearn` `api` with `mllib` implementation under the hood
+ `graphx`: manipulate and calculate graphs
+ `zeppelin`: the `jupyter notebook` of the `spark` world

#### resilient distributed datasets

the fundamental data structure of `spark` is a resilient distributed dataset (`rdd`)

previously we cited the requirements of distributed computing frameworks to be *fault tolerance, recoverability, consistency, and scalability*

`rdd`s are `spark`'s way of performing distributed computation while hitting those requirements. at the simplest level, `spark` takes a functional plan of attack (a sequence of functions) and figures out how to distribute the data to many different nodes (in memory) to optimize that plan

some important facts about `rdd`s

+ `rdd`s are immutable, read-only collections of objects
+ they can be built from a lineage (a series of `fpl` function calls)
    + this makes them *fault tolerant, recoverable, consistent*
+ they work in parallel, so *scalable*
+ they are operated on by `scala`, and `fpl`, so *consistent*
+ they are immutable, so *recoverable*

when you're using the `spark` api, then, your basic abilities are to create, transform, or export these `rdd`s.

you need to shift paradigms into a functional mindset: think of things you can do.

`spark` breaks these things down into basically two types

1. *transformations*: `rdd` $\rightarrow$ new `rdd`
    + `map`: take a big `rdd`, apply something, create a new `rdd` as a result
2. *actions*: return something back to the client (aggregation, e.g.)
    + `reduce`: repartition `rdd` by key, aggregate (sum, mean)
    

#### programming with `rdd`s

the way we actually deploy programs in `spark` is similar to how we deployed `mapreduce` jobs in `hadoop streaming`: we write some code, send it to some local machine, that distributes the computation elsewhere

what changes in `spark` is that a master program (the "driver") creates `rdd`s by *parallelizing* a `hadoop` dataset (that is, it partitions a given dataset and pushes those partitions to nodes that perform local computations in memory).

an `rdd` is a structure that manages this partitionting / parallelizing.

from the point of view of the `spark` program, the order of operations is

1. build `rdd`s
    + access data from `hdfs` or local disk storage
    + parallelize that collection of data
    + transform it as necessary
    + cache everything we can
2. pass *closures* (stateless functions, ignorant of the rest of the world) to each element of the `rdd`
    + *closures* are then locally applied in-memory and the outputs are also cached
3. output `rdd`s are *acted on* (aggregated)
    + this is the only place we atually have an eval step.

one quick note on some common terms: *variables* and *closures*

+ *closures* do not rely in any way on external data
    + if they have variables within, they are copied to the nodes with them, but kept in local scope
+ external data, if needed, is passed through shared variables
    + *broadcast* variables: read only, distributed (e.g. lookup tables / stopword lists)
    + *accumulators*: meant to be associatively updated (e.g. counters)

### interactive `spark` using `pyspark`

`pyspark` is a `repl` for the `spark` api bindings in `python`

let's do a quick walkthrough / demo of using `pyspark` to count the words in the `shakespeare.txt` file. you'll see that the syntax is pretty familiar from traditional `python`, with a few twists.

first, let's open the `repl`

```bash
cd ~/code/hadoop-fundamentals
$SPARK_HOME/bin/pyspark
```

and then, load the shakespeare text

```python
# sc is the "spark context", and it's pre-built by the repl
text = sc.textFile('data/shakespeare.txt')
print(text)
help(text)

# look at flatMap
help(text.flatMap)
```

we can use `flatMap` to apply a tokenization function to every text string:

```python
# make a tokenize function
def tokenize(text):
    return text.split()

# looks good. let's tokenize our text into words
words = text.flatMap(tokenize)
```

we can take that flat list of words and do our standard map and reduce. before we move on here, though, look at the linear (the defining sequence of functions) via `wc.toDebugString`

```python
# let's apply a map function for word counts
wc = words.map(lambda x: (x, 1))

# you can see the lineage:
print(wc.toDebugString())
```

finally, we'll use built-in functions to do our summation `reduce` step

```python
# include a reduce step to sort/shuffle/partition by key and add the values
from operator import add
cts = wc.reduceByKey(add)
```

finally, save the result to file. note that this is when something actually *happens*. up until this point, we were merely defining a *lineage*; now we're actually asking that some *action* take place, and `spark` springs into action

```python

# finally do something with it all
cts.saveAsTextFile('wc')

# exit so we can see the results
exit()
```

```bash
# look at them beautiful results
less wc/part-00000
```

## `aws emr`

we stressed earlier that `hadoop` is a software, not a hardware. one implementation of that software is the `aws` version of `hadoop`, which is called Elastic Map Reduce, or `emr`.

`emr` is both a simplification and a generalization of the sort of structure we've been talking about above.

for example, the types of nodes are generalized from master / worker to include

1. master (same as before)
2. core nodes: workers which run core services and directly interface with `hdfs`
3. task nodes: workers which do *nothing* but execute tasks (not even interface with `hdfs`

additionally, being an `aws` product, there is immediate integration with other `aws` services, and in particular `s3`.

`emr` can use `hdfs` (in `aws` land, that is ephemeral storage that is destroyed when a cluster is taken down), or it can use `s3`, or it can use the local file systems on the `aws` `ec2` servers which are actually running the cluster.

generally, input and ouptut is done in `s3` and intermediate steps are retained in the ephermeral `hdfs` storage

<div align="center">**pseudo-walkthrough: setting up an `aws emr` cluster**</div>

because of the length of time involved, we will not spin up an `emr` cluster live. I have done that prior to today and have one up and running already. however, we can walk through the steps briefly

the previous walkthrough showed the following

1. navigate to [the `aws` `emr` service page](https://console.aws.amazon.com/elasticmapreduce/home)
2. click the "create cluster" button
    1. you could go through with this simple version, but we will go straight into "advanced options" (link at the top)
    2. software and steps
        1. leave the `emr` release at the largest value
        2. choose your software! I will pick `hadoop`, `pig`, `hive`, `oozie`, `hue`, and `spark`
    3. hardware
        1. I increased the number of task servers to 3
        2. for both core and task servers, I select spot pricing and picked a price of 0.065
        3. the "green" numbers are the regions where the current spot price is lowest -- nothing more.
            1. look at [spot pricing history](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-spot-instances-history.html) for more info
    4. I *did not* add any bootstrap actions, but if I wanted to, say, install conda everywhere, this is where I would have done it.
3. submit and wait!
    1. it takes a *long* time, 20+ minutes for me.

at this point the most pressing matter is accessing `hue` (`h`adoop `u`ser `e`xperience).

`hue` is a web application running on the master node at port 8888. it'd be great if we could just navigate there, but that port is not open to the world (and probably shouldn't be -- anyone who can access that port can run any `hadoop` command they want).

this is the same setup as with any web app running on `aws` behind several firewalls.

I recommend what `aws` recommends: `ssh` port forwarding. basically, we open an `ssh` pipe to the master node, and declare that whenever we go to a *local* port of a certain number (say 8888), our request is forwarded over that `ssh` connection and requests a certain port (say, also 8888) on that remote machine. the request is brougth back to us. 

effectively, we are just replacing a port on our local machine with a single port on the remote machine.

```bash
ssh -i ~/.ssh/gu511_ubuntu_1.pem -N -f -L localhost:8888:localhost:8888  hadoop@ec2-54-152-19-53.compute-1.amazonaws.com
```

with this in place, we should be able to go to http://localhost:8888/ and start kicking around our new cluster

<div align="center">**`hue`-based `emr`-walkthrough**</div>

+ let's put some data out there. go to the `hue` file browser
    + textcorpus
        + locally: `wget https://github.com/bbengfort/hadoop-fundamentals/blob/master/data/textcorpus.zip?raw=true`
        + rename it, upload that via "Upload > zip"
    + shakespeare
        + local: `wget https://github.com/bbengfort/hadoop-fundamentals/blob/master/data/shakespeare.txt.zip?raw=true`
        + rename it, upload it
    + python data
        + download https://s3.amazonaws.com/hadoop.rzl.gu511.com/pyq_stripped.tar.gz
        + possibly load it in directly

+ hive (via hue)
    + create new database `stack_overflow_python`
    + point to the `s3` upload in `hadoop.rzl.gu511.com`
    + did three simple queries, all saved and available for review

### software lightning round

#### `hadoop` and `spark`

the current leaders in the distributed data and distributed computation sphere; we've talked about them enough

#### `pig`

[`pig`](https://pig.apache.org/) is a declarative querying / scripting language (not like `sql`, more like `spark`). bascially this is a higher-order abstraction of `mapreduce` functions that can be extremely fast while also being much easier to read and write

#### `hive`

[`hive`](https://hive.apache.org/) is the `sql` of the `hadoop` ecosystem. it also doubles as a data warehousing platform for many of the other applications. it implements batch querying (not interactive querying), and is not *itself* a database, just a means of interacting with other data structures

I personally find that `hive` is the easiest of the `hadoop` ecosystem tools to spin up in (especially if you have a background in `sql`), so you may want to consider staring here.

#### `hcatalog`
[`hcatalog`](https://cwiki.apache.org/confluence/display/Hive/HCatalog) is a sub-component of `hive` that sees wide use in other applications. it is the table and storage management layer, and is used by disparate apps to provide a normalized, gridded view of data of many different formats. it acts as a normalized layer between application and data format

#### `oozie`

[`oozie`](http://oozie.apache.org/) is the scheduling and workflow management tool for `hadoop` jobs. it uses `xml` configuration files to define chained DAGs of jobs, executes them, reports on results and logs, handles concurrency and timing issues, and provides for a complex set of control flows.

personally I think it's pretty janky, but its' the best you've got.

#### `hue`

[`hue`](http://gethue.com/) (short for `h`adoop `u`ser `e`xperience) aims to be the way that users experience `hadoop`, and that's a great thing -- almost everything you might want to do in the `hadoop` ecosystem will be easier (at first) to do in `hue`.

over time, of course, complications and bugs will push you down (up?) into the command line and the programs themselves, but for beginning it is *essential* that you try using `hue`.

#### `mahout`

[`mahout`](http://mahout.apache.org/) is a framework for building machine learning applications. it is not immediately clear to me the extent to which `mahout` and `spark`'s `mllib` are *competitors* or *complimentors*, and I haven't been able to clear that up.

#### `sqoop`

[`sqoop`](http://sqoop.apache.org/) (`sq`l `o`n hado`op`) is a bulk data transfer took, looking to implement (under the hood) `mapreduce` jobs to perform data transfer between traditional databases, `hdfs`, and other data sources.

#### `tez`
[`tez`](https://tez.apache.org/) is similar to `spark` in that it is positioning itself as a replacement for `mapreduce` for simpler application frameworks. however, it is trying to be much closer to the `hadoop` / `yarn` infrastructure, implementing a normalized `api` so that other tools (the `pig`s and `hive`s of the world) can run directly off of `tez` in a normalized way.

#### `hbase`
[`hbase`](https://hbase.apache.org/) is a big data database and date store. it is actually a `nosql` key-value store on `hdfs`, and it has its own query language (like many `nosql` dbs)

#### `phoenix`
[`phoenix`](https://phoenix.apache.org/) 

an on-line transaction processing software for performing data transactions in a `sql` syntax, but using `hbase` as its backing (so schemaless).

#### `presto`
[`presto`](https://prestodb.io/) is a `sql` query engine for interactive queries against big data platforms. it was developed at facebook so it has good integration with `cassandra` in addition to other data stores.

it's main goal is to be interactive and very fast. I haven't used it myself but the reviews are pretty positive.

#### `zeppelin`
[`zeppelin`](https://zeppelin.apache.org/) is the `jupyter notebook` of the `spark` world, allowing for collaborative and exploratory work to be done in a web-based notebook. current interpreters include `spark`, `sql`, `python`, `hive`, `pig`, `sparksql`, `markdown`, and others

#### `flink`
[`flink`](https://flink.apache.org) is a stream processing framework. many of the big data sources of import are streams of data (log files, e.g.) and `flink` aims to be the standard means of persisting data streams into distributed environments

#### `zookeeper`
[`zookeeper`](https://zookeeper.apache.org/) is a centralized configuration engine. with so many moving parts in the distributed cluster environment, this can be pretty essential

#### `livy`
[`livy`](https://livy.incubator.apache.org/) is a `REST` interface for `spark`, allowing users to submit `spark` jobs from anywhere (not just on the cluster, in a `repl`)

#### `ganglia`
[`ganglia`](http://ganglia.sourceforge.net/) is the primary distributed monitoring system for `hadoop` clusters and grid computing

#### `mxnet`
[`mxnet`](https://mxnet.incubator.apache.org/) is an apache incubator project for developing distributed deep learning algorithms

<!--div align="center">***DROP joke WHERE is_bad***</div>
<img align="middle" src=""></img-->

# END OF LECTURE