## overview
1. Every spark application consists of a driver program that runs
the users main function and excutes various parallel operatoins on
a cluster.
2. RDD is a collection of elements partitioned across the nodes of
cluster that can be operated on in parallel.
3. User can ask Spark to persist an RDD in memory.
4. RDD automatically reconver from node failures
5. Shared variables can be used in parallel operations.
6. When spark runs a function in parallel as a set of tasks on 
different nodes, it ships a copy of each variable used in the 
function to each tasks.
7. Spark supports tow types of share variables: broadcast variables
    , which can be used to cache a value in mememory on all nodes,
    and accumulators, which are varibles that are only 'added' to
    such as counters and sums

In [1]:
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
## The first thing a Spark program must do is to create a SparkContext
## which tells Spark how to access a cluster. To create a SparkContext
## you first need to build a SparkConf object that contains information
## about your application
conf = SparkConf().setAppName("RDD programing guide").setMaster("local")
sc = SparkContext(conf=conf)

## RDD can be created in two ways
1. parallelize method on an existing itertable or collection in your
driver program
2. referencing a dataset in an external storage system such as shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat

In [2]:
## parallelize collections
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
## wec can perform parallel operation after distribute the collection
distData.reduce(lambda a, b: a + b)

15

In [3]:
## External DataSets
## Spark Supports text files, SequenceFiles, and any other Hadoop
## InputFormat
distFile = sc.textFile("README.md")

In [4]:
distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)

3706

## Writable Support
When reading an RDD of key-value pairs from squenceFile, the Pyspark
SequenceFile support Loads an RDD of key-value pairs within java, 
converts Writables to base Java types, and pickles the resulting java
objects using Pyrolite.

In [5]:
rdd = sc.parallelize(range(1,4)).map(lambda x: (x, "a" * x))
## When saving an RDD of key-value pairs to SequenceFile, Pyspark
## does the reverse. It unpickles Python objects into Java objects
## and then converts the to Writables.
rdd.saveAsSequenceFile("./saveSequence")
sorted(sc.sequenceFile("./saveSequence").collect())

[(1, 'a'), (2, 'aa'), (3, 'aaa')]

## Saving and Loading Other Hadoop Input/Output Formats
PySpark can also read any Hadoop InputFormat or write any Hadoop
OutputFormat, for both 'new' and 'old' Hadoop MapReduce APIs. If
required, a Hadoop configuration can be passed in as a Python dict

If you have custom serialized binary data (such as loading data from
Cassandra / Hbase), then you will first need to transform that data
on the Scala/java side to something which can be handled by Pyrolite's'
pickler. 

A Converter trait is provided for this. Simply extend this trait and
implement your transformation code in the convert method.

## RDD Operations
RDDs support two types of operations:
1. transformation
    create a new dataset from an existing one
2. actions
    return a value to the driver program after running a computation
    on the dataset.
transformations are lazy.

However you may also persist an RDD in memory or disk or replicated across multiple nodes.

In [6]:
## Basics
lines = sc.textFile("README.md")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)

## if you wanted to use lineLengths again later, we could add:
## rdd.persist() method will persist the result in memory
lineLengths.persist()
totalLength

3706

## Passing Functions to Spark
Spark recommend three ways to pass functions in the driver program
to run on the cluster
1. Lambda expressions
2. Local defs
3. Top-level functions in a module

In [7]:
def myFunc(s):
    words = s.split(" ")
    return len(words)

sc.textFile("README.md").map(myFunc).reduce(lambda a,b: a + b)

566

## Understanding closures
One of the harder things about Spark is understanding the scope and
life cycle of variables and methods when executing code across cluster.
RDD operations that modify variables outside of their scope can be 
a frequent source of confusion. In the example below we ll look at code
that uses foreach() to increment a counter, but similar issues can 
occur for other operations as well.

In [8]:
## Example
counter = 0
rdd = sc.parallelize([1, 2, 3, 4])

# Wrong: Don't do this !!
def increment_counter(x):
    global counter
    counter += x

rdd.foreach(increment_counter)
print("Counter value:", counter)

Counter value: 0


### Local vs. cluster modes
The behavior of the above code is undefined, and may not work as inteded
To execute jobs, Spark breaks up the processing of RDD oeprations into
tasks, each of which is executed by an executor. Prior to execution,
Spark computes the task's closure. The closure is those variables
and methods which must be visible for the executor to perform its
computations on the RDD (in this case foreach()). This closure is
serialized and sent to each excutor.

The variables within the closure sent to each excutor are now copies and thus, when counter is referenced within the foreach function, it's no longer the counter on the driver node. There is still a counter in the memory of the driver node. There is still a counter
in the memory of the driver node but this is no longer visible to
the exectutors! The executors only see the copy from the serialized closure. Thus, the final value of counter will still be zero since all operations on counter were referencing the value within the serialized closure.

### Printing elements of an RDD
Another common idiom is attempting to print out the elements of an
RDD using rdd.foreach(println) or rdd.map(println). On a single
machine, this will generate the expected output and print all the
RDD's elements. However, in cluster mode, the output to stdout being called by the executors is now writing to the executor's stdout
instead, not the one on the driver, so stdout on the driver won't 
show these! To print all elements on the driver, one can use the collect() method to first bring the RDD to the driver node thus: rdd.collect().foreach(println). THis can cause the driver to run
out of the memory, though, becuase collect() fetches the entire RDD to a single machine; if you only need to print a few elements of the RDD, a safer approach is to use the take(): rdd.take(100).foreach(println)

## Transformations


map(func):
Return a new distributed dataset formed by passing each element of
the source throught a function func.

filter(func): Return a new dataset formed by selecting those elements of the source on which func returns true.

flatMap(func): Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).

mapPartition(func): Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T>
=> Iterator<U> when running on an RDD of type T.
    
mapPartitionsWithIndex(func): Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U>
when running on an RDD of type T.
    
Sample(WithReplacement, fraction, seed)
union(otherDataset)
intersection(otherDataset)
distinct([numPartitions])

groupByKey([numPartitions]): When called on a dataset of (K,V) pairs
, returns a dataset of (K, Iterable<V>) pairs.

reduceByKey(func, [numPartions]): When called on a dataset of (K,V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type(V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument

aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])

sortBykey([ascending], [numPartitions])
join(otherDataset, [numPartitions])
cogroup(otherDataset, [numPartitions])

## Actions
reduce(func)
collect()
count()
first()
take(n)
takeSample(withReplacement, num, [seed])
takeOrdered(n, [ordering])
saveAsTextFile(path)
saveAsSequenceFile(path)
saveAsObjectFile(paht)
countByKey()
foreach(func)

## Why use sequence file
First we should understand what problems does the SequenceFile try to solve, and then how can SequenceFile help to solve the problems.

### In HDFS
SequenceFile is one of the solutions to small file problem in Hadoop.
Small file is significantly smaller than the HDFS block size(128MB).
Each file, directory, block in HDFS is represented as object and occupies 150 bytes.
10 million files, would use about 3 gigabytes of memory of NameNode.
A billion files is not feasible.
### In MapReduce
Map tasks usually process a block of input at a time (using the default FileInputFormat).

The more the number of files is, the more number of Map task need and the job time can be much more slower.

### Small file scenarios
The files are pieces of a larger logical file.
The files are inherently small, for example, images.
#### These two cases require different solutions.

For first one, write a program to concatenate the small files together.(see Nathan Marz’s post about a tool called the Consolidator which does exactly this)
For the second one, some kind of container is needed to group the files in some way.
### Solutions in Hadoop
#### HAR files

HAR(Hadoop Archives) were introduced to alleviate the problem of lots of files putting pressure on the namenode’s memory.
HARs are probably best used purely for archival purposes.
#### SequenceFile

The concept of SequenceFile is to put each small file to a larger single file.
For example, suppose there are 10,000 100KB files, then we can write a program to put them into a single SequenceFile like below, where you can use filename to be the key and content to be the value.

SequenceFile File Layout http://img.blog.csdn.net/20151213123516719

Some benefits:

A smaller number of memory needed on NameNode. Continue with the 10,000 100KB files example,
Before using SequenceFile, 10,000 objects occupy about 4.5MB of RAM in NameNode.
After using SequenceFile, 1GB SequenceFile with 8 HDFS blocks, these objects occupy about 3.6KB of RAM in NameNode.
SequenceFile is splittable, so is suitable for MapReduce.
SequenceFile is compression supported.
Supported Compressions, the file structure depends on the compression type.

Uncompressed
Record-Compressed: Compresses each record as it’s added to the file. record_compress_seq http://img.blog.csdn.net/20151213182753789

Block-Compressed 这里写图片描述 http://img.blog.csdn.net/20151213183017236

Waits until data reaches block size to compress.
Block compression provide better compression ratio than Record compression.
Block compression is generally the preferred option when using SequenceFile.
Block here is unrelated to HDFS or filesystem block.