<a href="https://colab.research.google.com/github/da-head0/Spark-study/blob/main/Spark_Definitive_Guide_2_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop2.7.tgz
!tar -xvf spark-3.2.1-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install pyspark

In [24]:
!git clone https://github.com/databricks/Spark-The-Definitive-Guide.git

Cloning into 'Spark-The-Definitive-Guide'...
remote: Enumerating objects: 2035, done.[K
remote: Total 2035 (delta 0), reused 0 (delta 0), pack-reused 2035[K
Receiving objects: 100% (2035/2035), 523.88 MiB | 22.32 MiB/s, done.
Resolving deltas: 100% (305/305), done.
Checking out files: 100% (1738/1738), done.


In [25]:
%cd Spark-The-Definitive-Guide

/content/Spark-The-Definitive-Guide/Spark-The-Definitive-Guide


In [26]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop2.7"

In [27]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

# 조인
- 거의 모든 스파크 작업에 필수적으로 사용됨
- 스파크는 서로 다른 데이터를 조합할 수 있으므로 여러 데이터소스를 활용해 조인을 실행 
- 메모리 부족 상황을 회피하는 방법, 이전에 풀지 못했던 문제를 해결하는 데 도움이 된다.
- 가장 많이 사용하는 조인 표현식은 왼쪽과 오른쪽 데이터셋에 지정된 키가 동일한지 비교하는 equal join -> 키가 일치하면 왼쪽과 오른쪽 데이터셋을 결합
- 일치하는 키가 없는 로우는 조인에 포함 X
- 복합 데이터 타입을 조인에 사용할 수 있음. 배열 타입의 키게 조인할 키가 존재하는지 확인해 조인을 수행할 수 있음.

In [28]:
person = spark.createDataFrame([
    (0, "Bill Chambers", 0, [100]),
    (1, "Matei Zaharia", 1, [500, 250, 100]),
    (2, "Michael Armbrust", 1, [250, 100])])\
  .toDF("id", "name", "graduate_program", "spark_status")
graduateProgram = spark.createDataFrame([
    (0, "Masters", "School of Information", "UC Berkeley"),
    (2, "Masters", "EECS", "UC Berkeley"),
    (1, "Ph.D.", "EECS", "UC Berkeley")])\
  .toDF("id", "degree", "department", "school")
sparkStatus = spark.createDataFrame([
    (500, "Vice President"),
    (250, "PMC Member"),
    (100, "Contributor")])\
  .toDF("id", "status")

In [29]:
# 생성한 데이터셋을 테이블로 등록
person.createOrReplaceTempView("person")
graduateProgram.createOrReplaceTempView("graduateProgram")
sparkStatus.createOrReplaceTempView("sparkStatus")

In [30]:
# 내부 조인 - DataFrame이나 테이블에 존재하는 키를 평가하고 true로 평가되는 로우만 결합
joinExpression = person["graduate_program"] == graduateProgram['id']

In [31]:
person.join(graduateProgram, joinExpression).show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [32]:
# 조인 타입 명확하게 지정
joinType = "inner"

person.join(graduateProgram, joinExpression, joinType).show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [33]:
# 외부 조인 - DataFrame이나 테이블에 존재하는 키를 평가하여 참이나 거짓으로 평가한 로우를 포함 + 조인. 일치하는 로우가 없다면 해당 위치에 null 삽입
joinType = "outer"

person.join(graduateProgram, joinExpression, joinType).show()

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|null|            null|            null|           null|  2|Masters|                EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



- 스파크 버전 3점대에서는 앞 칼럼 순으로 정렬이 되는듯?

In [34]:
# 왼쪽 외부 조인 - 왼쪽 DataFrame의 모든 로우와 왼쪽 DataFrame과 일치하는 오른쪽 DataFrame의 로우를 함께 포함
joinType = "left_outer"

person.join(graduateProgram, joinExpression, joinType).show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [35]:
# 오른쪽 위부 조인
joinType = "right_outer"

person.join(graduateProgram, joinExpression, joinType).show()

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|null|            null|            null|           null|  2|Masters|                EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [36]:
person.show()

+---+----------------+----------------+---------------+
| id|            name|graduate_program|   spark_status|
+---+----------------+----------------+---------------+
|  0|   Bill Chambers|               0|          [100]|
|  1|   Matei Zaharia|               1|[500, 250, 100]|
|  2|Michael Armbrust|               1|     [250, 100]|
+---+----------------+----------------+---------------+



In [37]:
graduateProgram.show()

+---+-------+--------------------+-----------+
| id| degree|          department|     school|
+---+-------+--------------------+-----------+
|  0|Masters|School of Informa...|UC Berkeley|
|  2|Masters|                EECS|UC Berkeley|
|  1|  Ph.D.|                EECS|UC Berkeley|
+---+-------+--------------------+-----------+



In [38]:
# 왼쪽 안티 조인 - 오른쪽 데이터프레임의 어떤 값도 포함하지 않고 두 번째 데이터프레임은 값이 존재하는지 확인하기 위해 값만 비교하는 용도
joinType = "left_anti"

person.join(graduateProgram, joinExpression, joinType).show()

+---+----+----------------+------------+
| id|name|graduate_program|spark_status|
+---+----+----------------+------------+
+---+----+----------------+------------+



### 조인 사용 시 문제점
- 복합 데이터 타입의 조인

In [39]:
# 동일한 이름을 가진 2개의 키를 사용한다면 
# 1. 불리언 형태의 조인 표현식을 문자열이나 시퀀스 형태로 바꿈 -> 조인을 할때 두 컬럼 중 하나가 자동으로 제거됨
person.join(gradProgramDupe, "graduate_program").select("graduate_program").show()

NameError: ignored

In [None]:
# 2. 조인 후 컬럼 제거 -> 원본 DataFrame을 사용해 컬럼 참조
person.join(gradProgramDupe, joinExpr).drop(person.col("graduate_program")).select("graduate_program").show()

val joinExpr = person.col("graduate_program") === graduateProgran.col("id")
person.join(graduateProgram, joinExpr).drop(gradateProgram.col("id")).show()
# col 메서드 사용에 주목 -> 컬럼 고유의 ID로 해당 컬럼을 암시적으로 지정

In [None]:
# 3. 조인 전 컬럼명 변경
val gradProgram3 = graduateProgram.withColumnRenamed("id", "grad_id")
val joinExpr = person.col("graduate_program") === gradProgram3.col("grad_id")
person.join(gradProgram3, joinExpr).show()

# 9. 데이터소스

In [None]:
# 데이터 읽기의 핵심 구조
DataFrameReader.format(...).option("key", "value").schema(...).load()

In [None]:
# 데이터 읽기의 기초
DataFramereader -> SparkSession의 read 속성으로 접근

spark.read.format("csv")
  .option("mode", "FAILFAST") # 형식에 맞지 않는 레코드를 만나면 즉시 종료
  .option("inferSchema", "true")
  .option("path", "path/to/file")
  .schema(someSchema)
  .load()

### csv file
- 운영 환경에서는 어떤 내용이 들어있는지, 어떠한 구조로 되어 있는지 등 다양한 전제를 만들어낼 수 없음
- 스파크는 DataFrame 정의 시점이 아닌 잡 실행 시점에만 오류 발생

In [None]:
csvFile = spark.read.format("csv")\
  .option("header", "true")\
  .option("mode", "FAILFAST")\
  .option("inferSchema", "true")\
  .load("data/flight-data/csv/2010-summary.csv")

In [None]:
csvFile.write.format("csv").mode("overwrite").option("sep", "\t")\
  .save("/tmp/my-tsv-file.tsv")

### 파케이 파일
- 컬럼 기반의 데이터 저장 방식, 분석 워크로드에 최적화되어 있음. 저장소 공간을 절약할 수 있고 전체 파일을 읽는 대신 개별 컬럼을 읽을 수 있으며, 컬럼 기반의 압축 기능 제공.
- 아파치 스파크와 잘 호환되는 스파크의 기본 파일 포맷. 
- 읽기 연산 시 json이나 csv보다 훨씬 효율적으로 동작하므로 장기 저장용 데이터는 파케이 포맷으로 저장하는 것이 좋음
- 복합 데이터타입을 지원해 컬럼이 배열, 맵, 구조체 타입이라 해도 문제없이 읽고 쓸 수 있음

### SQLite
- 로컬 머신에 간단히 설치할 수 있음 (분산 환경이라면 다른 데이터베이스를 사용해야 함)
- 데이터베이스 접속 시 사용자 인증이 필요 없음

In [None]:
# 데이터베이스 병렬로 읽기
dbDataFrame = spark.read.format("jdbc")\
  .option("url", url).option("dbtable", tablename).option("driver",  driver)\
  .option("numPartitions", 10).load()