In [1]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Examples of Chapter 09, Data Source") \
    .getOrCreate()

# 9. 데이터 소스

## 9.1 데이터소스 API의 구조

#### 읽기 모드
+ Permissive: 오류 레코드의 모든 필드를 null로 설정하고 모든 오류 레코드를 \_corrup_record 라는 문자열 컬럼에 기록
+ dropMalfromed: 형식에 맞지 않는 레코드가 포함된 로우 제거
+ failFast: 형식에 맞지 않는 레코드를 만나면 즉시 종료

#### 쓰기 모드
+ append: 해당 경로에 이미 존재하는 파일 몰록에 결과 파일을 추가
+ overwrite: 이미 존재하는 모든 데이터를 완전히 덮어 씀
+ errorIfExists: 해당 경로에 데이터나 파일이 존재하는 경우 오류를 발생시키면서 쓰기 작업이 실패됨
+ ignore: 해당 경로에 데이터나 파일이 존재하는 경우 아무런 처리도 하지 않음


## 9.2 CSV

### 9.2.1 CSV 파일 쓰기

In [2]:
""" Open Session """
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

In [3]:
from pyspark.sql.types import StructField, StructType, StringType, LongType 

myManualSchema = StructType([
    StructField("DEST_COUNTRY_NAME", StringType(), True),
    StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
    StructField("count", LongType(), False)
])

csvFile = spark.read.format("csv")\
    .option("header", "true")\
    .option("mode", "permissive")\
    .schema(myManualSchema)\
    .load("../BookSamples/data/flight-data/csv/2010-summary.csv")\

In [4]:
csvFile.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



### 9.2.3 CSV 파일 쓰기

In [5]:
csvFile.write.format("csv").mode("overwrite")\
    .option("sep", "\t")\
    .save("../tmp/my-tsv-file.tsv")

In [6]:
csvFile = spark.read.format("csv")\
    .option("sep", "\t")\
    .option("header", "true")\
    .option("mode", "permissive")\
    .schema(myManualSchema)\
    .load("../tmp/my-tsv-file.tsv")

In [7]:
csvFile.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
|    United States|          Singapore|   25|
+-----------------+-------------------+-----+
only showing top 5 rows



In [8]:
!ls ../tmp/my-tsv-file.tsv

part-00000-9b156885-d714-4aa5-9950-357735831517-c000.csv  _SUCCESS


## 9.3 JSON
+ 스파크에서는 JSON 파일을 사용할 때 중로 구분된 JSON을 기본적으로 사용
+ MultiLine 옵션을 사용해 줄로 구분된 방식과 여러 줄로 구성된 방식을 선택적으로 사용할 수 있음

### 9.3.2 JSON 읽기

In [9]:
spark.read.format("json").option("mode", "FAILFAST").option("inferSchema", "true")\
    .load("../BookSamples/data/flight-data/json/2010-summary.json").show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



### 9.3.3 JSON 쓰기
+ 파티션당 하나의 파일을 만들며 전체 DataFrame을 단일 폴더에 저장
+ JSON 객체는 한 줄에 하나씩 기록됨

In [10]:
csvFile.write.format("json").mode("overwrite")\
    .option("mode", "append")\
    .save("../tmp/my-json-file.json")

## 9.4 Parquet & ORC
+ 전체 파일을 읽는 대신 개별 컬럼을 읽을 수 있으며 컬럼 기반의 압축 기능을 제공
+ 아파치 스파크와 잘 호환되기 때문에 기본 파일 포맷이 됨
+ 복합 데이터 타입을 지원
+ ORC 파일도 파케이와 유사하나 Hive에 최적화 됨

### 9.4.1 Parquet 파일 읽기
+ 스키마가 파일 자체에 내장되어 있음

In [11]:
parquetFile = spark.read.format("parquet").load("../BookSamples/data/flight-data/parquet/2010-summary.parquet/")

### 9.4.2 Parquet 파일 쓰기

In [12]:
parquetFile.write.format("parquet").mode("overwrite")\
    .save("../tmp/my-parquet-file.parquet")

## 9.6 SQL 데이터베이스

In [13]:
!pip install mysql-connector



In [14]:
""" 테스트를 위해 데이터를 DB에 추가함 """
import mysql.connector
from mysql.connector import Error
from mysql.connector import errorcode
from datetime import datetime
def insertInSummaryTable(index, dest, origin, count):
    try:
        conn = mysql.connector.connect(host='mariadb',
                                       database='sparklab',
                                       user='root',
                                       port=3306,
                                       password='9838')
        cursor = conn.cursor(prepared=True)
        
        sql_createtable_query = """
        CREATE TABLE IF NOT EXISTS summary_2010(
            DEST_COUNTRY_NAME VARCHAR(255) NOT NULL,
            ORIGIN_COUNTRY_NAME VARCHAR(255) NOT NULL,
            count INT NOT NULL)
        """
        result = cursor.execute(sql_createtable_query)
        conn.commit()        
        
        sql_insert_query = """
        INSERT INTO summary_2010 VALUES (%s,%s,%s)
        """
        insert_tuple = (dest, origin, int(count)) # np.int64를 일반적인 int로 형 변환
        result = cursor.execute(sql_insert_query, insert_tuple)
        conn.commit()
        print("Success {} into MySQL table called {}".format(insert_tuple, "summary_2010"))

    except mysql.connector.Error as error :
        conn.rollback()
        print("Failed to insert into MySQL table {}".format(error))

    finally:
        if(conn.is_connected()):
            cursor.close()
            conn.close()
            print("MySQL connection is closed")

In [15]:
df = parquetFile.select("*").toPandas()
for row in df.to_records():
    insertInSummaryTable(*row)

Success ('United States', 'Romania', 1) into MySQL table called summary_2010
MySQL connection is closed
Success ('United States', 'Ireland', 264) into MySQL table called summary_2010
MySQL connection is closed
Success ('United States', 'India', 69) into MySQL table called summary_2010
MySQL connection is closed
Success ('Egypt', 'United States', 24) into MySQL table called summary_2010
MySQL connection is closed
Success ('Equatorial Guinea', 'United States', 1) into MySQL table called summary_2010
MySQL connection is closed
Success ('United States', 'Singapore', 25) into MySQL table called summary_2010
MySQL connection is closed
Success ('United States', 'Grenada', 54) into MySQL table called summary_2010
MySQL connection is closed
Success ('Costa Rica', 'United States', 477) into MySQL table called summary_2010
MySQL connection is closed
Success ('Senegal', 'United States', 29) into MySQL table called summary_2010
MySQL connection is closed
Success ('United States', 'Marshall Islands'

In [16]:
dataframe_mysql = spark.read.format("jdbc").options(
    url="jdbc:mysql://mariadb/sparklab",
    driver = "com.mysql.jdbc.Driver",
    dbtable = "summary_2010",
    user="root",
    password="9838").load()

from pyspark.sql.functions import count, col
dataframe_mysql.select(col("DEST_COUNTRY_NAME")).distinct().show()

+--------------------+
|   DEST_COUNTRY_NAME|
+--------------------+
|            Anguilla|
|              Russia|
|            Paraguay|
|             Senegal|
|              Sweden|
|            Kiribati|
|              Guyana|
|         Philippines|
|            Malaysia|
|           Singapore|
|                Fiji|
|              Turkey|
|             Germany|
|         Afghanistan|
|              Jordan|
|               Palau|
|Turks and Caicos ...|
|              France|
|              Greece|
|              Taiwan|
+--------------------+
only showing top 20 rows



### 9.6.2 쿼리 푸시다운
+ DataFrame을 만들기 전에 데이터베이스 자체에서 데이터를 처리하도록 명령하는 것
    + 필터를 명시하면 해당 필터에 대한 처리는 데이터베이스에 위임
    + 괄호로 쿼리를 묶고 이름을 변경

In [17]:
dataframe_mysql.filter("DEST_COUNTRY_NAME in ('Anguilla', 'Sweden')").explain()

== Physical Plan ==
*(1) Scan JDBCRelation(summary_2010) [numPartitions=1] [DEST_COUNTRY_NAME#87,ORIGIN_COUNTRY_NAME#88,count#89] PushedFilters: [*In(DEST_COUNTRY_NAME, [Anguilla,Sweden])], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>


In [19]:
pushdownQuery = """(SELECT DISTINCT(DEST_COUNTRY_NAME) FROM summary_2010) AS summary_2010 """
dbDataFrame = spark.read.format("jdbc")\
    .option("url", "jdbc:mysql://mariadb/sparklab")\
    .option("dbtable", pushdownQuery)\
    .option("driver", "com.mysql.jdbc.Driver")\
    .option("user", "root").option("password", "9838")\
    .load()

In [20]:
dbDataFrame.explain() # 쿼리가 실행 계획에 그대로 반영됨

== Physical Plan ==
*(1) Scan JDBCRelation((SELECT DISTINCT(DEST_COUNTRY_NAME) FROM summary_2010) AS summary_2010) [numPartitions=1] [DEST_COUNTRY_NAME#98] PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


#### 데이터베이스 병렬로 읽기
+ numPartitions 옵션을 사용해 일기 및 쓰기용 동시 작업 수를 제한할 수 있는 최대 파티션 수를 설정

In [21]:
dbDataFrame = spark.read.format("jdbc")\
    .option("url", "jdbc:mysql://mariadb/sparklab")\
    .option("dbtable", pushdownQuery)\
    .option("driver", "com.mysql.jdbc.Driver")\
    .option("user", "root").option("password", "9838")\
    .option("numPartitions", 10)\
    .load()

In [22]:
dbDataFrame.select("DEST_COUNTRY_NAME").distinct().show()

+--------------------+
|   DEST_COUNTRY_NAME|
+--------------------+
|            Anguilla|
|              Russia|
|            Paraguay|
|             Senegal|
|              Sweden|
|            Kiribati|
|              Guyana|
|         Philippines|
|            Malaysia|
|           Singapore|
|                Fiji|
|              Turkey|
|             Germany|
|         Afghanistan|
|              Jordan|
|               Palau|
|Turks and Caicos ...|
|              France|
|              Greece|
|              Taiwan|
+--------------------+
only showing top 20 rows



+ 스파크 자체 파티션에 결과 데이터를 저장

In [24]:
props = {"driver":"com.mysql.jdbc.Driver",
         "user":"root",
         "password":"9838"}
predicates = [
    "DEST_COUNTRY_NAME = 'Sweden' OR ORIGIN_COUNTRY_NAME = 'Sweden'",
    "DEST_COUNTRY_NAME = 'Anguilla' OR ORIGIN_COUNTRY_NAME = 'Anguilla'"]

url = "jdbc:mysql://mariadb/sparklab"

spark.read.jdbc(url, "summary_2010", predicates=predicates, properties=props).show()
spark.read.jdbc(url, "summary_2010", predicates=predicates, properties=props)\
    .rdd.getNumPartitions()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|           Sweden|      United States|   65|
|    United States|             Sweden|   73|
|         Anguilla|      United States|   21|
|    United States|           Anguilla|   20|
+-----------------+-------------------+-----+



2

+ 연관성이 없는 조건절을 정의하여 중복 로우가 발생한 사례

In [25]:
predicates = [
    "DEST_COUNTRY_NAME != 'Sweden' OR ORIGIN_COUNTRY_NAME != 'Sweden'",
    "DEST_COUNTRY_NAME != 'Anguilla' OR ORIGIN_COUNTRY_NAME != 'Anguilla'"] # 조건마다 병렬처리 하고 결합, 중복열이 많이 발생할 수 있음

spark.read.jdbc(url, "summary_2010", predicates=predicates, properties=props).show()
spark.read.jdbc(url, "summary_2010", predicates=predicates, properties=props).count() # 원래 컬럼은 255개임

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|    1|
|       United States|            Ireland|  264|
|       United States|              India|   69|
|               Egypt|      United States|   24|
|   Equatorial Guinea|      United States|    1|
|       United States|          Singapore|   25|
|       United States|            Grenada|   54|
|          Costa Rica|      United States|  477|
|             Senegal|      United States|   29|
|       United States|   Marshall Islands|   44|
|              Guyana|      United States|   17|
|       United States|       Sint Maarten|   53|
|               Malta|      United States|    1|
|             Bolivia|      United States|   46|
|            Anguilla|      United States|   21|
|Turks and Caicos ...|      United States|  136|
|       United States|        Afghanistan|    2|
|Saint Vincent and..

510

#### 슬라이딩 윈도우 기반의 파티셔닝
+ 조건절을 기반으로 분할

In [26]:
colName = "count"
lowerBound = 0
upperBound = 348113
numPartitions = 10

spark.read.jdbc(url, "summary_2010", properties=props,
                lowerBound=lowerBound,
                upperBound=upperBound, # 예제 데이터베이스의 데이터 최대 개수
                numPartitions=numPartitions).count() # 255

255

### 9.6.3 SQL 데이터베이스 쓰기

+ overwrite 모드

In [27]:
csvFile.write.jdbc(url, "summary_2010", mode="overwrite", properties=props)
spark.read.jdbc(url, "summary_2010", properties=props).count()

254

+ append 모드, 레코드 증가 확인

In [28]:
csvFile.write.jdbc(url, "summary_2010", mode="append", properties=props)
spark.read.jdbc(url, "summary_2010", properties=props).count()

508

## 9.7 텍스트 파일
### 9.7.1 텍스트 파일 읽기
+ textFile 메서드는 텍스트 파일을 직접 지정 (※ 현재 메서드가 없음)
+ text는 파티션 수행 결과로 만들어진 디렉터리를 인식

In [31]:
spark.read.text("../BookSamples/data/flight-data/csv/2010-summary.csv")\
    .selectExpr("split(value, ',') as rows").show()

+--------------------+
|                rows|
+--------------------+
|[DEST_COUNTRY_NAM...|
|[United States, R...|
|[United States, I...|
|[United States, I...|
|[Egypt, United St...|
|[Equatorial Guine...|
|[United States, S...|
|[United States, G...|
|[Costa Rica, Unit...|
|[Senegal, United ...|
|[United States, M...|
|[Guyana, United S...|
|[United States, S...|
|[Malta, United St...|
|[Bolivia, United ...|
|[Anguilla, United...|
|[Turks and Caicos...|
|[United States, A...|
|[Saint Vincent an...|
|[Italy, United St...|
+--------------------+
only showing top 20 rows



+ 텍스트 파일을 쓸 때는 일반적으로 문자열 컬럼이 하나만 존재해야 함
+ 파티셔닝 작업을 수행하면 더 많은 컬럼을 저장하나 디텔터리에 폴더별로 컬럼을 저장

In [33]:
csvFile.limit(10).select("DEST_COUNTRY_NAME", "count")\
    .write.mode("overwrite").partitionBy("count").text("../BookSamples/data/tmp/five-csv-file2py.csv")

In [34]:
!ls ./data/tmp/five-csv-file2py.csv -al

total 52
drwxr-xr-x 12 root root 4096 Jun 13 00:40  .
drwxr-xr-x  3 root root 4096 Jun 13 00:40  ..
drwxr-xr-x  2 root root 4096 Jun 13 00:40 'count=1'
drwxr-xr-x  2 root root 4096 Jun 13 00:40 'count=17'
drwxr-xr-x  2 root root 4096 Jun 13 00:40 'count=24'
drwxr-xr-x  2 root root 4096 Jun 13 00:40 'count=25'
drwxr-xr-x  2 root root 4096 Jun 13 00:40 'count=264'
drwxr-xr-x  2 root root 4096 Jun 13 00:40 'count=29'
drwxr-xr-x  2 root root 4096 Jun 13 00:40 'count=44'
drwxr-xr-x  2 root root 4096 Jun 13 00:40 'count=477'
drwxr-xr-x  2 root root 4096 Jun 13 00:40 'count=54'
drwxr-xr-x  2 root root 4096 Jun 13 00:40 'count=69'
-rw-r--r--  1 root root    0 Jun 13 00:40  _SUCCESS
-rw-r--r--  1 root root    8 Jun 13 00:40  ._SUCCESS.crc


## 9.8 고급 I/O 개념
+ 추천하는 파일 포맷과 압축 방식은 파케이 파일 포맷과 GZIP 압축방식
+ 기본적으로 데이터 파티션당 하나의 파일이 작성됨
    + 아래는 5개의 파일을 생성

In [35]:
csvFile.repartition(5).write.mode("overwrite").format("csv").save("./data/tmp/multiple.csv")

In [36]:
!ls ./data/tmp/multiple.csv

part-00000-0ae2749c-63c8-496c-afb1-d901de82a63b-c000.csv
part-00001-0ae2749c-63c8-496c-afb1-d901de82a63b-c000.csv
part-00002-0ae2749c-63c8-496c-afb1-d901de82a63b-c000.csv
part-00003-0ae2749c-63c8-496c-afb1-d901de82a63b-c000.csv
part-00004-0ae2749c-63c8-496c-afb1-d901de82a63b-c000.csv
_SUCCESS


#### 파티셔닝
+ 디렉터리별로 컬럼 데이터를 인코딩해 저장
+ 전체 데이터셋을 스캔하지 않고 필요한 컬럼의 데이터만 읽기 가능

In [37]:
csvFile.limit(5).write.mode("overwrite").partitionBy("DEST_COUNTRY_NAME")\
    .save("./data/tmp/partitioned-files.parquet")

In [38]:
!ls ./data/tmp/partitioned-files.parquet/

'DEST_COUNTRY_NAME=Egypt'	       'DEST_COUNTRY_NAME=United States'
'DEST_COUNTRY_NAME=Equatorial Guinea'   _SUCCESS


#### 버켓팅
+ 동일한 버킷 ID를 가진 데이터가 하나의 물리적 파티션에 모두 모여있기 때문에 데이터를 읽을 때 셔플을 피할 수 있음
+ 스파크 관리 테이블에서만 사용 가능

In [39]:
""" 버켓 단위로 데이터를 모아 일정 수의 파일로 저장 """
numberBuckets = 10
columnToBucketBy = "count"

csvFile.write.format("parquet").mode("overwrite").bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")

#### 기타
+ 복합데이터(리스트 등)는 파케이, ORC 타입에서 지원
+ 스파크는 작은 용량의 파일이 많을 때 취약하나 반대의 상황도 좋지 않음
+ maxRecordsPerFile 옵션을 통해 파일당 레코드 수를 지정할 수 있음