In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
import pandas as pd
from pyspark.sql import SQLContext
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
spark = SparkSession(sc)

# Basic RDD Transformations

# map()
Syntax: RDD.map(<function>, preservesPartitioning=False).

Trả về 1 RDD mới bằng cách truyền mỗi phần tử đầu vào (nguồn) qua hàm func.

# flatMap()
Syntax: RDD.flatMap(<function>, preservesPartitioning=False).

Tương tự map() nhưng khác map() ở chỗ, mỗi phần tử đầu vào qua flatMap sẽ trả về 0 hoặc nhiều phần tử đầu ra (có thể hiểu qua map sẽ là 1-1).

# filter()
Syntax: RDD.filter(<function>).

Trả về 1 RDD mới bằng cách chọn những phần tử đầu vào (nguồn) mà hàm func  trả về kết quả true.

In [10]:
licenses = sc.textFile('opt/spark/licenses')
words = licenses.flatMap(lambda x: x.split(' '))
words.take(5)
# returns [u'The', u'MIT', u'License', u'(MIT)', u'']

['The', 'MIT', 'License', '(MIT)', '']

In [11]:
lowercase = words.map(lambda x: x.lower())
lowercase.take(5)
# returns [u'the', u'mit', u'license', u'(mit)', u'']

['the', 'mit', 'license', '(mit)', '']

In [12]:
longwords = lowercase.filter(lambda x: len(x) > 12)
longwords.take(5)
# returns [u'documentation', u'merchantability,']

['documentation',
 'merchantability,',
 'noninfringement.',
 'redistribution',
 'modification,']

# distinct()
Syntax: RDD.distinct(numPartitions=None).

Trả về 1 RDD mới chứa mỗi phần tử là duy nhất của tập dữ liệu nguồn (đầu vào).

In [14]:
allwords = lowercase.count()
distinctwords = lowercase.distinct().count()
print ("Total words: %s, Distinct Words: %s" % (allwords, distinctwords))
# returns "Total words: 32015, Distinct Words: 2529"

Total words: 32015, Distinct Words: 2529


# groupBy()
Syntax: RDD.groupBy(<function>, numPartitions=None).
    
Trả về 1 RDD của các phần tử được nhóm bởi một func xác định.

In [15]:
licenses = sc.textFile('opt/spark/licenses')
words = licenses.flatMap(lambda x: x.split(' ')) \
                .filter(lambda x: len(x) > 0)
words.take(5)

['The', 'MIT', 'License', '(MIT)', 'Copyright']

In [17]:
groupedbyfirstletter = words.groupBy(lambda x: x[0].lower())
groupedbyfirstletter.take(1)
# returns:
# [('l', <pyspark.resultiterable.ResultIterable object at 0x7f678e9cca20>)]

[('[', <pyspark.resultiterable.ResultIterable at 0x220ea026580>)]

# sortBy()
Syntax: RDD.sortBy(keyfunc , ascending=True, numPartitions=None).

Sắp xếp một RDD theo đối số keyfunc (hàm được đặt tên hoặc ẩn danh) đề cử khóa cho một tập dữ liệu nhất định. Nó sắp xếp theo thứ tự sắp xếp của loại đối tượng chính. 

In [24]:
readme = sc.textFile('opt/spark/README.md')
words = readme.flatMap(lambda x: x.split(' ')) \
              .filter(lambda x: len(x) > 0)
sortbyfirstletter = words.sortBy(lambda x: x[0].lower(), ascending=False)
sortbyfirstletter.take(5)
# returns ['You', 'you', 'you', 'you', 'you']

['You', 'you', 'you', 'you', 'You']

# Basic RDD Actions

# count()
Syntax:RDD.count()

Trả về số phần tử của tập dữ liệu.

In [25]:
words.count()
# returns 495

495

# collect()
Syntax: RDD.collect()

Trả về tất cả các phần tử của tập dữ liệu như 1 mảng ở driver Program. Hàm này hữu ích sau khi lọc hoặc thao tác khác mà trả về tập dữ liệu con đủ nhỏ.

In [26]:
words.collect()

['#',
 'Apache',
 'Spark',
 'Spark',
 'is',
 'a',
 'unified',
 'analytics',
 'engine',
 'for',
 'large-scale',
 'data',
 'processing.',
 'It',
 'provides',
 'high-level',
 'APIs',
 'in',
 'Scala,',
 'Java,',
 'Python,',
 'and',
 'R,',
 'and',
 'an',
 'optimized',
 'engine',
 'that',
 'supports',
 'general',
 'computation',
 'graphs',
 'for',
 'data',
 'analysis.',
 'It',
 'also',
 'supports',
 'a',
 'rich',
 'set',
 'of',
 'higher-level',
 'tools',
 'including',
 'Spark',
 'SQL',
 'for',
 'SQL',
 'and',
 'DataFrames,',
 'MLlib',
 'for',
 'machine',
 'learning,',
 'GraphX',
 'for',
 'graph',
 'processing,',
 'and',
 'Structured',
 'Streaming',
 'for',
 'stream',
 'processing.',
 '<https://spark.apache.org/>',
 '[![Jenkins',
 'Build](https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7-hive-2.3/badge/icon)](https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7-hive-2.3)',
 '[![AppVeyor',
 'Build](https://img.shields.io/appveyor/ci/ApacheSoftwa

# take()
Syntax: RDD.take(n)

Trả về mảng gồm n phần tử đầu tiên của tập dữ liệu.

In [27]:
words.take(3)

['#', 'Apache', 'Spark']

# top()
Syntax: RDD.top(n, key=None)

Trả về n phần tử hàng đầu từ RDD, nhưng không giống như với take (), với top() các phần tử được sắp xếp và trả về theo thứ tự giảm dần.

In [28]:
words.distinct().top(3)

['your', 'you', 'with']

# first()
Syntax: RDD.first()

Trả về phần tử đầu tiên của tập dữ liệu (tương tự take(1)).

In [29]:
words.first()

'#'

# reduce()
Syntax: RDD.reduce('function')
    
Tổng hợp các phần tử của tập dữ liệu sử dụng hàm func (có 2 đối số và trả về 1 kết quả).

In [32]:
numbers = sc.parallelize([1,2,3,4,5,6,7,8,9])
numbers.reduce(lambda x, y: x + y) 
# returns 45

45

# fold()
Syntax: RDD.fold(zeroValue, <'function'>)
    
Tổng hợp các phần tử của mỗi phân vùng của RDD và sau đó thực hiện phép toán tổng hợp dựa trên kết quả cho tất cả, bằng cách sử dụng chức năng và một zeroValue. Mặc dù reduce() và fold() tương tự nhau về chức năng, chúng khác nhau ở chỗ fold() không giao hoán và giá trị cuối cùng (zeroValue) là bắt buộc. 

In [31]:
numbers = sc.parallelize([1,2,3,4,5,6,7,8,9])
numbers.fold(0, lambda x, y: x + y)
# returns 45

45

In [33]:
empty = sc.parallelize([])
empty.reduce(lambda x, y: x + y)
# returns:
# ValueError: Cannot reduce() empty RDD

ValueError: Can not reduce() empty RDD

In [None]:
empty.fold(0, lambda x, y: x + y)
# returns 0

# foreach()
Syntax: RDD.foreach(<'function'>)

Chạy hàm func cho mỗi phần tử của tập dữ liệu. Điều này có tác dụng khi thực hiện cập nhật 1 biến accumulator hoặc tương tác với hệ thống lưu trữ ngoài.

In [36]:
def printfunc(x): print(x)
licenses = sc.textFile('opt/spark/licenses')
longwords = licenses.flatMap(lambda x: x.split(' ')) \
                    .filter(lambda x: len(x) > 12)
longwords.foreach(lambda x: printfunc(x))
# returns:
# ...
# Redistributions
# documentation
# distribution.
# MERCHANTABILITY
# ...

# Transformations on PairRDDs

# keys()
Syntax: RDD.keys()

Trả về một RDD với các khóa từ một cặp key/value RDD hoặc phần tử đầu tiên từ mỗi bộ trong một cặp key/value RDD.

In [37]:
kvpairs = sc.parallelize([('city','Hayward'),
                          ('state','CA'),
                          ('zip',94541),
                          ('country','USA')])
kvpairs.keys().collect()
# returns ['city', 'state', 'zip', 'country']

['city', 'state', 'zip', 'country']

# values()
Syntax: RDD.values()

Trả về một RDD với các giá trị từ một cặp key/value RDD hoặc phần tử thứ hai từ mỗi bộ trong một cặp key/value RDD

In [38]:
kvpairs = sc.parallelize([('city','Hayward'),
                          ('state','CA'),
                          ('zip',94541),
                          ('country','USA')])
kvpairs.values().collect()
# returns ['Hayward', 'CA', 94541, 'USA']

['Hayward', 'CA', 94541, 'USA']

# keyBy()
Syntax: RDD.keyBy(<'func'>)

Tạo ra một bộ giá trị bao gồm một khóa và một giá trị từ các phần tử trong RDD bằng cách áp dụng một chức năng được chỉ định bởi đối số func. 

In [39]:
locations = sc.parallelize([('Hayward', 'USA', 1),
                            ('Baumholder','Germany', 2),
                            ('Alexandria','USA', 3),
                            ('Melbourne','Australia', 4)])
bylocno = locations.keyBy(lambda x: x[2])
bylocno.collect()
# returns:
#[(1, ('Hayward', 'USA', 1)), (2, ('Baumholder', 'Germany', 2)),
# (3, ('Alexandria', 'USA', 3)), (4, ('Melbourne', 'Australia', 4))]

[(1, ('Hayward', 'USA', 1)),
 (2, ('Baumholder', 'Germany', 2)),
 (3, ('Alexandria', 'USA', 3)),
 (4, ('Melbourne', 'Australia', 4))]

# mapValues()
Syntax: RDD.mapValues(<'function'>)

Chuyển từng giá trị trong một cặp  key/value RDD thông qua một chức năng (một chức năng được đặt tên hoặc ẩn danh được chỉ định bởi đối số <'function'>) mà không cần thay đổi các key. 

# flatMapValues()
Syntax: RDD.flatMapValues(<'function'>)

Chuyển từng giá trị trong một cặp key/value RDD thông qua một chức năng mà không cần thay đổi các key và tạo ra một danh sách phẳng.

In [42]:
locwtemps = sc.parallelize(['Hayward,71|69|71|71|72',
                            'Baumholder,46|42|40|37|39',
                            'Alexandria,50|48|51|53|44',
                            'Melbourne,88|101|85|77|74'])
kvpairs = locwtemps.map(lambda x: x.split(','))
kvpairs.collect()
# returns :
# [['Hayward', '71|69|71|71|72'],
# ['Baumholder', '46|42|40|37|39'],
# ['Alexandria', '50|48|51|53|44'],
# ['Melbourne', '88|101|85|77|74']]

[['Hayward', '71|69|71|71|72'],
 ['Baumholder', '46|42|40|37|39'],
 ['Alexandria', '50|48|51|53|44'],
 ['Melbourne', '88|101|85|77|74']]

In [43]:
locwtemplist = kvpairs.mapValues(lambda x: x.split('|')) \
                      .mapValues(lambda x: [int(s) for s in x])
locwtemplist.collect()
# returns :
# [('Hayward', [71, 69, 71, 71, 72]),
# ('Baumholder', [46, 42, 40, 37, 39]),
# ('Alexandria', [50, 48, 51, 53, 44]),
# ('Melbourne', [88, 101, 85, 77, 74])]

[('Hayward', [71, 69, 71, 71, 72]),
 ('Baumholder', [46, 42, 40, 37, 39]),
 ('Alexandria', [50, 48, 51, 53, 44]),
 ('Melbourne', [88, 101, 85, 77, 74])]

In [44]:
locwtemps = kvpairs.flatMapValues(lambda x: x.split('|')) \
                   .map(lambda x: (x[0], int(x[1])))
locwtemps.collect()
# returns :
# [('Hayward', 71), ('Hayward', 69), ('Hayward', 71)]...

[('Hayward', 71),
 ('Hayward', 69),
 ('Hayward', 71),
 ('Hayward', 71),
 ('Hayward', 72),
 ('Baumholder', 46),
 ('Baumholder', 42),
 ('Baumholder', 40),
 ('Baumholder', 37),
 ('Baumholder', 39),
 ('Alexandria', 50),
 ('Alexandria', 48),
 ('Alexandria', 51),
 ('Alexandria', 53),
 ('Alexandria', 44),
 ('Melbourne', 88),
 ('Melbourne', 101),
 ('Melbourne', 85),
 ('Melbourne', 77),
 ('Melbourne', 74)]

# groupByKey()
Syntax: RDD.groupByKey(numPartitions=None, partitionFunc= <'hash_fn>)

Khi gọi đến 1 tập dữ liệu (K,V) sẽ trả về 1 tập là cặp (K,Seq(V))(Tức là nhóm tập các phần tử cùng Key). 
Chú ý: mặc định chỉ có 8 task song song khi grouping. Có thể thay đổi số task song song này bằng việc truyền vào tham số đầu vào.

In [45]:
grouped = locwtemps.groupByKey()
grouped.take(1)
# returns:
# [('Melbourne', <pyspark.resultiterable.ResultIterable object at 0x7f121ce11390>)]

[('Hayward', <pyspark.resultiterable.ResultIterable at 0x220ec79df70>)]

In [46]:
avgtemps = grouped.mapValues(lambda x: sum(x)/len(x))
avgtemps.collect()
# returns:
# [('Melbourne', 85), ('Baumholder', 40), ('Alexandria', 49), ('Hayward', 70)]

[('Hayward', 70.8),
 ('Baumholder', 40.8),
 ('Melbourne', 85.0),
 ('Alexandria', 49.2)]

# reduceByKey()
Syntax: RDD.reduceByKey(<'function'>, numPartitions=None, partitionFunc=<'hash_fn'>)
    
Khi gọi tập dữ liệu (K,V), trả về 1 tập (K,V) mà giá trị của key được tổng hợp sử dụng hàm reduce func.

In [47]:
temptups = locwtemps.mapValues(lambda x: (x, 1))
# creates tuples (city, (temp, 1))
inputstoavg = temptups.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))
# sums temperatures by city
averages = inputstoavg.map(lambda x: (x[0], x[1][0]/x[1][1]))
# divides the sum of temperatures by key by the number of readings
averages.collect()
# returns :
# [('Baumholder', 40.8),
# ('Melbourne', 85.0),
# ('Alexandria', 49.2),
# ('Hayward', 70.8)]

[('Hayward', 70.8),
 ('Baumholder', 40.8),
 ('Melbourne', 85.0),
 ('Alexandria', 49.2)]

# foldByKey()
Syntax: RDD.foldByKey(zeroValue, <'function'>, numPartitions=None, partitionFunc=<'hash_fn'>)

Tương tự như hàm fold(). Tuy nhiên, foldByKey() là mộtchuyển đổi hoạt động với các phần tử key/value được xác định trước. Cả foldByKey() và fold() đều cung cấp đối số zeroValue của cùng loại sẽ được sử dụng nếu RDD trống.

In [48]:
maxbycity = locwtemps.foldByKey(0, lambda x, y: x if x > y else y)
maxbycity.collect()
# returns :
# [('Baumholder', 46), ('Melbourne', 101), ('Alexandria', 53), ('Hayward', 72)]

[('Hayward', 72), ('Baumholder', 46), ('Melbourne', 101), ('Alexandria', 53)]

# sortByKey()
Syntax:RDD.sortByKey(ascending=True, numPartitions=None, keyfunc=<'function'>)

Khi gọi tập dữ liệu (K,V) với K có thể thực hiện sắp thứ tự được. Khi đó, nó sẽ trả về tập dữ liệu (K,V) được sắp sếp tăng dần hoặc giảm dần theo key. 

In [49]:
sortedbykey = locwtemps.sortByKey()
sortedbykey.take(4)
# returns:
# [('Alexandria', 50), ('Alexandria', 48), ('Alexandria', 51), ('Alexandria', 53)]

[('Alexandria', 50),
 ('Alexandria', 48),
 ('Alexandria', 51),
 ('Alexandria', 53)]

In [50]:
sortedbyval = locwtemps.map(lambda x: (x[1],x[0])) \
                       .sortByKey(ascending=False)
sortedbyval.take(4)
# returns:# [(101, 'Melbourne'), (88, 'Melbourne'), (85, 'Melbourne'), (77, 'Melbourne')]

[(101, 'Melbourne'), (88, 'Melbourne'), (85, 'Melbourne'), (77, 'Melbourne')]