### 1️⃣ Spark RDD 연산 구조

#### Spark RDD 연산은 크게 두 가지로 나뉩니다:
- Transformation (변환)
  -  예: map(), filter(), flatMap(), groupBy()

  -  지연 연산(lazy evaluation)
      - 결과를 즉시 계산하지 않고, **연산 계획(DAG)**만 기록

-  Action (액션)
  -   예: collect(), count(), reduce(), aggregate()

- 최종 결과를 계산해서 드라이버로 반환하거나 외부 저장소에 저장

### 2️⃣ 최종 결과(final result)의 목적

#### Spark에서 최종 결과를 구하는 목적은 다음과 같이 요약할 수 있습니다:

- 분산 연산의 집계/산출
  - 여러 파티션에 분산된 데이터를 합쳐서 단일 값 또는 리스트 형태로 계산
  - 예: 합계, 평균, 그룹별 집계 등

- 데이터 확인 및 추출
  - collect()나 take()를 통해 분산 데이터의 일부 또는 전체를 드라이버로 가져와 확인

- 외부 저장/보고
  - 최종 RDD 결과를 파일(HDFS, CSV 등) 또는 DB에 저장하여 다른 시스템에서 활용 가능

### 3️⃣ 특징

- 지연 연산과 달리 계산이 실제로 일어남 → 클러스터에서 모든 파티션을 처리
- 분산 연산 결과를 하나로 모으거나, 외부로 전달
- Spark의 장점: 대용량 데이터도 분산 처리 후 최종 결과만 가져옴 → 메모리 효율적

In [2]:
from pyspark import SparkConf, SparkContext

In [3]:
# SparkConf : 스파크 실행 환경 설정 클래스
# SparkContext : DriverProgram 실행 환경 구성을 위한 클래스
conf = SparkConf().setMaster("local").setAppName("country-student-count")

# 변수명을 항상 sc로 놓을 것!
sc = SparkContext(conf=conf)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/14 03:15:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
sc

In [5]:
filepath = '/home/hadoop/python_code/spark/xAPI-Edu-Data.csv'

In [6]:
lines = sc.textFile(f"file:///{filepath}")
lines

file:////home/hadoop/python_code/spark/xAPI-Edu-Data.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [7]:
header = lines.first()
header

Py4JJavaError: An error occurred while calling o25.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/hadoop/python_code/spark/xAPI-Edu-Data.csv
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:304)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:244)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:332)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:210)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.api.java.JavaRDDLike.partitions(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.JavaRDDLike.partitions$(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: Input path does not exist: file:/home/hadoop/python_code/spark/xAPI-Edu-Data.csv
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:278)
	... 25 more


In [None]:
datas = lines.filter(lambda row : row != header)
datas

In [None]:
conturies = datas.map(lambda row : row.split(",")[2])
conturies

In [None]:
conturies.collect()[:3]

In [None]:
conturies.countByValue()

In [None]:
sc.stop()