# RDD 연산 - 1
- Transformation 과 Action 으로 구분됨
    - Transformation : RDD는 immutable (불변) 특성을 가짐.  변겨을 위해서는 새로운 RDD를 생성하고 변경
    - Action : RDD의 내용을 조회, 저장
- Sparse 연산 : RDD의 개별 레코드를 변경할 수 없음. 모든 레코드에 대해서 동일하게 적용되는 연산. 리니지를 단순화하기 위해. 
- Lazy Evaluation 또는 Lazy Execution : 
    - Transformation 연산은 바로 실행되지 않고, parsing 만 됨. 
    - Action이 수행될때 Action에 필요한 RDD를 만들기 위한 Transformation 이 수행됨. 
    - Driver는 Action 연산을 실행하기 위해 RDD 생성 DAG를 만들고 최적화 <- 작업양 최소화 및 Shuffling시 데이터 이동 최소화


In [1]:
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.master("local").appName("rdd-op-test1").getOrCreate()

In [2]:
rdd1 = spark_session.sparkContext.parallelize([0,1,2,3,4,5,6,7,8,9])
rdd2 = rdd1.filter(lambda x: x%2)

In [3]:
rdd2.collect()

[1, 3, 5, 7, 9]

# RDD 지속성 및 재사용
    - RDD는 익스큐터(executor)의 메모리에 생성됨
    - 생성된 RDD는 더아상 참조되지 않으면 (사용되지 않으면) 삭제됨.
    - 이후 Action연산이 수행되기 위해 다시 RDD를 생성해야 함. 
    - Persist()를 이용하면 메모리에 지속됨

In [4]:
prdd1 = spark_session.sparkContext.parallelize([0,1,2,3,4,5,6,7,8,9])
prdd2 = rdd1.filter(lambda x: x%2)
prdd2.persist()

PythonRDD[3] at RDD at PythonRDD.scala:56

In [5]:
rdd2.collect()

[1, 3, 5, 7, 9]

# RDD의 유형
- **PairRDD**: (Key, Value) 쌍으로 구성된 RDD. `reduceByKey`, `join` 등 키 기반 연산에 특화.
- **DoubleRDD**: 숫자(Double) 데이터로 구성된 RDD. `mean`, `sum`, `stdev` 등 통계 연산 제공.
- **DataFrame**: 테이블 형식의 구조화된 데이터 처리를 위한 RDD 기반 API. 스키마와 Catalyst 옵티마이저를 통해 성능 최적화.
- **SequenceFileRDD**: 하둡의 (Key, Value) 이진 파일 형식인 SequenceFile을 다루는 RDD.
- **HadoopRDD / NewHadoopRDD**: HDFS 등 하둡 데이터 소스를 읽을 때 생성. (New는 개선된 MapReduce API 사용)
- **CoGroupRDD**: 여러 PairRDD를 동일 키 기준으로 그룹화할 때 생성. `cogroup` 연산의 결과.
- **JdbcRDD**: JDBC를 통해 관계형 데이터베이스에서 데이터를 병렬로 읽을 때 생성.
- **PartitioningPruningRDD**: 데이터 파티션 정보를 이용, 불필요한 파티션 스캔을 건너뛰어 성능을 최적화하는 RDD.
- **ShuffledRDD**: `groupByKey` 등 데이터 재분배(셔플)가 발생하는 연산의 결과로 생성되는 중간 RDD.
- **UnionRDD**: 둘 이상의 RDD를 하나로 통합할 때 생성. 각 부모 RDD의 파티션을 유지하며 논리적으로 연결.

# 기본 RDD Transportation 연산

- `map(func)`: 각 요소에 함수를 적용.
- `flatMap(func)`: 각 요소에 함수 적용 후 결과를 펼침(flatten).
- `filter(func)`: 함수 결과가 True인 요소만 필터링.
- `distinct()`: 중복 요소 제거.
- `groupBy(func)`: 함수를 기준으로 요소 그룹화.
- `sortBy(keyfunc)`: 주어진 함수(keyfunc)가 반환하는 키를 기준으로 RDD의 요소를 정렬


# 기본 RDD Action 연산
- `collect()`: RDD의 모든 요소를 드라이버(Driver)의 메모리로 가져와 리스트로 반환 (주의: 대용량 데이터에 사용 시 메모리 부족 발생 가능)
- `count()`: RDD에 포함된 요소의 총 개수를 반환
- `take(n)`: RDD에서 `n`개의 요소를 가져와 리스트로 반환
- `first()`: RDD의 첫 번째 요소를 반환 (`take(1)`과 동일한 결과를 내지만, 더 간결한 표현)
- `reduce(func)`: 주어진 함수(`func`)를 사용하여 RDD의 모든 요소를 집계(aggregate)하여 최종 결과값 하나를 반환
- `foreach(func)`: RDD의 각 요소에 주어진 함수를 적용 (반환값이 없으며, 각 요소에 특정 작업을 수행할 때 사용)
- `saveAsTextFile(path)`: RDD의 내용을 지정된 경로(`path`)에 텍스트 파일로 저장

In [25]:
shakespeare = spark_session.sparkContext.textFile("./data/shakespeare.txt")
shakespeare.take(5)

["A MIDSUMMER-NIGHT'S DREAM",
 '',
 'Now , fair Hippolyta , our nuptial hour ',
 'Draws on apace : four happy days bring in ',
 'Another moon ; but O ! methinks how slow ']

In [26]:
words = shakespeare.flatMap(lambda x: x.split(' '))
words.take(5)

['A', "MIDSUMMER-NIGHT'S", 'DREAM', '', 'Now']

In [27]:
lowercase = words.map(lambda x: x.lower())
lowercase.take(5)

['a', "midsummer-night's", 'dream', '', 'now']

In [28]:
longwords = lowercase.filter(lambda x: len(x) > 2)
longwords.take(5)

["midsummer-night's", 'dream', 'now', 'fair', 'hippolyta']

In [29]:
longwords_cnt = longwords.count()
longwords_cnt

632856

In [30]:
distinctwords = longwords.distinct()
distinctwords_cnt = distinctwords.count()
distinctwords_cnt

28734

In [31]:
groupby_firstletter = distinctwords.groupBy(lambda x: x[0].lower())
groupby_firstletter.count()

35

In [32]:
groupby_firstletter.take(3)

[('m', <pyspark.resultiterable.ResultIterable at 0x7114ae0263f0>),
 ('d', <pyspark.resultiterable.ResultIterable at 0x7114ae026d70>),
 ('n', <pyspark.resultiterable.ResultIterable at 0x7114ae05cef0>)]

In [40]:
grouped_as_list = groupby_firstletter.map(lambda kv: (kv[0], list(kv[1])))
result = grouped_as_list.take(3)
for key, values in result:
    print(f"Key: {key}, Values: {values[:5]}...") # 값은 일부만 출력

Key: m, Values: ["midsummer-night's", 'moon', 'methinks', "man's", 'merriments']...
Key: d, Values: ['dream', 'draws', 'days', 'desires', 'dame']...
Key: n, Values: ['now', 'nuptial', 'night', 'nights', 'new-bent']...


In [33]:
distinctwords_sort = distinctwords.sortBy(lambda x: x, ascending=False)
distinctwords_sort.take(10)

['zwaggered',
 'zur',
 'zounds',
 'zone',
 'zodiacs',
 'zodiac',
 'zephyrs',
 'zenith',
 'zenelophon',
 'zed']

In [34]:
distinctwords_sort = distinctwords.sortBy(lambda x: x[1], ascending=False)
distinctwords_sort.take(10)

['azure',
 "azur'd",
 "'zounds",
 'lysander',
 'eyes',
 'aye',
 'hymns',
 'eye',
 'sympathy',
 'myself']

In [35]:
#distinctwords.collect()
distinctwords.take(10)

["midsummer-night's",
 'dream',
 'now',
 'fair',
 'hippolyta',
 'our',
 'nuptial',
 'hour',
 'draws',
 'apace']

In [36]:
distinctwords.top(10)

['zwaggered',
 'zur',
 'zounds',
 'zone',
 'zodiacs',
 'zodiac',
 'zephyrs',
 'zenith',
 'zenelophon',
 'zed']

In [37]:
distinctwords.first()

"midsummer-night's"

In [38]:
numbers = spark_session.sparkContext.parallelize([0,1,2,3,4,5,6,7,8,9])
numbers.reduce(lambda x, y: x+y)


45