# Spark SQL

## Spark SQl 개요
* RDD는 스키마를 표현할수 없다는 단점이 있음, 스파크SQL모듈은 이를 보완할 수 있는 데이터 모델(데이터셋)과 API를 제공.
* RDD와 마찬가지로 액션과 트랜스포메이션으로 구성, 데이터셋의 트렌스포메이션 연산은 타입과 비타입 연산으로 나뉨  
타입 연산(typed operations) : 기존과 동일한 타입으로 연산 (RDD)  
비타입 연산(untyped opterations) : 기존과 다른 타입으로 연산 (int를 org.apache.spark.sql.Column 등) 로우와 칼럼 단위로 데이터를 처리함. 
* 데이터프레임은 org.apache.spark.sql.Row타입의 요소로 구성된 데이터 셋

In [3]:
import collections
import time
import numpy as np
import pandas as pd
import pyarrow as pa

from pyspark import StorageLevel
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.ml.image import ImageSchema
from pyspark.sql.types import LongType, BooleanType
from pyspark.sql.functions import pandas_udf, PandasUDFType

# from word import Word

### 코드 작성 및 단어 수 세기 예제

In [2]:
# 스파크 세션 생성
spark = SparkSession\
          .builder\
          .appName("Sample")\
          .master("local[*]")\
          .getOrCreate()



In [7]:
# 데이터 프레임 생성
source = "/usr/local/Cellar/apache-spark/2.3.2/README.md"

# 스파크 세션의 read() 메서드는 데이터를 읽기 위한 DataFrameReader 인스턴스를 돌려주는데 이 DataFrameReader를 이용해 다양한 유형의
# 데이터로 부터 데이터 프레임을 생성할 수 있음. 아래의 예제는 text 파일이기 떄문에 text()를 사용.
df = spark.read.text(source)


In [4]:
# 단어 수 카운트 코드
# row와 column 단위로 데이터를 처리함 (비타입 연산)

wordDF = df.select(explode(split(col("value"), " ")).alias("word"))
# df는 스파크 세션의 read() 메서드를 이용해 생성한 데이터프레임.
# 칼럼명을 따로 지정하지 않으면 "value"를 기본값으로 생성
# 따라서 df클래스의 메서드 select을 사용해 column 선택(?)
#      explode/split(= funtions 오브젝트의 메서드)를 각각 수행.
#      .alias 는 column 클래스의 메서드

result = wordDF.groupBy("word").count()

# 
result.show()
# result.write().text("<path_to_save>")
spark.stop()


+--------------------+-----+
|                word|count|
+--------------------+-----+
|              online|    1|
|              graphs|    1|
|          ["Parallel|    1|
|          ["Building|    1|
|              thread|    1|
|       documentation|    3|
|            command,|    2|
|         abbreviated|    1|
|            overview|    1|
|                rich|    1|
|                 set|    2|
|         -DskipTests|    1|
|                name|    1|
|page](http://spar...|    1|
|        ["Specifying|    1|
|              stream|    1|
|                run:|    1|
|                 not|    1|
|            programs|    2|
|               tests|    2|
+--------------------+-----+
only showing top 20 rows



## 스파크세션
* 스파크세션은 데이터프레임(DataFrame) 또는 데이터셋(DataSet)을 생성하거나 사용자 정의함수(UDF) 등록 가능.
* bulder 메서드를 이용해 생성 가능.
* 과거엔 SQLContext와 HiveContext를 사용했으나 스파크2.0에선 합쳐짐. SQLContext + HiveContext = SparkSession

In [9]:
# 스파크 세션 생성
spark = SparkSession\
          .builder\
          .appName("Sample")\
          .master("local[*]")\
          .getOrCreate()

## 데이터프레임, 로우, 컬럼
* DataFrame은 Dataset[Row]인 경우로 R의 데이터프레임이나 데이터베이스의 테이블과 비슷한 구조.

### 데이터프레임 생성
* 데이터프레임은 스파크세션을 이용해 생성, 외부파일이나 이미 생성된 RDD를 데이터프레임 형태로 생성 가능.  
  
    [외부 파일로 부터 데이터 생성]  
    (1) 스파크세션의 read() 메서드를 호출해 DataFrameReader 인스턴스를 생성함.  
    (2) format() 메서드로 데이터소스의 유형을 지정함.  
    (3) option() 메서드로 데이터소스 처리에 필요한 옵션을 저장함.  
    (4) load() 메서드로 대상 파일을 읽고 데이터프레임을 생성함.  
  
    [기존 RDD 및 로컬에서 데이터 생성]  
    (1) RDD와 달리 스키마 정보를 함꼐 지정.( 리플렉션 API를 활용 or  직접 정보 작성)  

#### 리플렉션을 통한 데이터프레임 생성

In [11]:
row1 = Row(name="hayoon", age=7, job="student")
row2 = Row(name="sunwoo", age=13, job="student")
row3 = Row(name="hajoo", age=5, job="kindergartener")
row4 = Row(name="jinwoo", age=13, job="student")
data = [row1, row2, row3, row4]
sample_df = spark.createDataFrame(data)
sample_df.show()

+---+--------------+------+
|age|           job|  name|
+---+--------------+------+
|  7|       student|hayoon|
| 13|       student|sunwoo|
|  5|kindergartener| hajoo|
| 13|       student|jinwoo|
+---+--------------+------+



#### 기존 RDD를 통한 데이터프레임 생성

In [7]:
# 기존 RDD를 dataframe으로 생성
rdd = spark.sparkContext.parallelize(data)
sample_df2 = spark.createDataFrame(data)
sample_df2.show()

AnalysisException: 'java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;'

#### 명시적 타입 지정을 통한 데이터프레임 생성
* 스키마 정보를 입력해야 함.  
StructField : 컬럼 정보  
StructType : 로우 정보

In [8]:
# 명시적 타입 지정을 통해 데이터프레임 생성 - 사용가능한 타입은 org.apache.spark.sql.types.DataType에서 참조 가능

sf1 = StructField("name", StringType(), True) # 컬럼에 대한 스키마 정보
#                 컬럼이름,  타입         , null 허용 여부
sf2 = StructField("age", IntegerType(), True)
sf3 = StructField("job", StringType(), True)
sf4 = StructField("salary", IntegerType(), True)
sf5 = StructField("target", IntegerType(), True)
schema = StructType([sf1, sf2, sf3, sf4, sf5]) # 로우에 대한 스키마 정보
r1 = ("hayoon", 7, "student",2000,1)
r2 = ("sunwoo", 13, "student",3000,0)
r3 = ("hajoo", 5, "kindergartener",2000,1)
r4 = ("jinwoo", 13, "student",1000,0)
r5 = ("hyukjoo", 27, "DS",4000,0)
rows = [r1, r2, r3, r4,r5]
sample_df3 = spark.createDataFrame(rows, schema)
sample_df3.show()

+-------+---+--------------+------+------+
|   name|age|           job|salary|target|
+-------+---+--------------+------+------+
| hayoon|  7|       student|  2000|     1|
| sunwoo| 13|       student|  3000|     0|
|  hajoo|  5|kindergartener|  2000|     1|
| jinwoo| 13|       student|  1000|     0|
|hyukjoo| 27|            DS|  4000|     0|
+-------+---+--------------+------+------+



### 데이터프레임 주요 연산 및 사용법
* 데이터프레임은 org.apache.spark.sql.Row 타입의 객체로 구성된 데이터 셋 !!  
* 데이터셋과 동일한 타입임에도 별도의 데이터프레임이라고 하는 이유는 사용 가능한 트랜스포메이션 연산의 종류가 다르기 때문.
#### 액션 연산

In [12]:
# show() - 데이터셋(프레임)에 저장된 데이터를 화면에 출력
sample_df3.show()

+-------+---+--------------+------+------+
|   name|age|           job|salary|target|
+-------+---+--------------+------+------+
| hayoon|  7|       student|  2000|     1|
| sunwoo| 13|       student|  3000|     0|
|  hajoo|  5|kindergartener|  2000|     1|
| jinwoo| 13|       student|  1000|     0|
|hyukjoo| 27|            DS|  4000|     0|
+-------+---+--------------+------+------+



In [14]:
# head()/first() - 데이터셋의 첫 번째 로우를 돌려줌.
sample_df3.head() # = sample_df3.first()
# = take 함수와 동일
sample_df3.take(1)

[Row(name='hayoon', age=7, job='student', salary=2000, target=1)]

In [15]:
# count() - Row개수를 리턴해줌
sample_df3.count()

5

In [16]:
# collect()/ collectAsList() - 데이터셋의 모든 데이터를 로컬 컬렉션 형태로 돌려줌. !!메모리에 적재됨으로 메모리 부족 에러가 발생하지 않도록 주의!!
sample_df3.collect()

[Row(name='hayoon', age=7, job='student', salary=2000, target=1),
 Row(name='sunwoo', age=13, job='student', salary=3000, target=0),
 Row(name='hajoo', age=5, job='kindergartener', salary=2000, target=1),
 Row(name='jinwoo', age=13, job='student', salary=1000, target=0),
 Row(name='hyukjoo', age=27, job='DS', salary=4000, target=0)]

In [15]:
# describe
sample_df3.describe(["age","salary"]).show()

+-------+-----------------+-----------------+
|summary|              age|           salary|
+-------+-----------------+-----------------+
|  count|                5|                5|
|   mean|             13.0|           2400.0|
| stddev|8.602325267042627|1140.175425099138|
|    min|                5|             1000|
|    max|               27|             4000|
+-------+-----------------+-----------------+



#### 기본 연산
* 데이터셋이 제공하는 연산은 크게 기본/ 타입 트랜스포메이션/ 비타입 트랜스포메이션/ 액션 연산으로 구성.  
* 비타입 트랜스포메이션 연산은 데이터프레임인 경우만 사용 가능.  
* 타입 트랜스포메이션 연산은 데이터셋인 경우만 사용 가능.  

In [17]:
# 작업 중인 데이터를 메모리에 저장함.
df.persist(StorageLevel.MEMORY_AND_DISK_2)

DataFrame[value: string]

In [18]:
# 스키마 정보를 조회.
df.printSchema()

root
 |-- value: string (nullable = true)



In [23]:
# 스키마 정보를 조회하는 다른 방법들.
df.columns
df.dtypes
# df.schema

['name', 'age', 'job', 'salary', 'target']

In [26]:
# 데이터프레임을 테이블 처럼 SQL을 사용해 처리할 수 있게 등록해 줌.
sample_df3.createOrReplaceTempView("users")


In [28]:
# 쿼리를 날리는 예시

spark.sql("select name, age from users where age > 20").show()
spark.sql("select name, age from users where age > 20").explain()

+-------+---+
|   name|age|
+-------+---+
|hyukjoo| 27|
+-------+---+

== Physical Plan ==
*(1) Project [name#21, age#22]
+- *(1) Filter (isnotnull(age#22) && (age#22 > 20))
   +- Scan ExistingRDD[name#21,age#22,job#23,salary#24,target#25]


#### 비타입 트랜스포메이션 연산
* 데이터의 실제 타입을 사용하지 않는 변환 연산을 수행.  
예) SELECT * FROM PERSON WHERE AGE > 10과 같은 SQL문에서 AGE 칼럼은 정수값을 갖지만 타입을 정수라고 기재하지 않음. 스파크 2.0에서는 데이터 셋이 등장하면서 타입을 고려한 연산과 그렇지 않은 연산 2가지가 존재해 구분차 '비타입'이 생김.

In [None]:
# 5.5.2.1.1절 ~ 5.5.2.2.4절
def runBasicOpsEx(spark, sc, df):
    df.show()
    df.head()
    df.first()
    df.take(2)
    df.count()
    df.collect()
    df.describe("age").show()
   

# 5.5.2.4절
def runColumnEx(spark, sc, df):
    df.where(df.age > 10).show()


# 5.5.2.4.2절
def runAlias(spark, sc, df):
    df.select(df.age + 1).show()
    df.select((df.age + 1).alias("age")).show()


# 5.5.2.4.3절
def runIsinEx(spark, sc):
    nums = spark.sparkContext.broadcast([1, 3, 5, 7, 9])
    rdd = spark.sparkContext.parallelize(range(0, 10)).map(lambda v: Row(v))
    df = spark.createDataFrame(rdd)
    df.where(df._1.isin(nums.value)).show()


# 5.5.2.4.4절
def runWhenEx(spark, sc):
    ds = spark.range(0, 5)
    col = when(ds.id % 2 == 0, "even").otherwise("odd").alias("type")
    ds.select(ds.id, col).show()


# 5.5.2.4.5절
def runMaxMin(spark, df):
    min_col = min("age")
    max_col = max("age")
    df.select(min_col, max_col).show()


# 5.5.2.4.6절 ~ 5.5.2.4.9절
def runAggregateFunctions(spark, df1, df2):
    # collect_list, collect_set
    doubledDf1 = df1.union(df1)
    doubledDf1.select(functions.collect_list(doubledDf1["name"])).show(truncate=False)
    doubledDf1.select(functions.collect_set(doubledDf1["name"])).show(truncate=False)

    # count, countDistinct
    doubledDf1.select(functions.count(doubledDf1["name"]), functions.countDistinct(doubledDf1["name"])).show(
        truncate=False)

    # sum
    df2.printSchema()
    df2.select(sum(df2["price"])).show(truncate=False)

    # grouping, grouping_id
    df2.cube(df2["store"], df2["product"]).agg(sum(df2["amount"]), grouping(df2["store"])).show(truncate=False)
    df2.cube(df2["store"], df2["product"]).agg(sum(df2["amount"]), grouping_id(df2["store"], df2["product"])).show(
        truncate=False)
    
    # grouping_id를 이용한 정렬
    df2.cube(df2["store"], df2["product"]) \
        .agg(sum("amount").alias("sum"), grouping_id("store", "product").alias("gid")) \
        .filter("gid != '2'") \
        .sort(asc("store"), col("gid")) \
        .na.fill({"store":"Total", "product":"-"}) \
        .select("store", "product", "sum") \
        .show(truncate=False)


# 5.5.2.4.10 ~ 5.5.2.4.11 절
def runCollectionFunctions(spark):
    df = spark.createDataFrame([{'numbers': '9,1,5,3,9'}])
    arrayCol = split(df.numbers, ",")

    # array_contains, size
    df.select(arrayCol, array_contains(arrayCol, 2), size(arrayCol)).show(truncate=False)

    # sort_array()
    df.select(arrayCol, sort_array(arrayCol)).show(truncate=False)

    # explode, posexplode
    df.select(explode(arrayCol)).show(truncate=False)
    df.select(posexplode(arrayCol)).show(truncate=False)


# 5.5.2.4.10 ~ 5.5.2.4.11 절
def runCollectionFunctions(spark):
    df = spark.createDataFrame([{'numbers': '9,1,5,3,9'}])
    arrayCol = split(df.numbers, ",")

    # array_contains, size
    df.select(arrayCol, array_contains(arrayCol, 2), size(arrayCol)).show(truncate=False)

    # sort_array()
    df.select(arrayCol, sort_array(arrayCol)).show(truncate=False)

    # explode, posexplode
    df.select(explode(arrayCol)).show(truncate=False)
    df.select(posexplode(arrayCol)).show(truncate=False)


# 5.5.2.4.12 ~ 5.5.2.4.14절
def runDateFunctions(spark):
    f1 = StructField("d1", StringType(), True)
    f2 = StructField("d2", StringType(), True)
    schema1 = StructType([f1, f2])

    df = spark.createDataFrame([("2017-12-25 12:00:05", "2017-12-25")], schema1)
    df.show(truncate=False)

    # current_date, unix_timestamp, to_date
    d3 = current_date().alias("d3")
    d4 = unix_timestamp(df["d1"].alias("d4"))
    d5 = to_date(df["d2"].alias("d5"))
    d6 = to_date(d4.cast("timestamp")).alias("d6")
    df.select(df["d1"], df["d2"], d3, d4, d5, d6).show(truncate=False)

    # add_months, date_add, last_day
    d7 = add_months(d6, 2).alias("d7")
    d8 = date_add(d6, 2).alias("d8")
    d9 = last_day(d6).alias("d9")
    df.select(df["d1"], df["d2"], d7, d8, d9).show(truncate=False)

    # window
    f3 = StructField("date", StringType(), True)
    f4 = StructField("product", StringType(), True)
    f5 = StructField("amount", IntegerType(), True)
    schema2 = StructType([f3, f4, f5])

    r2 = ("2017-12-25 12:01:00", "note", 1000)
    r3 = ("2017-12-25 12:01:10", "pencil", 3500)
    r4 = ("2017-12-25 12:03:20", "pencil", 23000)
    r5 = ("2017-12-25 12:05:00", "note", 1500)
    r6 = ("2017-12-25 12:05:07", "note", 2000)
    r7 = ("2017-12-25 12:06:25", "note", 1000)
    r8 = ("2017-12-25 12:08:00", "pencil", 500)
    r9 = ("2017-12-25 12:09:45", "note", 30000)

    dd = spark.createDataFrame([r2, r3, r4, r5, r6, r7, r8, r9], schema2);

    timeCol = unix_timestamp(dd["date"]).cast("timestamp");
    windowCol = window(timeCol, "5 minutes");
    dd.groupBy(windowCol, dd["product"]).agg(sum(dd["amount"])).show(truncate=False);


# 5.5.2.4.15절
def runDateFunctions(spark):
    # 파이썬의 경우 아래와 같이 튜플을 이용하여 데이터프레임을 생성하는 것도 가능함
    df1 = spark.createDataFrame([(1.512,), (2.234,), (3.42,)], ['value'])
    df2 = spark.createDataFrame([(25.0,), (9.0,), (10.0,)], ['value'])

    df1.select(round(df1["value"], 1)).show()
    df2.select(functions.sqrt('value')).show()


# 5.5.2.4.16 ~ 5.5.2.4.20절
def runOtherFunctions(spark, personDf):
    df = spark.createDataFrame([("v1", "v2", "v3")], ["c1", "c2", "c3"]);

    # array
    df.select(df.c1, df.c2, df.c3, array("c1", "c2", "c3").alias("newCol")).show(truncate=False)

    # desc, asc
    personDf.show()
    personDf.sort(functions.desc("age"), functions.asc("name")).show()

    # pyspark 2.1.0 버전은 desc_nulls_first, desc_nulls_last, asc_nulls_first, asc_nulls_last 지원하지 않음

    # split, length (pyspark에서 컬럼은 df["col"] 또는 df.col 형태로 사용 가능)
    df2 = spark.createDataFrame([("Splits str around pattern",)], ['value'])
    df2.select(df2.value, split(df2.value, " "), length(df2.value)).show(truncate=False)

    # rownum, rank
    f1 = StructField("date", StringType(), True)
    f2 = StructField("product", StringType(), True)
    f3 = StructField("amount", IntegerType(), True)
    schema = StructType([f1, f2, f3])

    p1 = ("2017-12-25 12:01:00", "note", 1000)
    p2 = ("2017-12-25 12:01:10", "pencil", 3500)
    p3 = ("2017-12-25 12:03:20", "pencil", 23000)
    p4 = ("2017-12-25 12:05:00", "note", 1500)
    p5 = ("2017-12-25 12:05:07", "note", 2000)
    p6 = ("2017-12-25 12:06:25", "note", 1000)
    p7 = ("2017-12-25 12:08:00", "pencil", 500)
    p8 = ("2017-12-25 12:09:45", "note", 30000)

    dd = spark.createDataFrame([p1, p2, p3, p4, p5, p6, p7, p8], schema)
    w1 = Window.partitionBy("product").orderBy("amount")
    w2 = Window.orderBy("amount")
    dd.select(dd.product, dd.amount, functions.row_number().over(w1).alias("rownum"),
              functions.rank().over(w2).alias("rank")).show()


# 5.5.2.4.21절
def runUDF(spark, df):
    # functions를 이용한 등록
    fn1 = functions.udf(lambda job: job == "student")
    df.select(df["name"], df["age"], df["job"], fn1(df["job"])).show()
    # SparkSession을 이용한 등록
    spark.udf.register("fn2", lambda job: job == "student")
    df.createOrReplaceTempView("persons")
    spark.sql("select name, age, job, fn2(job) from persons").show()


# 5.5.2.4.24절
def runAgg(spark, df):
    df.agg(max("amount"), min("price")).show()
    df.agg({"amount": "max", "price": "min"}).show()


# 5.5.2.4.26절
def runDfAlias(spark, df):
    df.select(df["product"]).show()
    df.alias("aa").select("aa.product").show()


# 5.5.2.4.27절
def runGroupBy(spark, df):
    df.groupBy("store", "product").agg({"price": "sum"}).show()


# 5.5.3.4.28절
def runCube(spark, df):
    df.cube("store", "product").agg({"price": "sum"}).show()


# 5.5.2.4.29절
def runDistinct(spark):
    d1 = ("store1", "note", 20, 2000)
    d2 = ("store1", "bag", 10, 5000)
    d3 = ("store1", "note", 20, 2000)
    rows = [d1, d2, d3]
    cols = ["store", "product", "amount", "price"]
    df = spark.createDataFrame(rows, cols)
    df.distinct().show()
    df.dropDuplicates(["store"]).show()


# 5.5.2.4.30절
def runDrop(spark, df):
    df.drop(df["store"]).show()


# 5.5.2.4.31절
def runIntersect(spark):
    a = spark.range(1, 5)
    b = spark.range(2, 6)
    c = a.intersect(b)
    c.show()


# 5.5.2.4.32절
def runExcept(spark):
    df1 = spark.range(1, 6)
    df2 = spark.createDataFrame([(2,), (4,)], ['value'])
    # 파이썬의 경우 except 대신 subtract 메서드 사용
    # subtract의 동작은 except와 같음
    df1.subtract(df2).show()


# 5.5.2.4.33절
def runJoin(spark, ldf, rdf):
    joinTypes = "inner,outer,leftouter,rightouter,leftsemi".split(",")
    for joinType in joinTypes:
        print("============= %s ===============" % joinType)
        ldf.join(rdf, ["word"], joinType).show()


# 5.5.2.4.35절
def runNa(spark, ldf, rdf):
    result = ldf.join(rdf, ["word"], "outer").toDF("word", "c1", "c2")
    result.show()
    # 파이썬의 경우 na.drop또는 dropna 사용 가능
    # c1과 c2 칼럼의 null이 아닌 값의 개수가 thresh 이하일 경우 drop
    # thresh=1로 설정할 경우 c1 또는 c2 둘 중의 하나만 null 아닌 값을 가질 경우
    # 결과에 포함시킨다는 의미가 됨
    result.na.drop(thresh=2, subset=["c1", "c2"]).show()
    result.dropna(thresh=2, subset=["c1", "c2"]).show()
    # fill
    result.na.fill({"c1": 0}).show()
    # 파이썬의 경우 to_replace에 딕셔너리를 지정하여 replace를 수행(이 경우 value에 선언한 값은 무시됨
    # 딕셔너리를 사용하지 않을 경우 키 목록(첫번째 인자)과 값 목록(두번째 인자)을 지정하여 replace 수행
    result.na.replace(to_replace={"w1": "word1", "w2": "word2"}, value="", subset="word").show()
    result.na.replace(["w1", "w2"], ["word1", "word2"], "word").show()


# 5.5.2.4.36절
def runOrderBy(spark):
    df = spark.createDataFrame([(3, "z"), (10, "a"), (5, "c")], ["idx", "name"])
    df.orderBy("name", "idx").show()
    df.orderBy("idx", "name").show()


# 5.5.2.4.37절
def runRollup(spark, df):
    df.rollup("store", "product").agg({"price": "sum"}).show();


# 5.5.2.4.38절
def runStat(spark):
    df = spark.createDataFrame([("a", 6), ("b", 4), ("c", 12), ("d", 6)], ["word", "count"])
    df.show()
    df.stat.crosstab("word", "count").show()


# 5.5.2.4.39절
def runWithColumn(spark):
    df1 = spark.createDataFrame([("prod1", "100"), ("prod2", "200")], ["pname", "price"])
    df2 = df1.withColumn("dcprice", df1["price"] * 0.9)
    df3 = df2.withColumnRenamed("dcprice", "newprice")
    df1.show()
    df2.show()
    df3.show()


# 5.5.2.4.40절
def runSave(spark):
    sparkHomeDir = "file:///Users/beginspark/Apps/spark"
    df = spark.read.json(sparkHomeDir + "/examples/src/main/resources/people.json")
    df.write.save("/Users/beginspark/Temp/default/%d" % time.time())
    df.write.format("json").save("/Users/beginspark/Temp/json/%d" % time.time())
    df.write.format("json").partitionBy("age").save("/Users/beginspark/Temp/parti/%d" % time.time())
    # saveMode: append, overwrite, error, ignore
    df.write.mode("overwrite").saveAsTable("ohMyTable")
    spark.sql("select * from ohMyTable").show()
    df.write.format('parquet').bucketBy(20, 'age').mode("overwrite").saveAsTable("bucketTable")
    spark.sql("select * from bucketTable").show()


## 데이터셋
* 기존의 RDD와 데이터프레임을 모두 활용하기 위해서 생김.
* Python에서는 현재 미지원

In [None]:
spark = SparkSession \
    .builder \
    .appName("sample") \
    .master("local[*]") \
    .config("spark.sql.warehouse.dir", "file:///Users/beginspark/Temp/") \
    .config("spark.driver.host", "127.0.0.1") \
    .getOrCreate()

sc = spark.sparkContext

# 파이썬에서 데이터프레임 생성 시 네임드튜플(namedtuple), 튜플(tuple)
# Row, 커스텀 클래스(class), 딕셔너리(dictionary) 등을
# 사용하여 생성할 수 있다
Person = collections.namedtuple('Person', 'name age job')

# sample dataframe 1
row1 = Person(name="hayoon", age=7, job="student")
row2 = Person(name="sunwoo", age=13, job="student")
row3 = Person(name="hajoo", age=5, job="kindergartener")
row4 = Person(name="jinwoo", age=13, job="student")
data = [row1, row2, row3, row4]
sample_df = spark.createDataFrame(data)

d1 = ("store2", "note", 20, 2000)
d2 = ("store2", "bag", 10, 5000)
d3 = ("store1", "note", 15, 1000)
d4 = ("store1", "pen", 20, 5000)
sample_df2 = spark.createDataFrame([d1, d2, d3, d4]).toDF("store", "product", "amount", "price")

ldf = spark.createDataFrame([Word("w1", 1), Word("w2", 1)])
rdf = spark.createDataFrame([Word("w1", 1), Word("w3", 1)])


# createDataFrame
def createDataFrame(spark, sc):
    sparkHomeDir = "file:/Users/beginspark/Apps/spark"

    # 1. 외부 데이터소스로부터 데이터프레임 생성
    df1 = spark.read.json(sparkHomeDir + "/examples/src/main/resources/people.json")
    df2 = spark.read.parquet(sparkHomeDir + "/examples/src/main/resources/users.parquet")
    df3 = spark.read.text(sparkHomeDir + "/examples/src/main/resources/people.txt")

    # 2. 로컬 컬렉션으로부터 데이터프레임 생성 (ex5-17)
    row1 = Row(name="hayoon", age=7, job="student")
    row2 = Row(name="sunwoo", age=13, job="student")
    row3 = Row(name="hajoo", age=5, job="kindergartener")
    row4 = Row(name="jinwoo", age=13, job="student")
    data = [row1, row2, row3, row4]
    df4 = spark.createDataFrame(data)

    # 3. 기존 RDD로부터 데이터프레임 생성 (ex5-20)
    rdd = spark.sparkContext.parallelize(data)
    df5 = spark.createDataFrame(data)

    # 4. 스키마 지정을 통한 데이터프레임 생성(ex5-23)
    sf1 = StructField("name", StringType(), True)
    sf2 = StructField("age", IntegerType(), True)
    sf3 = StructField("job", StringType(), True)
    schema = StructType([sf1, sf2, sf3])
    r1 = ("hayoon", 7, "student")
    r2 = ("sunwoo", 13, "student")
    r3 = ("hajoo", 5, "kindergartener")
    r4 = ("jinwoo", 13, "student")
    rows = [r1, r2, r3, r4]
    df6 = spark.createDataFrame(rows, schema)
    
    # 5. 이미지를 이용한 데이터프레임 생성
    path = sparkHomeDir + "/data/mllib/images"
    recursive = True
    numPartitions = 2
    dropImageFailures = True
    sampleRatio = 1.0
    seed = 0    
    imgdf = ImageSchema.readImages(path, recursive, numPartitions, dropImageFailures, sampleRatio, seed)
    
    imgdf = imgdf.select(imgdf["image.origin"], imgdf["image.height"], imgdf["image.width"], imgdf["image.nChannels"], imgdf["image.mode"])
    # imgdf.printSchema()
    # imgdf.show(10, False)    



    
# 5.5.2.4.41.2절 
def run_conversion_with_arrow(spark):
    spark.conf.set("spark.sql.execution.arrow.enabled", "true")
    sales_data = {'name': ['store2', 'store2', 'store1', 'store1'],
                  'product': ['note', 'bag', 'note', 'pen'],
                  'amount': [20, 10, 15, 20],
                  'price': [2000, 5000, 1000, 5000]}
    pdf = pd.DataFrame(sales_data)
    print(pdf)
    
    # pandas dataframe -> spark dataframe
    df = spark.createDataFrame(pdf).groupBy("name").count()
    df.show(10, False)
    # spark dataframe -> pandas dataframe    
    pdf2 = df.toPandas()
    print(pdf2)

    
# 5.5.2.4.41.3절
def run_pandas_scala_udf(spark):
    spark.conf.set("spark.sql.execution.arrow.enabled", "true")
    # pandas udf 정의 
    total_price = pandas_udf(get_total_price, returnType=LongType())
    # spark dataframe
    sample_df2.show()
    # padas 함수 적용 
    sample_df2.withColumn("total_price", total_price(col("amount"), col("price"))).show()

    
def get_total_price(amount, price):
    return amount * price


# 5.5.2.4.43절
def run_pandas_grouped_map_udf(spark):
    spark.conf.set("spark.sql.execution.arrow.enabled", "true")
    # spark dataframe
    sample_df2.show()
    # padas 함수 적용 
    sample_df2.groupby("product").apply(get_total_price_bydf).show()

    
@pandas_udf("store string, product string, amount int, price int, total_price int", PandasUDFType.GROUPED_MAP)
def get_total_price_bydf(pdf):
    amount = pdf.amount
    pdf['totol_price'] = (pdf.amount * pdf.price)
    return pdf    


# 실행은 bin/pyspark --jars <path>/beginning-spark-examples.jar와 같이 실행 
def run_java_udf(pdf, df):
    # python udf
    spark.udf.register("fn2", lambda job: job == "student")
    df.createOrReplaceTempView("persons")
    spark.sql("select name, age, job, fn2(job) from persons").show()
    # functions를 이용한 등록
    spark.udf.registerJavaFunction("judf", "com.wikibooks.spark.ch5.Judf", BooleanType())
    spark.sql("select name, age, job, judf(job) from persons").show()


# 5.5.2.4.41.4절(파이썬의 경우 partitionByRange 는 지원하지 않음(spark2.3.0))
def runPartitionByRange(spark):
    pass 


# 5.5.2.4.41.5절
def runColRegex(spark):
    d1 = ("store2", "note", 20, 2000)
    d2 = ("store2", "bag", 10, 5000)
    df = spark.createDataFrame([d1, d2]).toDF("store_nm", "prod_nm", "amount", "price")
    df.select(df.colRegex("`.*nm`")).show()


# 5.5.2.4.41.6절    
def runUnionByName(spark):
    d1 = ("store2", "note", 20, 2000)
    d2 = ("store2", "bag", 10, 5000)
    df1 = spark.createDataFrame([d1, d2]).toDF("store_nm", "prod_nm", "amount", "price")
    df2 = df1.select("price", "amount", "prod_nm", "store_nm")
    df1.union(df2).show()
    df1.unionByName(df2).show()


# 5.5.2.4.41.7절
def runToJson(spark):
    d1 = ("store2", "note", 20, 2000)
    d2 = ("store2", "bag", 10, 5000)
    df = spark.createDataFrame([d1, d2]).toDF("store_nm", "prod_nm", "amount", "price")
    df.select(to_json(struct("store_nm", "prod_nm", "amount", "price")).alias("value")).show(truncate=False)


# 5.5.2.4.41.7절
def runFromJson(spark):
    v1 = ("""{"store_nm":"store2", "prod_nm":"note", "amount":20, "price":2000}""",)
    v2 = ("""{"store_nm":"store2", "prod_nm":"bag","amount":10, "price":5000}""",)

    f1 = StructField("store_nm", StringType(), True)
    f2 = StructField("prod_nm", StringType(), True)
    f3 = StructField("amount", IntegerType(), True)
    f4 = StructField("price", IntegerType(), True)
    schema = StructType([f1, f2, f3, f4])
    
    options = {"multiLine" : "false"}
    df1 = spark.createDataFrame([v1, v2]).toDF("value")
    df2 = df1.select(from_json(df1.value, schema, options).alias("value"))
    df3 = df2.select(df2['value']['store_nm'], df2['value']['prod_nm'], df2['value']['amount'], df2['value']['price'])
    df3.toDF('store_nm', 'prod_nm', 'amount', 'price').show(truncate=False)

    
# [예제 실행 방법] 아래에서 원하는 예제의 주석을 제거하고 실행!!
# createDataFrame(spark, sc)
# runBasicOpsEx(spark, sc, sample_df)
# runColumnEx(spark, sc, sample_df)
# runAlias(spark, sc, sample_df)
# runIsinEx(spark, sc)
# runWhenEx(spark, sc)
# runMaxMin(spark, sample_df)
# runAggregateFunctions(spark, sample_df, sample_df2)
# runCollectionFunctions(spark)
# runDateFunctions(spark)
# runDateFunctions(spark)
# runOtherFunctions(spark, sample_df)
# runUDF(spark, sample_df)
# runAgg(spark, sample_df2)
# runDfAlias(spark, sample_df2)
# runGroupBy(spark, sample_df2)
# runCube(spark, sample_df2)
# runDistinct(spark)
# runDrop(spark, sample_df2)
# runIntersect(spark)
# runExcept(spark)
# runJoin(spark, ldf, rdf)
# runNa(spark, ldf, rdf)
# runOrderBy(spark)
# runRollup(spark, sample_df2)
# runWithColumn(spark)
# runSave(spark)
# run_conversion_with_arrow(spark)
# run_pandas_scala_udf(spark)
# run_pandas_grouped_map_udf(spark)
# runPartitionByRange(spark)
# runColRegex(spark)
# runUnionByName(spark)
# runToJson(spark)
# runFromJson(spark)

In [2]:
def runAggregateFunctions(spark, df1, df2):
    # collect_list, collect_set
    doubledDf1 = df1.union(df1)
    doubledDf1.select(functions.collect_list(doubledDf1["name"])).show(truncate=False)
    doubledDf1.select(functions.collect_set(doubledDf1["name"])).show(truncate=False)

    # count, countDistinct
    doubledDf1.select(functions.count(doubledDf1["name"]), functions.countDistinct(doubledDf1["name"])).show(
        truncate=False)

    # sum
    df2.printSchema()
    df2.select(sum(df2["price"])).show(truncate=False)

    # grouping, grouping_id
    df2.cube(df2["store"], df2["product"]).agg(sum(df2["amount"]), grouping(df2["store"])).show(truncate=False)
    df2.cube(df2["store"], df2["product"]).agg(sum(df2["amount"]), grouping_id(df2["store"], df2["product"])).show(
        truncate=False)
    
    # grouping_id를 이용한 정렬
    df2.cube(df2["store"], df2["product"]) \
        .agg(sum("amount").alias("sum"), grouping_id("store", "product").alias("gid")) \
        .filter("gid != '2'") \
        .sort(asc("store"), col("gid")) \
        .na.fill({"store":"Total", "product":"-"}) \
        .select("store", "product", "sum") \
        .show(truncate=False)