## DataFrame

데이터프레임은 관계형 데이터베이스의 테이블에서 칼럼으로 구성된 변경 불가능한 분산 데이터 컬랙션이다.  
아파치 스파크 1.0에서 스키마 RDD가 소개됐고 스키마 RDD가 아파치 스파크 1.3버전에서 데이터프레임이라는 이름으로 바뀜.  
파이썬 Pandas의 데이터프레임 또는 R의 데이터프레임에 익숙한 사람에게  
스파크 데이터 프레임은 구조적인 데이터로 작업을 쉽게 해준다.

#### 파이썬 RDD의 약점
스파크 API를 파이썬에서 실행하는 것은 자바 JVM과 py4j 사이의 커뮤니케이션 오버헤드가 발생해서 느려질 수 있다.

스파크 1.x 버전에서의 데이터프레임에 익숙하다면  
스파크 2.x 버전에서는 SQLContext 대신 SparkSession을 사용한다는 걸 알 수 있다.  
HiveContext SQLContext StreamingContext SparkContext 등이 SparkSession 으로 모두 통합되었다.(스파크2.x)  

In [1]:
import pyspark
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession 
conf = pyspark.SparkConf() \
                .setAppName('appName') \
                .setMaster('local[2]')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [55]:
sc.stop()

In [12]:
# 경로의 해당 파일 내용 복붙
stringJSONRDD = sc.parallelize(("""
  { "id": "123",
    "name": "Katie",
    "age": 19,
    "eyeColor": "brown"
  }""",
   """{
    "id": "234",
    "name": "Michael",
    "age": 22,
    "eyeColor": "green"
  }""", 
  """{
    "id": "345",
    "name": "Simone",
    "age": 23,
    "eyeColor": "blue"
  }
"""))

In [15]:
swimmersJSON = spark.read.json(stringJSONRDD)

In [14]:
print(type(swimmersJSON))

<class 'pyspark.sql.dataframe.DataFrame'>


In [16]:
dir(swimmersJSON)

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattr__',
 '__getattribute__',
 '__getitem__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_collectAsArrow',
 '_jcols',
 '_jdf',
 '_jmap',
 '_jseq',
 '_lazy_rdd',
 '_repr_html_',
 '_sc',
 '_schema',
 '_sort_cols',
 '_support_repr_html',
 'agg',
 'alias',
 'approxQuantile',
 'cache',
 'checkpoint',
 'coalesce',
 'colRegex',
 'collect',
 'columns',
 'corr',
 'count',
 'cov',
 'createGlobalTempView',
 'createOrReplaceGlobalTempView',
 'createOrReplaceTempView',
 'createTempView',
 'crossJoin',
 'crosstab',
 'cube',
 'describe',
 'distinct',
 'drop',
 'dropDuplicates',
 'drop_duplicates',
 'dropna',
 'dtypes',
 'exceptAll',
 'explain',
 'fillna',
 'filter',
 'first',
 'foreach',
 'f

In [20]:
swimmersJSON.createOrReplaceTempView("swimmersJSON")

In [8]:
print(type(swimmersJSON))

<class 'pyspark.sql.dataframe.DataFrame'>


In [18]:
swimmersJSON.show()

+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 19|   brown|123|  Katie|
| 22|   green|234|Michael|
| 23|    blue|345| Simone|
+---+--------+---+-------+



In [21]:
spark.sql("select * from swimmersJSON").collect()

[Row(age=19, eyeColor='brown', id='123', name='Katie'),
 Row(age=22, eyeColor='green', id='234', name='Michael'),
 Row(age=23, eyeColor='blue', id='345', name='Simone')]

RDD를 데이터프레임으로 변경되는 두가지 방법  
리플렉션을 사용해 스키마를 추측하는 방법  
스키마를 직접 코드상에 명시하는 방법  

In [22]:
swimmersJSON.printSchema()

root
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



In [23]:
from pyspark.sql.types import *

In [24]:
stringCSVRDD = sc.parallelize([
    (123, 'Katie', 19, 'brown'),
    (234, 'Michael', 22, 'green'),
    (345, 'Simone', 23, 'blue')
])

In [25]:
schemaString = "id name age eyeColor"

In [26]:
schema = StructType([
    StructField("id", LongType(), True),
    StructField("name", StringType(), True),
    StructField("age", LongType(), True),
    StructField("eyecolor", StringType(), True)
])

In [27]:
swimmers = spark.createDataFrame(stringCSVRDD, schema)

In [28]:
swimmers.createOrReplaceTempView("swimmers")

In [29]:
swimmers.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- eyecolor: string (nullable = true)



## Querying with SQL

In [31]:
# DataBricks notebook에서만 지원되는 방법 => DataBricks에서 서비스 받을 수 있다.
%sql
-- Query all data
select * from swimmers
# 안 되는게 정상이라니??

SyntaxError: invalid syntax (<ipython-input-31-52e67229a90e>, line 3)

In [32]:
spark.sql('select count(1) from swimmers').show()

+--------+
|count(1)|
+--------+
|       3|
+--------+



## Querying with the DataFrame API

In [33]:
display(swimmers)

DataFrame[id: bigint, name: string, age: bigint, eyecolor: string]

In [34]:
swimmers.count()

3

In [35]:
swimmers.select('id','age').show()

+---+---+
| id|age|
+---+---+
|123| 19|
|234| 22|
|345| 23|
+---+---+



In [36]:
swimmers.select('id','age').filter('age = 22').show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [37]:
swimmers.select('name', 'eyeColor').filter('eyeColor like "b%"').show()

+------+--------+
|  name|eyeColor|
+------+--------+
| Katie|   brown|
|Simone|    blue|
+------+--------+



- 스파크 SQL과 데이터프레임으로 작업할때 읹으면 안되는 중요한 점
- CSV, JSON을 비롯한 다양한 데이터 포멧을 작업하기 쉽지만 가장 일반적인 포멧은 파케이(parquet) 파일 포멧이다
- 스파크에서는 읽기, 쓰기 다 지원하며, 원본데이터의 스키마 정보를 잘 보존 해준다.

In [45]:
# On-time Flight performance
# DataFrame Queries
flightperFilePath = './data/flight-data(2)/departuredelays.csv'
airportsFilePath = './data/flight-data(2)/airport-codes-na.txt'

In [46]:
airports = spark.read.csv(airportsFilePath, header = 'true', inferSchema = 'true', sep = '\t')

- spark.read.csv()
- 첫번째 파라미터 : 데이터 파일경로
- 두번째 파라미터 : 첫번째 줄이 헤더인지 여부
- 세번째 파라미터 : 스파크가 리플렉션으로 데이터 타입 추측
- 네번째 파라미터 : 구분자 지정

In [47]:
airports.createOrReplaceTempView('airports')

In [48]:
flightPerf = spark.read.csv(flightperFilePath, header = 'true')
flightPerf.createOrReplaceTempView('FlightPerformance')

In [49]:
flightPerf.cache()

DataFrame[date: string, delay: string, distance: string, origin: string, destination: string]

In [50]:
spark.read.format('com.databricks.spark.csv').option('header', 'true').load(airportsFilePath)

DataFrame[City	State	Country	IATA: string]

In [51]:
# 워싱턴에 위치해 있는 공항
spark.sql('select * from airports where State = "WA"').show()
airports.select('*').filter('State = "WA"').show()

+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Bellingham|   WA|    USA| BLI|
| Moses Lake|   WA|    USA| MWH|
|      Pasco|   WA|    USA| PSC|
|    Pullman|   WA|    USA| PUW|
|    Seattle|   WA|    USA| SEA|
|    Spokane|   WA|    USA| GEG|
|Walla Walla|   WA|    USA| ALW|
|  Wenatchee|   WA|    USA| EAT|
|     Yakima|   WA|    USA| YKM|
+-----------+-----+-------+----+

+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Bellingham|   WA|    USA| BLI|
| Moses Lake|   WA|    USA| MWH|
|      Pasco|   WA|    USA| PSC|
|    Pullman|   WA|    USA| PUW|
|    Seattle|   WA|    USA| SEA|
|    Spokane|   WA|    USA| GEG|
|Walla Walla|   WA|    USA| ALW|
|  Wenatchee|   WA|    USA| EAT|
|     Yakima|   WA|    USA| YKM|
+-----------+-----+-------+----+



In [53]:
spark.sql('select a.City, f.origin, sum(f.delay) as Delays from FlightPerformance f join airports a on a.IATA = f.origin where a.State = "WA" group by a.City, f.origin order by sum(f.delay) desc').show()

+-------+------+--------+
|   City|origin|  Delays|
+-------+------+--------+
|Seattle|   SEA|159086.0|
|Spokane|   GEG| 12404.0|
|  Pasco|   PSC|   949.0|
+-------+------+--------+



In [54]:
# 미국의 주별 지연 정보 검색
spark.sql("select a.State, sum(f.delay) as Delays from FlightPerformance f join airports a on a.IATA = f.origin where a.Country ='USA' group by a.State").show()

+-----+---------+
|State|   Delays|
+-----+---------+
|   SC|  80666.0|
|   AZ| 401793.0|
|   LA| 199136.0|
|   MN| 256811.0|
|   NJ| 452791.0|
|   OR| 109333.0|
|   VA|  98016.0|
| null| 397237.0|
|   RI|  30760.0|
|   WY|  15365.0|
|   KY|  61156.0|
|   NH|  20474.0|
|   MI| 366486.0|
|   NV| 474208.0|
|   WI| 152311.0|
|   ID|  22932.0|
|   CA|1891919.0|
|   CT|  54662.0|
|   NE|  59376.0|
|   MT|  19271.0|
+-----+---------+
only showing top 20 rows



데이터프레임은 스칼라나 자바에서 프로그래머가 정의한 사용자 클래스에 의해 타입을 확실히 명시해야 하는 JVM 객체이다!!!!!  
이는 타입에 대한 견고함이 적은 파이스파크에서는 데이터프레임 API를 지원하지 않는 경우가 있다.  
파이스파크에서 사용 가능하지 않는 데이터프레임 API의 부분은 RDD로 변환하거나 UDF를 사용해야 가능해진다.  

(http://bit.ly/2dbfoFT)