## DataFrame
- Immutable, Lazy Evaluations(Action에서 수행), Distributed
- sql 문을 사용하기 위해 Table 형식으로 만들어줌
- 구조적 데이터 처리, RDBMS 처럼 컬럼으로 구성된 분산 데이터 컬렉션

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [3]:
spark

In [4]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [5]:
sc

### RDD vs DataFrame
- DataFrame : row 객체를 가지고 scheme 가 있는 구조화된 데이터로 만들어줌
- RDD : scheme 없음
- sql 문을 쓰려면 table 형태로 데이터 타입이 붙은 구조화된 데이터로 만들어 줘야함
- jdbc 가지고 여기저기 활용할 수 있음

In [6]:
# read csv
df = spark.read.csv("data/ages.csv")

In [7]:
# 자동으로 구조화된 데이터로 만들어줌 
df.dtypes

[('_c0', 'string'), ('_c1', 'string')]

In [8]:
# read format json
df = spark.read.format('json').load('data/people.json')

In [9]:
df.dtypes

[('age', 'bigint'), ('name', 'string')]

In [10]:
df.collect()

[Row(age=None, name='Michael'),
 Row(age=30, name='Andy'),
 Row(age=19, name='Justin')]

In [11]:
rdd = sc.textFile('data/people.json')

In [12]:
rdd.collect()

['{"name":"Michael"}',
 '{"name":"Andy", "age":30}',
 '{"name":"Justin", "age":19}']

In [13]:
# RDD로 부터 DataFrame 생성 
df_rdd = spark.read.json(rdd)

In [14]:
df_rdd

DataFrame[age: bigint, name: string]

In [65]:
# rdd 로 부터 데이터 객체 하나 생성해서 테이블 생성후 sql 쓰기 
rdd = sc.parallelize((
    """
        {
            "id":"123",
            "name":"Katie",
            "age":19,
            "eyeColor":"brown"
        }
    """,
    """
       {
            "id":"234",
            "name":"Michael",
            "age":22,
            "eyeColor":"green"
        }
    """,
    """
       {
            "id":"345",
            "name":"Simone",
            "age":23,
            "eyeColor":"blue"
        }
    """
))

In [66]:
rdd.collect()

['\n        {\n            "id":"123",\n            "name":"Katie",\n            "age":19,\n            "eyeColor":"brown"\n        }\n    ',
 '\n       {\n            "id":"234",\n            "name":"Michael",\n            "age":22,\n            "eyeColor":"green"\n        }\n    ',
 '\n       {\n            "id":"345",\n            "name":"Simone",\n            "age":23,\n            "eyeColor":"blue"\n        }\n    ']

In [70]:
# 데이터를 넣고 data frame 으로 바꿔주면 추론해서 scheme 로 만들어줌 
df = spark.read.json(rdd)

In [71]:
df

DataFrame[age: bigint, eyeColor: string, id: string, name: string]

### 임시테이블 생성
- createOrReplaceTempView
- view 테이블을 즉석에서 만듬

In [72]:
# df 를 가지고 view table 하나 생성
df.createOrReplaceTempView("test")

In [73]:
# 함수로 가져오기 
df.show()

+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 19|   brown|123|  Katie|
| 22|   green|234|Michael|
| 23|    blue|345| Simone|
+---+--------+---+-------+



In [69]:
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)



In [29]:
# data frame 은 row 로 관리하니깐 row 로 가져옴 
# sql 문으로 가져오기 --> return dataframe
# collect() --> return row 
spark.sql("select * from test").collect()

[Row(age=19, eyeColor='brown', id='123', name='Katie'),
 Row(age=22, eyeColor='green', id='234', name='Michael'),
 Row(age=23, eyeColor='blue', id='345', name='Simone')]

In [74]:
spark.sql("select count(*) from test").show()

+--------+
|count(1)|
+--------+
|       3|
+--------+



In [110]:
spark.sql("select id, age from test where age = 22").show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [75]:
df.select("id","age").show()

+---+---+
| id|age|
+---+---+
|123| 19|
|234| 22|
|345| 23|
+---+---+



In [77]:
df.select("id","age").filter("age==22").show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [78]:
df.select(df.id, df.age).show()

+---+---+
| id|age|
+---+---+
|123| 19|
|234| 22|
|345| 23|
+---+---+



### table sheme 구조 설정하기

In [16]:
rdd = sc.parallelize(
    [
        (123, "Soojung", 26, "brown"),
        (456, "hyeyoung", 26, "black"),
        (789, "minji", 29, "yello")

    ]
)

In [17]:
# 데이터 타입 다 쓸수 있음 import *
from pyspark.sql.types import * 

In [18]:
scheme = StructType(
    [
        StructField("id", LongType(), True),
        StructField("name", StringType(), True),
        StructField("age", LongType(), True),
        StructField("eyeColor", StringType(), True)
    ]
)

In [19]:
# rdd + scheme = data frame 
df = spark.createDataFrame(rdd, schema=scheme)

In [20]:
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)



In [21]:
df.createOrReplaceTempView("test2")

In [23]:
spark.sql("select * from test2").show()

+---+--------+---+--------+
| id|    name|age|eyeColor|
+---+--------+---+--------+
|123| Soojung| 26|   brown|
|456|hyeyoung| 26|   black|
|789|   minji| 29|   yello|
+---+--------+---+--------+



In [60]:
sc.stop()

In [52]:
sc = SparkContext.getOrCreate()

In [53]:
sc

### Airport, airline and route data  

In [24]:
flightPath = "data/departuredelays.csv"
airportPath = "data/airport-codes-na.txt"

In [25]:
flight = spark.read.csv(flightPath, header=True)

In [29]:
flight.take(1)

[Row(date='01011245', delay='6', distance='602', origin='ABE', destination='ATL')]

In [30]:
flight

DataFrame[date: string, delay: string, distance: string, origin: string, destination: string]

In [31]:
airport = spark.read.csv(airportPath, header=True, inferSchema=True, sep="\t")

In [32]:
airport.take(1)

[Row(City='Abbotsford', State='BC', Country='Canada', IATA='YXX')]

### JOIN airport and flight
- 어느 나라 어느 주에서 delay 가 얼마 정도인지 알고 싶다. 
- 공항마다 delay 률을 분석 

In [33]:
airport.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- IATA: string (nullable = true)



In [34]:
flight.printSchema()

root
 |-- date: string (nullable = true)
 |-- delay: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)



In [35]:
flight.createOrReplaceTempView("flight")

In [36]:
airport.createOrReplaceTempView("airport")

#### Spark Query 

In [38]:
# IATA 와 origin(공항코드)
spark.sql(
    """
        select a.City, f.origin, sum(f.delay) as delay
            from flight f
                join airport a 
                    on a.IATA = f.origin
        where a.State = "WA" 
        group by a.City, f.origin 
        order by delay desc
    """
).show()

+-------+------+--------+
|   City|origin|   delay|
+-------+------+--------+
|Seattle|   SEA|159086.0|
|Spokane|   GEG| 12404.0|
|  Pasco|   PSC|   949.0|
+-------+------+--------+



#### DataFrame API 

In [41]:
from pyspark.sql import functions as F

In [44]:
# agg : delay 가 row 로 되있는데 내부함수(F)로 aggregation 
airport.join(flight, airport.IATA == flight.origin)\
        .where(airport.State == "WA")\
        .select(airport.City, flight.origin, flight.delay)\
        .groupBy(airport.City, flight.origin)\
        .agg(F.sum(flight.delay))\
        .orderBy("sum(delay)", ascending=False)\
        .show()

+-------+------+----------+
|   City|origin|sum(delay)|
+-------+------+----------+
|Seattle|   SEA|  159086.0|
|Spokane|   GEG|   12404.0|
|  Pasco|   PSC|     949.0|
+-------+------+----------+

