## 5. 구조적 API 기본 연산
- DataFrame은 row 타입의 레코드와 각 레코드를 수행할 연산 표현식을 나타내는 여러 컬럼으로 구성
- 스키마는 각 컬럼명과 데이터 타입 정의
- DataFrame의 파티셔닝은 DataFrame이나 Dataset이 클러스터에서 물리적으로 배치되는 형태를 정의
- 파티셔닝 스키마는 파티션을 배치하는 방법을 정의


In [1]:
# 데이터 프레임 생성 방식
df = spark.read.format('json').load('file:///home/ubuntu/Spark-The-Definitive-Guide/data/flight-data/json/2015-summary.json')

In [2]:
# 스키마 확인 과정
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



### 5.1 스키마
- 스키마는 DataFrame의 컬럼명과 데이터 타입을 정의함. 

In [3]:
spark.read.format('json').load('file:///home/ubuntu/Spark-The-Definitive-Guide/data/flight-data/json/2015-summary.json').schema

StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,true)))

- 스키마는 여러 개의 StructField 타입 필드로 구성된 StructType 객체
- 스키마는 복합 데이터 타입인 StructType을 가질 수 있음. 

In [4]:
from pyspark.sql.types import StructField, StructType, StringType, LongType

myManualSchema = StructType([
    StructField('DEST_COUNTRY_NAME', StringType(), True),
    StructField('ORIGIN_COUNTRY_NAME', StringType(), True),
    StructField('count', LongType(), False, metadata = {'hello': 'world'})
])
df = spark.read.format('json').schema(myManualSchema).load('file:///home/ubuntu/Spark-The-Definitive-Guide/data/flight-data/json/2015-summary.json')

In [5]:
df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

### 5.2 컬럼과 표현식
- Pandas와 유사한 표현식을 가짐

#### 5.2.1 컬럼

In [6]:
from pyspark.sql.functions import col, column

print(col('someColumnName'))
print(column('someColumnName'))

Column<b'someColumnName'>
Column<b'someColumnName'>


- 컬럼은 컬럼명을 카탈로그에 저장된 정보와 비교하기 전꺼지 미확인 상태로 남음. -> 분석기가 동작하는 단계에서 컬럼과 테이블을 분석함. 

#### 5.2.2 표현식
- DataFrame 레코드의 여러 값에 대한 트랜스포메이션 집합을 의미함
- expr('somecolumn') = col('somecolumn')
- expr('somecolumn - 5') = col('somecolumn') - 5 = expr('somecolumn) - 5
- 위의 예제가 같은 이유는 스파크가 연산 순서를 지정하는 논리적 트리로 컴파일하기 때문임


In [7]:
from pyspark.sql.functions import expr

expr("(((someCol + 5) * 200) - 6) < otherCol")

Column<b'((((someCol + 5) * 200) - 6) < otherCol)'>

- SQL의 Select 구문에 이전 표현식을 사용해도 잘 동작하고 동일한 결과 반환
- 실행 시점에 동일한 논리 트리로 컴파일되기 때문

In [8]:
# DataFrame 컬럼에 접근하기
spark.read.format('json').load('file:///home/ubuntu/Spark-The-Definitive-Guide/data/flight-data/json/2015-summary.json').columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

### 5.3 레코드와 로우

In [9]:
df.first()

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

#### 5.3.1 로우 생성하기

In [10]:
from pyspark.sql import Row

myRow = Row('Hello', None, 1, False)

In [11]:
# 로우 데이터에 접근하는 방식
print(myRow[0])
print(myRow[2])

Hello
1


### 5.4 DataFrame의 트랜스포메이션
- 로우나 컬럼 추가
- 로우나 컬럼 제거
- 로우를 컬럼으로 변환하거나, 그 반대로 변환
- 컬럼값을 기준으로 로우 순서 변경

In [12]:
# DataFrame 생성 방식
# 임시 뷰 등록 방식
df = spark.read.format('json').load('file:///home/ubuntu/Spark-The-Definitive-Guide/data/flight-data/json/2015-summary.json')
df.createOrReplaceTempView('dfTable')

In [13]:
# row 객체를 가진 seq 타입을 직접 변환해 DataFrame 생성할 수 있음
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType

mySchema = StructType([
    StructField('Some', StringType(), True),
    StructField('col', StringType(), True),
    StructField('names', LongType(), False)
])

myRow = Row('Hello', None, 1)
myDf = spark.createDataFrame([myRow], mySchema)
myDf.show()

+-----+----+-----+
| Some| col|names|
+-----+----+-----+
|Hello|null|    1|
+-----+----+-----+



In [14]:
# select와 selectExpr을 사용하면 SQL을 실행한 것처럼 DataFrame에 적용가능
df.select('DEST_COUNTRY_NAME').show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [15]:
# 여러 컬럼 선택하는 방법
df.select('DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME').show(2)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows



In [16]:
# 여러 방식으로 섞어서 사용 가능
df.select(expr('DEST_COUNTRY_NAME'), col('DEST_COUNTRY_NAME'), column('DEST_COUNTRY_NAME')).show(2)

+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+-----------------+
|    United States|    United States|    United States|
|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+
only showing top 2 rows



- 컬럼 객체와 문자열을 함께 쓰면 에러 생김.
- expr은 가장 유연한 참조 방식

In [17]:
df.select(expr('DEST_COUNTRY_NAME as destination')).show(2)

+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows



In [18]:
# 컬럼명을 바꾸는 방식은 두가지로 할 수 있음
df.select(expr('DEST_COUNTRY_NAME as destination').alias('DEST_COUNTRY_NAME')).show(2)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [19]:
# selectExpr로 한번에 할 수 있음
df.selectExpr('DEST_COUNTRY_NAME as newCOlumnName', 'DEST_COUNTRY_NAME').show(2)

+-------------+-----------------+
|newCOlumnName|DEST_COUNTRY_NAME|
+-------------+-----------------+
|United States|    United States|
|United States|    United States|
+-------------+-----------------+
only showing top 2 rows



- selectExpr을 통해 복잡한 표현식을 간단하게 만들 수 있음
- 비집계형 SQL 구문도 지정할 수 있음
- 단 컬럼을 식별해야함


In [20]:
df.selectExpr("*", '(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry' ).show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



In [21]:
# 집계함수 지정
df.selectExpr('avg(count)', 'count(distinct(DEST_COUNTRY_NAME))').show(2)

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+



#### 5.4.3 스파크 데이터 타입으로 변환
- 새로운 컬럼이 아닌 명시적인 값을 스파크에 전달해야될 경우, 리터럴을 사용함
- 리터럴은 프로그래밍 언어의 리터럴값을 스파크가 이해할 수 있는 값으로 변환
- SQL에서 리터럴은 상숫값을 의미함

In [22]:
from pyspark.sql.functions import lit
df.select(expr("*"), lit(1).alias('one')).show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|one|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



#### 5.4.4 컬럼 추가
- withColumn 매서드를 활용

In [23]:
df.withColumn('numberOne', lit(1)).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [24]:
# 출발지와 도착지가 같은지 여부를 불리언 타입으로 표현하는 예제
df.withColumn('withCountry', expr('ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME')).show(2)

+-----------------+-------------------+-----+-----------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withCountry|
+-----------------+-------------------+-----+-----------+
|    United States|            Romania|   15|      false|
|    United States|            Croatia|    1|      false|
+-----------------+-------------------+-----+-----------+
only showing top 2 rows



In [25]:
# withColumn으로 컬럼명 바꾸는 예제
df.withColumn('Destination', expr('DEST_COUNTRY_NAME')).columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count', 'Destination']

In [26]:
# withColumnRenamed로도 변경 가능
df.withColumnRenamed('DEST_COUNTRY_NAME', 'DEST').columns

['DEST', 'ORIGIN_COUNTRY_NAME', 'count']

In [27]:
# 예약 문자와 키워드는 컬럼명에 사용할 수 없음(공백이나 하이픈(-))
# 예약 문자를 컬럼에 사용하려면 '을 통해 이스케이핑 해야함
# 만들때는 문제 없지만, 불러올때는 활용해야함
# 표현식 대신에 문자열을 사용해 명시적으로 컬럼을 참조하면 리터럴로 해석되기 때문에 예약 문자가 포함된 컬럼을 참조할 수 있음

dfwithLongColName = df.withColumn('This long column-name',
                                 expr('origin_country_name'))
print(dfwithLongColName.selectExpr('`this long column-name`',
                            '`this long column-name` as `new col`').show(2))
print(dfwithLongColName.selectExpr('`this long column-name`',
                            '`this long column-name` as `new col`').show(2))

+---------------------+-------+
|this long column-name|new col|
+---------------------+-------+
|              Romania|Romania|
|              Croatia|Croatia|
+---------------------+-------+
only showing top 2 rows

None
+---------------------+-------+
|this long column-name|new col|
+---------------------+-------+
|              Romania|Romania|
|              Croatia|Croatia|
+---------------------+-------+
only showing top 2 rows

None


- 스파크는 기본적으로 대소문자 구분하지 않음.

In [28]:
# 컬럼 제거
df.drop('ORIGIN_COUNTRY_NAME').columns

['DEST_COUNTRY_NAME', 'count']

In [29]:
# 한번에 제거
dfwithLongColName.drop('ORIGIN_COUNTRY_NAME', 'DEST_COUNTRY_NAME')

DataFrame[count: bigint, This long column-name: string]

In [30]:
# 데이터 타입 변경
df.withColumn('count2', col('count').cast('string'))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint, count2: string]

In [31]:
# filter와 where는 비슷한 역할을 함.
print(df.filter(col('count') < 2).show(2))
print(df.where('count < 2').show(2))

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

None
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

None


In [32]:
# 여러개 필터링 해야되는 경우 and로 차례대로 연결하고, 판단은 스파크에 맡긴다. 
df.where(col('count') < 2).where(col('ORIGIN_COUNTRY_NAME')!= 'Croatia').show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|          Singapore|    1|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [33]:
# 고유한 로우는 distinct를 사용함
# 하나 이상의 로우를 사용해야함
df.select('ORIGIN_COUNTRY_NAME', 'DEST_COUNTRY_NAME').distinct().count()

256

In [34]:
df.select('ORIGIN_COUNTRY_NAME').distinct().count()

125

In [35]:
# 무작위 샘플 만들기
seed = 5
withReplacement = False
fraction = 0.5
df.sample(withReplacement, fraction, seed).count()

126

In [36]:
# 임의로 분할하기
# seed가 꼭 필요함
dataFrames = df.randomSplit([0.25, 0.75], seed)
dataFrames[0].count() > dataFrames[1].count()

False

In [37]:
# DataFrame은 불변성을 가지기 때문에 레코드를 추가하려면 원본과 새로운 DataFrame을 통합해야함. 

from pyspark.sql import Row


schema = df.schema
newRows = [
    Row("New Country", "Other Country", 5),
    Row("New New Country", "Other Country", 1)
]

parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)

In [38]:
newDF.show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|      New Country|      Other Country|    5|
|  New New Country|      Other Country|    1|
+-----------------+-------------------+-----+



In [39]:
df.union(newDF).where('count = 1').where(col('ORIGIN_COUNTRY_NAME') != 'United States').show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|    United States|          Gibraltar|    1|
|    United States|             Cyprus|    1|
|    United States|            Estonia|    1|
|    United States|          Lithuania|    1|
|    United States|           Bulgaria|    1|
|    United States|            Georgia|    1|
|    United States|            Bahrain|    1|
|    United States|   Papua New Guinea|    1|
|    United States|         Montenegro|    1|
|    United States|            Namibia|    1|
|  New New Country|      Other Country|    1|
+-----------------+-------------------+-----+



In [40]:
# 로우 정렬하기
print(df.sort('count').show(5))
print(df.orderBy('count', 'DEST_COUNTRY_NAME').show(5))
print(df.orderBy(col('count').desc(), col('DEST_COUNTRY_NAME').asc()).show(5))

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|               Malta|      United States|    1|
|Saint Vincent and...|      United States|    1|
|       United States|            Croatia|    1|
|       United States|          Gibraltar|    1|
|       United States|          Singapore|    1|
+--------------------+-------------------+-----+
only showing top 5 rows

None
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|     Burkina Faso|      United States|    1|
|    Cote d'Ivoire|      United States|    1|
|           Cyprus|      United States|    1|
|         Djibouti|      United States|    1|
|        Indonesia|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows

None
+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME

In [43]:
# 파티션별 정렬 수행 -> 성능 최적화
spark.read.format('json').load('file:///home/ubuntu/Spark-The-Definitive-Guide/data/flight-data/json/*-summary.json').sortWithinPartitions('count')

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]

In [44]:
# 로우 수 제한
df.limit(5).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+



In [50]:
# 자주 필터링하는 컬럼 기준으로 데이터 분할
print(df.rdd.getNumPartitions())
print(df.repartition(5))
print(df.repartition(col('DEST_COUNTRY_NAME')))
print(df.repartition(5, col('DEST_COUNTRY_NAME')))

1
DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]
DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]
DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]


In [54]:
# 전체 데이터를 셔플하지 않고 파티션을 병합하는 경우
print(df.repartition(5, col('DEST_COUNTRY_NAME')).coalesce(2))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: bigint]


In [55]:
# 스파크는 드라이버에서 클러스터 상태 정보 유지
collectDF = df.limit(10)
collectDF.take(5)
collectDF.show()
collectDF.show(5)
collectDF.show(5, False)
collectDF.collect()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Grenada', count=62),
 Row(DEST_COUNTRY_NAME='Costa Rica', ORIGIN_COUNTRY_NAME='United States', count=588),
 Row(DEST_COUNTRY_NAME='Senegal', ORIGIN_COUNTRY_NAME='United States', count=40),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

In [57]:
# 반복 처리를 위한 메서드
collectDF.toLocalIterator()

<itertools.chain at 0x7f0d7c84f0b8>