In [1]:
import org.apache.spark.sql.SparkSession
val sc = SparkSession.builder().getOrCreate()
sc.version

sc = org.apache.spark.sql.SparkSession@31154bd5


2.4.3

### 9. 데이터 소스

스파크에서 지원하는 핵심 데이터 소스
- CSV
- JSON
- 파케이
- ORC
- JDBC / ODBC 연결
- 일반 텍스트 파일

또한 수많은 데이터 소스들을 지원함
- 카산드라
- HBASE
- 몽고디비
- AWS Redshift
- XML
- 기타 수많은 데이터 소스

### 9.1.1 읽기 API 구조

```DataframReader.format(...).option("key", "value").schema(...).load()```

모든 데이터 소스를 읽을 때 위와 같은 형식을 사용함. <br>
format 메서드는 선택적으로 사용할 수 있으며, 기본값은 파케이 <br>
option 메서드를 사용해 데이터를 읽는 방법에 대한 파라미터를 키-값 쌍으로 설정할 수 있다. <br>
schema 메서드는 데이터 소스에서 스키마를 제공하거나, 추론 기능을 사용하려는 경우 선택적으로 사용함.

### 9.1.2 데이터 읽기의 기초

스파크 데이터를 읽을 때 기본적으로 DataFrameReader를 사용하며, 이는 SparkSession의 read 속성으로 접근할 수 있다.
```spark.read ```

이후 다음 값을 지정 해야함.
- 포맷
- 스키마
- 읽기 모드
- 옵션

```
spark.read.format("csv")
    .option("mode", "FAILFAST")
    .option("inferSchema", "true")
    .option("path", "./../../")
    .schema(someSchame)
    .load()
```

#### 읽기 모드
읽기 모드는 아래와 같은 종류가 있다.
- permissive: 오류 레코드의 모든 필드를 null로 설정하고, 모든 오류 레코드를 _corrupt_record라는 문자열 컬럼에 기록함.
- dropMalformed: 형식에 맞지 않는 레코드가 포함된 로우를 제거함.
- failFast: 형식에 맞지 않는 레코드를 만나면 즉시 종료함.

기본 값은 permissive 이다.

### 9.1.3  쓰기 API 구조
```DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBY(...).save()```

format 메서드는 선택적으로 사용할 수 있으며 기본값은 파케이 포맷.
option 메서드를 사용해 데이터 쓰기 방법을 설정함.
partitionBy, bucketBy, sortBY 메서드는 파일 기반의 데이터 소스에서만 동작하여, 최종 파일 배치 형태를 제어할 수 있음.

### 9.1.4 데이터 쓰기의 기초
읽기와 매우 유사하며, DataFrameWriter를 사용함.
```
dataframe.write.format("csv")
    .option("mode", "OVERWRITE")
    .option("dataFormat", "yyyy-MM-dd")
    .option("path", "../../../")
    .save()
```

#### 저장 모드
저장 모드는 아래와 같다.
- append: 해당 경로에 이미 존재하는 파일목록에 결과 파일을 추가함.
- overwrite: 이미 존재하는 모든 데이터를 완전히 덮어씀.
- errorIfExists: 해당 경로에 데이터나 파일이 존재하는 경우 오류를 발생시키며 쓰기 작업이 실패함.
- ignore: 해당 경로에 데이터나 파일이 존재하는 경우 아무런 처리도 하지 않음

기본 값은 errorIfExists 이다.

### 9.2 CSV 파일
콤파(,)로 구분된 값을 의미함. 각 줄이 단일 레코드가 되며 레코드의 각 필드를 콤마(,)로 구분하는 텍스트 파일 포맷. <br>
```
id1,name1,description1
id2,name2,description2
id3,name3,description3
```
CSV는 실제 운영환경에서는 어떤 내용이 들어있는지, 어떠한 구조로 되어있는지 등 다양한 전제를 만들어낼 수 없다. 그렇기에 수많은 옵션들을 제공함.

### 9.2.2 CSV 파일 읽기


In [4]:
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}

val myManualSchema = new StructType(Array(
    new StructField("DEST_COUNTRY_NAME", StringType, true) ,
    new StructField("ORIGIN_COUNTRY_NAME", StringType, true) ,
    new StructField("count", LongType, true)
))

sc.read.format("csv")
    .option("header", "true")
    .option("mode", "FAILFAST")
    .schema(myManualSchema)
    .load("../data/flight-data/csv/2010-summary.csv")
    .show(5, false)
// 정상동작

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |1    |
|United States    |Ireland            |264  |
|United States    |India              |69   |
|Egypt            |United States      |24   |
|Equatorial Guinea|United States      |1    |
+-----------------+-------------------+-----+
only showing top 5 rows



myManualSchema = StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,LongType,true))


lastException: Throwable = null


StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,LongType,true))

In [None]:
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}

val myManualSchema = new StructType(Array(
    new StructField("DEST_COUNTRY_NAME", LongType, true) ,
    new StructField("ORIGIN_COUNTRY_NAME", LongType, true) ,
    new StructField("count", LongType, true)
))

sc.read.format("csv")
    .option("header", "true")
    .option("mode", "FAILFAST")
    .schema(myManualSchema)
    .load("../data/flight-data/csv/2010-summary.csv")
    .show(5, false)
// 비정상동작 

// 실제 스키마와 일치 하지 않지만, 스파크는 어떠한 문제를 찾지 못함.
//// 왜냐하면, 스파크가 실제로 데이터를 읽어들이는 시점에 문제가 발생함. 즉, 데이터가 지정된 스키마가 일치하지 않으므로 스키마 잡은 시작하자마자 종료됨.
//// 또한, 스파크는 지연 연산 특성이 있으므로 DataFrame 정의 시점이 아닌 잡 실행 시점에만 오류가 발생함.

### 9.2.3 CSV 파일 쓰기

In [13]:
// CSV --> TSV
val csvFile = sc.read.format("csv")
    .option("header", "true").option("mode", "FAILFAST").schema(myManualSchema)
    .load("../data/flight-data/csv/2010-summary.csv")

csvFile.write.format("csv").mode("overwrite").option("sep", "\t").save("../tmp/my-tsv-file.tsv")
// 실제로 쓰는 시점에 DataFrame의 파티션 수를 반영함. 만약 사전에 데이터를 분할했다면, 파일 수가 달라질 수 있을 것.
csvFile.write.format("csv").mode("overwrite").option("sep", "\t").partitionBy("DEST_COUNTRY_NAME").save("../tmp_5/my-tsv-file.tsv")

csvFile = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]


lastException: Throwable = null


[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

### 9.3 JSON 파일
자바스크립트에서 사용하는 파일 형식으로, 자바스크립트 객체 표기법. (JavaScript Object Notation) <br>
스파크에서는 줄로 구분된 JSON을 기본적으로 사용함. 이런 방식은 큰 JSON 객체나 배열을 하나씩 가지고 있는 파일을 다루는 것보다 대조적인 부분(?)

multiLine 옵션을 사용해 줄로 구분된 방식과, 여러 줄로 구성된 방식을 선택적으로 사용할 수 있다.<br>
이 옵션을 true로 설정하면 전체 파일을 하나의 JSON 객체로 읽을 수 있다. <br>
스파크는 JSON 파일을 파싱한 다음에 Dataframe을 생성한다. <br>

줄로 구분된 JSON은
- 전체 파일을 읽어 들인 다음 저장하는 방식이 아니기 때문에 새로운 레코드를 추가할 수 있다. 다른 포맷에 비해 안정적인 포맷이므로 이 방식을 사용하는 것이 좋다.
- 구조화 되어있고, 최소한의 기본 데이터 타입이 존재함. 즉, 스파크는 적합한 데이터 타입을 추정할 수 있어 원할하게 처리할 수 있다.

### 9.3.2 JSON 파일 읽기

In [15]:
val jsonFile = sc.read.format("json").option("mode", "FAILFAST").schema(myManualSchema)
    .load("../data/flight-data/json/2010-summary.json")
jsonFile.show(5, false)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |1    |
|United States    |Ireland            |264  |
|United States    |India              |69   |
|Egypt            |United States      |24   |
|Equatorial Guinea|United States      |1    |
+-----------------+-------------------+-----+
only showing top 5 rows



jsonFile = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]


[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

### 9.3.3 JSON 파일 쓰기

In [17]:
csvFile.write.format("json").mode("overwrite").save("../tmp/my-json-file.json")

### 9.4 파케이 파일
파케이는 다양한 스토리지 최적화 기술을 제공하는 오픈소스로 만들어진 컬럼 기반의 데이터 저장 방식. 특히 분석 워크로드에 최적화되어 있음. <br>
저장소 공간을 절약할 수 있으며, 전체 파일을 읽는 대신 개별 컬럼을 읽을 수 있으며, 컬럼 기반의 압축 기능을 제공함. <br>
아파치 스파크와 잘 호환되기 때문에 **스파크의 기본 파일 포맷**이기도 함. 읽기 연산시 JSON, CSV 보다 훨씬 효율적으로 동작하기 때문에 장기 저장용 데이터는 파케이 포맷으로 저장하는 것이 좋다.

### 9.4.1 파케이 파일 읽기

옵션이 많이 없다. 자체 스키마를 사용해 데이터를 저장하기 때문이다. <br>
파케이 파일은 스키마가 파일 자체에 내장되어 있기 때문에 따로 추정할 필요가 없다. <br>

In [18]:
sc.read.format("parquet").load("../data/flight-data/parquet/2010-summary.parquet").show(5, false)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|United States    |Romania            |1    |
|United States    |Ireland            |264  |
|United States    |India              |69   |
|Egypt            |United States      |24   |
|Equatorial Guinea|United States      |1    |
+-----------------+-------------------+-----+
only showing top 5 rows



In [19]:
csvFile.write.format("parquet").mode("overwrite").save("../tmp/my-parquet-file.parquet")

### 9.5 ORC 파일
하둡 워크로드를 위해 설계된 자기 기술적이며 데이터 타입을 인식할 수 있는 컬럼 기반의 파일 포맷. <br>
대규모 스트리밍 읽기에 최적화되어 있을 뿐 아니라 필요한 로우를 신속하게 찾아낼 수 있는 기능이 통합되어 있음. <br>
파케이와 매우 유사하지만, 근본적인 차이점으로..
- 파케이는 스파크에 최적화
- ORC는 하이브에 최적화

In [20]:
// ORC 파일 읽기, 쓰기
sc.read.format("orc").load("../data/flight-data/orc/2010-summary.orc").show(5)
csvFile.write.format("orc").mode("overwrite").save("../tmp/my-org-file.orc")

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



### 9.6 SQL 데이터베이스
다양한 SQL 시스템에 연결할 수 있다. (MySQL, postgreSQL, Oracle, SQLite 등)

### 9.8 고급 I/O 개념
쓰기 작업 전에 파티션 수를 조절함으로써 병렬로 처리할 파일 수를 제어할 수 있습니다. <br>
**버켓팅, 파티셔닝** 을 조절함으로써 데이터의 저장 구조를 제어할 수 있음.

#### 9.8.1 분할 가능한 파일 타입과 압축 방식
특정 파일 포맷은 기본적으로 분할을 지원하기 때문에, 전체 파일이 아닌 쿼리에 필요한 부분만 읽을 수 있다. (파케이, ORC 포맷)<br>
HDFS 시스템을 사용한다면, 여러 블록으로 나누어 저장되기 때문에 훨씬 더 최적화 할 수 있다. <br>
모든 압축 방식이 분할을 지원하지 않음. 기본적으로 GZIP 방식을 추천한다.


#### 9.8.2 병렬로 데이터 읽기
여러 익스큐터가 같은 파일을 동시에 읽지는 않지만, 여러 파일을 동시에 읽을 수 있다. <br> 
다수의 파일이 존재하는 폴더를 읽을 떄 개별 파일은 DataFrame의 파티션이 됨. 따라서 사용 가능한 익스큐터를 이용해 병렬로 파일을 읽음.

#### 9.8.3 병렬로 데이터 쓰기
Dataframe이 가진 파티션 수에 따라 달라질 수 있음. 데이터 파티션 당 하나의 파일이 저장됨. <br>
아래 예제는 폴더 안에 5개 파일을 생성한다.

```csvFile.repartition(5).write.format("csv").save("../tmp/multiple.csv")```

#### 파티셔닝
- 어떤 데이터를 어디에 저장할 것인지 제어할 수 있는 기능. 즉, 파티셔닝은 필터링을 자주 사용하는 테이블을 가진 경우에 사용할 수 있는 가장 손쉬운 최적화 방식.
- 파티셔닝된 디렉토리 또는 데이터에 파일을 쓸 때 **디렉토리 별로 컬럼 데이터를 인코딩**해 저장함.
- 즉, 데이터를 읽을 때는 전체 데이터셋을 스캔하지 않고 필요한 컬럼의 데이터만 읽음.

#### 버켓팅
- 각 **파일에 저장된 데이터를 제어**할 수 있는 파일 조직화 기법.
- 동일한 버킷 ID를 가진 데이터가 하나의 물리적 파티션에 모두 모여 있기 때문에 데이터를 읽을 떄 셔플을 피한다.
- 즉, 데이터가 이후에 사용 방식에 맞춰 사전에 파티셔닝이 되므로 나중에 조인, 집계를 위해 파일을 읽을 때 발생하는 고비용의 셔플을 피한다.
- ex) PageRank에서 Pair RDD를 미리 버켓팅하면, 같은 id를 가진 노드들은 같은 버켓에 저장됨으로, 이후 조인시 발생하는 고비용의 셔플을 피할 수 있다.

In [23]:
// 파티셔닝
csvFile.limit(10).write.mode("overwrite").partitionBy("DEST_COUNTRY_NAME").save("../tmp/partitioned-files.parquet")

In [45]:
// 버켓팅
// count 컬럼을 기반으로 버켓팅을 하면 같은 count를 가진 row는 같은 버켓팅에 저장됨.
// 보통 버킷 수보다 count 종류의 수가 더 많기 때문에 버킷 당 여러 count들이 저장됨.
val numberBuckets = 10
val columnToBucketBy = "count"

csvFile.write.format("parquet").mode("overwrite")
    .bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")

numberBuckets = 10
columnToBucketBy = count


count

In [43]:
sc.read.format("parquet").load("./spark-warehouse/bucketedFiles/part-00000-4d6946cc-ef1f-4eba-811e-85a9dbe86999_00000.c000.snappy.parquet").sort("count").show(100, false)


+---------------------------------+---------------------------------+-----+
|DEST_COUNTRY_NAME                |ORIGIN_COUNTRY_NAME              |count|
+---------------------------------+---------------------------------+-----+
|French Guiana                    |United States                    |4    |
|Nigeria                          |United States                    |16   |
|Morocco                          |United States                    |16   |
|Bonaire, Sint Eustatius, and Saba|United States                    |16   |
|United States                    |Saint Vincent and the Grenadines |16   |
|United States                    |Morocco                          |16   |
|United States                    |Bonaire, Sint Eustatius, and Saba|16   |
|Thailand                         |United States                    |16   |
|United States                    |Angola                           |18   |
|United States                    |Cape Verde                       |18   |
|United Stat

In [44]:
sc.read.format("parquet").load("./spark-warehouse/bucketedFiles/part-00000-4d6946cc-ef1f-4eba-811e-85a9dbe86999_00009.c000.snappy.parquet").sort("count").show(100, false)


+--------------------------------+----------------------+-----+
|DEST_COUNTRY_NAME               |ORIGIN_COUNTRY_NAME   |count|
+--------------------------------+----------------------+-----+
|United States                   |Cyprus                |1    |
|United States                   |Estonia               |1    |
|United States                   |Azerbaijan            |1    |
|Malta                           |United States         |1    |
|Saint Vincent and the Grenadines|United States         |1    |
|Slovakia                        |United States         |1    |
|United States                   |Vietnam               |1    |
|United States                   |Cameroon              |1    |
|United States                   |Slovakia              |1    |
|Bulgaria                        |United States         |1    |
|United States                   |French Guiana         |1    |
|United States                   |Uganda                |1    |
|United States                   |Serbia