In [1]:
from pyspark import SparkConf, SparkContext

In [2]:
conf = SparkConf().setMaster("local").setAppName("transformations_actions")

In [3]:
sc = SparkContext(conf = conf)

In [4]:
sc.getConf().getAll()

[('spark.master', 'local'),
 ('spark.driver.port', '51402'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.submit.pyFiles', ''),
 ('spark.app.startTime', '1636081026534'),
 ('spark.executor.id', 'driver'),
 ('spark.app.name', 'transformations_actions'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', '172.30.1.34'),
 ('spark.app.id', 'local-1636081027829'),
 ('spark.ui.showConsoleProgress', 'true')]

In [5]:
sc.stop()

In [None]:
# sc를 stop하면 Nonetype
type(sc)

In [9]:
# sc는 하나만 띄울 수 있다 (여러 개를 동시에 띄우려 하면 에러 발생)
sc = SparkContext(conf=conf)

In [10]:
# parallelize : list로부터 RDD를 만든다 (textfile은 텍스트 파일로부터 RDD를 만든다)
foods = sc.parallelize(["짜장면", "마라탕", "짬뽕", "떡볶이", "쌀국수", "짬뽕", "짜장면", "짜장면", "짜장면",  "라면", "우동", "라면"])

In [11]:
foods

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274

In [12]:
# collect : RDD 안에 있는 values를 모두 가져올 수 있는데 list를 그대로 가져오기 때문에 개발환경이나 디버깅하면서 사용
# 실제 production 환경에서는 사용하지 않아야 함(data를 모두 가져오기 때문에 낭비가 심하고, spark를 사용하는 이유가 없어지는 것)
foods.collect()

['짜장면', '마라탕', '짬뽕', '떡볶이', '쌀국수', '짬뽕', '짜장면', '짜장면', '짜장면', '라면', '우동', '라면']

In [13]:
foods.countByValue()

defaultdict(int,
            {'짜장면': 4,
             '마라탕': 1,
             '짬뽕': 2,
             '떡볶이': 1,
             '쌀국수': 1,
             '라면': 2,
             '우동': 1})

In [14]:
foods.take(3)

['짜장면', '마라탕', '짬뽕']

In [15]:
foods.take(5)

['짜장면', '마라탕', '짬뽕', '떡볶이', '쌀국수']

In [16]:
foods.first()

'짜장면'

In [17]:
# numbers of total elements
foods.count()

12

In [20]:
# distinct는 중복된 데이터를 제거하는 transformation
# foods.distinct().collect()
foods.distinct().count()

7

In [21]:
# foreach : 요소들을 하나씩 꺼내 하나의 함수를 적용시키는 action

In [24]:
# 현재는 sc가 있는 diver program
# foreach는 action이기 때문에 worker node에서 실행된다 -> RDD에 연산을 하고 LOG는 어딘가에 저장되어 있다.
foods.foreach(lambda x: print(x))

In [None]:
# 파일을 어딘가에 저장
foods.foreach(lambda x: save(x))

In [47]:
# transformation으로 바로 RDD가 만들어지는 게 아니라, action할 때 한꺼번에 만들어짐 (DAG : transformation이 순서대로 수행되도록 하는 순환되지 않는 그래프)
sc.parallelize([1, 2, 3]).map(lambda x: x + 2).collect()

[3, 4, 5]

In [48]:
sc.parallelize([1, 2, 3]).map(lambda x: x * 2).collect()

[2, 4, 6]

## narrow transformation 
- 1:1 변환
- 1열을 조작하기 위해 다른 열/파티션의 데이터를 쓸 필요 없음
- 정렬(비교를 위해 다른 열을 참고해 많은 통신이 발생)이 필요하지 않은 경우

In [49]:
movies = [
    "그린 북",
    "매트릭스",
    "토이 스토리",
    "캐스트 어웨이",
    "포드 V 페라리",
    "보헤미안 랩소디",
    "빽 투 더 퓨처",
    "반지의 제왕",
    "죽은 시인의 사회"
]

In [50]:
moviesRDD = sc.parallelize(movies)

In [51]:
moviesRDD.collect()

['그린 북',
 '매트릭스',
 '토이 스토리',
 '캐스트 어웨이',
 '포드 V 페라리',
 '보헤미안 랩소디',
 '빽 투 더 퓨처',
 '반지의 제왕',
 '죽은 시인의 사회']

In [52]:
flatMovies = moviesRDD.flatMap(lambda x: x.split(" "))

In [54]:
flatMovies.collect()

['그린',
 '북',
 '매트릭스',
 '토이',
 '스토리',
 '캐스트',
 '어웨이',
 '포드',
 'V',
 '페라리',
 '보헤미안',
 '랩소디',
 '빽',
 '투',
 '더',
 '퓨처',
 '반지의',
 '제왕',
 '죽은',
 '시인의',
 '사회']

In [55]:
filteredMovies = flatMovies.filter(lambda x: x != "매트릭스")

In [56]:
filteredMovies.collect()

['그린',
 '북',
 '토이',
 '스토리',
 '캐스트',
 '어웨이',
 '포드',
 'V',
 '페라리',
 '보헤미안',
 '랩소디',
 '빽',
 '투',
 '더',
 '퓨처',
 '반지의',
 '제왕',
 '죽은',
 '시인의',
 '사회']

In [57]:
num1 = sc.parallelize([1, 2, 3, 4])
num2 = sc.parallelize([4, 5, 6, 7, 8, 9, 10])

In [58]:

num1.intersection(num2).collect()

[4]

In [59]:
num1.union(num2).collect()

[1, 2, 3, 4, 4, 5, 6, 7, 8, 9, 10]

In [60]:
num1.subtract(num2).collect()

[2, 1, 3]

In [61]:
numUnion = num1.union(num2)

In [62]:
numUnion.collect()

[1, 2, 3, 4, 4, 5, 6, 7, 8, 9, 10]

In [63]:
# numUnion.sample(T)
# [1, 2, 3, 4, 4, 5, 6, 7, 8, 9, 10]

# 1st parameter : with replacement=True : 복원 추출
# 2nd parameter : 몇 % sampling할 것인가? : list elements의 50%를 sampling
numUnion.sample(True, .5).collect()


[3, 4, 5, 6, 8, 9]

In [76]:
# 3rd parameter : 확률적으로 random sampling 되는 list를 고정하고 싶을 때 seed 사용
numUnion.sample(True, .5, seed=5).collect()


[1, 4, 6, 9, 9, 10]

## wide transformation 
- shuffling(output RDD의 파티션에 다른 파티션의 데이터가 들어갈 수 있음)
- 통신 발생으로 많은 리소스 요구 -> 최소화 OR 최적화 해서 좋은 성능을 끌어내야 함

In [77]:
foods = sc.parallelize(["짜장면", "마라탕", "짬뽕", "떡볶이", "쌀국수", "짬뽕", "짜장면", "짜장면", "짜장면",  "라면", "우동", "라면", "치킨", "돈까스", "회", "햄버거", "피자"])

In [78]:
# groupBy Transformation

In [79]:
foodsGroup = foods.groupBy(lambda x: x[0])

In [81]:
res = foodsGroup.collect()

In [84]:
# groupby 활용 1 : filtering
for (k, v) in res:
    print(k, list(v))

짜 ['짜장면', '짜장면', '짜장면', '짜장면']
마 ['마라탕']
짬 ['짬뽕', '짬뽕']
떡 ['떡볶이']
쌀 ['쌀국수']
라 ['라면', '라면']
우 ['우동']
치 ['치킨']
돈 ['돈까스']
회 ['회']
햄 ['햄버거']
피 ['피자']


In [85]:
nums = sc.parallelize([1,2,3,4,5,6,7,8,9,10])

In [None]:
# groupby 활용 2 : 수학적 계산
nums.groupBy(lambda x: x % 2).collect()

In [89]:
list(nums.groupBy(lambda x: x % 2).collect()[1][1])

[2, 4, 6, 8, 10]