# Chapter 3 : 데이터프레임

데이터프레임은 관계형 데이터베이스의 테이블에서 칼럼 이름으로 구성된 변경 불가능한 분산 데이터 컬렉션이다.|

In [1]:
spark

In [2]:
sc

In [15]:
# RDD를 생성하는 코드
stringJSONRDD = sc.parallelize((""" 
  { "id": "123", "name": "Katie", "age": 19, "eyeColor": "brown"}""",
   """{ "id": "234", "name": "Michael", "age": 22, "eyeColor": "green"}""", 
   """{ "id": "345", "name": "Simone", "age": 23, "eyeColor": "blue" }""")
)

In [16]:
# RDD를 생성했으니, 이것을 spark.read.json 함수를 사용해서 RDD로 바꿀 것이다.
swimmersJSON = spark.read.json(stringJSONRDD)

In [17]:
# Create temporary table
swimmersJSON.createOrReplaceTempView("swimmersJSON")

In [18]:
# DataFrame API
swimmersJSON.show()

+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 19|   brown|123|  Katie|
| 22|   green|234|Michael|
| 23|    blue|345| Simone|
+---+--------+---+-------+



In [13]:
type(swimmersJSON)

pyspark.sql.dataframe.DataFrame

## RDD로 연동하기

기존에 있는 RDD를 데이터프레임(또는 Dataset[T])으로 변경하는 두 가지 방법이 있다.

1. reflection을 사용해 스키마을 추측: 더욱 자세한 코드를 작성하게 할 수 있다.

2. 스키마를 직접적으로 코드에 명시: 열과 데이터 타입이 런타임에 드러날 때 데이터프레임의 구조를 만들도록 한다.

### reflection을 이용한 스키마 추측하기

데이터프레임을 빌드하고 쿼리를 수행하는 과정에서 데이터프레임에 대한 스키마는 자동으로 정의된다. 최초에 행 객체는 key-value 리스트가 행 클래스의 **kwargs로 전달되면서 구성된다. 그 후, 스파크 SQL은 이 행 객체의 RDD를 데이터프레임으로 변경한다. 키는 column이고 데이터 타입은 데이터 샘플링을 통해 추측된다.

In [23]:
# Print the schema
swimmersJSON.printSchema()

root
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



만약 id가 string이 아니라 long타입이기를 의도한 것이라서 스키마를 명시하고 싶다면 스키마를 프로그래밍하듯이 명시해야 한다

[참고사항]

아래 코드에서 id로 들어가는 데이터들에 따옴표를 씌운다음 long으로 명시하면 스키마 자체는 명시한대로 long이 되지만 실제 DataFrame으로 만들 때 '123'이 long 타입으로 변환되지 않아서 에러가 난다.

다시말해 string형식으로 적고 스키마를 long으로 했을 때 강제형변환이 되지는 않는다는 것이다. 심심하면 123,234,345에 따옴표를 씌워보자. 밑에서 swimmers.count()를 할 때 런타임에러가 날 것이다. 

에러가 저 때 나는 이유는 스파크특성상 count()라는 액션을 취하기 전까지 데이터가 실제로 만들어지지 않기 때문이다. 이 말이 이해가 되지 않는다면 1장 2장을 다시 복습할 것

In [19]:
from pyspark.sql.types import *

# Generate our own CSV data 
stringCSVRDD = sc.parallelize(
    [(123, 'Katie', 19, 'brown'), 
     (234, 'Michael', 22, 'green'), 
     (345, 'Simone', 23, 'blue')])

# The schema is encoded in a string, 
# using StructType we define the schema using various pyspark.sql.types
schemaString = "id name age eyeColor"
schema = StructType([
    StructField("id", LongType(), True),    
    StructField("name", StringType(), True),
    StructField("age", LongType(), True),
    StructField("eyeColor", StringType(), True)
])

# Apply the schema to the RDD and Create DataFrame
swimmers = spark.createDataFrame(stringCSVRDD, schema)

# Creates a temporary view using the DataFrame
swimmers.createOrReplaceTempView("swimmers")

In [20]:
swimmers.show()

+---+-------+---+--------+
| id|   name|age|eyeColor|
+---+-------+---+--------+
|123|  Katie| 19|   brown|
|234|Michael| 22|   green|
|345| Simone| 23|    blue|
+---+-------+---+--------+



In [21]:
# Print the schema
# Notice that we have redefined id as Long (instead of String)
swimmers.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)



In [22]:
print(type(swimmers))

<class 'pyspark.sql.dataframe.DataFrame'>


## 데이터프레임 API로 Query하기

collect(), show(), take() 함수를 이용해 데이터프레임 내의 데이터를 볼 수 있다. show()와 take() 함수는 리턴되는 행의 개수를 제한하는 옵션을 포함한다.

In [26]:
swimmers.collect()

[Row(id=123, name='Katie', age=19, eyeColor='brown'),
 Row(id=234, name='Michael', age=22, eyeColor='green'),
 Row(id=345, name='Simone', age=23, eyeColor='blue')]

In [27]:
# Show the values 
swimmers.show()

+---+-------+---+--------+
| id|   name|age|eyeColor|
+---+-------+---+--------+
|123|  Katie| 19|   brown|
|234|Michael| 22|   green|
|345| Simone| 23|    blue|
+---+-------+---+--------+



In [28]:
swimmers.show(2)

+---+-------+---+--------+
| id|   name|age|eyeColor|
+---+-------+---+--------+
|123|  Katie| 19|   brown|
|234|Michael| 22|   green|
+---+-------+---+--------+
only showing top 2 rows



In [32]:
swimmers.take(1)

[Row(id=123, name='Katie', age=19, eyeColor='brown')]

In [25]:
# Using Databricks `display` command to view the data easier
display(swimmers)

DataFrame[id: bigint, name: string, age: bigint, eyeColor: string]

### filter문 사용하기

나이가 22살인 행에서 id와 age추출하기

In [55]:
# Get the id, age where age = 22
swimmers.select("id", "age").filter("age = 22").show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [62]:
# Query id and age for swimmers with age = 22 via DataFrame API in another way
swimmers.select(swimmers.id, swimmers.age).filter(swimmers.age == 22).show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



눈 색깔이 b로 시작하는 수영선수의 이름 얻기

In [56]:
# Get the name, eyeColor where eyeColor like 'b%'
swimmers.select("name", "eyeColor").filter("eyeColor like 'b%'").show()

+------+--------+
|  name|eyeColor|
+------+--------+
| Katie|   brown|
|Simone|    blue|
+------+--------+



## SQL로 쿼리하기

데이터프레임 API로 진행했던 연산을 SQL로 해보자

In [33]:
# Execute SQL Query and return the data
spark.sql("select * from swimmers").show()

AnalysisException: 'java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;'

In [58]:
# Get count of rows in SQL
spark.sql("select count(1) from swimmers").show()

+--------+
|count(1)|
+--------+
|       3|
+--------+



In [61]:
# Query id and age for swimmers with age = 22 in SQL
spark.sql("select id, age from swimmers where age = 22").show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [63]:
# Query name and eye color for swimmers with eye color starting with the letter 'b'
spark.sql("select name, eyeColor from swimmers where eyeColor like 'b%'").show()

+------+--------+
|  name|eyeColor|
+------+--------+
| Katie|   brown|
|Simone|    blue|
+------+--------+



## 데이터프레임 시나리오 : 비행 기록 성능

In [66]:
# Set File Paths
flightPerfFilePath = "flight-data/departuredelays.csv"
airportsFilePath = "flight-data/airport-codes-na.txt"

# Obtain Airports dataset
airports = spark.read.csv(airportsFilePath, header='true', inferSchema='true', sep='\t')
airports.createOrReplaceTempView("airports")

# Obtain Departure Delays dataset
flightPerf = spark.read.csv(flightPerfFilePath, header='true')
flightPerf.createOrReplaceTempView("FlightPerformance")

# Cache the Departure Delays dataset 
flightPerf.cache()

DataFrame[date: string, delay: string, distance: string, origin: string, destination: string]

In [75]:
# 도시와 출발지 코드에 다라 비행 지연 합을 쿼리하기 (워싱턴 주에 대해)
spark.sql("select a.City, f.origin, sum(f.delay) as Delays from FlightPerformance f join airports a on a.IATA = f.origin where a.State = 'WA' group by a.City, f.origin order by sum(f.delay) desc").show()

+-------+------+--------+
|   City|origin|  Delays|
+-------+------+--------+
|Seattle|   SEA|159086.0|
|Spokane|   GEG| 12404.0|
|  Pasco|   PSC|   949.0|
+-------+------+--------+



In [76]:
# 모든 주에 대해 계속 진행하기
spark.sql("select a.State, sum(f.delay) as Delays from FlightPerformance f join airports a on a.IATA = f.origin where a.Country = 'USA' group by a.State").show()

+-----+---------+
|State|   Delays|
+-----+---------+
|   SC|  80666.0|
|   AZ| 401793.0|
|   LA| 199136.0|
|   MN| 256811.0|
|   NJ| 452791.0|
|   OR| 109333.0|
|   VA|  98016.0|
| null| 397237.0|
|   RI|  30760.0|
|   WY|  15365.0|
|   KY|  61156.0|
|   NH|  20474.0|
|   MI| 366486.0|
|   NV| 474208.0|
|   WI| 152311.0|
|   ID|  22932.0|
|   CA|1891919.0|
|   CT|  54662.0|
|   NE|  59376.0|
|   MT|  19271.0|
+-----+---------+
only showing top 20 rows



In [88]:
# 쿼리 결과에 대한 모든 결과 보기
queryResult = spark.sql("select a.State, sum(f.delay) as Delays from FlightPerformance f join airports a on a.IATA = f.origin where a.Country = 'USA' group by a.State").collect()
queryResult

[Row(State='SC', Delays=80666.0),
 Row(State='AZ', Delays=401793.0),
 Row(State='LA', Delays=199136.0),
 Row(State='MN', Delays=256811.0),
 Row(State='NJ', Delays=452791.0),
 Row(State='OR', Delays=109333.0),
 Row(State='VA', Delays=98016.0),
 Row(State=None, Delays=397237.0),
 Row(State='RI', Delays=30760.0),
 Row(State='WY', Delays=15365.0),
 Row(State='KY', Delays=61156.0),
 Row(State='NH', Delays=20474.0),
 Row(State='MI', Delays=366486.0),
 Row(State='NV', Delays=474208.0),
 Row(State='WI', Delays=152311.0),
 Row(State='ID', Delays=22932.0),
 Row(State='CA', Delays=1891919.0),
 Row(State='CT', Delays=54662.0),
 Row(State='NE', Delays=59376.0),
 Row(State='MT', Delays=19271.0),
 Row(State='NC', Delays=394256.0),
 Row(State='VT', Delays=14755.0),
 Row(State='MD', Delays=362845.0),
 Row(State='MO', Delays=344538.0),
 Row(State='IL', Delays=1630792.0),
 Row(State='ME', Delays=15214.0),
 Row(State='ND', Delays=27402.0),
 Row(State='WA', Delays=172439.0),
 Row(State='MS', Delays=33827.0