# Key-Values RDD

In [1]:
from pyspark import SparkConf, SparkContext


conf = SparkConf().setMaster("local").setAppName("category-review-average") # 로컬 환경과 앱 이름 지정
sc = SparkContext(conf=conf) # 컨텍스트 초기화
# 막간 에러: 자바 로컬 환경 변수를 변경해도 conda 가상환경 내 버전이 변경이 되지 않아 conda로 재인스톨하여 해결하였다.
# !conda install -c anaconda openjdk

21/12/11 16:56:52 WARN Utils: Your hostname, 6miniui-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.200.112 instead (on interface en0)
21/12/11 16:56:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/11 16:56:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
lines = sc.textFile("/Users/6mini/spark/res.csv")
lines.collect() # 간단한 액션



['id,item,category,reviews',
 '0,짜장면,중식,125',
 '1,짬뽕,중식,235',
 '2,김밥,분식,32',
 '3,떡볶이,분식,534',
 '4,라멘,일식,223',
 '5,돈가스,일식,52',
 '6,우동,일식,12',
 '7,쌀국수,아시안,312',
 '8,햄버거,패스트푸드,12',
 '9,치킨,패스트푸드,23']

In [3]:
header = lines.first() # 헤더 추출
filtered_lines = lines.filter(lambda row: row != header)
filtered_lines.collect()

['0,짜장면,중식,125',
 '1,짬뽕,중식,235',
 '2,김밥,분식,32',
 '3,떡볶이,분식,534',
 '4,라멘,일식,223',
 '5,돈가스,일식,52',
 '6,우동,일식,12',
 '7,쌀국수,아시안,312',
 '8,햄버거,패스트푸드,12',
 '9,치킨,패스트푸드,23']

In [4]:
def parse(row): # 카테고리와 리뷰 수 만을 파싱하는 함수
    fields = row.split(",")
    category = fields[2]
    reviews = int(fields[3])
    return (category, reviews) # KV RDD를 위해 튜플 형태로 두가지 리턴

category_reviews = filtered_lines.map(parse) # KV RDD 생성
category_reviews.collect()

[('중식', 125),
 ('중식', 235),
 ('분식', 32),
 ('분식', 534),
 ('일식', 223),
 ('일식', 52),
 ('일식', 12),
 ('아시안', 312),
 ('패스트푸드', 12),
 ('패스트푸드', 23)]

In [6]:
category_reviews_count = category_reviews.mapValues(lambda x: (x, 1)) # 각 카테고리마다 값 하나를 추가
category_reviews_count.collect()

[('중식', (125, 1)),
 ('중식', (235, 1)),
 ('분식', (32, 1)),
 ('분식', (534, 1)),
 ('일식', (223, 1)),
 ('일식', (52, 1)),
 ('일식', (12, 1)),
 ('아시안', (312, 1)),
 ('패스트푸드', (12, 1)),
 ('패스트푸드', (23, 1))]

In [7]:
reduced = category_reviews_count.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) # 카테고리의 수와 리뷰 수의 총 합
reduced.collect()



[('중식', (360, 2)),
 ('분식', (566, 2)),
 ('일식', (287, 3)),
 ('아시안', (312, 1)),
 ('패스트푸드', (35, 2))]

In [8]:
averages = reduced.mapValues(lambda x: x[0] / x[1]) # 평균
averages.collect()

[('중식', 180.0),
 ('분식', 283.0),
 ('일식', 95.66666666666667),
 ('아시안', 312.0),
 ('패스트푸드', 17.5)]

# RDD Transformatieons & Actions

In [2]:
from pyspark import SparkConf, SparkContext


# 스파크 컨텍스트 생성
conf = SparkConf().setMaster("local").setAppName("transformations_actions")
sc = SparkContext(conf = conf)
sc.getConf().getAll() # 설정 리스트 확인

21/12/11 17:17:04 WARN Utils: Your hostname, 6miniui-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.200.112 instead (on interface en0)
21/12/11 17:17:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/11 17:17:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


[('spark.master', 'local'),
 ('spark.app.id', 'local-1639210625743'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.port', '64503'),
 ('spark.submit.pyFiles', ''),
 ('spark.executor.id', 'driver'),
 ('spark.app.name', 'transformations_actions'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.host', '192.168.200.112'),
 ('spark.app.startTime', '1639210624537')]

In [3]:
# sc.stop() # 스파크 컨텍스트 중단
# sc = SparkContext(conf=conf) # 다시 생성하여야 사용 가능

foods = sc.parallelize(["짜장면", "마라탕", "짬뽕", "떡볶이", "쌀국수", "짬뽕", "짜장면", "짜장면", "짜장면",  "라면", "우동", "라면"]) # 텍스트파일 함수와 같이 RDD를 생성
foods

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274

In [4]:
# RDD 데이터 확인
# 벨류를 모두 가져올 수 있는데 디버깅이나 개발 환경에서만 사용해야하고 실제 프로덕트 상황에서 지양해야한다.
# 데이터를 모두 가져오기 때문에 너무 낭비가 되기 때문에 스파크를 쓰는 의미가 없어진다.
foods.collect()



['짜장면', '마라탕', '짬뽕', '떡볶이', '쌀국수', '짬뽕', '짜장면', '짜장면', '짜장면', '라면', '우동', '라면']

In [5]:
foods.countByValue() # 벨류 값 계산하여 키 벨류 형태로 전시

defaultdict(int,
            {'짜장면': 4,
             '마라탕': 1,
             '짬뽕': 2,
             '떡볶이': 1,
             '쌀국수': 1,
             '라면': 2,
             '우동': 1})

In [6]:
foods.take(3) # 3개의 엘리먼트만 전시

['짜장면', '마라탕', '짬뽕']

In [7]:
foods.first() # 첫 값 전시

'짜장면'

In [8]:
foods.count() # 엘리먼트 갯수 전시

12

In [9]:
foods.distinct().count() # 엘리먼트 갯수 유니크하여 전시



7

In [None]:
# foreach: 요소를 하나하나 꺼내어 하나의 함수를 작용시키는데 쓰인다.
foods.foreach(lambda x: print(x))
# 실행시켜도 리턴값은 없지만, 워커 노드에서 실행이 되기 때문에 현재 드라이버 프로그램에서는 보이지 않는다.
# RDD를 연산하거나 로그를 세이브할 때 유용하게 쓰인다.


 

# Transformation

In [10]:
sc.parallelize([1, 2, 3]).map(lambda x: x + 2).collect() # 새로운 RDD 생성

[3, 4, 5]

In [29]:
movies = [
    "그린 북",
    "매트릭스",
    "토이 스토리",
    "캐스트 어웨이",
    "포드 V 페라리",
    "보헤미안 랩소디",
    "빽 투 더 퓨처",
    "반지의 제왕",
    "죽은 시인의 사회"
]

movies_rdd = sc.parallelize(movies)
movies_rdd.collect()

['그린 북',
 '매트릭스',
 '토이 스토리',
 '캐스트 어웨이',
 '포드 V 페라리',
 '보헤미안 랩소디',
 '빽 투 더 퓨처',
 '반지의 제왕',
 '죽은 시인의 사회']

In [30]:
# 리스트를 펼쳐 보는 네로우 트랜스포메이션
flat_movies = movies_rdd.flatMap(lambda x: x.split(" ")) # 어떻게 나눌 지 지정
flat_movies.collect()

['그린',
 '북',
 '매트릭스',
 '토이',
 '스토리',
 '캐스트',
 '어웨이',
 '포드',
 'V',
 '페라리',
 '보헤미안',
 '랩소디',
 '빽',
 '투',
 '더',
 '퓨처',
 '반지의',
 '제왕',
 '죽은',
 '시인의',
 '사회']

In [31]:
filtered_movies = flat_movies.filter(lambda x: x != "매트릭스") # 필터링
filtered_movies.collect()

['그린',
 '북',
 '토이',
 '스토리',
 '캐스트',
 '어웨이',
 '포드',
 'V',
 '페라리',
 '보헤미안',
 '랩소디',
 '빽',
 '투',
 '더',
 '퓨처',
 '반지의',
 '제왕',
 '죽은',
 '시인의',
 '사회']

In [14]:
# 집합과 관련된 트랜스포메이션
num1 = sc.parallelize([1, 2, 3, 4])
num2 = sc.parallelize([4, 5, 6, 7, 8, 9, 10])
num1.intersection(num2).collect()



[4]

In [15]:
num1.union(num2).collect() # 모든 값이 합쳐진다.

[1, 2, 3, 4, 4, 5, 6, 7, 8, 9, 10]

In [16]:
num1.subtract(num2).collect() # 중복된 데이터가 사라진다.

[2, 1, 3]

In [32]:
num_union = num1.union(num2)
num_union.sample(True, .5, seed=6).collect() # True: 리샘플링 여부 / .5: 샘플링 할 비율

[2, 10]

In [34]:
# 와이드 트랜스포메이션
foods = sc.parallelize(["짜장면", "마라탕", "짬뽕", "떡볶이", "쌀국수", "짬뽕", "짜장면", "짜장면", "짜장면",  "라면", "우동", "라면", "치킨", "돈까스", "회", "햄버거", "피자"])
foods_group = foods.groupBy(lambda x: x[0]) # 그룹핑
res = foods_group.collect()
for (k, v) in res:
    print(k, list(v))

짜 ['짜장면', '짜장면', '짜장면', '짜장면']
마 ['마라탕']
짬 ['짬뽕', '짬뽕']
떡 ['떡볶이']
쌀 ['쌀국수']
라 ['라면', '라면']
우 ['우동']
치 ['치킨']
돈 ['돈까스']
회 ['회']
햄 ['햄버거']
피 ['피자']


In [35]:
nums = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
list(nums.groupBy(lambda x: x % 2).collect()[1][1])

[2, 4, 6, 8, 10]