# 데이터소스
- 스파크 핵심 데이터소스 
  - CSV
  - JSON
  - 파케이
  - ORC
  - JDBC/ODBC연결
  - 일반 텍스트 파일

- 핵심 서드파티 데이터소스
  - 카산드라
  - HBase
  - 몽고디비
  - AWS Redshift
  - XML

In [0]:
path='/FileStore/tables/all/*.csv'

## 데이터소스 API의 구조

### 읽기 API 구조
- 스파크에서 데이터를 읽을 땐 기본적으로 <strong>DataFrameReader</strong>를 사용함
- DataFrameReader는 <strong>SparkSession의 read 속성</strong>으로 접근
- DataFrameReader를 얻고 나서는 다음과 같은 값을 지정해야함
  - format: 포맷 지정(default: 파케이)
  - option: 데이터 읽는 방법 지정 ex)키-값 쌍이면 option('key', 'value')
  - schema: 데이터 소스에서 스키마를 제공하거나 스키마 추론 기능을 사용하려고 할 때 지정

In [0]:
spark.read

In [0]:
df = spark.read.format('csv')\
.option('mode', 'FAILFAST')\
.option('inferSchema', 'true')\
.option('path', path)\
.load()

---
- 위 예시에서는 읽기 모드를 'failfast'로 지정함
- <strong>읽기 모드: 스파크가 형식에 맞지 않는 데이터를 만났을 때의 동작 방식을 지정하는 옵션</strong>
  - permissive(default): 오류 레코드의 모든 필드를 null로 설정하고 모든 오류 레코드를 _corrupt_record라는 문자열 컬럼에 기록
  - dropMalformed: 형식에 맞지 않는 레코드가 포함된 로우 제거
  - failFast: 형식에 맞지 않는 레코드를 만나면 즉시 종료

### 쓰기 API 구조
- 데이터 읽기와 매우 유사한데 DataFrameReader 대신 <strong>DataFrameWriter</strong>를 사용
- 데이터소스에 항상 데이터를 기록해야하므로 <strong>DataFrame의 write속성</strong>을 사용
  - DataFrame별로 DataFrameWriter에 접근
- DataFrameWriter를 얻고나서는 다음과 같은 값을 지정해야함
  - format
  - option
  - 파일 기반의 데이터소스만 해당
    - partitionBy
    - bucketBy
    - sortBy

In [0]:
df.write.format('csv')\
.option('mode', 'OVERWRITE')\
.option('dateFormat', 'yyyy-MM-dd')\
.option('path', '/FileStore/tables/temp/temp.csv')\
.save()

- 위 예시에서는 저장 모드를 'overwrite'로 지정함
- 저장 모드: 스파크가 지정된 위치에서 동일한 파일이 발견됐을 때의 동작 방식 지정
  - append: 해당 경로에 이미 존재하는 파일 목록에 결과 파일 추가
  - overwrite: 이미 존재하는 모든 데이터를 완전히 덮어씀
  - errorIfExists(default): 오류를 발생시키면서 쓰기 작업이 실패
  - ignore: 아무런 처리 X

## CSV 파일
- CSV(comma-separated values)는 <strong>콤마(,)로 구분된 값</strong>을 의미
- 각 줄이 단일 레코드
- 각 필드를 콤마로 구분하는 일반적인 텍스트 파일 포맷
- 해당 포맷이 다루기 까다로운 이유
  - 운영 환경에서는 어떤 내용, 어떤 구조로 되어 있는지 등 다양한 전제를 만들어낼 수 없기 때문
  - 그래서 <strong>CSV reader는 많은 수의 옵션</strong>을 제공
  - [링크](https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html?highlight=csv#pyspark.sql.DataFrameReader.csv)

### CSV 파일 읽기

In [0]:
df =spark.read.format('csv')\
.option('header', 'true')\
.option('mode','FAILFAST')\
.load(path)

In [0]:
df.show(5)

### CSV 파일 쓰기

In [0]:
df.write.format('csv').mode('overwrite').option('sep','\t').save('/FileStore/tables/temp/tsv_file.tsv')

-----
- 위처럼 csv파일을 읽어 들여 tsv파일로 내보는 처리도 간단

## JSON 파일
- <strong>자바스크립트 객체 표기법(JavaScript Object Notation)</strong>
- 스파크에서는 JSON파일을 사용할 때 <strong>줄로 구분된 JSON</strong>을 기본적으로 사용
  - <strong>큰 JSON 객체나 배열을 하나씩 가지고 있는 파일</strong>을 다루는 것과 대조적인 부분
  - <strong>multiLine옵션</strong>으로 바꿀 순 있음
  - 근데 줄로 구분된 방식이 더 안정적이라서 디폴트임
    - 구조화 되어 있음
    - 최소한의 기본 데이터 타입이 존재함
    
- JSON은 객체이므로 CSV보다 옵션 수가 적음
  - [링크](https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html?highlight=json#pyspark.sql.DataFrameReader.json)

### JSON 파일 읽기

In [0]:
j_path='/FileStore/tables/2010_summary.json'
spark.read.format('json').option('mode', 'FAILFAST').option('inferSchema', 'true').load(j_path).show(5)

### JSON 파일 쓰기

In [0]:
df.write.format('json').mode('overwrite').save('/FileStore/tables/temp/j_file.json')

-----
- 데이터소스에 관계없이 json파일에 저장할 수 있음
- 위처럼 csv DataFrame을 json파일의 소스로 재사용 가능

## 파케이 파일
- 다양한 스토리지 최적화 기술을 제공하는 오픈소스로 만들어진 <strong>컬럼 기반의 데이터 저장 방식</strong>
- 저장 공간 절약 가능
- <strong>전체 파일을 읽는 대신 개별 컬럼</strong>을 읽음
- 컬럼 기반 압축 기능 제공
- 아파치 스파크와 호환이 잘 됨
  - 그래서 스파크의 기본 파일 포맷
- json이나 csv보다 읽기 연산이 더 효율적
- 장기 저장용 데이터는 파케이 포맷으로 저장하는 것이 좋음
- 복합 데이터 타입 지원
  - csv에서는 배열을 사용할 수 없음
- 옵션이 거의 없음(2개)
  - 스파크 개념에 아주 잘 부합하고 알맞게 정의된 명세를 가지고 있기 때문
  - [링크](https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html?highlight=csv#pyspark.sql.DataFrameReader.parquet)

### 파케이 파일 읽기

In [0]:
p_path='/FileStore/tables/2010-summary.parquet'
spark.read.format('parquet').load(p_path).show(5)

### 파케이 파일 쓰기

In [0]:
df.write.format('parquet').mode('overwrite').save('/FileStore/tables/temp/p_file.parquet')

---
- 다른 포맷과 동일

## ORC 파일
- <strong>ORC는 하둡 워크로드를 위해 설계된 자기 기술적(self-describing)이며 데이터 타입을 인식할 수 있는 컬럼 기반의 파일 포맷</strong>
- 대규모 스트리밍 읽기에 최적화
- 필요한 로우를 신속하게 찾아낼 수 있음
- 스파크는 ORC파일 포맷을 별도의 옵션 지정 없이 효율적으로 사용 가능
- 파케이와의 차이?
  - 파케이는 스파크에 최적화
  - ORC는 하이브에 최적화

### ORC 파일 읽기

In [0]:
o_path='/FileStore/tables/2010-summary.orc/part_r_00000_2c4f7d96_e703_4de3_af1b_1441d172c80f_snappy.orc'
spark.read.format('orc').load(o_path).show(5)

### ORC 파일 쓰기

In [0]:
df.write.format('orc').mode('overwrite').save('/FileStore/tables/temp/o_file.orc')

## SQL 데이터베이스
- 매우 강력한 <strong>커넥터</strong> 중 하나
- SQL을 지원하는 다양한 시스템에 SQL 데이터소스 연결
  - MySQL
  - PostgreSQL
  - Oracle
  - SQLite
- 데이터 베이스는 원시 파일 형태가 아니므로 고려해야할 옵션이 많음
  - 데이터 베이스 인증 정보나 접속 관련 옵션
  - 스파크 클러스터에서 데이터 베이스 시스템에 접속 가능한지 네트워크 상태 확인

### SQL 데이터베이스 읽기
- 다른 데이터소스처럼 포맷과 옵션을 지정한 후 데이터를 읽어들임

In [0]:
driver='org.sqlite.JDBC'
s_path='dbfs:/FileStore/tables/my_sqlite.db'
url= 'jdbc:sqlite:'+s_path
tablename='flight_info'

In [0]:
%fs ls /FileStore/tables

path,name,size
dbfs:/FileStore/tables/2010-summary.orc/,2010-summary.orc/,0
dbfs:/FileStore/tables/2010-summary.parquet/,2010-summary.parquet/,0
dbfs:/FileStore/tables/2010_summary.csv,2010_summary.csv,7121
dbfs:/FileStore/tables/2010_summary.json,2010_summary.json,21353
dbfs:/FileStore/tables/all/,all/,0
dbfs:/FileStore/tables/my_sqlite.db,my_sqlite.db,11264
dbfs:/FileStore/tables/my_sqlite_copy.db,my_sqlite_copy.db,11264
dbfs:/FileStore/tables/temp/,temp/,0


In [0]:
#파일 경로 문제 참조
#https://stackoverflow.com/questions/68202341/analysisexception-path-does-not-exist-dbfs-databricks-python-lib-python3-7-si
#https://docs.databricks.com/dev-tools/databricks-utils.html
copy_path='/FileStore/tables/my_sqlite_copy.db'
new_path= '/tmp/my_sqlite_copy.db'
dbutils.fs.cp(copy_path, new_path)

In [0]:
%fs ls /tmp

path,name,size
dbfs:/tmp/hive/,hive/,0
dbfs:/tmp/my_sqlite_copy.db,my_sqlite_copy.db,11264


In [0]:
url= 'jdbc:sqlite:'+new_path

In [0]:
url

In [0]:
dbDF = spark.read.format('jdbc').option('url', url).option('dbtable',tablename).option('driver', driver).load()

---
- 그냥 이렇게하고 일반 데이터프레임처럼 쓰면 됨
  - 근데 자꾸 에러가...

#### 쿼리푸시다운
- 스파크는 DataFrame을 만들기 전에 DB자체에서 데이터를 필터링하도록 만들 수 있음

In [0]:
dbDF.filter("DEST_COUNTRY_NAME in ('Anguilla', 'Sweden')")

----
- 위 처럼 필터를 명시하면 스파크는 해당 필터에 대한 처리를 DB에 위임한다.
 - 위임하는걸 push down이라고 함

In [0]:
pushdownQuery="(select distinct(dest_country_name) from flight_info) as flight_info"
dbDF = spark.read.format('jdbc').option('url', url).option('dbtable',pushdownQuery).option('driver', driver).load()

----
- 또 위처럼 전체 쿼리를 DB에 직접 전달해서 DataFrame으로 결과를 받아야하는 경우엔 테이블명 대신 SQL 쿼리를 명시하면 됨

#### 데이터베이스 병렬로 읽기
- 스파크는 파일 크기, 파일 유형, 압축 방식에 따른 <strong>'분할 가능성'</strong>에 따라
  - 여러 파일을 읽어 하나의 파티션으로 만들거나 
  - 여러 파티션을 하나의 파일로 만드는 알고리즘을 가짐

- 파일이 가진 이런 유연성은 SQL 데이터베이스에도 존재하지만 몇 가지 수동 설정이 필요
  - 옵션 목록 중 <strong>numPartitions</strong> 옵션으로 읽기 및 쓰기용 <strong>동시 작업 수를 제한할 수 있는 최대 파티션 수</strong> 설정 가능

In [0]:
dbDF = spark.read.format('jdbc').option('url', url).option('dbtable',tablename).option('driver', driver)\
.option('numPartitions',10).load()

#### 슬라이딩 윈도우 기반의 파티셔닝
- 조건절을 기반으로 분할할 수 있는 방법

In [0]:
props ={'driver':'org.sqlite.JDBC'}
#분할 기준 컬럼
colName= 'count' 
#처음과 마지막 파티션 사이의 최솟값과 최댓값 (이 범위 밖의 모든 값은 첫 번째 또는 마지막 파티션에 속함)
lowerBound=0
upperBound=348113
#파티션 수
numPartitions=10

In [0]:
spark.read.jdbc(url, tablename, column=colName, properties=props, lowerBound=lowerBound, upperBound=upperBound,numPartitions=numPartitions )

### SQL 데이터베이스 쓰기
- URI를 지정하고 지정한 쓰기 모드에 따라 데이터를 쓰면 됨

In [0]:
newPath='jdbc:sqlite://tmp/new_qlite.db'
df.write.jdbc(newPath, tablename, properties=props )

## 텍스트 파일
- 파일의 각 줄은 DataFrame의 레코드

### 텍스트 파일 읽기

In [0]:
spark.read.format('text').load('/FileStore/tables/2010_summary.csv').selectExpr("split(value,',') as rows").show(5)

### 텍스트 파일 쓰기

In [0]:
df.select('StockCode').write.text('/FileStore/tables/tmp.txt')

In [0]:
%fs ls /FileStore/tables

path,name,size
dbfs:/FileStore/tables/2010-summary.orc/,2010-summary.orc/,0
dbfs:/FileStore/tables/2010-summary.parquet/,2010-summary.parquet/,0
dbfs:/FileStore/tables/2010_summary.csv,2010_summary.csv,7121
dbfs:/FileStore/tables/2010_summary.json,2010_summary.json,21353
dbfs:/FileStore/tables/all/,all/,0
dbfs:/FileStore/tables/my_sqlite.db,my_sqlite.db,11264
dbfs:/FileStore/tables/my_sqlite_copy.db,my_sqlite_copy.db,11264
dbfs:/FileStore/tables/temp/,temp/,0
dbfs:/FileStore/tables/tmp.txt/,tmp.txt/,0


## 고급 I/O개념