<a href="https://colab.research.google.com/github/Jaejuna/SparkML/blob/main/%08Spark_ch5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark
!pyspark --version

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=f712da82b7cc505c551113961abc0c71c5873fc3a14d97ba8673e53ee052d5a9
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.4.0
      /_/
                        
Us

In [8]:
# 데이터 다운
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## 스파크 SQL UDF

In [9]:
from pyspark.sql import SparkSession

#Create SparkSession
spark = SparkSession.builder.appName("ch05").getOrCreate()

# 스파크 SQL UDF 파이썬 예제
from pyspark.sql.types import LongType

# 큐브 함수 생성
def cubed(s):
  return s * s * s

data = [(1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)]
df = spark.createDataFrame(data, schema=['id'])

# UDF로 등록
spark.udf.register("cubed", cubed, LongType())

# 임시 뷰 생성
df.createOrReplaceTempView('udf_test')

In [10]:
# 큐브 UDF를 사용하여 쿼리
spark.sql("SELECT id, cubed(id) AS id_cubed FROM udf_test").show() 

+---+--------+
| id|id_cubed|
+---+--------+
|  1|       1|
|  2|       8|
|  3|      27|
|  4|      64|
|  5|     125|
|  6|     216|
|  7|     343|
|  8|     512|
|  9|     729|
+---+--------+



## 판다스 UDF

In [11]:
# 판다스 가져오기
import pandas as pd

# 파이스파크 SQL 함수와 pandas_udf 가져오기
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# 큐브 함수 선언 (파이스파크 UDF랑 다른 점 확인)
def cubed(a: pd.Series) -> pd.Series:
  return a * a * a

# 큐브 함수에 대한 판다스 UDF 생성
cubed_udf = pandas_udf(cubed, returnType=LongType())

In [12]:
# 판다스 시리즈 생성
x = pd.Series([1,2,3])

# 로컬 판다스 데이터를 실행하는 pandas_udf에 대한 함수
print(cubed(x))

0     1
1     8
2    27
dtype: int64


In [13]:
# 스파크 데이터 프레임 생성
df = spark.range(1, 4)

# 벡터화된 스파크 UDF(판다스 UDF)를 함수로 생성
df.select("id", cubed_udf(col('id'))).show()

+---+---------+
| id|cubed(id)|
+---+---------+
|  1|        1|
|  2|        8|
|  3|       27|
+---+---------+



## 스파크 SQL 셸 사용하기

In [None]:
# in $SPARK_HOME directory
# ./bin/spark-sql

/bin/bash: ./bin/spark-sql: No such file or directory


In [None]:
# 스파크 SQL 테이블 생성
CREATE TABLE people (name STRING, age INT);

# 테이블에 데이터 삽입
INSERT INTO people SELECET name, age FROM
# values 문 사용
INSERT INTO people SELECET name, age VALUES ("Michael", NULL);
INSERT INTO people SELECET name, age VALUES ("Andy", 30);
INSERT INTO people SELECET name, age VALUES ("Samantha", 19);

# 스파크 SQL 쿼리 실행하기
SHOW TABLES;
SELECT * FROM people WHERE age < 20;
SELECT name FROM people WHERE age IS NULL;

### 비라인 작업

In [None]:
# in $SPARK_HOME directory
# ./sbin/start-thriftserver.sh

In [None]:
# 쓰리프트 서버 테스트
./bin/beeline
# 비라인을 구성하여 로컬 쓰리프트 서버에 연결
!connect jdbc:hive2//localhost:10000

In [None]:
#비라인으로 스파크 sql 쿼리 실행
SHOW tables;
SELECT * FROM people;

In [None]:
# 서버 중지하기
./sbin/stop-thriftserver.sh

### PostgreSQL

In [None]:
# 읽기 방법 1 : 로드 함수를 사용하여 JDBC 소스로부터 데이터를 로드
jdbcDF1 = (spark
           .read
           .format("jdbc")
           .option("url", "jdbc:postgresql://[DBSERVER]")
           .option("dbtable", "[SCHEMA].[TABLENAME]")
           .option('user', "[USERNAME]")
           .option('password', "[PASSWORD]")
           .load())

# 읽기 방법 2 : jdbc 함수를 사용하여 JDBC 소스로부터 데이터를 로드
jdbcDF2 = (spark
           .read
           .jdbc("jdbc:postgresql://[DBSERVER]", "[SCHEMA].[TABLENAME]",
                 properties={'user':"[USERNAME]", 'password':"[PASSWORD]"}))

# 쓰기 방법 1 : 저장 함수를 사용하여 JDBC 소스에 데이터를 저장
(jdbcDF1
 .write
 .format("jdbc")
 .option("url", "jdbc:postgresql://[DBSERVER]")
 .option("dbtable", "[SCHEMA].[TABLENAME]")
 .option('user', "[USERNAME]")
 .option('password', "[PASSWORD]")
 .save())

# 쓰기 방법 2 : jdbc 함수를 사용하여 JDBC 소스에 데이터르 저장
(jdbcDF2
 .write
 .jdbc("jdbc:postgresql://[DBSERVER]", "[SCHEMA].[TABLENAME]",
       properties={'user':"[USERNAME]", 'password':"[PASSWORD]"}))

### MySQL

In [None]:
# 로드 함수를 사용하여 JDBC 소스로부터 데이터를 로드
jdbcDF = (spark
          .read
          .format("jdbc")
          .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")
          .option("driver", "com.myslql.jdbcDriver")
          .option("dbtable", "[TABLENAME]")
          .option('user', "[USERNAME]")
          .option('password', "[PASSWORD]")
          .load())

# 저장 함수를 사용하여 JDBC 소스에 데이터를 저장
(jdbcDF
 .write
 .format("jdbc")
 .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")
 .option("driver", "com.myslql.jdbcDriver")
 .option("dbtable", "[TABLENAME]")
 .option('user', "[USERNAME]")
 .option('password', "[PASSWORD]")
 .save())

### 에저 코스모스 DB

In [None]:
# 애저 코스모스 DB로부터 데이터 로드
## 설정 읽기
query = "SELECT c.colA, c.coln FROM c WHERE c.origin = 'SEA'"
readConfig = {
    "Endpoint" : "https://[ACCOUNT].documents.azure.com:443",
    "MasterKey" : "[MASTER KEY]",
    "Database" : "[DATABASE]",
    "preferredRegions" : "Central US; East US2",
    "Collection" : "[COLLECTION]",
    "SamplingRatio" : "1.0"
    "schema_samplesize" : "1000",
    "query_pagesize" : "2147483647",
    "query_custom" : query
}

# azure-cosmosdb-spark를 통해 연결하여 스파크 데이터 프레임 생성
df = (spark
      .read
      .format("com.microsoft.azure.cosmosdb.spark")
      .option(**readConfig)
      .load())

# 비행 수 카운트
df.count()

# 애저 코스모스 DB에 데이터 저장
# 설정 쓰기
writeConfig = {
    "Endpoint" : "https://[ACCOUNT].documents.azure.com:443",
    "MasterKey" : "[MASTER KEY]",
    "Database" : "[DATABASE]",
    "Collection" : "[COLLECTION]",
    "Upsert" : "true"
}

# 애저 코스모스 DB에 데이터 프레임 업서트 하기
(df.write
 .format("com.microsoft.azure.cosmosdb.spark")
 .options(**writeConfig)
 .save())

## 고차 함수

In [15]:
# DF 생성
from pyspark.sql.types import *

schema = StructType([StructField("celsius", ArrayType(IntegerType()))])

t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]]
t_c = spark.createDataFrame(t_list, schema)
t_c.createOrReplaceTempView("tC")

# DF 출력
t_c.show()

+--------------------+
|             celsius|
+--------------------+
|[35, 36, 32, 30, ...|
|[31, 32, 34, 55, 56]|
+--------------------+



In [None]:
# 고차 함수 쿼리 (transform)
spark.sql("""
SELECT celsius, 
transform(celsius, t -> ((t * 9) div 5) + 32)) AS fahrenheit
FROM tC
""").show()

In [None]:
# 고차 함수 쿼리 (filter)
spark.sql("""
SELECT celsius, 
filter(celsius, t -> t > 38) AS high
FROM tC
""").show()

In [None]:
# 고차 함수 쿼리 (exists)
spark.sql("""
SELECT celsius, 
  exists(celsius, t -> t = 38) as threshold
FROM tC
""").show()

In [None]:
# 고차 함수 쿼리 (reduce)
spark.sql("""
SELECT celsius, 
  reduce(
    celsius,
    0,
    (t, acc) -> t + acc,
    acc -> (acc div size(celsius) * 9 div 5) + 32
  ) as avgFahrenheit
FROM tC
""").show()

## 일반적인 데이터 프레임 및 스파크 SQL 작업

In [19]:
# 파일 경로 설정
from pyspark.sql.functions import expr
tripdelaysFilePath = "/content/drive/MyDrive/BOAZ/엔지/Spark Study/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
airportsnaFilePath = "/content/drive/MyDrive/BOAZ/엔지/Spark Study/databricks-datasets/learning-spark-v2/flights/airport-codes-na.txt"

# 공항 데이터세트 읽어 오기
airportsna = (spark.read
              .format("csv")
              .options(header="true", inferSchema="true", sep="\t")
              .load(airportsnaFilePath))

airportsna.createOrReplaceTempView("airports_na")

# 출발 지연 데이터세트를 읽어 오기
departureDelays = (spark.read
                   .format("csv")
                   .options(header="true")
                   .load(tripdelaysFilePath))

departureDelays = (departureDelays
                   .withColumn("delay", expr("CAST(delay as INT) as delay"))
                   .withColumn("distance", expr("CAST(distance as INT) as distance")))

departureDelays.createOrReplaceTempView("departureDelays")

# 임시 작은 테이블(뷰) 생성
foo = (departureDelays
       .filter(expr("""origin == 'SEA' AND destination == 'SFO' and date like '01010%' and delay > 0""")))
foo.createOrReplaceTempView("foo")


In [20]:
# SQL query 넣기
spark.sql("SELECT * FROM airports_na LIMIT 10").show()
spark.sql("SELECT * FROM departureDelays LIMIT 10").show()
spark.sql("SELECT * FROm foo").show()

+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
+-----------+-----+-------+----+

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        AT

### Union

In [21]:
# 두 테이블 결합
bar = departureDelays.union(foo)
bar.createOrReplaceTempView("bar")

# 결합된 결과 보기(특정 시간 범위에 대한 SEA와 SFO를 필터)
bar.filter(expr("""origin == 'SEA' AND destination == 'SFO' AND date LIKE '01010%' AND delay > 0""")).show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



In [None]:
# bar DF는 foo와 delays의 결합입으로 중복됨을 확인할 수 있다.
spark.sql("""
  SELECT *
  FROM bar
  WHERE origin = 'SEA'
    AND destination = 'SFO'
    AND date LIKE '01010%'
    AND delay > 0
""").show()

### Join

In [None]:
# 출발 지연 데이터(foo)와 공항 정보의 inner 조인
foo.join(
    airports,
    airports.IATA == foo.origin
).select("City", "State", "date", "delay", "distance", "destination").show()

In [None]:
# SQL 예제
spark.sql("""
SELECT a.City, a.State, f.date, f.delay, f.distance, f.destination
  FROM foo f
  JOIN airports_na a
    ON a.IATA = f.origin
""").show()

### 윈도우

In [None]:
# SEA, SFO, JFK에서 출발하여 특정 목적지 위치로 이동하는 항공편 검토
DROP TABLE IF EXISTS departureDelaysWindow;

CREATE TABLE departureDelaysWindow AS
SELECT origin, destination, SUM(delay) AS TotalDelays
FROM departureDelays
WHERE origin IN ('SEA', 'SFO', 'JFK')
AND destination IN ('SEA', 'SFO', 'JFK', 'DEN', 'ORD', 'LAX', 'ATL')
GROUP BY origin, distination;

SELECT * FROM departureDelaysWindow

In [None]:
# 각 출방 공항에 대해 가장 많은 지연이 발생한 3개의 목적지 찾기
SELECT origin, destination, SUM(TotalDelays) AS TotalDelays
FROM departureDelaysWindow
WHERE origin = '[ORIGIN]'
GROUP BY origin, destination
ORDER BY SUM(TotalDelays) DESC
LIMIT 3

In [None]:
# density_rank() 사용하여 위 쿼리 보완
spark.sql("""
SELECT origin, destination, TotalDelays, rank
FROM (
  SELECT origin, destination, TotalDelays,
  dense_rank() OVER (PARTITION BY origin ORDER BY TotalDelays DESC) AS rank
  FROM departureDelaysWindow
) t
WHERE rank <= 3
""").show()

### 수정

In [24]:
# 열 추가 
from pyspark.sql.functions import expr

foo.show()

foo2 = (foo.withColumn(
    "status",
    expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END" )
))

foo2.show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+

+--------+-----+--------+------+-----------+-------+
|    date|delay|distance|origin|destination| status|
+--------+-----+--------+------+-----------+-------+
|01010710|   31|     590|   SEA|        SFO|Delayed|
|01010955|  104|     590|   SEA|        SFO|Delayed|
|01010730|    5|     590|   SEA|        SFO|On-time|
+--------+-----+--------+------+-----------+-------+



In [25]:
# 열 삭제
foo3 = foo2.drop("delay")
foo3.show()

+--------+--------+------+-----------+-------+
|    date|distance|origin|destination| status|
+--------+--------+------+-----------+-------+
|01010710|     590|   SEA|        SFO|Delayed|
|01010955|     590|   SEA|        SFO|Delayed|
|01010730|     590|   SEA|        SFO|On-time|
+--------+--------+------+-----------+-------+



In [26]:
# 컬럼명 바꾸기
foo4 = foo3.withColumnRenamed("status", "flight_status")
foo4.show()

+--------+--------+------+-----------+-------------+
|    date|distance|origin|destination|flight_status|
+--------+--------+------+-----------+-------------+
|01010710|     590|   SEA|        SFO|      Delayed|
|01010955|     590|   SEA|        SFO|      Delayed|
|01010730|     590|   SEA|        SFO|      On-time|
+--------+--------+------+-----------+-------------+



In [None]:
# SEA에서 출발하는 항공편 도착지, 월, 지연 컬럼으로 query
SELECT destination, CAST(SUBSTRING(date, 0, 2)AS int) AS month, delay
FROM departureDelays
WHERE origin = "SEA"

In [None]:
# 위 쿼리를 목적지 및 월별 지연(평균과 최대)에 대한 집계 계산하기
SELECT destination, CAST(SUBSTRING(date, 0, 2)AS int) AS month, delay
FROM departureDelays
WHERE origin = "SEA"
)
PIVOT (
    CAST(AVG(delay) AS DECIMAL(4, 2)) AS AvgDelay, MAX(delay) AS MaxDelay FOR month IN (1 JAN, 2 FEB)
)
ORDER BY destination