# Chapter 9. 데이터소스

## 9.1 데이터소스 API의 구조

### 9.1.1 읽기 API의 구조

[ 데이터 읽기의 핵심 구조 ]

DataFrameReader.format(...).option("key", "value").schema(...).load()

데이터소스를 읽을 때는 위와 같은 형식을 사용. 

* format 메서드는 선택적으로 사용할 수 있으며 기본값은 파케이 포맷
* option 메서드를 사용해 데이터를 읽는 방법에 대한 파라미터를 키-값 쌍으로 설정 가능
* schema 메서드는 데이터 소스에서 스키마를 제공하거나 스키마 추론 기능을 사용하려는 경웨 선택적으로 사용 가능

### 9.1.2 데이터 읽기의 기초

스파크에서 데이터를 읽을 때는 기본적으로 DataFrameReader 사용. (DataFrameReader는 SparkSession의 read 속성으로 접근)

spark.read

[ DataFrameReader를 얻은 다음에는 다음과 같은 값 지정 ]
* 포맷
* 스키마
* 읽기 모드
* 옵션

포맷, 스키마, 옵션은 트랜스포메이션을 추가로 정의할 수 있는 DataFrameReader를 반환. 

읽기 모드를 제외한 세 가지 항목은 필요한 경우에만 선택적으로 지정 가능.

데이터소스마다 데이터를 읽은 방식을 결정할 수 있는 옵션 제공.

DataFrameReader에 반드시 데이터를 읽을 경로 지정.

[ 전반적인 코드 구성 ]

spark.read.format("csv")\
  .option("mode", "FAILFAST")\
  .option("inferSchema", "true")\
  .option("path", "path/to.file(s)")\
  .schema(someSchema)\
  .load()
  
  
**읽기 모드**

외부 데이터소스에서 데이터를 읽다 보면 자연스럽게 형식에 맞지 않는 데이터를 만나게 되는데, 특히 반정형 데이터소스를 다룰 때 많이 발생. 

읽기 모드는 스파크가 형식에 맞지 않는 데이터를 만났을 때 동작 방식을 지정하는 옵션

[ 스파크의 읽기 모드 ]
* permissive : 오류 레코드의 모든 필드를 null로 설정하고 모든 오류 레코드를 _corrupt_record라는 문자열 컬럼에 기록
* dropMalformed : 형식에 맞지 않는 레코드가 포함된 로우 제거
* failFast : 형식에 맞지 않는 레코드를 만나면 즉시 종료

단, 읽기 모드의 기본값은 permissive

### 9.1.3 쓰기 API 구조

[ 데이터 쓰기의 핵심 구조 ]
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()

* format 메서드는 선택적으로 사용할 수 있으며 기본값은 파케이 포맷
* option 메서드를 사용해 데이터의 쓰기 방법 설정 가능
* partitionBy, bucketBy, sortBy 메서드는 파일 기반의 데이터소스에서만 동작하며 이 기능으로 최종 파일 배치 형태를 제어 가능

### 9.1.4 데이터 쓰기의 기초

데이터 쓰기는 데이터 읽기와 매우 유사하며 DataFrameReader 대신 DataFrrameWriter를 사용. 

데이터소스에 항상 데이터를 기록해야 하기 때문에 DataFrame의 write 속성을 이용해 DataFrame 별로 DataFrameWriter에 접근 해야 함.

**저장 모드**

저장 모드는 스파크가 지정된 위치에서 동일한 파일이 발견했을 때의 동작 방식을 지정하는 옵션.

[ 스파크의 저장 모드 ]

* append : 해당 경로에 이미 존재하는 파일 목록에 결과 파일 추가
* overwrite : 이미 존재하는 모든 데이터를 완전히 덮어씀
* errorIfExists : 해당 경로에 데이터나 파일이 존재하는 경우 올유를 발생시키면서 쓰기 작업이 실패
* ignore : 해당 경로에 데이터나 파일이 존재하는 경우 아무런 처리도 하지 않음

단, 기본값은 errorIfExists 즉 스파크가 파일을 저장할 경로에 데이터나 파일이 이미 존재하면 쓰기 작업은 즉시 실패.

## 9.2 CSV 파일

CSV 파일은 콤마(,)로 구분된 값을 의미함. 

CSV는 각 줄이 단일 레코드가 되며 레코드의 각 필드를 콤마로 구분하는 일반적인 텍스트 파일 포맷

CSV 파일은 구조적으로 보이지만 매우 까다로운 파일 포맷 중 하나 - 운영 환경에서는 어떤 내용이 들어 있는지, 어떠한 구조로 되어 있는지 등 다양한 전제를 만들어낼 수 없기 때문.

때문에 CSV reader는 많은 수의 옵션 제공. 

(ex. CSV 파일 내 컬럼 내용에 콤마가 들어 있거나 비표준적인 방식으로 null 값이 기록된 경우 특정 문자를 이스케이프 처리하는 옵션을 사용해 문제 해결 가능)

### 9.2.1 CSV 옵션

* sep : 각 필드와 값을 구분하는데 사용되는 단일 문자 (기본값 : ,)
* header : 첫 번째 줄이 컬럼명인지 나타내는 불리언 값 (기본값 : false)
* escape : 스파크가 파일에서 이스케이프 처리할 문자 (기본값 : \)
* inferSchema : 스파크가 파일을 읽을 때 컬럼의 데이터 타입을 추론할지 정의 (기본값 : false)
* ignoreLeadingWhiteSpace : 값을 읽을 때 값의 선행 공백을 무시할지 정의 (기본값 : false)
* ignoreTrailingWhiteSpace : 값을 읽을 때 값의 후행 공백을 무시할지 정의 (기본값 : false)
* nullValue : 파일에서 null 값을 나타내는 문자 (기본값 : "")
* nanValue : CSV 파일에서 NaN이나 값 없음을 나타내는 문자 선언 (기본값 : NaN)
* positiveInf : 양의 무한 값을 나타내는 문자(열)를 선언 (기본값 : Inf)
* negativeInf : 음의 무한 값을 나타내는 문자(열)를 선언 (기본값 : -Inf)
* compression 또는 codec : 스파크가 파일을 읽고 쓸 때 사용하는 압축 코덱 정의 (기본값 : none)
* dataFormat : 날짜 데이터 타입인 모든 필드에서 사용할 날짜 형식 (기본값 : yyyy-MM-dd)
* timestampFormat : 타임스탬프 데이터 타입인 모든 필드에서 사용할 날짜 형식 (기본값 : yyyy-MM-dd'T'HH:mm:ss.SSSZZ)
* maxColumns : 파일을 구성하는 최대 컬럼 수를 선언 (기본값 : 20480)
* maxCharsPerColumn : 컬럼의 문자 최대 길이를 선언 (기본값 : 1000000)
* escapeQuotes : 스파크가 파일의 라인에 포함된 인용부호를 이스케이프할지 선언 (기본값 : true)
* maxMalformedLogPerPartition : 스파크가 각 파티션별로 비정상적인 레코드를 발견했을 때 기록할 최대 수. 이 숫자를 초과하는 비정상적인 레코드는 무시된 (기본값 : 10)
* quoteAll : 인용부호 문자가 있는 값을 이스케이프 처리하지 않고, 전체 값을 인용부호로 묶을지 여부 (기본값 : false)
* multiLline : 하나의 논리적 레코드가 여러 줄로 이루어진 CSV 파일 읽기를 허용할지 여부 (기본값 : false)

### 9.2.2 CSV 파일 읽기

csv 파일을 읽으려면 먼저 CSV용 DataFrameReader를 생성하고 스키마와 읽기 모드 지정.

스파크는 지연 연산 특성이 있으므로 DataFrame 정의 시점이 아닌 잡 실행 시점에만 오류가 발생.

### 9.2.3 CSV 파일 쓰기

In [0]:
csvFile = spark.read.format("csv")\
  .option("header", "true")\
  .option("mode", "FAILFAST")\
  .option("inferSchema", "true")\
  .load("dbfs:/FileStore/shared_uploads/hyjeong0815@gmail.com/2010_summary-1.csv")

CSV 파일을 읽어 TSV 파일로 내보내는 처리

In [0]:
csvFile.write.format("csv").mode("overwrite").option("sep", "\t")\
  .save("/tmp/my-tsv-file.tsv")

## 9.3 JSON 파일 

스파크에서는 JSON 파일을 사용할 때 줄로 구분된 JSON을 기본적으로 사용. 

multiLine 옵션을 사용해 줄로 구분된 방식과 여러 줄로 구성된 방식을 선택적으로 사용 가능하다. 이 옵션을 true로 설정하면 전체 파일을 하나의 JSON 객체로 읽을 수 있다. 

스파크는 JSON 파일을 파싱한 다음에 DataFrame을 생성. 

줄로 구분된 JSON은 전체 파일을 읽어 들인 다음 저장하는 방식이 아니므로 새로운 레코드 추가 가능. 

줄로 구분된 JSON이 인기 있는 이유는 구조화되어 있고, 최소한의 기본 데이터 타입이 존재하기 때문.

### 9.3.1 JSON 옵션

* compression 또는 codec : 스파크가 파일을 읽고 쓸 때 사용하는 압축 코덱 정의 (기본값 : none)
* dateFormat : 날짜 데이터 타입인 모든 필드에서 사용할 날짜 형식을 정의 (기본값 : yyyy-MM-dd)
* timestampFormat : 타임스탬프 데이터 타입인 모든 필드에서 사용할 날짜 형식을 정의 (기본값 : yyyy-MM-dd'T'HH:mm:ss.SSSZZ)
* priomitiveAsString : 모든 프리미티브값을 문자열로 추정할지 정의 (기본값 : false)
* allowComments : JSON 레코드에서 자바나 C++ 스타일로 된 코멘트를 무시할지 정의 (기본값 : false)
* allowUnquotedFieldNames : 인용부호로 감싸여 있지 않은 JSON 필드명을 허용할지 정의 (기본값 : false)
* allowSingleQuotes : 인용부호로 큰따옴표 대신 작은따옴표를 허용할지 정의 (기본값 : true)
* allowNumericLeadingZeros : 숫자 앞에 0을 허용할지 정의 (기본값 : false)
* allowBackslashEscapingAnyCharacter : 백슬래시 인용부호 매커니즘을 사용한 인용부호를 허용할지 정의 (기본값 : false)
* columnNameOfCorruptRecord : permissive 모드에서 생성된 비정상 문자열을 가진 새로운 필드명을 변경 가능 (기본값 : spark.sql.columnNameOfCorruptRecord 속성의 설정값)
* multiLine : 줄로 구분되지 않은 JSON 파일의 읽기를 허용할지 정의 (기본값 : false)

### 9.3.2 JSON 파일 읽기

In [0]:
spark.read.format("json").option("mode", "FAILFAST")\
  .option("inferSchema", "true")\
  .load("dbfs:/FileStore/shared_uploads/hyjeong0815@gmail.com/2010_summary-1.json").show(5)

### 9.3.3 JSON 파일 쓰기

파티션당 하나의 파일을 만들며 전체 DataFrame을 단일 폴더에 저장.

JSON 객체는 한 줄에 하나씩 기록.

In [0]:
csvFile.write.format("json").mode("overwrite").save("/tmp/my-json-file.json")

## 9.4 파케이 파일

파케이는 다양한 스토리지 최적화 기술을 제공하는 오픈소스로 만들어진 컬럼 기반의 데이터 저장 방식. 

저장 공간을 절약할 수 있고 전체 파일을 읽는 대신 개별 컬럼을 읽을 수 있으며 컬럼 기반의 압축 기능 제공.

### 9.4.1 파케이 파일 읽기

파케이는 옵션이 거의 없다. 데이터를 저장할 때 자체 스키마를 사용해 데이터를 저장하기 때문. 

**파케이 옵션**

* compression 또는 codec : 스파크가 파일을 읽고 쓸 때 사용하는 압축 코덱 정의 (기본값 : none)
* mergeSchema : 동일한 테이블이나 폴더에 신규 추가된 파케이 파일에 컬럼을 점진적으로 추가할 수 있음. 이러한 기능을 활성화하거나 비활성화하기 위해 옵션 사용 (기본값 : spark.sql.parquet.mergeSchema 속성의 설정값)

### 9.4.2 파케이 파일 쓰기

파케이 파일 쓰기도 파일의 경로만 명시하면 가능.

분할 규칙은 달든 포맷과 동일하게 적용.

## 9.5 ORC 파일

ORC는 하둡 워크로드를 위해 설계된 자기 기술적이며 데이터 타입을 인식할 수 있는 컬럼 기반의 파일 포맷. 

대규모 스트리밍 처리에 최적화되어 있을 뿐만 아니라 필요한 로울르 신속하게 찾아낼 수 있는 기능이 통합되어 있음. 

스파크는 ORC 파일 포맷을 효율적으로 사용할 수 있으므로 별도의 옵션 지정 없이 데이터를 읽을 수 있음.

### 9.5.1 ORC 파일 읽기

spark.read.foramt("orc").load("/data/flight-data.orc/2010-summary.orc")

### 9.5.2 ORC 파일 쓰기

csvFile.write.format("orc").mode("overwrite").save("/tmp/my-orc-file.orc")

## 9.6 SQL 데이터베이스

사용자는 SQL을 지원하는 다양한 시스템에 SQL 데이터소스를 연결 가능. 

데이터베이스의 데이터를 읽고 쓰기 위해서는 스파크 클래스패스에 데이터베이스의 JDBC 드라이버를 추가하고 적절한 JDBC 드라이버 jar 파일을 제공해야 함. 

ex. PostgreSQL 데이터베이스에 데이터를 읽거나 쓰기 위한 실행식

./bin/spark-shell\
--driver-class-path postgresql-9.4.1207.jar\
--jars postgresql-9.4.1207.jar

[ JDBC 데이터소스 옵션 ]
* url : 접속을 위한 JDBC URL. 소스 시스템에 특화된 설정은 URL에 지정 가능. 
* dtable : 읽을 JDBC 테이블 설정
* driver : 지정한 URL에 접속할 때 사용할 JDBC 드라이버 클래스명 지정
* partitionColumn, lowerBound, upperBound : 세가지 옵션은 항상 같이 지정. numPartitions도 반드시 지정. partitionColumn은 반드시 해당 테이블의 수치형 컬럼. lowerBound와 upperBound는 테이블의 로우를 필터링하는 데 사용되는 것이 아니라 각 파티션의 범위를 결정하는데 사용. 따라서 테이블의 로우는 분할되어 반환 (읽기에만 적용)
* numPartitions : 테이블의 데이터를 병렬로 읽거나 쓰기 작업에 사용할 수 있는 최대 파티션 수를 결정. 이 속성은 최대 동시 JDBC 연결 수를 결정. 
* fetchsize : 한 번에 얼마나 많은 로우를 가져올지 결정하는 JDBC의 패치 크기 설정. 이 옵션은 기본적으로 패치 크기가 작게 설정된 JDBC 드라이버의 성능을 올리는데 도움. (읽기에만 적용)
* betchsize : 한 번에 얼마나 많은 로우를 저장할지 결정하는 JDBC의 배치 크기 설정. JDBC 드라이버의 성능을 향상시킬 수 있음. 쓰기에만 적용되며 기본값은 1000
* isolationLevel : 현재 연결에 적용되는 트랜잭션 격리 수준 정의. 
* truncate : JDBC writer 관련 옵션
* createTableOptions : JDBC writer 관련 옵션 - 테이블 생성 시 특정 테이블의 데이터베이스와 파티션 옵션 설정 가능
* createTableColumnTypes : 테이블을 생성할 때 기본값 대신 사용할 데이터베이스 컬럼 데이터 타입을 정의

### 9.6.1 SQL 데이터베이스 읽기

접속 관련 속성 정의.

In [0]:
driver = "org.sqlite.JDBC"
path = "dbfs:/FileStore/shared_uploads/hyjeong0815@gmail.com/my_sqlite-2.db"
url = "jdbc:sqlite:" + path
tablename = "flight_info"

접속 테스트


import java.sql.DriverManager

connection = DriverManager.getConnection(url)

connection.isClosed()

connection.Close()

접속 관련 속성 정의 후 정상적으로 데이터베이스에 접속되는지 테스트해 해당 연결이 유효한지 확인 가능. 

코드) SQL 테이블을 읽어 DataFrame 생성

In [0]:
dbDataFrame = spark.read.format("jdbc").option("url", url)\
  .option("dbtable", tablename).option("driver", driver).load()

코드) PostgreSQL을 이용해 동일한 데이터 읽기 작업 수행

pgDF = spark.read.format("jdbc")\

  .option("driver", "org.postgresql.Driver")\

  .option("url", "jdbc:postgresql://database_server")\

  .option("dbtable", "schema.tablename")\
  
  .option("user", "username").option("password", "my-secret-password").load()

### 9.6.2 쿼리 푸시다운

스파크는 DataFrame을 만들기 전에 데이터베이스 자체에서 데이터를 필터링하도록 만들 수 있음.

dbDataFrame.select("DEST_COUNTRY_NAME").distinct().explain

**데이터베이스 병렬로 읽기**

numPartitions 옵션을 사용해 읽기 및 쓰기용 동시 작업 수를 제한할 수 있는 최대 파티션 수 설정 가능.

dbDataFrame = spark.format("jdbc")\

  .option("url", url).option("dbtable", tablename).option("driver", driver)\
  
  .option("numPartitions", 10).load()

데이터소스 생성 시 조건절 목록을 정의해 스파크 자체 파티션에 결과 데이터를 저장 가능.

props = {"driver" : "org.sqlite.JDBC"}

predicates = [

  "DEST_COUNTRY_NAME = 'Sweden' OR ORIGIN_COUNTRY_NAME = 'Sweden'",
  
  
  "DEST_COUNTRY_NAME = 'Anguilla' OR ORIGIN_COUNTRY_NAME = 'Anguilla'"]

spark.read.jdbc(url, tablename, predicates-predicates, properties=props).show()

spark.read.jdbc(url, tablename, predicates=predicates, properties=pops)\

  .rdd.getNumPartitions()

연관성 없는 조건절을 중복 로우가 많이 발생할 수 있음. 중복 로우를 발생시키는 조건절 예제

pops = {"driver" : "org.sqlite.JDBC"}

predicates = [
  
  "DEST_COUNTRY_NAME != 'Sweden' OR ORIGIN_COUNTRY_NAME != 'Sweden'",
  
  "DEST_COUNTRY_NAME != 'Anguilla' OR ORIGIN_COUNTRY_NAME != 'Anguilla'"]
  
spark.read.jdbc(url, tablename, predicates=predicates, properties=pops).count()

**슬라이딩 윈도우 기반의 파티셔닝**

조건절을 기반으로 분할.

처음과 마지막 파티션 사이의 최솟값과 최댓값을 사용해 이 범위 밖의 모든 값은 첫 번째 또는 마지막 파티션에 속함. 

그 다음 전체 파티션 수를 설정. 

이 값은 병렬 처리 수준 의미. 

스파크는 데이터베이스에 병렬로 쿼리를 요청하며 numPartitions에 설정된 값만큼 파티션 반환. 

그리고 파티션에 값을 할당하기 위해 상한값과 하한값을 수정.

### 9.6.3 SQL 데이터베이스 쓰기

SQL 데이터베이스에 데이터를 쓰는 것은 URI를 지정하고 지정한 쓰기 모드에 따라 데이터를 쓰면 된다. 

newPath = "jdbc:sqlite://tmp/my-sqlite.db"

csvFile.write.jdbc(newPath, tablename, mode="overwrite", properties=props)

spark.read.jdbc(newPath, tablename, properties=props).count()

## 9.7 텍스트 파일

스파크는 일반 텍스트 파일을 읽을 수 있음.

파일의 각 줄은 DataFrame의 레코드.

### 9.7.1 텍스트 파일 읽기

textFile 메서드에 텍스트 파일을 지정하면 된다. 

단, textFile 메서드는 파티션 수행 결과로 만들어진 디렉터리명을 무시하기 때문에 파티션된 텍스트 파일을 읽거나 쓰려면 읽기 및 쓰기 시 파티션 수행 결과로 만들어진 디렉터리를 인식할 수 있도록 text 메서드 사용

spark.read.format("csv").load("dbfs:/FileStore/shared_uploads/hyjeong0815@gmail.com/2010_summary-3.csv")\

  .selectExpr("split(value, ',') as rows").show()

### 9.7.2 텍스트 파일 쓰기

텍스트 파일을 쓸 때는 문자열 컬럼이 하나만 존재해야 함. 그렇지 않으면 실패.

csvFile.select("DEST_COUNTRY_NAME").write.text("/tmp/simple-text-file.txt")

텍스트 파일에 데이터를 저장할 때 파티셔닝 작업을 수행하면 더 많은 컬럼 지정 가능. 

하지만 모든 파일에 컬럼을 추가하는 것이 아니라 텍스트 파일이 저장되는 디렉터리에 폴더별로 컬럼 저장. 

csvFile.limit(10).select("DEST_COUNTRY_NAME", "count")\

  .write.partitionBy("count").text(".tmp/five-csv-file2py.csv")

## 9.8 고급 I/O 개념

### 9.8.1 분할 가능한 파일 타입과 압축 방식

특정 파일 포맷은 기본적으로 분할을 지원. 

스파크에서 전체 파일이 아닌 쿼리에 필요한 부분ㄴ만 읽을 수 있으므로 성능 향상에 도움. 

하둡 분산 파일 시스템 같은 시스템을 사용하면 분할된 파일을 여러 블록으로 나누어 분산 저장하기 때문에 훨씬 더 최적화 가능.

### 9.8.2 병렬로 데이터 읽기

여러 익스큐터가 같은 파일을 동시에 읽을 수는 없지만 여러 파일을 동시에 읽을 수는 있음. 

다수의 파일이 존재하는 폴더를 읽을 때 폴더의 개별 파일은 DataFrame의 파티션이 된다.

따라서 사용 가능한 익스큐터를 이용해 병렬로 데이터를 읽음.

### 9.8.3 병렬로 데이터 쓰기

파일이나 데이터 수는 데이터를 쓰는 시점에 DataFrame이 가진 파티션 수에 따라 달라질 수 있음. 

기본적으로 데이터 파티션당 하나의 파일 작성. 

따라서 옵션에 지정된 파일명은 실제로는 다수의 파일을 가진 디렉터리. 그리고 그 디렉터리 안에 파티션당 하나의 파일로 데이터 저장.

**파티셔닝**

파티셔닝은 어떤 데이터를 어디에 저장할 것인지 제어할 수 있는 기능. 

파티셔닝된 디렉터리 또는 테이블에 파일을 쓸 때 디렉터리별로 컬럼 데이터를 인코딩해 저장. 

그러므로 데이터를 읽을 때 전체 데이터셋을 스캔하지 않고 필요한 컬럼의 데이터만 읽을 수 있음. 

파티셔닝은 필터링을 자주 사용하는 테이블을 가진 경우에 사용할 수 있는 가장 손쉬운 최적화 방식. 

**버케팅**

버케팅은 각 파일에 저장된 데이터를 제어할 수 있는 또 다른 파일 조직화 기법.

이 기법을 사용하면 동일한 버킷 ID를 가진 데이터가 하나의 물리적 파티션에 모두 모여있기 때문에 데이터를 읽을 때 셔플을 피할 수 있음.

즉, 데이터가 이후의 사용 방식에 맞춰 사전에 파티셔닝되므로 조인이나 집계 시 발생하는 고비용의 셔플을 피하라 수 있음.

특정 컬럼을 파티셔닝하면 수억 개의 디렉터리를 만들어낼 수 있음.

### 9.8.4 복합 데이터 유형 쓰기

스파크는 다양한 자체 데이터 타입을 제공. 

단 모든 데이터 파일 포맷에 적합한 것은 아니다. 

ex. csv 파일은 복합 데이터 타입을 지원하지 않지만 파케이나 ORC는 복합 데이터 타입 지원.

### 9.8.5 파일 크기 관리

작은 파일을 많이 생성하면 메타데이터에 엄청난 관리 부하 발생. 

HDFS 같은 많은 파일 시스템은 작은 크기의 파일을 잘 다루지 못함. 

maxRecordsPerFile 옵션에 파일당 레코드 수를 지정하면 각 파일에 기록될 레코드 수를 조절할 수 있으므로 파일 크기를 더 효과적으로 제어 가능.