#**Cluster computing Challenge: how do we split work across machines? How do we deal with server failure?**

##**Ex1: Hash Table**
![img1](https://raw.githubusercontent.com/hetianch/IntroToBigDataApacheSpark/master/img1.png)

In [2]:
import string
exclude = set(string.punctuation)
raw_string = "I am Sam Sam I am Do you like Green eggs and ham?"
trans_string = ''.join([ch for ch in raw_string if ch not in exclude])
list_string = trans_string.split(" ")

dic = {}
for str in list_string:
	dic[str]= dic[str] +1 if str in dic.keys() else 1 
print dic

{'and': 1, 'Do': 1, 'like': 1, 'Sam': 2, 'I': 2, 'eggs': 1, 'am': 2, 'Green': 1, 'you': 1, 'ham': 1}


![img2](https://raw.githubusercontent.com/hetianch/IntroToBigDataApacheSpark/master/img2.png)
![img3](https://raw.githubusercontent.com/hetianch/IntroToBigDataApacheSpark/master/img3.png)
###**challenge of divide and conqure**###
When using divide and conquer, you have to consider the network and data locality because moving data between machines is expensive. Even with a low per-machine failure rate, using many machines means that several will fail per day. As machines age, they may fail in ways that cause slow performance (e.g., a failing disk drive that retries each read or write operation multiple times before successfully completing).
![img4](https://raw.githubusercontent.com/hetianch/IntroToBigDataApacheSpark/master/img4.png)
![img5](https://raw.githubusercontent.com/hetianch/IntroToBigDataApacheSpark/master/img5.png)
sort by sending words to different machines based on occurance.

##**How to deal with failure and slow task**##
The machine that runs a very slow task might be about to fail. Map Reduce deals with failures and slow tasks by re-launching the tasks on other machines. 

#**Apache Spark Motivation**#
* Original Map Reduce involved many I/O operaiton which is slow
* Cost of memmory drops, so that we can keep more data in-memmory instead of writing it to disk slowly.
![img7](https://raw.githubusercontent.com/hetianch/IntroToBigDataApacheSpark/master/img7.png)
only read in from disk for the first query then store data in memmory

![img6](https://raw.githubusercontent.com/hetianch/IntroToBigDataApacheSpark/master/img6.png)

####**Spark is oftern faster than a traditional MapReduce because: **###
* Results do not need to be written to disk
* Results do not need to be serialized (converted into a format that can be stored on disk)

#**Spark Example**#


In [28]:
#create RDD 
## from list
data = [1,2,3,3,4,5]
rDD = sc.parallelize(data,4) #create 4 partitions of data
rDD
## from file
distFile = sc.textFile("data/cs100/lab1/shakespeare.txt",4)
distFile

data/cs100/lab1/shakespeare.txt MapPartitionsRDD[121] at textFile at NativeMethodAccessorImpl.java:-2

In [41]:
#transform RDD
rDD.map(lambda x: x*2)  #[1,4,9,9,16,25]
rDD.filter(lambda x: x%2==0) #[2,4]
rDD.distinct() #[1,2,3,4,5]

rDD.map(lambda x:[x,x+5]) #[[1,6],[2,7],...]
rDD.flatMap(lambda x:[x,x+5]) #[1,6,2,7,...]

##comments = distFile.filter(isComment) # not sure why isComment is not defined
## key-value transformations
rdd = sc.parallelize([(1,2),(3,4),(3,6)])
rdd.reduceByKey(lambda a,b:a+b).collect() # the operation a+b is for values with the same key

rdd2 = sc.parallelize([(1,'a'),(2,'c'),(1,'b')])
rdd2.sortByKey()
rdd2.groupByKey()


PythonRDD[267] at RDD at PythonRDD.scala:43

In [29]:
#get data out of RDD
rDD = sc.parallelize([5,4,1,2,3])
rDD.reduce(lambda a,b:a*b)
rDD.take(2)
rDD.collect()
rDD.takeOrdered(3,lambda s: -1*s) #reverse order
distFile.count()


122395

In [31]:
#cache
distFile = sc.textFile("data/cs100/lab1/shakespeare.txt",4)
distFile.cache()
distFile.count() # runs from memmory since data are cached.


122395

In [42]:
#Shared Variables-- Broadcast Variables (efficiently send large, read-only value to all workers)
broadcastVar=sc.broadcast([1,2,3])
broadcastVar.value

[1, 2, 3]

In [47]:
#Shared Variables-- Accumulators (aggregate values from workers back to driver)
accum = sc.accumulator(0) #initialized to 0
rdd =sc.parallelize([1,2,3,4])
def f(x):
    global accum
    accum +=x
rdd.foreach(f)
accum.value


10

#**Data management**#  
##**1. pySpark and pandas DataFrame**##

In [None]:
#Spark DataFrame to Pands
pandas_df = spark_df.toPandas()
#Spark DataFrame from Pandas
spark_df = context.createDataFrame(pandas_df)

##**2. pySpark and SQL**##

In [4]:
#X.join(Y) #inner join by key
x = sc.parallelize([("a",1),("b",4)])
y = sc.parallelize([("a",2),("a",3)])
print x.join(y).collect()

#X.leftOuterJoin(Y)
print x.leftOuterJoin(y).collect()

#Y.leftOuterJoin(X)
print y.rightOuterJoin(x).collect()

#x.fullOuterJoin(Y)
x = sc.parallelize([("a",1),("b",4)])
y = sc.parallelize([("a",2),("c",3)])
print x.fullOuterJoin(y).collect()


[('a', (1, 2)), ('a', (1, 3))]
[('a', (1, 2)), ('a', (1, 3)), ('b', (4, None))]
[('a', (2, 1)), ('a', (3, 1)), ('b', (None, 4))]
[('a', (1, 2)), ('c', (None, 3)), ('b', (4, None))]
