[View in Colaboratory](https://colab.research.google.com/github/UDICatNCHU/SparkTutorial/blob/master/RDD_basic_tutorial.ipynb)

In [0]:
# 環境初始化 (大約三至五分鐘)
! wget -O init_env.sh https://www.dropbox.com/s/6bnwn8u2hz19s59/init_env.sh && \
bash init_env.sh

In [0]:
import os, sys
os.environ['SPARK_HOME'] = "/usr/local/spark"
os.environ['PYSPARK_PYTHON'] = "/usr/bin/python"
sys.path.append("/usr/local/spark/python/")
sys.path.append("/usr/local/spark/python/lib/pyspark.zip")
sys.path.append("/usr/local/spark/python/lib/py4j-0.10.4-src.zip")

In [0]:
from pyspark import SparkContext
from pyspark import SparkConf
sc = SparkContext()

# Apache Spark 基本練習：

是一個開源叢集運算框架，最初是由加州大學柏克萊分校AMPLab所開發。相對於Hadoop的MapReduce會在執行完工作後將中介資料存放到磁碟中，Spark使用了記憶體內運算技術，能在資料尚未寫入硬碟時即在記憶體內分析運算。Spark在記憶體內執行程式的運算速度能做到比Hadoop MapReduce的運算速度快上100倍。

Some References :

1. [http://www.mccarroll.net/blog/pyspark/index.html]
2. [https://www.codementor.io/spark/tutorial/spark-python-rdd-basics]
3. [http://backtobazics.com/big-data/spark/apache-spark-map-example/]
4. [http://datascience-enthusiast.com/Python/Apache_Spark1.html]

## RDD 基本操作練習：



### 產生一個整數隨機陣列： python語法：

In [0]:
import numpy as np
random_array = np.random.randint(1000, size=1000)
print (random_array)

# 將資料轉成RDD，分別擺放於各spark executors上

<img src="https://www.dropbox.com/s/br94ete5q3rj9w3/spark%20data%20model.png?dl=1" width="500" align="left">




___
<img src="https://www.dropbox.com/s/l2gohpmn53jkv1b/%20spark%20system%20overview.png?dl=1" width="500" align="left">

In [0]:
rdd = sc.parallelize(random_array)

In [10]:
print type(random_array)
print type(rdd)
print type(rdd.collect())

<type 'numpy.ndarray'>
<class 'pyspark.rdd.RDD'>
<type 'list'>


### RDD為Apache Spark最核心之概念，有別於MapRduce，僅提供Map()與Reduce()兩項操作。 RDD提供兩大類別Transformation與Action



> <img src="https://www.dropbox.com/s/omfoi3uzcgapcm4/rdd%20transformation%20concept.png?dl=1" width="500" align="left">  

> <img src="https://www.dropbox.com/s/3u8gt5376qq5vjy/spark%20core.png?dl=1" width="500" align="left">

## 最基本之Action操作  
### 使用 collect( ) 將分散於各機器之資料，收集成為單機資料集
<img src="https://www.dropbox.com/s/pjv20pl5wkevjf6/collect.png?dl=1" width="500" align="left">


In [11]:
rdd.first()

16

## Transformation 觀念介紹

In [12]:
def doubling(x):
    return x*2

print doubling(10)

20


In [15]:
rdd.take(5)

[16, 55, 700, 749, 238]

In [16]:
rdd.map(doubling).take(5)

[32, 110, 1400, 1498, 476]

In [0]:
def minusone(y):
  return y-1

In [18]:
rdd.map(minusone).take(5)

[15, 54, 699, 748, 237]

#### 注意：匿名函式的使用  
    rdd.map(doubling) = rdd.map(lambda x: x*2)

In [20]:
rdd.map(lambda x: x-1).take(5)

[15, 54, 699, 748, 237]

In [21]:
rdd.map(lambda yy:0).take(2)

[0, 0]

In [23]:
rdd.map(lambda x: x-1).map(lambda x:x-2).map(lambda x:x-3).take(2)

[10, 49]

464176

#### 練習：使用map()，將所有數字開平方根 
    import math
    print math.sqrt(5)

In [19]:
import math
rdd.map(lambda x: math.sqrt(x)).take(2)

[20.83266665599966, 19.467922333931785]

#### 匿名函式的差異

In [0]:
import math
def sqrt(x):
    return math.sqrt(x)

In [20]:
rdd.map(lambda y: math.sqrt(y)).take(2)

[7.211102550927978, 26.814175355583846]

In [23]:
rdd.map(sqrt).take(2)

[7.211102550927978, 26.814175355583846]

# Transformation Operators: map(), filter(), sample(), groupBy(), etc 
## (完整Transformation Operator請參考下列網址) http://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.RDD 

rdd資料中有多少筆?

In [0]:
rdd.map(lambda x:1).reduce(lambda x,y:x+y) 

In [0]:
rdd.count()

In [0]:
rdd.map(lambda x: x if x%2==0 else 0).reduce(lambda x,y:x+y)

#### Transformation Operator 使用filter()，將所有偶數留下，奇數刪除。

In [30]:
rdd.filter(lambda x: x%2==0).reduce(lambda x,y:x+y)

504

#### Transformation Operator 使用sample( ) 抽樣給定比例之RDD子集合

In [21]:
subsetrdd = rdd.sample(False, 0.01)
subsetrdd.collect()

[379, 925, 307, 337, 532, 973, 408, 541, 12, 699, 550, 394, 456, 415]

#### Transformation Operator: 使用 groupBy( ) 將資料分組

In [23]:
result = subsetrdd.groupBy(lambda x: x%2==0).collect()
print result
print [(x, list(y)) for (x, y) in result]

[(False, <pyspark.resultiterable.ResultIterable object at 0x7f4286a82350>), (True, <pyspark.resultiterable.ResultIterable object at 0x7f4286a82f50>)]
[(False, [379, 925, 307, 337, 973, 541, 699, 415]), (True, [532, 408, 12, 550, 394, 456])]


[1, 0, 1, 0, 1]

#### Transformation Operator 使用map ( ) 產生 key value pair

In [0]:
keyValueRdd = rdd.map(lambda x: ('even', x) if x%2==0 else ('odd',x))


In [30]:
rdd.map(lambda x: ('even', x) if x%2==0 else ('odd',x))\
   .reduceByKey(lambda x,y: x+y)\
   .take(2)

[('even', 252194), ('odd', 243733)]

In [0]:
def evennumber(x):
  if x%2==0:
    return x
  else:
    return 0

In [27]:
evennumber(101)

0

In [0]:
rdd.reduce(lambda x,y: x*x+y*y)

In [28]:
rdd.map(evennumber).reduce(lambda x,y:x+y)

252194

In [24]:
rdd.map(lambda x: x if x%2==0 else 0).reduce(lambda x,y:x+y)


252194

#### Transformation Operator: 使用 groupbyKey ( ) 根據key將資料分組

In [29]:

keyValueRdd = rdd.map(lambda x: ('even', x) if x%2==0 else ('odd',x))

for x in keyValueRdd.groupByKey().collect():
    print x[0], list(x[1])

even [386, 886, 606, 212, 374, 434, 464, 226, 528, 212, 220, 868, 730, 742, 506, 784, 834, 284, 620, 364, 652, 134, 482, 426, 92, 492, 180, 842, 894, 246, 970, 792, 150, 780, 10, 194, 44, 958, 588, 576, 878, 152, 150, 42, 412, 986, 798, 960, 950, 388, 550, 292, 738, 440, 552, 314, 0, 678, 200, 680, 528, 582, 774, 528, 22, 986, 88, 754, 294, 888, 30, 986, 496, 138, 724, 22, 242, 630, 120, 416, 6, 712, 756, 60, 670, 984, 944, 344, 674, 178, 650, 118, 700, 880, 154, 204, 860, 706, 564, 14, 226, 788, 664, 934, 110, 954, 114, 572, 760, 566, 402, 430, 782, 750, 416, 746, 304, 720, 802, 140, 726, 338, 834, 34, 516, 990, 348, 870, 970, 90, 452, 82, 60, 498, 710, 108, 74, 112, 6, 108, 176, 632, 498, 704, 756, 482, 344, 626, 744, 646, 864, 918, 664, 442, 224, 54, 850, 814, 672, 634, 768, 456, 636, 278, 234, 346, 710, 594, 566, 386, 352, 232, 600, 430, 390, 392, 600, 194, 636, 838, 960, 846, 214, 536, 562, 962, 700, 646, 390, 814, 198, 640, 330, 640, 60, 662, 388, 978, 108, 688, 918, 404, 574, 17


###練習 計算奇數加總值與偶數加總值
#### groupbyKey(), reduceByKey()

In [26]:
keyValueRdd.reduceByKey(lambda x,y: x+y).take(2)

[('even', 240850), ('odd', 229326)]

In [29]:
rdd.map(lambda x: (x,1)).reduceByKey(lambda a,b:a+b).take(5)

[(0, 1), (512, 1), (514, 1), (4, 1), (518, 3)]

___
___
___


# RDD Action Action Operation
#### 使用reduce()，計算所有數之加總值。

In [24]:
rdd.reduce(lambda x,y: x+y )

508189

In [0]:
rdd.reduce(lambda x,y: x*x+y*y)

In [25]:
rdd.map(lambda x:1).reduce(lambda a,b:a+b)

1000

[1, 1, 1, 1, 1]

______

## 觀念
### 練習：計算所有數字平方和。

In [28]:
rdd.map(lambda x: x*x).reduce(lambda x,y: x+y)

306397002

## 練習：使用map(), reduceByKey()，計算所有數字出現頻率。

In [29]:
rdd.map(lambda x: (x,1)).reduceByKey(lambda a,b:a+b)

PythonRDD[37] at RDD at PythonRDD.scala:48

_____
_____
_____

# WordCount

In [0]:
import urllib
#f=urllib.urlretrieve("https://www.ccel.org/ccel/bible/kjv.txt","bible")
f=urllib.urlretrieve("https://www.dropbox.com/s/28ljfwb1aeuyi37/speech.txt?dl=1","speech")


In [0]:
data_file = "./speech"

In [0]:
text_file = sc.textFile(data_file)

In [48]:
print(type(text_file))

<class 'pyspark.rdd.RDD'>


In [50]:
text_file.map(lambda line: line.split(" ")).count()

89

In [51]:
text_file.flatMap(lambda line: line.split(" ")).count()

3776

In [52]:
wordcountsRDD = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
wordcountsRDD.take(10)


[(u'', 228),
 (u'\u8981', 25),
 (u'\u627e\u51fa', 1),
 (u'\u65c5\u7a0b', 1),
 (u'\u8b1d\u8b1d', 2),
 (u'\u5341\u516d\u65e5', 1),
 (u'\u7b2c\u4e8c', 2),
 (u'\u4ee3\u5de5', 1),
 (u'\u7b2c\u4e09\u4ef6', 1),
 (u'\u6b4c\u66f2', 1)]

In [53]:
for i in wordcountsRDD.take(10):
  print i[0], i[1]

 228
要 25
找出 1
旅程 1
謝謝 2
十六日 1
第二 2
代工 1
第三件 1
歌曲 1


#### 根據字元符號順序做排序

In [41]:
text_file = sc.textFile(data_file)
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b).sortByKey()
counts.take(5)

[(u'', 228), (u'1992', 2), (u'1996', 1), (u'20', 4), (u'2016', 1)]

#### 出現頻率做排序

In [59]:
text_file = sc.textFile(data_file)

counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)\
             .sortBy(lambda x: x[1], ascending=False)

s = counts.take(20)

for i in s:
  print i[0],i[1]

， 336
的 292
 228
。 159
我們 86
、 59
台灣 39
與 37
在 33
國家 32
和 31
一個 29
是 27
新政府 27
要 25
經濟 25
這個 25
會 24
讓 24
也 22


#### map 與 flatMap的差異

In [54]:
text_file = sc.textFile(data_file)
counts = text_file.map(lambda line: line.split(" "))
counts.take(1)


[[u'\u5404\u4f4d',
  u'\u53cb\u90a6',
  u'\u7684',
  u'\u5143\u9996',
  u'\u8207',
  u'\u8cb4\u8cd3',
  u'\u3001',
  u'\u5404\u570b',
  u'\u99d0\u53f0',
  u'\u4f7f\u7bc0',
  u'\u53ca',
  u'\u4ee3\u8868',
  u'\u3001',
  u'\u73fe\u5834',
  u'\u7684',
  u'\u597d',
  u'\u670b\u53cb',
  u'\uff0c',
  u'\u5168\u9ad4',
  u'\u570b\u4eba',
  u'\u540c\u80de',
  u'\uff0c',
  u'\u5927\u5bb6',
  u'\u597d',
  u'\u3002',
  u'',
  u'',
  u'']]

In [56]:
text_file = sc.textFile(data_file)
counts = text_file.flatMap(lambda line: line.split(" "))
counts.take(1)

[u'\u5404\u4f4d']

In [39]:
rdd.map(lambda x: x if x>500 else 0).reduce(lambda x,y:x+y)

366120


____
____
____
# pi-estimation 

In [30]:
import random

def sample(p):
    x, y = random.random(), random.random()
    return 1 if x*x + y*y < 1 else 0

count = sc.parallelize(xrange(0, 10000000)).map(sample) \
             .reduce(lambda a, b: a + b)
print "Pi is roughly %f" % (4.0 * count / 10000000)

Pi is roughly 3.143342


In [31]:
count = sc.parallelize(xrange(0, 1000000))\
        .map(lambda p: 1 if (random.random()**2 + random.random()**2)<1 else 0) \
        .reduce(lambda a, b: a + b)
print "Pi is roughly %f" % (4.0 * count / 1000000)

Pi is roughly 3.146328


___
___
___

# Text Search Example

In [46]:
import urllib
f=urllib.urlretrieve("https://www.ccel.org/ccel/bible/kjv.txt","bible")
text_file = sc.textFile(data_file)
lines = text_file.map(lambda line: line) 
lines.take(20)

[u'     __________________________________________________________________',
 u'',
 u'           Title: The King James Version of the Holy Bible',
 u'      Creator(s): Anonymous',
 u'          Rights: Public Domain',
 u'   CCEL Subjects: All; Bible; Old Testament; New Testament; Apocrypha',
 u'      LC Call no: BS185',
 u'     LC Subjects:',
 u'',
 u'                  The Bible',
 u'',
 u'                  Modern texts and versions',
 u'',
 u'                  English',
 u'     __________________________________________________________________',
 u'',
 u'Holy Bible',
 u'',
 u'                               King James Version',
 u'     __________________________________________________________________']


___
___
___

# Filter Example

In [61]:
import urllib
urllib.urlretrieve("https://www.ccel.org/ccel/bible/kjv.txt","bible")
data_file = "./bible"
text_file = sc.textFile(data_file)
lines = text_file.filter(lambda line: 'and' in line) 
lines.take(20)

[u'                  Modern texts and versions',
 u'   Great and manifold were the blessings, most dread Sovereign, which',
 u"   England, when first he sent Your Majesty's Royal Person to rule and",
 u'   Occindental Star, Queen Elizabeth, of most happy memory, some thick and',
 u'   palpable clouds of darkness would so have overshadowed this land, that',
 u'   men should have been in doubt which way they were to walk, and that it',
 u'   dispelled those supposed and surmised mists, and gave unto all that',
 u'   beheld the Government established in Your Highness, and Your hopeful',
 u'   Seed, by an undoubted Title; and this also accompanied with peace and',
 u'   tranquillity at home and abroad.',
 u'   only to the time spent in this transitory world, but directeth and',
 u'   and to continue it in that state wherein the famous Predecessor of Your',
 u'   Highness did leave it; nay, to go forward with the confidence and',
 u'   resolution of a man, in maintaining the truth of Christ