In [1]:
import pyspark
myConf = pyspark.SparkConf()
spark = pyspark.sql.SparkSession\
    .builder\
    .master('local')\
    .appName('myApp')\
    .config(conf = myConf)\
    .getOrCreate()

21/11/22 03:40:21 WARN Utils: Your hostname, Kritiasui-MacBookAir.local resolves to a loopback address: 127.0.0.1; using 172.30.1.23 instead (on interface en0)
21/11/22 03:40:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/11/22 03:40:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# RDD

## 생성

### List에서 생성
- sparkContext.parallelize()

In [2]:
myList = [x for x in range(1, 8)]
rdd1 = spark.sparkContext.parallelize(myList)


### file에서 생성
- sparkContext.textFile()

In [3]:
import os
rdd2 = spark.sparkContext.textFile(os.path.join('data', 'spark_wiki.txt'))

In [4]:
rdd3 = spark.sparkContext.textFile(os.path.join('data', 'spark_2cols.csv'))

## API
- 변환 함수를 사용해서 연산결과로서 새로운 RDD객체, iterator를 생성함
- 연산 함수를 사용해서 연산결과로서 새로운 상수값을 python list로 생성함

### transformations
- Lazy연산으로, 실제 수행시에 일괄적으로 실행된다.

In [5]:
# map
squared = rdd1.map(lambda x: x * x)
squared.collect()



[1, 4, 9, 16, 25, 36, 49]

In [6]:
# map
splitted = rdd3.map(lambda line : line.split(','))
splitted.take(5)


[['35', ' 2'], ['40', ' 27'], ['12', ' 38'], ['15', ' 31'], ['21', ' 1']]

In [7]:
# map
stripped = splitted.map(lambda x: [int(i) for i in x])
stripped.take(5)

[[35, 2], [40, 27], [12, 38], [15, 31], [21, 1]]

In [8]:
sentences = rdd2.map(lambda x : x.split())
sentences.take(3)

[['Wikipedia'],
 ['Apache',
  'Spark',
  'is',
  'an',
  'open',
  'source',
  'cluster',
  'computing',
  'framework.'],
 ['아파치', '스파크는', '오픈', '소스', '클러스터', '컴퓨팅', '프레임워크이다.']]

In [9]:
for line in sentences.collect():
    for word in line:
        print(word, end = ' ')
    print("\n-----")

Wikipedia 
-----
Apache Spark is an open source cluster computing framework. 
-----
아파치 스파크는 오픈 소스 클러스터 컴퓨팅 프레임워크이다. 
-----
Apache Spark Apache Spark Apache Spark 
-----
아파치 스파크 아파치 스파크 아파치 스파크 
-----
Originally developed at the University of California, Berkeley's AMPLab, 
-----
the Spark codebase was later donated to the Apache Software Foundation, 
-----
which has maintained it since. 
-----
Spark provides an interface for programming entire clusters with 
-----
implicit data parallelism and fault-tolerance. 
-----


In [10]:
rdd2.map(lambda x : len(x)).collect()

[9, 59, 32, 38, 23, 72, 71, 30, 64, 46]

#### reduce

In [11]:
rdd_ztoh = spark.sparkContext.parallelize(range(1, 101))
rdd_ztoh.reduce(lambda subtotal, x: subtotal + x)

5050

#### filter

In [12]:
# 한글 필터링은 문자열 앞에 u붙여서 unicode인식하게 해주기
rdd_filtered = rdd2.filter(lambda line : 'Spark' in line)
print("How many lines having 'Spark':", rdd_filtered.count())

How many lines having 'Spark': 4


In [13]:
# filter로 stopwords제거
stopwords = ['is', 'am', 'are', 'the', 'for', 'a', 'an', 'at']
stop_filtered = rdd2.flatMap(lambda x : x.split()).filter(lambda x: x not in stopwords).collect()
for words in stop_filtered:
    print(words, end = ' ')

Wikipedia Apache Spark open source cluster computing framework. 아파치 스파크는 오픈 소스 클러스터 컴퓨팅 프레임워크이다. Apache Spark Apache Spark Apache Spark 아파치 스파크 아파치 스파크 아파치 스파크 Originally developed University of California, Berkeley's AMPLab, Spark codebase was later donated to Apache Software Foundation, which has maintained it since. Spark provides interface programming entire clusters with implicit data parallelism and fault-tolerance. 

#### foreach
-   반환값이 없고, 각 요소에 대해 적용한다.

In [14]:
spark.sparkContext.parallelize(range(1, 6)).foreach(lambda x : x + 1).collect()

AttributeError: 'NoneType' object has no attribute 'collect'

In [15]:
def f(x):
    print(x, end = ' ')

spark.sparkContext.parallelize(range(1, 6)).foreach(f)

1 2 3 4 5 

#### groupBy
- 주로 unpaired RDD에 많이 쓰인다.
- key를 선택하여 사용할 수 있다.
- 유사한 기능을 수행하는 groupByKey()와 비교해서 상대적으로 빠르지 않다.

In [16]:
# 앞 2글자로 group
grouping = rdd2.groupBy(lambda x : x[:2]) # 인자로 key를 지정해준다.
for (k, v) in grouping.collect():
    print(k, ':', v)



Wi : <pyspark.resultiterable.ResultIterable object at 0x7f9bc7355a30>
Ap : <pyspark.resultiterable.ResultIterable object at 0x7f9bc7355f10>
아파 : <pyspark.resultiterable.ResultIterable object at 0x7f9bc7355640>
Or : <pyspark.resultiterable.ResultIterable object at 0x7f9bc750cd90>
th : <pyspark.resultiterable.ResultIterable object at 0x7f9bc750cca0>
wh : <pyspark.resultiterable.ResultIterable object at 0x7f9bc750c220>
Sp : <pyspark.resultiterable.ResultIterable object at 0x7f9bc750c2e0>
im : <pyspark.resultiterable.ResultIterable object at 0x7f9bc750c250>


In [17]:
grouping = rdd2.groupBy(lambda x: x[:2])
for k, v in grouping.collect():
    for eachValue in v:
        print("{}: {}".format(k, eachValue))
    print('----')

Wi: Wikipedia
----
Ap: Apache Spark is an open source cluster computing framework.
Ap: Apache Spark Apache Spark Apache Spark
----
아파: 아파치 스파크는 오픈 소스 클러스터 컴퓨팅 프레임워크이다.
아파: 아파치 스파크 아파치 스파크 아파치 스파크
----
Or: Originally developed at the University of California, Berkeley's AMPLab,
----
th: the Spark codebase was later donated to the Apache Software Foundation,
----
wh: which has maintained it since.
----
Sp: Spark provides an interface for programming entire clusters with
----
im: implicit data parallelism and fault-tolerance.
----


#### paired RDD의 groupBy

In [18]:
_testList = [('a', 1), ('a', 1), ('a', 1), ('b', 1), ('b', 1),
             ('a', 1), ('b', 1),
             ('a', 1), ('a', 1), ('b', 1), ('b', 1),]

In [19]:
_testRdd = spark.sparkContext.parallelize(_testList)
_testRdd.groupBy(lambda x: x[0]).collect()
# groupBy의 결과는 (key, resultIteratable)

[('a', <pyspark.resultiterable.ResultIterable at 0x7f9bc75081f0>),
 ('b', <pyspark.resultiterable.ResultIterable at 0x7f9bc754c430>)]

#### resultIteratable을 보는 방법

In [20]:
_testRdd.groupBy(lambda x : x[0]).mapValues(list).collect()
# mapValues : value에 대해 map연산

[('a', [('a', 1), ('a', 1), ('a', 1), ('a', 1), ('a', 1), ('a', 1)]),
 ('b', [('b', 1), ('b', 1), ('b', 1), ('b', 1), ('b', 1)])]

### Pair RDD에 대한 연산
- groupByKey는 메모리를 많이 사용해서 비추
- reduceByKey나 combineByKey를 사용해서 병렬처리를 지향하자.

In [21]:
rdd4 = _testRdd
rdd4.getNumPartitions()

1

In [22]:
rdd4.keys().collect()

['a', 'a', 'a', 'b', 'b', 'a', 'b', 'a', 'a', 'b', 'b']

#### reduceByKey

In [23]:
rdd4.reduceByKey(lambda x, y: x + y).collect()

[('a', 6), ('b', 5)]

In [24]:
# groupByKey를 사용했을 때의 예
rdd2\
    .flatMap(lambda x: x.split())\
    .map(lambda x: (x, 1))\
    .groupByKey()\
    .mapValues(sum)\
    .take(3)   

[('Wikipedia', 1), ('Apache', 5), ('Spark', 6)]

In [25]:
# reduceByKey를 사용했을 때의 예
rdd2\
    .flatMap(lambda x: x.split())\
    .map(lambda x: (x, 1))\
    .reduceByKey(lambda x, y : x + y)\
    .take(3)

[('Wikipedia', 1), ('Apache', 5), ('Spark', 6)]

In [26]:
#countByKey를 사용하는 경우의 예
rdd2\
    .flatMap(lambda x : x.split())\
    .map(lambda x : (x, 1))\
    .countByKey()\
    .items()
    

dict_items([('Wikipedia', 1), ('Apache', 5), ('Spark', 6), ('is', 1), ('an', 2), ('open', 1), ('source', 1), ('cluster', 1), ('computing', 1), ('framework.', 1), ('아파치', 4), ('스파크는', 1), ('오픈', 1), ('소스', 1), ('클러스터', 1), ('컴퓨팅', 1), ('프레임워크이다.', 1), ('스파크', 3), ('Originally', 1), ('developed', 1), ('at', 1), ('the', 3), ('University', 1), ('of', 1), ('California,', 1), ("Berkeley's", 1), ('AMPLab,', 1), ('codebase', 1), ('was', 1), ('later', 1), ('donated', 1), ('to', 1), ('Software', 1), ('Foundation,', 1), ('which', 1), ('has', 1), ('maintained', 1), ('it', 1), ('since.', 1), ('provides', 1), ('interface', 1), ('for', 1), ('programming', 1), ('entire', 1), ('clusters', 1), ('with', 1), ('implicit', 1), ('data', 1), ('parallelism', 1), ('and', 1), ('fault-tolerance.', 1)])

#### combineByKey
rdd.combineByKey(createCombiner, mergeValue, mergeCombiner)
- createCombiner : 각 값을 연산에 필요한 형태로 바꾼다.
- mergeValue : accumulator, new value를 인자로 받아, 각 partition별로 연산을 수행한다.
- mergeCombiner : accumulator, accumulator를 인자로 받아, partition간의 연산을 수행한다.

1. reduceByKey를 사용할 수 없을 때 사용합니다.
2. 평균과 합계를 구할 때에도 사용할 수 있습니다.

In [27]:
rdd4.combineByKey(lambda value : (value, 1),
                  lambda x, value : (x[0] + value, x[1] + 1),
                  lambda x, y : (x[0] + y[0], x[1] + y[1]))\
    .collect()

[('a', (6, 6)), ('b', (5, 5))]

21/11/22 20:57:33 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1531100 ms exceeds timeout 120000 ms
21/11/22 20:57:33 WARN SparkContext: Killing executors is not supported by current scheduler.
