In [2]:
from pyspark import SparkConf, SparkContext
# 스파크 환경 설정 객체 생성
conf = SparkConf().setMaster("local").setAppName("21204_01_RDD_API")
spark = SparkContext(conf=conf).getOrCreate()
spark

24/12/04 11:33:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
foods = spark.parallelize(["짜장면", "마라탕", "짬뽕", "떡볶이", "쌀국수", "짬뽕", "짜장면", "짜장면", "짜장면", "라면", "우동", "라면"])
foods

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274

In [4]:
# 모두 메모리게 올리기
foods.collect()

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

['짜장면', '마라탕', '짬뽕', '떡볶이', '쌀국수', '짬뽕', '짜장면', '짜장면', '짜장면', '라면', '우동', '라면']

In [7]:
# 값을 기준으로 카운트 : 음식별 개수 세기
foods.countByValue()

defaultdict(int,
            {'짜장면': 4,
             '마라탕': 1,
             '짬뽕': 2,
             '떡볶이': 1,
             '쌀국수': 1,
             '라면': 2,
             '우동': 1})

In [8]:
# 상위 3개 호출
foods.take(3)

['짜장면', '마라탕', '짬뽕']

In [10]:
# 처음 1개 가져오기
foods.first()

'짜장면'

In [11]:
#RDD 개수
foods.count()

12

In [12]:
# 중복 제거한 목록
# Transformation 연산
fd_dist = foods.distinct()
fd_dist

PythonRDD[11] at RDD at PythonRDD.scala:53

In [15]:
# action 연산 : Result
fd_dist.collect()

['짜장면', '마라탕', '짬뽕', '떡볶이', '쌀국수', '라면', '우동']

In [20]:
# 워커노드에서 실행하는 기능
foods.foreach(lambda x : print(x))

짜장면
마라탕
짬뽕
떡볶이
쌀국수
짬뽕
짜장면
짜장면
짜장면
라면
우동
라면


# Narrow Operation 1:1 연산

* filter() : 데이터셋의 각 요소에 대해 주어진 조건을 만족하는 요소들만 필터링
* map() : 데이터셋의 각 요소에 대해 주어진 함수를 적용하여 새로운 요소로 반환
* flatMap() : 각 요소에 대해 주어진 함수를 적용하여 여러 개의 새로운 요소를 생성하고, 결과를 평면화(flatten) 함.
* sample() : 데이터셋에서 주어진 비율만큼 임의의 요소를 샘플링
* union() : 두 개의 RDD를 합쳐서 새로운 RDD 생성

> 데이터가 변환될 때, 하나의 파티션에서 다른 파티션으로 데이터가 이동되지 않는 연산  
> 단순하고 빠르게 처리됨

In [22]:
sample_rdd = spark.parallelize([1,2,3,4,5])

In [25]:
sample_rdd2 = sample_rdd.map(lambda x :x+2) # transformation

In [26]:
sample_rdd2.collect()

[3, 4, 5, 6, 7]

In [27]:
movies = [
    "그린 북",
    "매트릭스",
    "토이 스토리",
    "캐스트 어웨이",
    "포드 V 페라리",
    "보헤미안 랩소디",
    "빽 투 더 퓨처",
    "반지의 제왕",
    "죽은 시인의 사회"]

In [28]:
moviesRDD = spark.parallelize(movies)

In [29]:
mapMovies = moviesRDD.map(lambda x : x.split(" "))

In [30]:
mapMovies.collect()

[['그린', '북'],
 ['매트릭스'],
 ['토이', '스토리'],
 ['캐스트', '어웨이'],
 ['포드', 'V', '페라리'],
 ['보헤미안', '랩소디'],
 ['빽', '투', '더', '퓨처'],
 ['반지의', '제왕'],
 ['죽은', '시인의', '사회']]

In [33]:
# 단어 빈도 분석에서 많이 활용 (map보다 자주 활용 예정)
flatMapMovies = moviesRDD.flatMap(lambda x:x.split(" "))
flatMapMovies.collect()

['그린',
 '북',
 '매트릭스',
 '토이',
 '스토리',
 '캐스트',
 '어웨이',
 '포드',
 'V',
 '페라리',
 '보헤미안',
 '랩소디',
 '빽',
 '투',
 '더',
 '퓨처',
 '반지의',
 '제왕',
 '죽은',
 '시인의',
 '사회']

In [37]:
filterdMovies = flatMapMovies.filter(lambda x: x!="매트릭스") # 여러개 지정하려면 x not in ["a", "b", ""c]
filterdMovies.collect()

['그린',
 '북',
 '토이',
 '스토리',
 '캐스트',
 '어웨이',
 '포드',
 'V',
 '페라리',
 '보헤미안',
 '랩소디',
 '빽',
 '투',
 '더',
 '퓨처',
 '반지의',
 '제왕',
 '죽은',
 '시인의',
 '사회']

### ➕ 집합 연산 용어로 표현된 RDD 연산

1. **합집합 (Union)**:
   - **RDD 연산**: `union()`
   - **설명**: 두 RDD의 모든 요소를 포함하는 새로운 RDD를 생성합니다.
   - **예시**:
     ```python
     from pyspark import SparkContext

     sc = SparkContext.getOrCreate()

     rdd1 = sc.parallelize([1, 2, 3])
     rdd2 = sc.parallelize([3, 4, 5])
     union_rdd = rdd1.union(rdd2).collect()
     print("Union:", union_rdd)  # 출력: [1, 2, 3, 3, 4, 5]
     ```

2. **교집합 (Intersection)**:
   - **RDD 연산**: `intersection()`
   - **설명**: 두 RDD에 공통으로 포함된 요소만을 추출하여 새로운 RDD를 생성합니다.
   - **예시**:
     ```python
     from pyspark import SparkContext

     sc = SparkContext.getOrCreate()

     rdd1 = sc.parallelize([1, 2, 3])
     rdd2 = sc.parallelize([3, 4, 5])
     intersection_rdd = rdd1.intersection(rdd2).collect()
     print("Intersection:", intersection_rdd)  # 출력: [3]
     ```

3. **차집합 (Difference/Subtraction)**:
   - **RDD 연산**: `subtract()`
   - **설명**: 첫 번째 RDD에 속하고 두 번째 RDD에는 속하지 않는 요소들을 추출하여 새로운 RDD를 생성합니다.
   - **예시**:
     ```python
     from pyspark import SparkContext

     sc = SparkContext.getOrCreate()

     rdd1 = sc.parallelize([1, 2, 3, 4, 5])
     rdd2 = sc.parallelize([4, 5, 6, 7, 8])
     subtract_rdd = rdd1.subtract(rdd2).collect()
     print("Subtract:", subtract_rdd)  # 출력: [1, 2, 3]
     ```

4. **중복 제거 (Distinct)**:
   - **RDD 연산**: `distinct()`
   - **설명**: RDD의 중복된 요소를 제거하여 유일한 요소들만 포함하는 새로운 RDD를 생성합니다.
   - **예시**:
     ```python
     from pyspark import SparkContext

     sc = SparkContext.getOrCreate()

     rdd = sc.parallelize([1, 2, 2, 3, 4, 4, 5])
     distinct_rdd = rdd.distinct().collect()
     print("Distinct:", distinct_rdd)  # 출력: [1, 2, 3, 4, 5]
     ```

### 추가적인 비슷한 연산

5. **데카르트 곱 (Cartesian Product)**:
   - **RDD 연산**: `cartesian()`
   - **설명**: 두 RDD의 모든 가능한 쌍의 조합을 생성합니다.
   - **예시**:
     ```python
     from pyspark import SparkContext

     sc = SparkContext.getOrCreate()

     rdd1 = sc.parallelize([1, 2])
     rdd2 = sc.parallelize(["a", "b"])
     cartesian_rdd = rdd1.cartesian(rdd2).collect()
     print("Cartesian Product:", cartesian_rdd)  # 출력: [(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]
     ```

6. **쌍으로 묶기 (Zip)**:
   - **RDD 연산**: `zip()`
   - **설명**: 두 RDD의 각 요소를 짝지어 튜플로 묶습니다. 두 RDD의 길이가 동일해야 합니다.
   - **예시**:
     ```python
     from pyspark import SparkContext

     sc = SparkContext.getOrCreate()

     rdd1 = sc.parallelize([1, 2, 3])
     rdd2 = sc.parallelize(["a", "b", "c"])
     zip_rdd = rdd1.zip(rdd2).collect()
     print("Zip:", zip_rdd)  # 출력: [(1, 'a'), (2, 'b'), (3, 'c')]
     ```


In [38]:
# 집합 연산
num1 = spark.parallelize([1,2,3,4,5])
num2 = spark.parallelize([4,5,6,7,8,9,10])

In [39]:
# intersection : 교집합
num1.intersection(num2).collect()

[4, 5]

In [43]:
# union : 합집합
num1.union(num2).collect()

[1, 2, 3, 4, 5, 4, 5, 6, 7, 8, 9, 10]

In [44]:
# subtract : 차집합
num1.subtract(num2).collect()

[2, 1, 3]

# sample(withRepLacement, fraction, seed)
일부를 샘플링해서 추출함.

```
* withReplacement : 비복원, 복원 추출 (True / False)
* fraction : 기대값 (샘플링할 비율 : 0~1 사이)
* seed : 난수추출을 위한 시드값-재현 가능 (옵션)
```

In [47]:
numlist = num1.union(num2)
numlist.collect()

[1, 2, 3, 4, 5, 4, 5, 6, 7, 8, 9, 10]

In [69]:
numlist.sample(True, 0.5).collect()

[1, 4, 6, 8, 9]

In [70]:
numlist.sample(True, 0.3).collect()

[5, 4, 8]

In [71]:
numlist.sample(False, 0.7).collect() # 재현이 안되는 샘플

[1, 2, 4, 5, 4, 5, 8, 9, 10]

In [65]:
numlist.sample(True, 0.5, seed=42).collect() # 재현이 가능함

[5, 6, 6]

### ➕ 원하는 개수만큼 샘플링하는 방법


#### 1) 반복적으로 샘플링하여 원하는 개수에 맞출 때까지 시도
```
sampled_data = data.sample(withReplacement=False, fraction=fraction, seed=seed).collect()  
while len(sampled_data) != desired_sample_size:  
    seed += 1 # 시드 값을 변경하여 다른 샘플을 추출  
    sampled_data = data.sample(withReplacement=False, fraction=fraction, seed=seed).collect()  
  
print("Sampled Data (Using While Loop):", sampled_data)  
print("Sampled Data Count:", len(sampled_data))  
```
#### 2) takeSample 매서드 사용

sampled_data_takeSample = data.takeSample(withReplacement=False, num=3, seed=42)  
print("Sampled Data (Using takeSample):", sampled_data_takeSample)


# wide transformation

- 데이터가 여러 파티션으로 이동하여 셔플링이 발생하는 연산
 > 데이터가 여러 파티션으로 이동하면서 네트워크 통신 발생  
 > 일반적으로 비용이 많이 듦

groupby,  
reduce

In [72]:
foods.collect() # 모든 요소를 수집하여 리스트로 반환

['짜장면', '마라탕', '짬뽕', '떡볶이', '쌀국수', '짬뽕', '짜장면', '짜장면', '짜장면', '라면', '우동', '라면']

In [75]:
foodsGroup = foods.groupBy(lambda x:x[0]) # 첫 글자를 기준으로 그룹화
res = foodsGroup.collect() # 그룹화된 결과를 수집하여 키와 리스트 형태로 반환

In [77]:
for (k,v) in res:
    print(k, list(v)) # 각 그룹의 키와 값을 반복문으로 출력

짜 ['짜장면', '짜장면', '짜장면', '짜장면']
마 ['마라탕']
짬 ['짬뽕', '짬뽕']
떡 ['떡볶이']
쌀 ['쌀국수']
라 ['라면', '라면']
우 ['우동']


In [78]:
spark.stop()