In [None]:
from __future__ import print_function, division
import os
import sys 

spark_home = os.environ['SPARK_HOME']
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.4-src.zip'))

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local") \
   .appName("test") \
   .enableHiveSupport() \
   .getOrCreate()

sc = spark.sparkContext

## Part1. RDD 的基本操作
RDD 由於採分散式架構，在計算以及操作是使用 Map 與 Reduce 的方式，與一般單執行緒程式邏輯不同．

本節將介紹基本的 Map 與 Reduce 等基本指令，讓各位同學能夠熟悉如何操作 RDD．

In [None]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
print(type(wordsRDD))

In [None]:
wordsRDD.map(lambda x: (x,1)).collect()

## map (Transform)
map 能將 fuction 套用在 rdd 中的每個元素上

In [None]:
def makePlural(word):
    return word + 's'

print(makePlural('dog'))

In [None]:
def yourfun(x, y):
    return x.upper+ y
    

In [None]:
wordsRDD.map(lambda x: yourfun(x, ' hi'))

In [None]:
wordsRDD.map(lambda x: makePlural(x)).collect()

In [None]:
appliedRDD = wordsRDD.map(makePlural)

In [None]:
print(type(appliedRDD))

## Collect (Action)
將 RDD 元素送回 Master 並回傳為 List

In [None]:
print(appliedRDD.collect())
print(type(appliedRDD.collect()))

## 使用 Lambda function
Lambda function 亦為匿名函數，並不另外定義函數，直接透過 lambda 來宣告函數邏輯．

In [None]:
lambdaRDD = wordsRDD.map(lambda word: word + 's')
print(lambdaRDD.collect())

### 小練習: 計算每個單字長度

In [None]:
lenofRDD = (wordsRDD
            .map(lambda word: len(word))
            .collect())
print(lenofRDD)

## Part2. 使用 Pair RDD 來做計算

## Pair RDD
Pair RDD 是一种以 (key, value) 方式储存的RDD

In [None]:
pairRDD = wordsRDD.map(lambda word: (word, 1))

In [None]:
print(pairRDD.take(1))
print(pairRDD)

## groupByKey(Transform)
將資料依照 Key 值重新排序

In [None]:
wordsGrouped = pairRDD.groupByKey()
for key, value in wordsGrouped.collect():
    print('{0}: {1}'.format(key, list(value)))


### 依照 key 值加總

In [None]:
wordCountsGrouped = wordsGrouped.map(lambda x: (x[0], sum(x[1])))

In [None]:
wordCountsGrouped.collect()

## 更好的解決方案！
## reduceBykey (Transform)
reduceByKey 能夠直接將資料根據 key 值聚合，減少多餘的交換(shuffle)動作

In [None]:
wordCounts = pairRDD.reduceByKey(lambda a,b: a+b)
print(wordCounts.collect())


### 小練習: 現在你也會寫 wordcount

In [None]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordRDD = sc.parallelize(wordsList)

In [None]:
wordCountRDD = (wordRDD
                 .map(lambda x: (x, 1))
                 .reduceByKey(lambda x ,y : x + y))

In [None]:
wordCountRDD.collect()

## Part3. 尋找不重復值
RDD 提供了多樣的現成 API 可供使用，建議先查找官方文件瞭解現有的 API，避免重復造輪．

## disticnt (Transform)

In [None]:
uniqueWords = wordsRDD.map(lambda word: (word, 1)).distinct()
print(uniqueWords.collect())


### count (action)

In [None]:
countUniqueWords = wordsRDD.map(lambda word: (word, 1)).distinct().count()
print(countUniqueWords)

### 小作業: 計算每個字平均出現幾次

In [None]:
wordsCount = [('cat',2),('elephant',1),('rat',2)]

In [None]:
wordCountRDD = sc.parallelize(wordsCount)

In [None]:
totalCount = (wordCountRDD
                .map(lambda x: x[1])
                .reduce(lambda x, y: x + y))

In [None]:
average = totalCount / (wordCountRDD.distinct().count())

In [None]:
print(average)

## Part4.  Rdd 也可以當作函數的參數

In [None]:
def wordCount(wordListRDD):
    """Creates a pair RDD with word counts from an RDD of words.

    Args:
        wordListRDD (RDD of str): An RDD consisting of words.

    Returns:
        RDD of (str, int): An RDD consisting of (word, count) tuples.
    """
    return (wordListRDD
           .map(lambda a : (a,1))
           .reduceByKey(lambda a,b: a+b))


In [None]:
wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
print(wordCount(wordsRDD).collect())

## Part5. 與 python library 互動
使用 pyspark 最方便的地方就是在於可以直接引用 python 的庫，並用在 map 或 reduce 中

In [None]:
from operator import add

totalCount = (wordCounts
              .map(lambda x: x[1])
              .reduce(add))

print(totalCount)


## Part6. Text Mining
結合目前的課程，最後帶大家做個 Text Mining 的例子．
需要讀取一個外部 text 文字檔，清理文檔內容，計算使用頻次最高的幾個詞．

### 6.1 去除符号

In [None]:
import re
def removePunctuation(text):
    """Removes punctuation, changes to lower case, and strips leading and trailing spaces.

    Note:
        Only spaces, letters, and numbers should be retained.  Other characters should should be
        eliminated (e.g. it's becomes its).  Leading and trailing spaces should be removed after
        punctuation is removed.

    Args:
        text (str): A string.

    Returns:
        str: The cleaned up string.
    """
    return re.sub("[^a-zA-Z0-9 ]", "", text.strip(" ").lower())
print(removePunctuation('Hi, you!'))
print(removePunctuation(' No under_score!'))

### 6.2 從 HDFS 讀取檔案

In [None]:
!hadoop fs -put ../data/shakespear.txt /tmp

In [None]:
!hadoop fs -tail /tmp/shakespear.txt

In [None]:
filepath = "hdfs:///tmp/shakespear.txt"
shakespeareRDD = (sc
                  .textFile(filepath)
                  .map(removePunctuation))

In [None]:
shakespeareRDD.collect()

In [None]:
print('\n'.join(shakespeareRDD
                .zipWithIndex()  # to (line, lineNum)
                .map(lambda x: '{0}: {1}'.format(x[1], x[0]))  # to 'lineNum: line'
                .take(15)))

### 6.3 切詞

In [None]:
shakespeareWordsRDD = shakespeareRDD.flatMap(lambda a: a.split(" "))
shakespeareWordCount = shakespeareWordsRDD.count()
print(shakespeareWordsRDD.top(5))
print(shakespeareWordCount)

### 6.4 移除空白值

In [None]:
shakeWordsRDD = shakespeareWordsRDD.filter(lambda word: len(word) > 0)
shakeWordCount = shakeWordsRDD.count()
print(shakeWordCount)


### 6.5 計算字詞數並排序

In [None]:
top15WordsAndCounts = wordCount(shakeWordsRDD).takeOrdered(15, lambda x: -x[1])
print('\n'.join(map(lambda x: '{0}: {1}'.format(x[0], x[1]), top15WordsAndCounts)))