<a href="https://colab.research.google.com/github/GaGyeong-Kim/GaGyeong-Kim/blob/main/sparkRDD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# mount Drive to access data files

from google.colab import drive
drive.mount('./mount')

Mounted at ./mount


In [2]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 51 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 61.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=d352948c1211e471a194667718f3a390793456af0f028047ec256e5d7a4b4c64
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [4]:
from pyspark import SparkConf, SparkContext

# SparkContext 초기화
conf = SparkConf().setMaster("local").setAppName("transformations_actions")
# 객체 생성
sc = SparkContext(conf = conf)

In [5]:
# RDD 생성
foods = sc.parallelize(["짜장면","마라탕","짬뽕","떡볶이","쌀국수","짬뽕","짜장면","짜장면","짜장면","라면","우동","라면"])

In [7]:
foods.collect()

['짜장면', '마라탕', '짬뽕', '떡볶이', '쌀국수', '짬뽕', '짜장면', '짜장면', '짜장면', '라면', '우동', '라면']

In [8]:
foods.countByValue()

defaultdict(int,
            {'짜장면': 4,
             '마라탕': 1,
             '짬뽕': 2,
             '떡볶이': 1,
             '쌀국수': 1,
             '라면': 2,
             '우동': 1})

In [10]:
foods.take(3)
foods.take(5)

['짜장면', '마라탕', '짬뽕', '떡볶이', '쌀국수']

In [11]:
foods.first()

'짜장면'

In [14]:
foods.count()

12

In [15]:
foods.distinct().count()

7

In [16]:
# Native Data 타입=>Actions의 결과, int
print(type(foods.count()))
print(type(foods.distinct().count()))

# RDD타입 => Transformations의 결과
print(type(foods.distinct()))

<class 'int'>
<class 'int'>
<class 'pyspark.rdd.PipelinedRDD'>


In [17]:
# 값 리턴을 하지 않음
# Worker node에서 실행됨
# 현재 SparkContext가 있는 Driver Program에 있기 때문에 실행 안됨
foods.foreach(lambda x:print(x))

In [18]:
# movie RDD 생성
movies =[
	"그린 북",
	"매트릭스",
	"토이 스토리",
	"캐스트 어웨이",
	"포드 V 페라리",
	"보헤미안 랩소디",
	"빽 투 더 퓨처",
	"반지의 제왕",
	"죽은 시인의 사회"
]

moviesRDD = sc.parallelize(movies)

In [19]:
flatMovies = moviesRDD.flatMap(lambda x : x.split(" "))
flatMovies.collect()

['그린',
 '북',
 '매트릭스',
 '토이',
 '스토리',
 '캐스트',
 '어웨이',
 '포드',
 'V',
 '페라리',
 '보헤미안',
 '랩소디',
 '빽',
 '투',
 '더',
 '퓨처',
 '반지의',
 '제왕',
 '죽은',
 '시인의',
 '사회']

In [20]:
filteredMovies = moviesRDD.filter(lambda x : x!="매트릭스")
filteredMovies.collect()

['그린 북',
 '토이 스토리',
 '캐스트 어웨이',
 '포드 V 페라리',
 '보헤미안 랩소디',
 '빽 투 더 퓨처',
 '반지의 제왕',
 '죽은 시인의 사회']

In [21]:
# RDD 생성
num1 = sc.parallelize([1,2,3,4,5])
num2 = sc.parallelize([4,5,6,7,8,9,10])

In [22]:
num1.union(num2).collect()

[1, 2, 3, 4, 5, 4, 5, 6, 7, 8, 9, 10]

In [23]:
num1.subtract(num2).collect()

[2, 1, 3]

In [24]:
numUnion = num1.union(num2)
numUnion.sample(True, .5, seed=2).collect()

[1, 1, 2, 4, 5, 5, 4, 5, 6, 7, 9]

In [25]:
# RDD 생성
rdd1 = sc.parallelize(['안녕 보아즈', 'BOAZ', '스파크', '엔지니어링 화이팅'])

print(rdd1.map(lambda x:x.split(' ')).collect())

# 1차원 반환
print(rdd1.flatMap(lambda x:x.split(' ')).collect())

[['안녕', '보아즈'], ['BOAZ'], ['스파크'], ['엔지니어링', '화이팅']]
['안녕', '보아즈', 'BOAZ', '스파크', '엔지니어링', '화이팅']


In [26]:
foodsGroup = foods.groupBy(lambda x : x[0])
res = foodsGroup.collect()

In [27]:
for (k,v) in res:
  print(k, list(v))

짜 ['짜장면', '짜장면', '짜장면', '짜장면']
마 ['마라탕']
짬 ['짬뽕', '짬뽕']
떡 ['떡볶이']
쌀 ['쌀국수']
라 ['라면', '라면']
우 ['우동']


In [28]:
num1.intersection(num2).collect()

[4, 5]

In [29]:
from operator import add

print(sc.parallelize([1,2,3,4,5]).reduce(add))


# 파티션에 따라 결과값이 달라지기도 함

# 파티션 1개
print(sc.parallelize([1,2,3,4],1).reduce(lambda x,y : (x*2)+y))
# 파티션 2개
print(sc.parallelize([1,2,3,4],2).reduce(lambda x,y : (x*2)+y))

15
26
18


In [30]:
# 파티션이 4개인 RDD -> 값이 3가지라 하나의 빈 파티션이 발생
rdd = sc.parallelize([2,3,4],4)

# 24 = 2*3*4
print(rdd.reduce(lambda x,y : x*y))

# zeroValue값이 0
# 0 = 0*2*3*4
print(rdd.fold(0, lambda x,y : x*y))

# 24 = 1*2*3*4
print(rdd.fold(1, lambda x,y:x*y))

24
0
24


In [31]:
rdd = sc.parallelize([1,1,2,3,5,8])
result = rdd.groupBy(lambda x : x%2).collect()

# 결과
sorted([(x, sorted(y)) for (x,y) in result])

[(0, [2, 8]), (1, [1, 1, 3, 5])]

In [32]:
seqOp = (lambda x,y:(x[0] + y, x[1] + 1))
combOp = (lambda x,y:(x[0] + y[0], x[1] + y[1]))
sc.parallelize([1,2,3,4]).aggregate((0,0), seqOp, combOp)

(10, 4)

In [33]:
x = sc.parallelize([("MATH",7),("MATH",),("ENGLISH",7),("SCIENCE",7),("ENGLISH",4),("ENGLISH",9),
("MATH",8),("MATH",3),("ENGLISH",4),("SCIENCE",6),("SCIENCE",9),("SCIENCE",5)], 3)

y = x.groupByKey()

print(y.getNumPartitions())

3


In [34]:
y = x.groupByKey(2)
print(y.getNumPartitions())

2


In [35]:
from operator import add, mul


rdd = sc.parallelize([("BOAZ",2),("ENGINEERING",3),("BOAZ",5)])
print(sorted(rdd.reduceByKey(add).collect()))
print(sorted(rdd.reduceByKey(mul).collect()))

[('BOAZ', 7), ('ENGINEERING', 3)]
[('BOAZ', 10), ('ENGINEERING', 3)]


In [36]:
x = sc.parallelize([("a", ["HELLO","BOAZ","WELCOME"]), ("b", ["ENGINEERING"])])

def f(x) : return len(x)

# 리스트의 길이 리턴
x.mapValues(f).collect()

[('a', 3), ('b', 1)]

In [37]:
x = sc.parallelize([("MEOW", 1), ("BOWWOW", 1), ("MEOW",1)])
sorted(x.countByKey().items())

[('BOWWOW', 1), ('MEOW', 2)]

In [38]:
# 예제 1
m = sc.parallelize([(1,2),(3,4)]).keys()
m.collect()

[1, 3]

In [39]:
# 예제 2
x = sc.parallelize([("MATH",7),("MATH",2),("ENGLISH",7),("SCIENCE",7),("ENGLISH",4),("ENGLISH",9),("MATH",8),("MATH",3),("ENGLISH",4),("SCIENCE",6),("SCIENCE",9),("SCIENCE",5)], 3)
x.keys().distinct().count()

3

In [40]:
rdd1 = sc.parallelize([("fish",1), ("kitten", 2), ("puppy", 3)])
rdd2 = sc.parallelize([("fish",4), ("kitten", 5), ("kitten", 6), ("pet", 1)])

print(rdd1.join(rdd2).collect())
print(rdd1.leftOuterJoin(rdd2).collect())
print(rdd1.rightOuterJoin(rdd2).collect())

[('fish', (1, 4)), ('kitten', (2, 5)), ('kitten', (2, 6))]
[('fish', (1, 4)), ('puppy', (3, None)), ('kitten', (2, 5)), ('kitten', (2, 6))]
[('fish', (1, 4)), ('kitten', (2, 5)), ('kitten', (2, 6)), ('pet', (None, 1))]


In [41]:
# 사용 후 종료!

sc.stop()