# 1. Tuning Parallelism
* change the partitioning to distribute the workload more efficiently
* minimize shuffling and avoid memory problems

### Set partitionSize
* Set size when create RDD
```
rdd = sc.parallelize(data, partitionSize)
rdd = sc.textFile(text, partitionSize)
```
* Set size when transform RDD
    <br >shuffle
```
sortBy(), sortByKey(), groupBy(), reduceByKey(), join(), leftOuterJoin(), rightOuterJoin(), partitionBy(), combineByKey(), aggregateByKey(), foldByKey(), groupByKey(), cogroup(), subtractByKey(), subtract(), repartition(), coalesce()
```

### Change partitionSize
#### pair rdd
<br>partitioned by key

* HashPartitioner : partitioner using a hash value of a key.
<br>`partitionBy(N)` uses HashPartitioner by default.
```
rdd1 = rdd.partitionBy(partitionSize)
```
* CustomPartitioner : User-defined paritioner.
```
def custom_partitioner(key): 
    return hash(key + 10)
pair_rdd.partitonBy(N, custom_partitioner)
```
* RangePartitioner : partitioner partitioning sorted RDDs into roughly equal ranges.
<br>`pyspark` does not have RangePartitioner

#### normal rdd
* `repartition(numPartitions: Int)`
<br>Shuffle data across the network to create a new set of partitions.
<br>Expensive.
* `coalesce(numPartitions: Int, shuffle = false)`
<br>Optimized version of repartition() – avoid data movement and reduce the number of RDD partitions.
<br>Match the locality as much as possible, but try to balance partitions across the machines.

### Check partitionSize
```
print rdd.getNumPartitions()
print rdd.glom().collect()
```

### Partition-specific Methods
* `foreachPartition()` : Apply a function to each partition of an RDD.
* `glom()` : Return all elements within each partition.
* `lookup(key)`: Return values for the key using the partitioner to narrow its search to only the partitions where the key would present.
* `mapPartitions()`: Return a new RDD by applying a function to each partition of the RDD.

# 2. RDD Dependencies


# 3. Using Shared Variables
* Accumulator : Aggregate information from executor nodes to the driver node.
    <br> can only be used in action, not transformation
* Broadcast variable : Efficiently distribute large read-only values to executor nodes.

In [1]:
sc

### EX0

In [3]:
text = sc.textFile('2018-msan697-example/Data/USF_Mission.txt')

In [4]:
text.collect()

[u'At USF, excellence is the standard for teaching, scholarship, creative expression, and service. Individuals from all faiths or with no religious affiliations contribute to the diversity of perspectives and experiences that are essential to our truly global education.',
 u'',
 u'*Care of the Whole Person',
 u'Cura personalis \u2014 care of the whole person \u2014 is a Jesuit tradition that inspires our distinct style of education. One in which intellect is only one part of an individual\u2019s full development.',
 u'\u201cJesuit education offers a distinctive pedagogical framework for educating the whole person (cura personalis) in which this connection between interior wisdom and exterior knowledge is made explicit."',
 u'',
 u'',
 u'*Women and Men for Others',
 u'Fr. Pedro Arrupe, S.J., in his speech to the Tenth International Congress of Jesuit Alumni of Europe, called for action. He challenged Jesuits to reflect on what they believe in and how they act. And, in doing so, inspired

In [7]:
words = text.flatMap(lambda x:x.split(' '))

In [8]:
words.collect()

[u'At',
 u'USF,',
 u'excellence',
 u'is',
 u'the',
 u'standard',
 u'for',
 u'teaching,',
 u'scholarship,',
 u'creative',
 u'expression,',
 u'and',
 u'service.',
 u'Individuals',
 u'from',
 u'all',
 u'faiths',
 u'or',
 u'with',
 u'no',
 u'religious',
 u'affiliations',
 u'contribute',
 u'to',
 u'the',
 u'diversity',
 u'of',
 u'perspectives',
 u'and',
 u'experiences',
 u'that',
 u'are',
 u'essential',
 u'to',
 u'our',
 u'truly',
 u'global',
 u'education.',
 u'',
 u'*Care',
 u'of',
 u'the',
 u'Whole',
 u'Person',
 u'Cura',
 u'personalis',
 u'\u2014',
 u'care',
 u'of',
 u'the',
 u'whole',
 u'person',
 u'\u2014',
 u'is',
 u'a',
 u'Jesuit',
 u'tradition',
 u'that',
 u'inspires',
 u'our',
 u'distinct',
 u'style',
 u'of',
 u'education.',
 u'One',
 u'in',
 u'which',
 u'intellect',
 u'is',
 u'only',
 u'one',
 u'part',
 u'of',
 u'an',
 u'individual\u2019s',
 u'full',
 u'development.',
 u'\u201cJesuit',
 u'education',
 u'offers',
 u'a',
 u'distinctive',
 u'pedagogical',
 u'framework',
 u'for',
 u'e

In [20]:
freq = words.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)

In [21]:
freq_sort = freq.sortBy(lambda (x,y):-y)

In [22]:
for (x,y) in freq_sort.take(5):
    print x + ' : ' + str(y)

and : 25
the : 22
of : 19
to : 18
in : 13


In [23]:
print freq_sort.toDebugString()

(2) PythonRDD[63] at RDD at PythonRDD.scala:48 []
 |  MapPartitionsRDD[61] at mapPartitions at PythonRDD.scala:427 []
 |  ShuffledRDD[60] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(2) PairwiseRDD[59] at sortBy at <ipython-input-21-960059633a6f>:1 []
    |  PythonRDD[58] at sortBy at <ipython-input-21-960059633a6f>:1 []
    |  MapPartitionsRDD[55] at mapPartitions at PythonRDD.scala:427 []
    |  ShuffledRDD[54] at partitionBy at NativeMethodAccessorImpl.java:0 []
    +-(2) PairwiseRDD[53] at reduceByKey at <ipython-input-20-3bd44d96c879>:1 []
       |  PythonRDD[52] at reduceByKey at <ipython-input-20-3bd44d96c879>:1 []
       |  2018-msan697-example/Data/USF_Mission.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0 []
       |  2018-msan697-example/Data/USF_Mission.txt HadoopRDD[2] at textFile at NativeMethodAccessorImpl.java:0 []


### EX1

In [51]:
import random
data = sc.parallelize(range(1,10), random.randint(1,5))

In [52]:
print data.getNumPartitions()
print data.glom().collect()

2
[[1, 2, 3, 4], [5, 6, 7, 8, 9]]


### EX2

In [39]:
data = [('a',3),('b',4),('a',5)]

In [40]:
rdd = sc.parallelize(data, 3)

In [41]:
rdd_reduce1 = rdd.reduceByKey(lambda x,y:x+y)

In [42]:
print rdd_reduce1.getNumPartitions()
print rdd_reduce1.glom().collect()

3
[[], [('b', 4)], [('a', 8)]]


In [45]:
rdd_reduce2 = rdd.reduceByKey(lambda x,y:x+y, 2)

In [46]:
print rdd_reduce2.getNumPartitions()
print rdd_reduce2.glom().collect()

2
[[('a', 8)], [('b', 4)]]


In [47]:
rdd_reduce3 = rdd.reduceByKey(lambda x,y:x+y, 1)

In [48]:
print rdd_reduce3.getNumPartitions()
print rdd_reduce3.glom().collect()

1
[[('a', 8), ('b', 4)]]


### EX3

#### Set partitionSize
* Set size when create RDD
```
rdd = sc.parallelize(data, partitionSize)
rdd = sc.textFile(text, partitionSize)
```
* Set size when transform RDD
    <br >shuffle
```
sortBy(), sortByKey(), groupBy(), reduceByKey(), join(), leftOuterJoin(), rightOuterJoin(), partitionBy(), combineByKey(), aggregateByKey(), foldByKey(), groupByKey(), cogroup(), subtractByKey(), subtract(), repartition(), coalesce()
```

#### Change partitionSize
* HashPartitioner : partitioner using a hash value of a key.
<br>`partitionBy(N)` uses HashPartitioner by default.
```
rdd1 = rdd.partitionBy(partitionSize)
```
* CustomPartitioner : User-defined paritioner.
```
def custom_partitioner(key): 
    return hash(key + 10)
pair_rdd.partitonBy(N, custom_partitioner)
```
* RangePartitioner : partitioner partitioning sorted RDDs into roughly equal ranges.
<br>`pyspark` does not have RangePartitioner

#### Check partitionSize
```
print rdd.getNumPartitions()
print rdd.glom().collect()
```

In [55]:
rdd_change = rdd.partitionBy(2)

In [56]:
print rdd_change.getNumPartitions()
print rdd_change.glom().collect()

2
[[('a', 3), ('a', 5)], [('b', 4)]]


### EX4

In [129]:
lines1 = sc.textFile("2018-msan697-example/Data/filtered_registered_business_sf.csv") 
lines2 = sc.textFile("2018-msan697-example/Data/supervisor_sf.csv")

* without partition

In [130]:
sf_business1 = lines1.map(lambda x: (x.split(",")[0],x)).persist()
supervisor1 = lines2.map(lambda x: (x.split(",")[0],x)).persist()

In [131]:
print sf_business1.getNumPartitions()
print supervisor1.getNumPartitions()

2
2


In [155]:
import time
start = time.time()
sf_business1.join(supervisor1).collect()
print time.time() - start

1.18877005577


In [158]:
sf_business1.join(supervisor1).glom().collect()

[[(u'94123',
   (u'94123,Tournahu George L,3301 Broderick St,San Francisco,CA',
    u'94123,2')),
  (u'94123',
   (u'94123,Amore Robert,1958 Union St,San Francisco,CA', u'94123,2')),
  (u'94123',
   (u'94123,Aunt Anns Corp Headquarters,2722 Gough St,San Francisco,CA',
    u'94123,2')),
  (u'94123',
   (u'94123,Barbagelata & Co Inc,2381 Chestnut St,San Francisco,CA',
    u'94123,2')),
  (u'94123',
   (u'94123,Boas International Motors Inc,2098 Lombard St,San Francisco,CA',
    u'94123,2')),
  (u'94123',
   (u'94123,Books Inc,2251 Chestnut St,San Francisco,CA', u'94123,2')),
  (u'94123',
   (u'94123,Boswell Alliance Const Co Inc,1686 Union St 306,San Francisco,CA',
    u'94123,2')),
  (u'94123',
   (u'94123,Lew Willie L & Po S,2102 Filbert St,San Francisco,CA',
    u'94123,2')),
  (u'94123',
   (u'94123,Gabriels Horn Inc,1901 Union St,San Francisco,CA', u'94123,2')),
  (u'94123',
   (u'94123,Joseph Caston Md,2710 Broderick St,San Francisco,CA', u'94123,2')),
  (u'94123',
   (u'94123,Del 

* with partition

In [149]:
partitionSize = 4

In [150]:
sf_business2 = lines1.map(lambda x: (x.split(",")[0],x)).partitionBy(partitionSize).persist()
supervisor2 = lines2.map(lambda x: (x.split(",")[0],x)).partitionBy(partitionSize).persist()

In [151]:
print sf_business2.getNumPartitions()
print supervisor2.getNumPartitions()

4
4


In [152]:
import time
start = time.time()
sf_business2.join(supervisor2).collect()
print time.time() - start

1.44442105293


### EX5

In [163]:
nums = sc.parallelize([(1,2),(3,4),(5,1),(2,3),(1,5)])

In [164]:
nums.partitionBy(4).glom().collect()

[[], [(1, 2), (5, 1), (1, 5)], [(2, 3)], [(3, 4)]]

In [165]:
def hash_partition(key):
    return hash(key)
def custom_partition(key):
    return hash(key + 10)

In [166]:
nums.partitionBy(4, hash_partition).glom().collect()

[[], [(1, 2), (5, 1), (1, 5)], [(2, 3)], [(3, 4)]]

In [167]:
nums.partitionBy(4, custom_partition).glom().collect()

[[(2, 3)], [(3, 4)], [], [(1, 2), (5, 1), (1, 5)]]

### Repartition
* `repartition(numPartitions: Int)`
<br>Shuffle data across the network to create a new set of partitions.
<br>Expensive.
* `coalesce(numPartitions: Int, shuffle = false)`
<br>Optimized version of repartition() – avoid data movement and reduce the number of RDD partitions.
<br>Match the locality as much as possible, but try to balance partitions across the machines.

### EX6

In [268]:
nums = sc.parallelize([1,1,3,3,11,13,14], 5)
partitionSize = 8
nums_re = nums.repartition(partitionSize)
nums_co = nums.coalesce(partitionSize)
print nums_re.toDebugString()
print nums_co.toDebugString()

(8) MapPartitionsRDD[908] at coalesce at <unknown>:0 []
 |  CoalescedRDD[907] at coalesce at <unknown>:0 []
 |  ShuffledRDD[906] at coalesce at <unknown>:0 []
 +-(5) MapPartitionsRDD[905] at coalesce at <unknown>:0 []
    |  PythonRDD[904] at RDD at PythonRDD.scala:48 []
    |  ParallelCollectionRDD[903] at parallelize at PythonRDD.scala:480 []
(5) CoalescedRDD[909] at coalesce at <unknown>:0 []
 |  ParallelCollectionRDD[903] at parallelize at PythonRDD.scala:480 []


In [269]:
nums = sc.parallelize([1,1,3,3,11,13,14], 5)
partitionSize = 3
print nums.glom().collect()
print nums.repartition(partitionSize).glom().collect()
print nums.coalesce(partitionSize).glom().collect()
print nums.coalesce(partitionSize, shuffle = True).glom().collect()

[[1], [1], [3, 3], [11], [13, 14]]
[[11, 13, 14], [1, 1], [3, 3]]
[[1], [1, 3, 3], [11, 13, 14]]
[[11, 13, 14], [1, 1], [3, 3]]


In [267]:
nums = sc.parallelize([1,1,3,3,11,13,14], 5)
partitionSize = 10
print nums.glom().collect()
print nums.repartition(partitionSize).glom().collect()
# By default, `coalesce` can only reduce the number of RDD partition
print nums.coalesce(partitionSize).glom().collect()
print nums.coalesce(partitionSize, shuffle = True).glom().collect()

[[1], [1], [3, 3], [11], [13, 14]]
[[], [1], [], [13, 14], [], [11], [1], [], [], [3, 3]]
[[1], [1], [3, 3], [11], [13, 14]]
[[], [1], [], [13, 14], [], [11], [1], [], [], [3, 3]]


### EX8

In [240]:
list = [random.randrange(10) for x in range(500)]

In [241]:
listrdd = sc.parallelize(list, 5)

In [242]:
pairs = listrdd.map(lambda x:(x,x*x))

In [251]:
print pairs.toDebugString()

(5) PythonRDD[828] at RDD at PythonRDD.scala:48 []
 |  ParallelCollectionRDD[819] at parallelize at PythonRDD.scala:480 []


In [252]:
reduced = pairs.reduceByKey(lambda x,y:x+y, 3)

In [253]:
reduced.glom().collect()

[[(0, 0), (9, 3888), (3, 495), (6, 1692)],
 [(1, 44), (4, 832), (7, 3038)],
 [(8, 2880), (2, 172), (5, 1300)]]

In [255]:
print reduced.toDebugString()

(3) PythonRDD[834] at RDD at PythonRDD.scala:48 []
 |  MapPartitionsRDD[832] at mapPartitions at PythonRDD.scala:427 []
 |  ShuffledRDD[831] at partitionBy at <unknown>:0 []
 +-(5) PairwiseRDD[830] at reduceByKey at <ipython-input-252-fef1e9323331>:1 []
    |  PythonRDD[829] at reduceByKey at <ipython-input-252-fef1e9323331>:1 []
    |  ParallelCollectionRDD[819] at parallelize at PythonRDD.scala:480 []


In [256]:
finalrdd = reduced.mapPartitions(lambda itr:['K='+str(k)+',V='+str(v) for (k,v) in itr])

In [257]:
finalrdd.glom().collect()

[['K=0,V=0', 'K=9,V=3888', 'K=3,V=495', 'K=6,V=1692'],
 ['K=1,V=44', 'K=4,V=832', 'K=7,V=3038'],
 ['K=8,V=2880', 'K=2,V=172', 'K=5,V=1300']]

In [258]:
print finalrdd.toDebugString()

(3) PythonRDD[836] at RDD at PythonRDD.scala:48 []
 |  MapPartitionsRDD[832] at mapPartitions at PythonRDD.scala:427 []
 |  ShuffledRDD[831] at partitionBy at <unknown>:0 []
 +-(5) PairwiseRDD[830] at reduceByKey at <ipython-input-252-fef1e9323331>:1 []
    |  PythonRDD[829] at reduceByKey at <ipython-input-252-fef1e9323331>:1 []
    |  ParallelCollectionRDD[819] at parallelize at PythonRDD.scala:480 []


### EX9

In [72]:
accum1 = sc.accumulator(0)
accum2 = sc.accumulator(0)

In [78]:
nums = sc.parallelize(range(1, 10000001))
nums.take(4)

[1, 2, 3, 4]

In [74]:
import time
start = time.time()
nums.foreach(lambda x:accum1.add(1))
print time.time()-start

1.334649086


In [77]:
import time
start = time.time()
for x in range(1, 10000001):
    accum2.add(1)
print time.time()-start

4.02622294426


In [76]:
print accum1.value
print accum2.value

10000000
10000000


### EX10

In [13]:
text = sc.textFile('2018-msan697-example/Data/bike_share/status.csv').map(lambda x:x.split(',')[0])

In [14]:
text.count()

36647622

In [15]:
text.take(10)

[u'10', u'10', u'10', u'10', u'10', u'10', u'10', u'10', u'10', u'10']

In [16]:
accum1 = sc.accumulator(0)
accum2 = sc.accumulator(0)
def myaccum(x):
    if x == '10':
        accum1.add(1)
        return x

text_filter = text.map(myaccum)
text_filter.foreach(lambda x:accum2.add(1))
print text_filter.count()
print accum1.value
print accum2.value

36647622
1047246
36647622


In [17]:
text.filter(lambda x:x== '10').count()

523623

### EX11

In [39]:
list = sc.parallelize(range(10,100))

In [41]:
broadcastValue = sc.broadcast([1,2,3])

In [42]:
type(broadcastValue)

pyspark.broadcast.Broadcast

In [43]:
broadcastValue.value

[1, 2, 3]

In [48]:
broadcastValue.value[0]

1

In [49]:
add_broadcastValue = list.map(lambda x:x+broadcastValue.value[0])

In [51]:
add_broadcastValue.take(2)

[11, 12]

In [52]:
broadcastValue.unpersist()
broadcastValue.value

[1, 2, 3]

### EX12

In [53]:
text = sc.textFile('2018-msan697-example/Data/bike_share/status.csv').map(lambda x:x.split(','))

In [54]:
text.take(2)

[[u'10', u'7', u'8', u'"2014-12-30 15:37:02"'],
 [u'10', u'7', u'8', u'"2014-12-30 15:35:02"']]