#### History:
Developed by Matei Zaharia as a part of his PhD thesis at UC Berkeley starting in 2009. <br>
First version released in 2012

In [1]:
# Initialize SparkContext as an instance method

from pyspark import SparkContext
sc = SparkContext()


### API: Data Structures 

#### RDD: 
The core data structure in Spark is a distributed collection of immutable JVM (Java Virtual Machine) objects called Resilient Distributed Datasets (RDD). The Python data is stored within these JVM objects through py4j.

In [8]:
raw_data = [('A', 85), ('B', 47), ('C',15), ('D', 2), ('A', 19)]
# Below will generate a ParallelCollectionRDD
distributed_data = sc.parallelize(raw_data)

# Below will genearte a MapPartitionsRDD

# text_file_address = 'textfile.txt' 
# dist_file = sc.textFile(text_file_address, 4) 

# The last parameter denotes the number of partitions to cut the dataset into.
# Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster. 
# Normally, Spark tries to set the number of partitions automatically based on your cluster. 
# However, you can also set it manually by passing it as a second parameter to parallelize

# Each row from the file forms an element of an RDD.

### RDD Transformations

#### .map()
The method is applied to each element of the RDD

In [24]:
list_data = [85, 44, 15, 2, 19]
distributed_list = sc.parallelize(list_data)
mapped_data = distributed_list.map(lambda x: x+2)

# We then use the take() method to print the first n elements of the RDD
print distributed_list.take(3)

#The .collect() method fetches the entire RDD to a single machine (the driver) where it is converted to a list.
collected_data = mapped_data.collect()
print collected_data

[85, 44, 15]
[87, 46, 17, 4, 21]


#### .reduceByKey(func)
Merge the values for each key using an associative and commutative reduce function. <br>
When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

In [25]:
dict_data = [('AA', 85), ('BA', 47), ('CA',15), ('DA', 2), ('AA', 19)]
distributed_dict = sc.parallelize(dict_data)
histogram_data = distributed_dict.map(lambda x: (x[0], 1)).reduceByKey(lambda x,y: x+y)
collected_data = histogram_data.collect()
print collected_data

[('AA', 2), ('BA', 1), ('CA', 1), ('DA', 1)]


#### .filter(func)
Return a new dataset formed by selecting those elements of the source on which func returns true.

In [26]:
even_list = distributed_list.filter(lambda x: x % 2 == 0).collect()
print even_list

filtered_histogram_data = distributed_dict.filter(lambda x: 'B' in x[0]).map(lambda x: (x[0], 1)).reduceByKey(lambda x,y: x+y)
print filtered_histogram_data.collect()

[44, 2]
[('BA', 1)]


#### .flatMap(f, preservesPartitioning=False)
Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results

In [29]:
rdd = sc.parallelize([2, 3, 4])
print rdd.flatMap(lambda x: range(1, x)).collect()
print rdd.flatMap(lambda x: [(x, x+1), (x+10, x+20)]).collect()

# Return the key-value pairs in this RDD to the master as a dictionary
print rdd.flatMap(lambda x: [(x, x+1), (x+10, x+20)]).collectAsMap()

[1, 1, 2, 1, 2, 3]
[(2, 3), (12, 22), (3, 4), (13, 23), (4, 5), (14, 24)]
{2: 3, 3: 4, 4: 5, 12: 22, 13: 23, 14: 24}


#### .distinct(numPartitions=None)
Return a new RDD containing the distinct elements in this RDD

In [30]:
sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())

[1, 2, 3]

#### .sample(withReplacement, fraction, seed=None)
Return a sampled subset of this RDD.

Parameters:	
-  withReplacement – can elements be sampled multiple times (replaced when sampled out)
-  fraction – expected size of the sample as a fraction of this RDD’s size without replacement: probability that each element is chosen; fraction must be [0, 1] with replacement: expected number of times each element is chosen; fraction must be >= 0
-  seed – seed for the random number generator

Note: This is not guaranteed to provide exactly the fraction specified of the total count of the given DataFrame

In [33]:
rdd = sc.parallelize(range(100), 4)
print rdd.sample(False, 0.1, 81).collect()
print rdd.sample(False, 0.1, 81).count()

[4, 27, 40, 42, 43, 60, 76, 80, 86, 97]
10


#### .join(other, numPartitions=None)
Return an RDD containing all pairs of elements with matching keys in self and other. <br>
Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in self and (k, v2) is in other. <br>
Performs a hash join across the cluster.

In [34]:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
print sorted(x.join(y).collect())

[('a', (1, 2)), ('a', (1, 3))]


#### leftOuterJoin(other, numPartitions=None)
Perform a left outer join of self and other. <br>
For each element (k, v) in self, the resulting RDD will either contain all pairs (k, (v, w)) for w in other, or the pair (k, (v, None)) if no elements in other have key k. <br>
Hash-partitions the resulting RDD into the given number of partitions.

In [35]:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
print sorted(x.leftOuterJoin(y).collect())

[('a', (1, 2)), ('b', (4, None))]


#### .intersection(other)
Return the intersection of this RDD and another one. <br>
The output will not contain any duplicate elements, even if the input RDDs did.

In [36]:
rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
rdd1.intersection(rdd2).collect()

[1, 2, 3]


#### DataFrame: 
Like an RDD, a DataFrame is an immutable distributed collection of data. Data is organized into named columns, like a table in a relational database.
#### Details: 
https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

#### Spark SQL Catalyst Optimizer
Powers both SQL queries and the DataFrame API. Two purposes: First, to make it easy to add new optimization techniques and features to Spark SQL. Second, to enable external developers to extend the optimizer — for example, by adding data source specific rules that can push filtering or aggregation into external storage systems, or support for new data types. 
Catalyst supports both rule-based and cost-based optimization.
https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html