# Spark RDD Lab

## Khởi tạo SparkContext

In [1]:
import findspark
findspark.init()

import pyspark
findspark.find()

from pyspark import SparkContext
sc = SparkContext()

## 1. Tạo một RDD từ một collection
### 1.1 Tạo một Python list gồm các số từ 1 đến 100 

In [16]:
numb = range(1,101)

### 1.2. Tạo một RDD từ Python list

In [17]:
numbRDD = sc.parallelize(numb)


### 1.3. In ra kiểu của `numbRDD`

In [18]:
print("The type of RDD is", type(numbRDD))


The type of RDD is <class 'pyspark.rdd.PipelinedRDD'>


## 2. Tạo một RDD từ một external dataset
### 2.1. Tạo một RDD từ một file

In [19]:
file_path = './README.md' # File readme của thư mục cài đặt Spark
fileRDD = sc.textFile(file_path)

### 2.2. Kiểm tra kiểu của `fileRDD`

In [20]:
print("The file type of fileRDD is", type(fileRDD))

The file type of fileRDD is <class 'pyspark.rdd.RDD'>


## 3. Phân hoạch dữ liệu
### 3.1. Kiểm tra số partition của `fileRDD`

In [21]:
print("Number of partitions in fileRDD is", fileRDD.getNumPartitions())

Number of partitions in fileRDD is 2


### 3.2. Tạo `fileRDD_part` từ `file_path` với 5 partitions

In [22]:
fileRDD_part = sc.textFile(file_path, minPartitions=5)

### 3.3. Kiểm tra số partition của `fileRDD_part`

In [23]:
print("Number of partitions in fileRDD_part is", fileRDD_part.getNumPartitions())


Number of partitions in fileRDD_part is 5


## 4. Map and Collect
### 4.1. Tạo một map() transformation để tính lũy thừa bậc 3 của các phần tử trong `numbRDD`


In [24]:
cubedRDD = numbRDD.map(lambda x: x ** 3)


### 4.2. Collect kết quả

In [25]:
numbers_all = cubedRDD.collect()


### 4.3. In ra các phần tử từ `numbers_all`

In [26]:
for numb in numbers_all:
    print(numb)

1
8
27
64
125
216
343
512
729
1000
1331
1728
2197
2744
3375
4096
4913
5832
6859
8000
9261
10648
12167
13824
15625
17576
19683
21952
24389
27000
29791
32768
35937
39304
42875
46656
50653
54872
59319
64000
68921
74088
79507
85184
91125
97336
103823
110592
117649
125000
132651
140608
148877
157464
166375
175616
185193
195112
205379
216000
226981
238328
250047
262144
274625
287496
300763
314432
328509
343000
357911
373248
389017
405224
421875
438976
456533
474552
493039
512000
531441
551368
571787
592704
614125
636056
658503
681472
704969
729000
753571
778688
804357
830584
857375
884736
912673
941192
970299
1000000


## 5. Filter and Count
### 5.1 Áp dụng filter trên `fileRDD` để chọn ra những dòng có từ `Spark`

In [27]:
fileRDD_filter = fileRDD.filter(lambda line: 'Spark' in line)

### 5.2. Cho biết có bao nhiêu dòng từ `fileRDD`?

In [28]:
print("The total number of lines with the keyword Spark is", fileRDD_filter.count())

The total number of lines with the keyword Spark is 20


### 5.3. In 5 dòng đầu của `fileRDD`

In [29]:
for line in fileRDD_filter.take(5):
    print(line)


# Apache Spark
Spark is a unified analytics engine for large-scale data processing. It provides
rich set of higher-level tools including Spark SQL for SQL and DataFrames,
pandas API on Spark for pandas workloads, MLlib for machine learning, GraphX for graph processing,
[![PySpark Coverage](https://codecov.io/gh/apache/spark/branch/master/graph/badge.svg)](https://codecov.io/gh/apache/spark)


## 6. ReduceBykey and Collect  
### 6.1. Tạo một `PairRDD` tên `Rdd` như sau:

In [30]:
Rdd = sc.parallelize([(1,2),(3,4),(3,6),(4,5)])

### 6.2. Áp dụng `reduceByKey()` operation trên Rdd

In [31]:
Rdd_Reduced = Rdd.reduceByKey(lambda x, y: x + y)


### 6.3. Duyệt và in ra kết quả

In [32]:
for num in Rdd_Reduced.collect():
    print("Key {} has {} Counts".format(num[0], num[1]))


Key 1 has 2 Counts
Key 3 has 10 Counts
Key 4 has 5 Counts


## 7. SortByKey and Collect
### 7.1. Sắp xếp `Rdd_Reduced` theo khóa giảm dần

In [33]:
Rdd_Reduced_Sort = Rdd_Reduced.sortByKey(ascending=False)


### 7.2. Duyệt và in ra kết quả

In [34]:
for num in Rdd_Reduced_Sort.collect():
    print("Key {} has {} Counts".format(num[0], num[1]))


Key 4 has 5 Counts
Key 3 has 10 Counts
Key 1 has 2 Counts


## 8. CountingBykeys
### 8.1. Đếm số key khác nhau

In [35]:
total = Rdd.keys().distinct().count()

### 8.2 Kiểm tra kiểu của biến `total`?

In [36]:
print("The type of total is", type(total))


The type of total is <class 'int'>


### 8.2. Duyệt và in ra kết quả

In [37]:
for k, v in Rdd_Reduced.collect():
    print("key", k, "has", v, "counts")

key 1 has 2 counts
key 3 has 10 counts
key 4 has 5 counts


## 9. Tạo biến `baseRDD` từ file và thực hiện các transform
### 9.1 Tạo biến `baseRDD` từ `file_path`

In [38]:
file_path = './README.md'
baseRDD = sc.textFile(file_path)


### 9.2. Biến đổi các dòng trong `baseRDD` thành các từ

In [41]:
splitRDD = baseRDD.flatMap(lambda x: x.split())


### 9.3. Đếm tổng số từ

In [42]:
print("Total number of words in splitRDD:", splitRDD.count())


Total number of words in splitRDD: 516


## 10. Loại bỏ stop word và áp dụng reduce opperator trên tập dữ liệu
### 10.1. Chuyển các từ sang dạng chữ thường (lower case) và loại bỏ các stop words từ biến `stop_words`

In [43]:
stop_words = ['a', 'an', 'the', 'is', 'be', 'was', 'were', 'it', 'this', 'that', 'but', 'if', 'or', 'not']
splitRDD_no_stop = splitRDD.filter(lambda x: x.lower() not in stop_words)


### 10.2. Tạo các tuple có dạng (word, 1) 

In [44]:
splitRDD_no_stop_words = splitRDD_no_stop.map(lambda w: (w, 1))


### 10.3. Đếm tần số của mỗi từ

In [45]:
resultRDD = splitRDD_no_stop_words.reduceByKey(lambda x, y: x + y)


## 11. In tần số của mỗi từ
### 11.1. Hiển thị 10 từ đầu và tần số của nó trong `resultRDD`

In [46]:
for word in resultRDD.take(10):
    print(word)


('#', 1)
('Apache', 1)
('Spark', 15)
('unified', 1)
('analytics', 1)
('engine', 2)
('provides', 1)
('high-level', 1)
('APIs', 1)
('in', 5)


### 11.2 Hoán đổi giữa key và value trong `resultRDD`

In [47]:
resultRDD_swap = resultRDD.map(lambda x: (x[1], x[0]))


### 11.3. Sắp xếp các key theo thứ tự giảm dần

In [48]:
resultRDD_swap_sort = resultRDD_swap.sortByKey(ascending=False)


### 11.4. Hiển thị 10 từ xuất hiện nhiều nhất trong `resultRDD_swap_sort`

In [49]:
for word in resultRDD_swap_sort.take(10):
    print("{}, {}".format(word[0], word[1]))


16, to
15, Spark
13, for
9, and
9, ##
8, on
8, ```
7, run
6, can
6, ```bash
