In [10]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster('local').setAppName('transform_action')
sc = SparkContext(conf = conf)

In [3]:
# 지금 쓰는 세팅
sc.getConf().getAll()

[('spark.master', 'local'),
 ('spark.app.startTime', '1690546919066'),
 ('spark.app.id', 'local-1690546921789'),
 ('spark.driver.host', '192.168.124.100'),
 ('spark.driver.port', '63927'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.submit.pyFiles', ''),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.name', 'transform_action'),
 ('spark.ui.showConsoleProgress', 'true')]

In [7]:
sc.stop()

In [12]:
sc

In [14]:
# make RDD .parallelize : list -> RDD, .textFile : text -> RDD

sample = sc.parallelize(['a','b','c','d','e','f','a1','b2','c3','d4','e5','f6'])

In [24]:
# Action part - eager execution

sample.collect() 

# 개발, 디버깅 단계에서 사용. 

# PRODUCTION 단계에서는 RDD의 리스트 전부를 가져오기 때문에 낭비가 심해 SPARK를 쓰는 의미가 없어짐 지양

# 기본적으로 spark 는 규모가 매우 큰 데이터를 효율적으로 다루기 위해 사용하는 것인데,

# .collect()의 결과를 보면 N 의 시간복잡도를 가지는 메소드를 production 단계에서 사용하는 것은 지양해야한다고 생각. 

['a', 'b', 'c', 'd', 'e', 'f', 'a1', 'b2', 'c3', 'd4', 'e5', 'f6']

In [25]:
sample.countByValue()

# key 와 value 상태로 개수를 count

defaultdict(int,
            {'a': 1,
             'b': 1,
             'c': 1,
             'd': 1,
             'e': 1,
             'f': 1,
             'a1': 1,
             'b2': 1,
             'c3': 1,
             'd4': 1,
             'e5': 1,
             'f6': 1})

In [18]:
sample.take(5)

# index 순으로 element를 보여줌

['a', 'b', 'c', 'd', 'e']

In [19]:
sample.first() 

# RDD 내 첫 번째 element 보여줌

'a'

In [20]:
sample.count()

# element의 총 개수를 알려줌

12

In [21]:
sample.distinct()

# distinct 는 transformation 이기에 바로 확인할 수 없고 action을 섞어 써야보임

PythonRDD[11] at RDD at PythonRDD.scala:53

In [22]:
sample.distinct().collect()

# sql의 distinct 와 같은 기능

['a', 'b', 'c', 'd', 'e', 'f', 'a1', 'b2', 'c3', 'd4', 'e5', 'f6']

In [23]:
sample.foreach(lambda x: print(x))

# foreach 는 worker node 에서 실행됨

# spark context가 있는 driver 에서는 볼 수 없음

# log 저장 등에 유리

In [None]:
# Transformations part - lazy execution

# Transformations = narrow + wide

# 1. narrow transformation

# 1:1 변환, .filter(), .map(), .flatmap(), .sample(), .union()

# 열을 조작하기 위해 다른 열 / 파티션의 데이터를 사용하지 않음

# 정렬이 필요하지 않은 경우 <정렬은 다른 열의 데이터를 사용>

# 2. wide transformation

# resource를 비교적 많이 필요로 함.

# shuffling, intersection, join, distinct, cartesian, .reduceBykey(), groupBykey()

# 아웃풋 RDD의 파티션에 조건,동작시킨 행위에 따라 다른 데이터가 들어갈 수 있게됨.

# 위에 action part 에서 말했던 것처럼, action 에 해당하는 메소드들은 client로 가져오기 때문에

# python object 로 처리할 수 있지만, transformation 에 해당하는 메소드들은 RDD를 반환한다.

In [28]:
# narrow t 1. .map()

sample.map(lambda x: x+'e')

PythonRDD[21] at RDD at PythonRDD.scala:53

In [29]:
sample.map(lambda x: x+'e').take(3)

['ae', 'be', 'ce']

In [30]:
# narrow t 2. .flatmap()

corpus = ["yes yes", 'No No', "Yes No", 'No yes']

corpusRDD = sc.parallelize(corpus)

corpusRDD.flatMap(lambda x:x.split(" "))

PythonRDD[24] at RDD at PythonRDD.scala:53

In [32]:
corpusRDD.flatMap(lambda x:x.split(" ")).collect()

# 리스트 내부의 값을 어떤 기준에 따라 element를 늘릴 수 있도록 함

['yes', 'yes', 'No', 'No', 'Yes', 'No', 'No', 'yes']

In [38]:
data = [[1, 2, 3], [4, 5], [6, 7, 8]]
dataRDD = sc.parallelize(data)

result = dataRDD.flatMap(lambda x: [i * 2 for i in x])
result.collect()

# 아래의 결과와 같이 몇 겹의 리스트여도, 한 개의 리스트에 한 개의 element 화 하는 것을 알 수 있음.

[2, 4, 6, 8, 10, 12, 14, 16]

In [39]:
# narrow t 2. .filter()


doubled_corpusRDD = corpusRDD.flatMap(lambda x:x.split(" "))

doubled_corpusRDD.filter(lambda x:x != 'yes')

PythonRDD[33] at RDD at PythonRDD.scala:53

In [40]:
doubled_corpusRDD.filter(lambda x:x != 'yes').collect()

# 설명 생략

['No', 'No', 'Yes', 'No', 'No']

In [46]:
# wide t 1. .intersection() 

# 입력 RDD들 간에 데이터의 재분배를 수행하기 때문.

# resource 가 많이 필요함.

a = [1,2,3,4,5,6]
b = [5,6,7,8,9,10]

RDD1 = sc.parallelize(a)
RDD2 = sc.parallelize(b)

RDD1, RDD2

(ParallelCollectionRDD[53] at readRDDFromFile at PythonRDD.scala:274,
 ParallelCollectionRDD[54] at readRDDFromFile at PythonRDD.scala:274)

In [49]:
RDD1.intersection(RDD2).collect() # 교집합

[6, 5]

In [50]:
# wide t 2. .union() 

# 입력 RDD들 간에 데이터의 재분배를 수행하기 때문.

RDD1.union(RDD2).collect() # 합집합

[1, 2, 3, 4, 5, 6, 5, 6, 7, 8, 9, 10]

In [52]:
# wide t 3. .substract() 

RDD1.subtract(RDD2).collect() #RDD1 과 RDD2의 intersection 값을 제외한 나머지

[2, 4, 1, 3]

In [55]:
# narrow t 3. .sample() 

# sample(1. True or False<복원, 비복원 추출>, 2. %<얼마나>, 3. seed 값 설정)

RDD12 = RDD1.union(RDD2)

RDD12.sample(True, .5).collect()

[5, 6, 6, 10]

In [58]:
# wide t 3. .groupBy()

# 데이터를 새롭게 분배시켜 만든 RDD 

corpusRDD.groupBy(lambda x:x[0])

PythonRDD[108] at RDD at PythonRDD.scala:53

In [60]:
test = corpusRDD.groupBy(lambda x:x[0]).collect()
test

[('y', <pyspark.resultiterable.ResultIterable at 0x16e40b5a910>),
 ('N', <pyspark.resultiterable.ResultIterable at 0x16e40b5a9d0>),
 ('Y', <pyspark.resultiterable.ResultIterable at 0x16e40b5aa60>)]

In [65]:
for (k, v) in test:
    print(f"key is :{k}, value is : {v}")

# 그냥 v 를 쓰면 iterable한 pyspark result 기 때문에 python object로 바꿔주자

key is :y, value is : <pyspark.resultiterable.ResultIterable object at 0x0000016E40B5A910>
key is :N, value is : <pyspark.resultiterable.ResultIterable object at 0x0000016E40B5A9D0>
key is :Y, value is : <pyspark.resultiterable.ResultIterable object at 0x0000016E40B5AA60>


In [66]:
for (k, v) in test:
    print(f"key is :{k}, value is : {list(v)}")

key is :y, value is : ['yes yes']
key is :N, value is : ['No No', 'No yes']
key is :Y, value is : ['Yes No']


In [71]:
RDD12.groupBy(lambda x:x%2).collect()

# 나누어 떨어진 것과 1이 남은 것을 알 수 있음.

[(0, <pyspark.resultiterable.ResultIterable at 0x16e40b5b9d0>),
 (1, <pyspark.resultiterable.ResultIterable at 0x16e40b6e160>)]

In [74]:
list(RDD12.groupBy(lambda x:x%2).collect()[0][1])

# 위에서 말했던 iterable 한 RDD result 를 python object 로

# 나누어 떨어진 애들

[2, 4, 6, 6, 8, 10]

In [75]:
list(RDD12.groupBy(lambda x:x%2).collect()[1][1])

# 안되는 애들

[1, 3, 5, 5, 7, 9]