In [3]:
from pyspark.sql import SparkSession

from pyspark.sql.types import LongType


In [4]:
# SparkSession 생성
spark = (SparkSession
         .builder
         .appName("SparkMllibExampleApp")
         .getOrCreate())

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/28 18:37:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### spark UDF

In [5]:
# 큐브 함수 생성
def cubed(s):
    return s*s*s

# UDF로 등록
spark.udf.register("cubed", cubed, LongType())

# 임시 뷰 생성
spark.range(1,9).createOrReplaceTempView("udf_test")

In [6]:
spark.sql("select * from udf_test").show()

                                                                                

+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
+---+



In [7]:
spark.sql("select id, cubed(id) as id_cubed from udf_test").show()

[Stage 1:>                                                          (0 + 8) / 8]

+---+--------+
| id|id_cubed|
+---+--------+
|  1|       1|
|  2|       8|
|  3|      27|
|  4|      64|
|  5|     125|
|  6|     216|
|  7|     343|
|  8|     512|
+---+--------+



                                                                                

### pandas UDF

In [8]:
import pandas as pd
from pyspark.sql.functions import col, pandas_udf

In [10]:
# 큐브 함수 선언
def cubed(a: pd.Series) -> pd.Series:
    return a*a*a

# 큐브 함수에 대한 판다스 UDF 생성
cubed_udf = pandas_udf(cubed, returnType=LongType())

In [13]:
# 로컬 실행
%time
x = pd.Series([1,2,3])
print(cubed(x))

CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 2.62 µs
0     1
1     8
2    27
dtype: int64


In [15]:
# 스파크 UDF 실행
df = spark.range(1,4)

# 벡터화된 스파크 UDF 함수 실행
%time
df.select("id", cubed_udf(col("id"))).show()

CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 2.86 µs
+---+---------+
| id|cubed(id)|
+---+---------+
|  1|        1|
|  2|        8|
|  3|       27|
+---+---------+



### 고차 함수

In [19]:
# 데이터 생성
from pyspark.sql.types import *
schema = StructType([StructField("celsius", ArrayType(IntegerType()))])

t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]]
t_c = spark.createDataFrame(t_list, schema)
t_c.createOrReplaceTempView("tC")

# 데이터 프레임 출력
t_c.show()

                                                                                

+--------------------+
|             celsius|
+--------------------+
|[35, 36, 32, 30, ...|
|[31, 32, 34, 55, 56]|
+--------------------+



In [21]:
# transform()
spark.sql("""SELECT celsius,
                    transform(celsius, t -> ((t * 9) div 5) + 32 ) AS fahrenheit
             FROM tC""").show()

+--------------------+--------------------+
|             celsius|          fahrenheit|
+--------------------+--------------------+
|[35, 36, 32, 30, ...|[95, 96, 89, 86, ...|
|[31, 32, 34, 55, 56]|[87, 89, 93, 131,...|
+--------------------+--------------------+



In [22]:
# filter()
spark.sql("""SELECT celsius,
                    filter(celsius, t -> t>38) AS high
             FROM tC""").show()

+--------------------+--------+
|             celsius|    high|
+--------------------+--------+
|[35, 36, 32, 30, ...|[40, 42]|
|[31, 32, 34, 55, 56]|[55, 56]|
+--------------------+--------+



In [23]:
# exists()
spark.sql("""SELECT celsius,
                    exists(celsius, t -> t = 38) as threshold
             FROM tC""").show()

+--------------------+---------+
|             celsius|threshold|
+--------------------+---------+
|[35, 36, 32, 30, ...|     true|
|[31, 32, 34, 55, 56]|    false|
+--------------------+---------+



In [25]:
# reduce() - 왜 안 될까...
spark.sql("""SELECT celsius,
                    reduce(celsius, 0,
                           (t, acc) -> t + acc,
                           acc -> (acc div size(celsius) * 9 div 5) + 32) AS avgFahrenheit
             FROM tC""").show()

AnalysisException: Undefined function: reduce. This function is neither a built-in/temporary function, nor a persistent function that is qualified as spark_catalog.default.reduce.; line 1 pos 16

### 일반적인 데이터 프레임 및 스파크 SQL 작업 - 예시

#### 예시 데이터 생성 과정
1. 두 개의 파일을 가져와서 두 개의 데이터 프레임을 만들기.
    - 공항 정보 데이터 (airportsna)
    - 미국 비행 지연 데이터 (departureDelays)
2. expr()을 사용하여 delay(지연) 및 distance(거리) 칼럼을 STRING에서 INT로 변환
3. 작은 테이블 foo를 만들기

In [44]:
# 데이터 경로 설정
from pyspark.sql.functions import expr
tripdelaysFilePath = "../data/departuredelays.csv"
airportsnaFilePath = "../data/airport-codes-na.txt"

# 공항 데이터세트 읽어오기
airportna = (spark.read.format("csv")
                       .options(header="true", inferSchema="true", sep="\t")
                       .load(airportsnaFilePath))

airportna.createOrReplaceTempView("airports_na")

# 출발 지연 데이터 읽어오기
departureDelays = (spark.read.format("csv")
                       .options(header="true")
                       .load(tripdelaysFilePath))

# 일부 칼럼 데이터 타입 변경
departureDelays = (departureDelays.withColumn("delay", expr("CAST(delay as INT) as delay"))
                                  .withColumn("distance", expr("CAST(distance as INT) as distance")))

departureDelays.createOrReplaceTempView("departureDelays")

# 임시 작은 테이블 생성 - 예시 코드를 위해
foo = (departureDelays.filter(expr("""origin == "SEA" AND destination == "SFO" AND date like '01010%' AND delay > 0""")))
foo.createOrReplaceTempView("foo")

In [45]:
foo.show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



In [48]:
spark.sql("""SELECT * FROM airports_na LIMIT 10""").show()

+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
+-----------+-----+-------+----+



In [49]:
spark.sql("""SELECT * FROM departureDelays LIMIT 10""").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+



In [50]:
spark.sql("""SELECT * FROM foo""").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



#### Union
- 동일한 스키마를 가진 두 개의 서로 다른 데이터 프레임을 함께 결합

In [52]:
# 두 테이블 결합
bar = departureDelays.union(foo)
bar.createOrReplaceTempView("bar")

# 결합된 결과 보기 - 예상: foo 데이터가 중복된다.
bar.filter(expr("""origin == "SEA" AND destination == "SFO" AND date like '01010%' AND delay > 0""")).show()



+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



                                                                                

#### Join
- default : inner join
- inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, left_anti 옵션 적용 가능

In [53]:
# 출발 지연 데이터와 공항 정보의 조인1
foo.join(airportna,
         airportna.IATA == foo.origin
         ).select("City", "State", "date", "delay", "distance", "destination").show()

+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+



In [54]:
# sql
spark.sql("""SELECT a.City, a.State, f.date, f.delay, f.distance, f.destination
             FROM foo f
             JOIN airports_na a
             ON a.IATA = f.origin""").show()

+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+



#### 윈도우 연산

In [58]:
# 특정 출발지-도착지 쌍의 지연 합
departureDelayWindow = spark.sql("""SELECT origin, destination, SUM(delay) AS TOTALDelays
                                    FROM departureDelays
                                    WHERE origin IN ('SEA', 'SFO', 'JFK')
                                    AND destination IN ('SEA', 'SFO', 'JFK', 'DEN', 'ORD', 'LAX', 'ATL')
                                    GROUP BY origin, destination""")

departureDelayWindow.createOrReplaceTempView("departureDelayWindow")
departureDelayWindow.show()



+------+-----------+-----------+
|origin|destination|TOTALDelays|
+------+-----------+-----------+
|   JFK|        ORD|       5608|
|   JFK|        SFO|      35619|
|   JFK|        DEN|       4315|
|   JFK|        ATL|      12141|
|   JFK|        SEA|       7856|
|   JFK|        LAX|      35755|
|   SEA|        LAX|       9359|
|   SFO|        ORD|      27412|
|   SFO|        DEN|      18688|
|   SFO|        SEA|      17080|
|   SEA|        SFO|      22293|
|   SFO|        ATL|       5091|
|   SEA|        DEN|      13645|
|   SEA|        ATL|       4535|
|   SEA|        ORD|      10041|
|   SFO|        JFK|      24100|
|   SFO|        LAX|      40798|
|   SEA|        JFK|       4667|
+------+-----------+-----------+



                                                                                

In [74]:
# density_rank()
spark.sql("""
            SELECT origin, destination, TotalDelays, rank
            FROM (
                SELECT origin, destination, TotalDelays,
                dense_rank() OVER (PARTITION BY origin ORDER BY TotalDelays DESC) AS rank
                FROM departureDelayWindow
            ) t
            WHERE rank <= 3""").show()



+------+-----------+-----------+----+
|origin|destination|TotalDelays|rank|
+------+-----------+-----------+----+
|   JFK|        LAX|      35755|   1|
|   JFK|        SFO|      35619|   2|
|   JFK|        ATL|      12141|   3|
|   SEA|        SFO|      22293|   1|
|   SEA|        DEN|      13645|   2|
|   SEA|        ORD|      10041|   3|
|   SFO|        LAX|      40798|   1|
|   SFO|        ORD|      27412|   2|
|   SFO|        JFK|      24100|   3|
+------+-----------+-----------+----+



                                                                                