In [17]:
pip install pyspark



In [18]:
#spark Properties
from pyspark.sql import SparkSession
spark= SparkSession.builder.master("local[*]").appName('Customers').getOrCreate()
from pyspark import SparkConf, SparkContext
import collections

In [19]:
conf= SparkConf().setMaster("local").setAppName("word couting")
sc = SparkContext.getOrCreate(conf=conf)

In [20]:
#Chuyển RDD thành list bằng parallelize
textRDD = sc.parallelize([
    "tui có tờ vé số là 18 12 00 98", 
    "tớ 21 tuổi rồi.",
    "đi phía trước rẽ trái rồi đi thêm 200 mét."
])
#chúng ta có thể chuyển ngược lại bằng collect() tuy nhiên nên chú ý kích thước
textRDD.collect()

['tui có tờ vé số là 18 12 00 98',
 'tớ 21 tuổi rồi.',
 'đi phía trước rẽ trái rồi đi thêm 200 mét.']

In [21]:
#Một số chức năng khác:
#count() trả về số phần tử trong RDD
print("RDD size: %s" % textRDD.count())

RDD size: 3


In [22]:
#first Lấy phần tử đầu tiên
print("RDD first element: %s" % textRDD.first())

RDD first element: tui có tờ vé số là 18 12 00 98


In [23]:
#take() lấy n phần tử đầu tiên dưới dang list
print("RDD first 2 elements:  %s" % textRDD.take(2))

RDD first 2 elements:  ['tui có tờ vé số là 18 12 00 98', 'tớ 21 tuổi rồi.']


In [24]:
#foreach áp dụng hàm cho từng phần tử RDD
from __future__ import print_function
textRDD.foreach(print)

In [25]:
#save RDD thành file text
textRDD.saveAsTextFile('samples.txt')

In [26]:
# đây là RDD ban đầu của chúng tôi - mỗi phần tử là một dòng văn bản
textRDD.collect()


['tui có tờ vé số là 18 12 00 98',
 'tớ 21 tuổi rồi.',
 'đi phía trước rẽ trái rồi đi thêm 200 mét.']

In [27]:
#đầu tiên chia từng dòng thành các token và kết hợp các token thành 1 RDD duy nhất (list)
textRDD.flatMap(lambda line:line.split())

PythonRDD[23] at RDD at PythonRDD.scala:53

In [28]:

textRDD \
    .flatMap(lambda line:line.split()) \
    .collect()

['tui',
 'có',
 'tờ',
 'vé',
 'số',
 'là',
 '18',
 '12',
 '00',
 '98',
 'tớ',
 '21',
 'tuổi',
 'rồi.',
 'đi',
 'phía',
 'trước',
 'rẽ',
 'trái',
 'rồi',
 'đi',
 'thêm',
 '200',
 'mét.']

In [29]:
import re
# tạo rdd các phần tử là số
def is_number(s):
    return re.match(r'[0-9]+', s)
textRDD.flatMap(lambda line:line.split()) \
    .filter(is_number) \
    .collect()

['18', '12', '00', '98', '21', '200']

In [30]:
#chuyển các số thành số int
textRDD.flatMap(lambda line:line.split()) \
    .filter(is_number) \
    .map(int) \
    .collect()

[18, 12, 0, 98, 21, 200]

In [31]:
#cộng các phần tử lại vs nhau bằng reduce
import operator
textRDD.flatMap(lambda line:line.split()) \
    .filter(is_number) \
    .map(int) \
    .reduce(operator.add)

349