# SparkSQL

<div style="text-align: right">
2018.11.03 / 발제자 임지훈
</div>

<br>
### Reference
- 빅데이터 분석을 위한 스파크2 프로그래밍, 백성민, 위키북스
- Spark SQL, DataFrames and Datasets Guide, https://spark.apache.org/docs/latest/sql-programming-guide.html
- Pyspark Package, http://spark.apache.org/docs/latest/api/python/pyspark.html

<hr>
### 순서
1. Dataset
2. Word Count in DataFrame
3. DataFrame 생성
4. 주요 연산
5. Pandas 연동
6. Hive 연동
<hr>

## 1. Dataset

In [1]:
# RDD 생성 후 map() 메소드를 이용해보자
rdd = sc.parallelize(range(20))
rdd.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

In [2]:
rdd.map(lambda element : element+1).collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]

## 2. Word Count in DataFrame

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession\
        .builder\
        .appName("sample")\
        .master("local[*]")\
        .getOrCreate()

In [4]:
source = "file:///home/ubuntu/18-2Engineering/Week05_181103/resources/countMe.txt"         
df = spark.read.text(source)  
df.show()

+--------------------+
|               value|
+--------------------+
|A bad penny alway...|
|A barking dog nev...|
|A bird in the han...|
|A cat may look at...|
|A chain is only a...|
|A change is as go...|
|A dog is a man's ...|
|A drowning man wi...|
|A fish always rot...|
|A fool and his mo...|
|A friend in need ...|
|A golden key can ...|
|A good beginning ...|
|A good man is har...|
|A house divided a...|
|A person is known...|
|A house is not a ...|
|A journey of a th...|
|A leopard cannot ...|
|A little knowledg...|
+--------------------+
only showing top 20 rows



In [5]:
wordDF= df.select(explode(split(col("value"), " ")).alias("word"))

In [6]:
wordDF.show()

+-------+
|   word|
+-------+
|      A|
|    bad|
|  penny|
| always|
|  turns|
|     up|
|      A|
|barking|
|    dog|
|  never|
|  bites|
|      A|
|   bird|
|     in|
|    the|
|   hand|
|     is|
|  worth|
|    two|
|     in|
+-------+
only showing top 20 rows



In [7]:
result=wordDF.groupBy("word").count()

In [8]:
result.show()

+----------+-----+
|      word|count|
+----------+-----+
|      July|    1|
|     those|    5|
|     spoil|    3|
|    travel|    2|
|       few|    1|
|pack-drill|    1|
|    waters|    1|
|    harder|    1|
|      hope|    1|
|      some|    1|
|    taking|    1|
|   Sabbath|    1|
|     parts|    1|
|      lies|    1|
|    Mighty|    1|
|  Tomorrow|    2|
|   vinegar|    1|
|   stomach|    2|
|   showers|    2|
|   flowers|    2|
+----------+-----+
only showing top 20 rows



In [None]:
outdir = "file:///home/ubuntu/18-2Engineering/Week05_181103/output/"
result.write.format("csv").save(outdir)

In [9]:
spark.stop()

In [2]:
#DataFrame을 생성 후 SQL의 SELECT 문으로 원하는 데이터를 조회해보자
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession\
        .builder\
        .appName("sample")\
        .master("local[*]")\
        .getOrCreate()
        
source = "file:///home/ubuntu/18-2Engineering/Week05_181103/resources/countMe.txt"         
df = spark.read.text(source)  

wordDF = df.select(explode(split(col("value"), " ")).alias("word"))
result = wordDF.groupBy("word").count()
result.show()

outdir = "file:///home/ubuntu/18-2Engineering/Week05_181103/output/"
result.write.format("csv").save(outdir)

spark.stop()

+----------+-----+
|      word|count|
+----------+-----+
|      July|    1|
|     those|    5|
|     spoil|    3|
|    travel|    2|
|       few|    1|
|pack-drill|    1|
|    waters|    1|
|    harder|    1|
|      hope|    1|
|      some|    1|
|    taking|    1|
|   Sabbath|    1|
|     parts|    1|
|      lies|    1|
|    Mighty|    1|
|  Tomorrow|    2|
|   vinegar|    1|
|   stomach|    2|
|   showers|    2|
|   flowers|    2|
+----------+-----+
only showing top 20 rows



AnalysisException: 'path file:/home/ubuntu/18-2Engineering/Week05_181103/output already exists.;'

해당 아웃풋 디렉토리로 가서 결과가 잘 저장됐는지 확인해보세요

## 3. DataFrame 생성

In [10]:
import collections

from pyspark import StorageLevel
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import functions
from pyspark.sql.types import *
from pyspark.sql.window import Window
import time

#from word import Word

spark = SparkSession \
    .builder \
    .appName("sample") \
    .master("local[*]") \
    .getOrCreate()

sc = spark.sparkContext


#    .config("spark.sql.warehouse.dir", "file:///Users/beginspark/Temp/") \
#    .config("spark.driver.host", "127.0.0.1") \

### 3-1.외부 데이터 소스로부터 Dataframe 생성

In [11]:
commonDir = "file:///home/ubuntu/18-2Engineering/Week05_181103/"
df1 = spark.read.json(commonDir + "./resources/people.json")
df2 = spark.read.parquet(commonDir + "./resources/users.parquet")
df3 = spark.read.text(commonDir + "./resources/people.txt")

df1.show()
df2.show()
df3.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

+-----------+
|      value|
+-----------+
|Michael, 29|
|   Andy, 30|
| Justin, 19|
+-----------+



### 3-2. RDD로부터 Dataframe 생성

In [12]:
row1 = Row(name="soonchan", age=26, job="googler")
row2 = Row(name="taeoh", age=25, job="mother")
row3 = Row(name="hyunwoo", age=25, job="father")
row4 = Row(name="euntaek", age=25, job="garbage")
row5 = Row(name="jihoon", age=2, job="student")

data = [row1, row2, row3, row4, row5]
rdd = spark.sparkContext.parallelize(data)

df4 = spark.createDataFrame(data)
df4.show()

+---+-------+--------+
|age|    job|    name|
+---+-------+--------+
| 26|googler|soonchan|
| 25| mother|   taeoh|
| 25| father| hyunwoo|
| 25|garbage| euntaek|
|  2|student|  jihoon|
+---+-------+--------+



### 3-3 List로부터 Dataframe 생성

In [13]:
row1 = Row(name="soonchan", age=26, job="googler")
row2 = Row(name="taeoh", age=25, job="mother")
row3 = Row(name="hyunwoo", age=25, job="father")
row4 = Row(name="euntaek", age=25, job="garbage")
row5 = Row(name="jihoon", age=2, job="student")

data = [row1, row2, row3, row4,row5]

df5 = spark.createDataFrame(data)
df5.show()

+---+-------+--------+
|age|    job|    name|
+---+-------+--------+
| 26|googler|soonchan|
| 25| mother|   taeoh|
| 25| father| hyunwoo|
| 25|garbage| euntaek|
|  2|student|  jihoon|
+---+-------+--------+



### 3-4 Schema 지정을 통한 Dataframe 생성

In [14]:
sf1 = StructField("name", StringType(), True)
sf2 = StructField("age", IntegerType(), True)
sf3 = StructField("job", StringType(), True)

schema = StructType([sf1, sf2, sf3])

r1 = ("soonchan", 26, "googler")
r2 = ("taeoh", 25, "mother")
r3 = ("hyunwoo", 25, "father")
r4 = ("euntaek", 25, "garbage")
r5 = ("jihoon", 25, "student")
rows = [r1, r2, r3, r4, r5]

df6 = spark.createDataFrame(rows, schema)
df6.show()

+--------+---+-------+
|    name|age|    job|
+--------+---+-------+
|soonchan| 26|googler|
|   taeoh| 25| mother|
| hyunwoo| 25| father|
| euntaek| 25|garbage|
|  jihoon| 25|student|
+--------+---+-------+



### 3-5. 튜플로부터 Dataframe 생성

In [15]:
#네임드튜플(namedtuple)로 먼저 row와 column에 해당되는 부분을 정의한다.
Person = collections.namedtuple('Person', 'name age job')

# sample dataframe 1
row1 = Person(name="soonchan", age=26, job="googler")
row2 = Person(name="taeoh", age=25, job="mother")
row3 = Person(name="hyunwoo", age=25, job="father")
row4 = Person(name="euntaek", age=25, job="garbage")
row5 = Person(name="jihoon", age=25, job="student")

data = [row1, row2, row3, row4, row5]

df7 = spark.createDataFrame(data)
df7.show()

+--------+---+-------+
|    name|age|    job|
+--------+---+-------+
|soonchan| 26|googler|
|   taeoh| 25| mother|
| hyunwoo| 25| father|
| euntaek| 25|garbage|
|  jihoon| 25|student|
+--------+---+-------+



### 3-6. 

In [16]:
d1 = ("store2", "note", 20, 2000)
d2 = ("store2", "bag", 10, 5000)
d3 = ("store1", "note", 15, 1000)
d4 = ("store1", "pen", 20, 5000)
df8 = spark.createDataFrame([d1, d2, d3, d4]).toDF("store", "product", "amount", "price")
df8.show()

+------+-------+------+-----+
| store|product|amount|price|
+------+-------+------+-----+
|store2|   note|    20| 2000|
|store2|    bag|    10| 5000|
|store1|   note|    15| 1000|
|store1|    pen|    20| 5000|
+------+-------+------+-----+



### 3-7. class를 이용하여 Dataframe 생성

In [17]:
#사용자가 class를 정의한 후, 새로운 객체를 생성하고 그 객체를 이용해서 Dataframe으로 생성
class Word:
    def __init__(self, word, count):
        self.word = word
        self.count = count

ldf = spark.createDataFrame([Word("w1", 1), Word("w2", 1)])
rdf = spark.createDataFrame([Word("w1", 1), Word("w3", 1)])
ldf.show()
rdf.show()

+-----+----+
|count|word|
+-----+----+
|    1|  w1|
|    1|  w2|
+-----+----+

+-----+----+
|count|word|
+-----+----+
|    1|  w1|
|    1|  w3|
+-----+----+



In [18]:
# createDataFrame
def createDataFrame(spark, sc):
    sparkHomeDir = "file:///home/ubuntu/18-2Engineering/Week05_181103"

    # 1. 파일로 부터 생성
    df1 = spark.read.json(sparkHomeDir + "./resources/people.json")
    df2 = spark.read.parquet(sparkHomeDir + "./resources/users.parquet")
    df3 = spark.read.text(sparkHomeDir + "./resources/people.txt")

    # 2. List으로부터 생성 (ex5-17)
    row1 = Row(name="soonchan", age=26, job="googler")
    row2 = Row(name="taeoh", age=25, job="mother")
    row3 = Row(name="hyunwoo", age=25, job="father")
    row4 = Row(name="euntaek", age=25, job="garbage")
    row5 = Row(name="jihoon", age=25, job="student")
    data = [row1, row2, row3, row4,row5]
    df4 = spark.createDataFrame(data)

    # 3. RDD로부터 생성 (ex5-20)
    rdd = spark.sparkContext.parallelize(data)
    df5 = spark.createDataFrame(data)

    # 4. 스키마 지정(ex5-23)
    sf1 = StructField("name", StringType(), True)
    sf2 = StructField("age", IntegerType(), True)
    sf3 = StructField("job", StringType(), True)
    schema = StructType([sf1, sf2, sf3])
    r1 = ("soonchan", 26, "googler")
    r2 = ("taeoh", 25, "mother")
    r3 = ("hyunwoo", 25, "father")
    r4 = ("euntaek", 25, "garbage")
    r5 = ("jihoon", 25, "student")
    rows = [r1, r2, r3, r4, r5]
    df6 = spark.createDataFrame(rows, schema)

## 4. 주요 연산

### 4-1. 액션 연산

show()

In [19]:
# show()
# DataFrame에 저장된 데이터를 화면에 출력
df7.show()

+--------+---+-------+
|    name|age|    job|
+--------+---+-------+
|soonchan| 26|googler|
|   taeoh| 25| mother|
| hyunwoo| 25| father|
| euntaek| 25|garbage|
|  jihoon| 25|student|
+--------+---+-------+



head() / first()

In [20]:
# head, first
# 첫 번째 로우를 리턴
df7.head()
# df8.first()

Row(name='soonchan', age=26, job='googler')

take()

In [21]:
# take(n)
# 첫 n개를 보여줌
df8.take(3)

[Row(store='store2', product='note', amount=20, price=2000),
 Row(store='store2', product='bag', amount=10, price=5000),
 Row(store='store1', product='note', amount=15, price=1000)]

count()

In [22]:
# count
# RDD의 count()연산과 동일, 로우의 개수를 리턴
df5.count()

5

collect()

In [23]:
# collect
# 데이터셋에 있는 데이터를 로컬 컬렉션(배열) 형태로 리턴
# 메모리 부족 에러가 발생하지 않도록 주의!
df7.collect()

[Row(name='soonchan', age=26, job='googler'),
 Row(name='taeoh', age=25, job='mother'),
 Row(name='hyunwoo', age=25, job='father'),
 Row(name='euntaek', age=25, job='garbage'),
 Row(name='jihoon', age=25, job='student')]

describe()

In [24]:
# describe
df7.describe("age").show()

+-------+-------------------+
|summary|                age|
+-------+-------------------+
|  count|                  5|
|   mean|               25.2|
| stddev|0.44721359549995765|
|    min|                 25|
|    max|                 26|
+-------+-------------------+



In [25]:
df7.describe("job").show()

+-------+-------+
|summary|    job|
+-------+-------+
|  count|      5|
|   mean|   null|
| stddev|   null|
|    min| father|
|    max|student|
+-------+-------+



### 4-2. 기본 연산

persist() / cache()

In [26]:
# cache는 persist와 동일한 기능 제공
# 작업중인 데이터를 메모리에 저장

from pyspark import StorageLevel

df7.persist(StorageLevel.MEMORY_AND_DISK_2)

DataFrame[name: string, age: bigint, job: string]

http://스파크셸실행시킨서버:4040 으로 접속

In [27]:
df7.count()

5

http://스파크셸실행시킨서버:4040 다시 확인

printSchema()

In [28]:
# printSchema(), columns, dtypes, schema
# 스키마 정보를 조회하는 메서드와 리스트들

df7.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- job: string (nullable = true)



columns

In [29]:
# 메서드가 아닙니다. 리스트를 반환합니다.
df7.columns

['name', 'age', 'job']

dtypes

In [30]:
# 메서드가 아닙니다. 리스트를 반환합니다.
df7.dtypes

[('name', 'string'), ('age', 'bigint'), ('job', 'string')]

schema

In [31]:
# 메서드가 아닙니다. 리스트를 반환합니다.
df7.schema

StructType(List(StructField(name,StringType,true),StructField(age,LongType,true),StructField(job,StringType,true)))

createOrReplaceTempView()

In [32]:
# createOrReplaceTempView()

# DataFrame을 SQL문을 이용해 테이블처럼 조회할 수 있도록 등록해줍니다.
# 단, 이 메서드로 등록된 테이블은 스파크세션이 유지되는 동안만 유효합니다.
# (세션이 종료되면 사라짐)

df7.createOrReplaceTempView("engineeringTeam")
spark.sql("SELECT name, age FROM engineeringTeam WHERE age > 25").show()

+--------+---+
|    name|age|
+--------+---+
|soonchan| 26|
+--------+---+



explain()

In [33]:
# explain()

# 데이터프레임 처리와 관련된 실행 계획 정보를 출력합니다.

spark.sql("SELECT name, age FROM engineeringTeam WHERE age > 25").explain()

== Physical Plan ==
*(1) Filter (isnotnull(age#133L) && (age#133L > 25))
+- InMemoryTableScan [name#132, age#133L], [isnotnull(age#133L), (age#133L > 25)]
      +- InMemoryRelation [name#132, age#133L, job#134], true, 10000, StorageLevel(disk, memory, 2 replicas)
            +- Scan ExistingRDD[name#132,age#133L,job#134]


### 4-3. Row, Column, functions

In [34]:
# SQL문을 직접 이용한 연산

spark.sql("SELECT name, age, job FROM engineeringTeam WHERE age > 20").show()

+--------+---+-------+
|    name|age|    job|
+--------+---+-------+
|soonchan| 26|googler|
|   taeoh| 25| mother|
| hyunwoo| 25| father|
| euntaek| 25|garbage|
|  jihoon| 25|student|
+--------+---+-------+



In [35]:
# 이 연산과 동일합니다.
# 이런 형태의 연산을 비타입 트랜스포메이션 연산이라 합니다.

df7.where(df7['age'] > 20).show()
#df7.where(df7.age > 20).show() # 이런 형태도 가능

+--------+---+-------+
|    name|age|    job|
+--------+---+-------+
|soonchan| 26|googler|
|   taeoh| 25| mother|
| hyunwoo| 25| father|
| euntaek| 25|garbage|
|  jihoon| 25|student|
+--------+---+-------+



alias(), as()

In [36]:
# alias()

# 컬럼 명에 원하는 이름을 붙여줄 수 있습니다. 
df7.select(df7['age'] + 1).show()

+---------+
|(age + 1)|
+---------+
|       27|
|       26|
|       26|
|       26|
|       26|
+---------+



In [37]:
df7.select((df7['age'] + 1).alias("age")).show()

+---+
|age|
+---+
| 27|
| 26|
| 26|
| 26|
| 26|
+---+



isin()

In [40]:
# isIn()

# 컬럼의 값이 인자로 지정된 값에 포함되어 있는지의 여부를 확인합니다.
# 아래의 예제는 유효한 값을 broadcaast 변수에 담아 필터처럼 사용합니다.

from pyspark.sql import Row

nums = spark.sparkContext.broadcast([1, 3, 5, 7, 9])
rdd = spark.sparkContext.parallelize(range(0, 10)).map(lambda v: Row(v))
df = spark.createDataFrame(rdd)
df.where(df._1.isin(nums.value)).show()

+---+
| _1|
+---+
|  1|
|  3|
|  5|
|  7|
|  9|
+---+



In [41]:
elder = spark.sparkContext.broadcast([26,27,28,29,30])
df7.select(df7['name'], df7['age'].isin(elder.value).alias("elder")).show()

+--------+-----+
|    name|elder|
+--------+-----+
|soonchan| true|
|   taeoh|false|
| hyunwoo|false|
| euntaek|false|
|  jihoon|false|
+--------+-----+



when()

In [33]:
# when()

from pyspark.sql import functions

ds = spark.range(0, 5)
col = functions.when(ds.id % 2 == 0, "even").otherwise("odd").alias("type")
ds.select(ds.id, col).show()

+---+----+
| id|type|
+---+----+
|  0|even|
|  1| odd|
|  2|even|
|  3| odd|
|  4|even|
+---+----+



In [34]:
d1 = ("store2", "note", 20, 2000)
d2 = ("store2", "bag", 10, 5000)
d3 = ("store1", "note", 15, 1000)
d4 = ("store1", "pen", 20, 5000)
df = spark.createDataFrame([d1, d2, d3, d4]).toDF("store", "product", "amount", "price")

gdf = df.groupBy(df.store)
sorted(gdf.agg({"*": "count"}).collect())

sorted(gdf.agg(functions.min(df.amount)).collect())

[Row(store='store1', min(amount)=15), Row(store='store2', min(amount)=10)]

max(), mean()

In [35]:
df7.select(max(df7['age'])).show()

+--------+
|max(age)|
+--------+
|      26|
+--------+



In [36]:
df7.select(max(df7['job'])).show()

+--------+
|max(job)|
+--------+
| student|
+--------+



In [37]:
df7.select(mean(df7['age'])).show()

+--------+
|avg(age)|
+--------+
|    25.2|
+--------+



In [38]:
df7.select(mean(df7['job'])).show()

+--------+
|avg(job)|
+--------+
|    null|
+--------+



collect_list() , collect_set()

In [39]:
# 특정 컬럼 값을 모아서 하나의 리스트 또는 세트(set)로 된 컬럼을 생성함
# 리스트는 중복 포함, 세트는 중복 미포함

doubleDf = df7.union(df7)
doubleDf.show()

+--------+---+-------+
|    name|age|    job|
+--------+---+-------+
|soonchan| 26|googler|
|   taeoh| 25| mother|
| hyunwoo| 25| father|
| euntaek| 25|garbage|
|  jihoon| 25|student|
|soonchan| 26|googler|
|   taeoh| 25| mother|
| hyunwoo| 25| father|
| euntaek| 25|garbage|
|  jihoon| 25|student|
+--------+---+-------+



In [40]:
doubleDf.select(collect_list('name')).take(10)

[Row(collect_list(name)=['soonchan', 'taeoh', 'hyunwoo', 'euntaek', 'jihoon', 'soonchan', 'taeoh', 'hyunwoo', 'euntaek', 'jihoon'])]

In [41]:
doubleDf.select(collect_set('name')).take(10)

[Row(collect_set(name)=['hyunwoo', 'euntaek', 'taeoh', 'soonchan', 'jihoon'])]

In [42]:
doubleDf.select(collect_list("name")[0]).show()

+---------------------+
|collect_list(name)[0]|
+---------------------+
|             soonchan|
+---------------------+



In [43]:
doubleDf.select(collect_set("name")[4]).show()

+--------------------+
|collect_set(name)[4]|
+--------------------+
|              jihoon|
+--------------------+



count() / countDistinct()

In [44]:
doubleDf.select(count("name"), countDistinct("name")).show()

+-----------+--------------------+
|count(name)|count(DISTINCT name)|
+-----------+--------------------+
|         10|                   5|
+-----------+--------------------+



sum()

In [45]:
df8.show()

+------+-------+------+-----+
| store|product|amount|price|
+------+-------+------+-----+
|store2|   note|    20| 2000|
|store2|    bag|    10| 5000|
|store1|   note|    15| 1000|
|store1|    pen|    20| 5000|
+------+-------+------+-----+



In [46]:
df8.select(sum("price")).show()

+----------+
|sum(price)|
+----------+
|     13000|
+----------+



In [47]:
df8.select(sum("price") / count("price")).show()

+---------------------------+
|(sum(price) / count(price))|
+---------------------------+
|                     3250.0|
+---------------------------+



round() / sqrt()

In [48]:
df8.select(sqrt(sum("price") / count("price"))).show()

+---------------------------------+
|SQRT((sum(price) / count(price)))|
+---------------------------------+
|                 57.0087712549569|
+---------------------------------+



In [49]:
df8.select(round(sqrt(sum("price") / count("price")),3)).show()

+-------------------------------------------+
|round(SQRT((sum(price) / count(price))), 3)|
+-------------------------------------------+
|                                     57.009|
+-------------------------------------------+



desc() / asc()

In [50]:
df8.sort(desc("price")).show()

+------+-------+------+-----+
| store|product|amount|price|
+------+-------+------+-----+
|store2|    bag|    10| 5000|
|store1|    pen|    20| 5000|
|store2|   note|    20| 2000|
|store1|   note|    15| 1000|
+------+-------+------+-----+



In [51]:
df8.sort(desc("price"),asc("store")).show()

+------+-------+------+-----+
| store|product|amount|price|
+------+-------+------+-----+
|store1|    pen|    20| 5000|
|store2|    bag|    10| 5000|
|store2|   note|    20| 2000|
|store1|   note|    15| 1000|
+------+-------+------+-----+



agg()

In [52]:
df8.agg(max("amount"), min("price")).show()

+-----------+----------+
|max(amount)|min(price)|
+-----------+----------+
|         20|      1000|
+-----------+----------+



In [53]:
df8.select(max("amount"), min("price")).show()

+-----------+----------+
|max(amount)|min(price)|
+-----------+----------+
|         20|      1000|
+-----------+----------+



groupBy()

In [54]:
df8.groupBy("store").sum("price").show()

+------+----------+
| store|sum(price)|
+------+----------+
|store2|      7000|
|store1|      6000|
+------+----------+



In [55]:
df8.groupBy("store", "product").sum("price").show()

+------+-------+----------+
| store|product|sum(price)|
+------+-------+----------+
|store1|   note|      1000|
|store2|    bag|      5000|
|store1|    pen|      5000|
|store2|   note|      2000|
+------+-------+----------+



cube()

In [56]:
df8.cube("store", "product").sum("price")\
.sort(asc("store"), asc("product")).show()

+------+-------+----------+
| store|product|sum(price)|
+------+-------+----------+
|  null|   null|     13000|
|  null|    bag|      5000|
|  null|   note|      3000|
|  null|    pen|      5000|
|store1|   null|      6000|
|store1|   note|      1000|
|store1|    pen|      5000|
|store2|   null|      7000|
|store2|    bag|      5000|
|store2|   note|      2000|
+------+-------+----------+



In [57]:
df8.select([c for c in df8.columns]).show()

+------+-------+------+-----+
| store|product|amount|price|
+------+-------+------+-----+
|store2|   note|    20| 2000|
|store2|    bag|    10| 5000|
|store1|   note|    15| 1000|
|store1|    pen|    20| 5000|
+------+-------+------+-----+



## 5. Pandas 연동

In [58]:
!pip install pyarrow

[33mYou are using pip version 9.0.1, however version 18.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [42]:
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
sales_data = {'name' : ['store2', 'store2', 'store1', 'store1'],
             'product' : ['note', 'bag', 'note', 'pen'],
             'amount' : [20, 10, 15, 20],
             'price' : [2000, 5000, 1000, 5000]}
sales_data

{'amount': [20, 10, 15, 20],
 'name': ['store2', 'store2', 'store1', 'store1'],
 'price': [2000, 5000, 1000, 5000],
 'product': ['note', 'bag', 'note', 'pen']}

In [43]:
pdf = pd.DataFrame(sales_data)
pdf

Unnamed: 0,amount,name,price,product
0,20,store2,2000,note
1,10,store2,5000,bag
2,15,store1,1000,note
3,20,store1,5000,pen


Pandas Dataframe -> pyspark DataFrame

In [44]:
df = spark.createDataFrame(pdf)
df.show()

+------+------+-----+-------+
|amount|  name|price|product|
+------+------+-----+-------+
|    20|store2| 2000|   note|
|    10|store2| 5000|    bag|
|    15|store1| 1000|   note|
|    20|store1| 5000|    pen|
+------+------+-----+-------+



In [45]:
df =spark.createDataFrame(pdf).groupBy("name").count()
df.show()

+------+-----+
|  name|count|
+------+-----+
|store2|    2|
|store1|    2|
+------+-----+



 pyspark DataFrame -> Pandas Dataframe

In [46]:
pdf2 = df.toPandas()
pdf2

Unnamed: 0,name,count
0,store2,2
1,store1,2


Pandas UDF

In [47]:
totalDf = spark.createDataFrame(pdf)
totalDf.show()

+------+------+-----+-------+
|amount|  name|price|product|
+------+------+-----+-------+
|    20|store2| 2000|   note|
|    10|store2| 5000|    bag|
|    15|store1| 1000|   note|
|    20|store1| 5000|    pen|
+------+------+-----+-------+



In [48]:
def get_total_price(amount, price):
    return amount * price

total_price = pandas_udf(get_total_price, returnType=LongType())
totalDf.withColumn("total_price", total_price(totalDf["amount"], totalDf["price"])).show()

+------+------+-----+-------+-----------+
|amount|  name|price|product|total_price|
+------+------+-----+-------+-----------+
|    20|store2| 2000|   note|      40000|
|    10|store2| 5000|    bag|      50000|
|    15|store1| 1000|   note|      15000|
|    20|store1| 5000|    pen|     100000|
+------+------+-----+-------+-----------+



## 6. Hive 연동

https://creativedata.atlassian.net/wiki/spaces/SAP/pages/82255289/Pyspark+-+Read+Write+files+from+Hive

In [65]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, HiveContext

SparkContext.setSystemProperty("hive.metastore.uris", "thrift://nn1:9083")

sparkSession = (SparkSession
                .builder
                .appName('example-pyspark-read-and-write-from-hive')
                .enableHiveSupport()
                .getOrCreate())

row1 = Person(name="soonchan", age=26, job="googler")
row2 = Person(name="taeoh", age=25, job="mother")
row3 = Person(name="hyunwoo", age=25, job="father")
row4 = Person(name="euntaek", age=25, job="garbage")
row5 = Person(name="jihoon", age=25, job="student")

data = [row1, row2, row3, row4, row5]

df = sparkSession.createDataFrame(data)

df.write.saveAsTable('example')

## Do Yourself!

In [66]:
# 5.5.2.1.1절 ~ 5.5.2.2.4절
def runBasicOpsEx(spark, sc, df):
    df.show()
    df.head()
    df.first()
    df.take(2)
    df.count()
    df.collect()
    df.describe("age").show()
    df.persist(StorageLevel.MEMORY_AND_DISK_2)
    df.printSchema()
    df.columns
    df.dtypes
    df.schema
    df.createOrReplaceTempView("users")
    spark.sql("select name, age from users where age > 20").show()
    spark.sql("select name, age from users where age > 20").explain()


# 5.5.2.4절
def runColumnEx(spark, sc, df):
    df.where(df.age > 10).show()


# 5.5.2.4.2절
def runAlias(spark, sc, df):
    df.select(df.age + 1).show()
    df.select((df.age + 1).alias("age")).show()


# 5.5.2.4.3절
def runIsinEx(spark, sc):
    nums = spark.sparkContext.broadcast([1, 3, 5, 7, 9])
    rdd = spark.sparkContext.parallelize(range(0, 10)).map(lambda v: Row(v))
    df = spark.createDataFrame(rdd)
    df.where(df._1.isin(nums.value)).show()


# 5.5.2.4.4절
def runWhenEx(spark, sc):
    ds = spark.range(0, 5)
    col = when(ds.id % 2 == 0, "even").otherwise("odd").alias("type")
    ds.select(ds.id, col).show()


# 5.5.2.4.5절
def runMaxMin(spark, df):
    min_col = min("age")
    max_col = max("age")
    df.select(min_col, max_col).show()


# 5.5.2.4.6절 ~ 5.5.2.4.9절
def runAggregateFunctions(spark, df1, df2):
    # collect_list, collect_set
    doubledDf1 = df1.union(df1)
    doubledDf1.select(functions.collect_list(doubledDf1["name"])).show(truncate=False)
    doubledDf1.select(functions.collect_set(doubledDf1["name"])).show(truncate=False)

    # count, countDistinct
    doubledDf1.select(functions.count(doubledDf1["name"]), functions.countDistinct(doubledDf1["name"])).show(
        truncate=False)

    # sum
    df2.printSchema()
    df2.select(sum(df2["price"])).show(truncate=False)

    # grouping, grouping_id
    df2.cube(df2["store"], df2["product"]).agg(sum(df2["amount"]), grouping(df2["store"])).show(truncate=False)
    df2.cube(df2["store"], df2["product"]).agg(sum(df2["amount"]), grouping_id(df2["store"], df2["product"])).show(
        truncate=False)


# 5.5.2.4.10 ~ 5.5.2.4.11 절
def runCollectionFunctions(spark):
    df = spark.createDataFrame([{'numbers': '9,1,5,3,9'}])
    arrayCol = split(df.numbers, ",")

    # array_contains, size
    df.select(arrayCol, array_contains(arrayCol, 2), size(arrayCol)).show(truncate=False)

    # sort_array()
    df.select(arrayCol, sort_array(arrayCol)).show(truncate=False)

    # explode, posexplode
    df.select(explode(arrayCol)).show(truncate=False)
    df.select(posexplode(arrayCol)).show(truncate=False)


# 5.5.2.4.10 ~ 5.5.2.4.11 절
def runCollectionFunctions(spark):
    df = spark.createDataFrame([{'numbers': '9,1,5,3,9'}])
    arrayCol = split(df.numbers, ",")

    # array_contains, size
    df.select(arrayCol, array_contains(arrayCol, 2), size(arrayCol)).show(truncate=False)

    # sort_array()
    df.select(arrayCol, sort_array(arrayCol)).show(truncate=False)

    # explode, posexplode
    df.select(explode(arrayCol)).show(truncate=False)
    df.select(posexplode(arrayCol)).show(truncate=False)


# 5.5.2.4.12 ~ 5.5.2.4.14절
def runDateFunctions(spark):
    f1 = StructField("d1", StringType(), True)
    f2 = StructField("d2", StringType(), True)
    schema1 = StructType([f1, f2])

    df = spark.createDataFrame([("2017-12-25 12:00:05", "2017-12-25")], schema1)
    df.show(truncate=False)

    # current_date, unix_timestamp, to_date
    d3 = current_date().alias("d3")
    d4 = unix_timestamp(df["d1"].alias("d4"))
    d5 = to_date(df["d2"].alias("d5"))
    d6 = to_date(d4.cast("timestamp")).alias("d6")
    df.select(df["d1"], df["d2"], d3, d4, d5, d6).show(truncate=False)

    # add_months, date_add, last_day
    d7 = add_months(d6, 2).alias("d7")
    d8 = date_add(d6, 2).alias("d8")
    d9 = last_day(d6).alias("d9")
    df.select(df["d1"], df["d2"], d7, d8, d9).show(truncate=False)

    # window
    f3 = StructField("date", StringType(), True)
    f4 = StructField("product", StringType(), True)
    f5 = StructField("amount", IntegerType(), True)
    schema2 = StructType([f3, f4, f5])

    r2 = ("2017-12-25 12:01:00", "note", 1000)
    r3 = ("2017-12-25 12:01:10", "pencil", 3500)
    r4 = ("2017-12-25 12:03:20", "pencil", 23000)
    r5 = ("2017-12-25 12:05:00", "note", 1500)
    r6 = ("2017-12-25 12:05:07", "note", 2000)
    r7 = ("2017-12-25 12:06:25", "note", 1000)
    r8 = ("2017-12-25 12:08:00", "pencil", 500)
    r9 = ("2017-12-25 12:09:45", "note", 30000)

    dd = spark.createDataFrame([r2, r3, r4, r5, r6, r7, r8, r9], schema2);

    timeCol = unix_timestamp(dd["date"]).cast("timestamp");
    windowCol = window(timeCol, "5 minutes");
    dd.groupBy(windowCol, dd["product"]).agg(sum(dd["amount"])).show(truncate=False);


# 5.5.2.4.15절
def runDateFunctions(spark):
    # 파이썬의 경우 아래와 같이 튜플을 이용하여 데이터프레임을 생성하는 것도 가능함
    df1 = spark.createDataFrame([(1.512,), (2.234,), (3.42,)], ['value'])
    df2 = spark.createDataFrame([(25.0,), (9.0,), (10.0,)], ['value'])

    df1.select(round(df1["value"], 1)).show()
    df2.select(functions.sqrt('value')).show()


# 5.5.2.4.16 ~ 5.5.2.4.20절
def runOtherFunctions(spark, personDf):
    df = spark.createDataFrame([("v1", "v2", "v3")], ["c1", "c2", "c3"]);

    # array
    df.select(df.c1, df.c2, df.c3, array("c1", "c2", "c3").alias("newCol")).show(truncate=False)

    # desc, asc
    personDf.show()
    personDf.sort(functions.desc("age"), functions.asc("name")).show()

    # pyspark 2.1.0 버전은 desc_nulls_first, desc_nulls_last, asc_nulls_first, asc_nulls_last 지원하지 않음

    # split, length (pyspark에서 컬럼은 df["col"] 또는 df.col 형태로 사용 가능)
    df2 = spark.createDataFrame([("Splits str around pattern",)], ['value'])
    df2.select(df2.value, split(df2.value, " "), length(df2.value)).show(truncate=False)

    # rownum, rank
    f1 = StructField("date", StringType(), True)
    f2 = StructField("product", StringType(), True)
    f3 = StructField("amount", IntegerType(), True)
    schema = StructType([f1, f2, f3])

    p1 = ("2017-12-25 12:01:00", "note", 1000)
    p2 = ("2017-12-25 12:01:10", "pencil", 3500)
    p3 = ("2017-12-25 12:03:20", "pencil", 23000)
    p4 = ("2017-12-25 12:05:00", "note", 1500)
    p5 = ("2017-12-25 12:05:07", "note", 2000)
    p6 = ("2017-12-25 12:06:25", "note", 1000)
    p7 = ("2017-12-25 12:08:00", "pencil", 500)
    p8 = ("2017-12-25 12:09:45", "note", 30000)

    dd = spark.createDataFrame([p1, p2, p3, p4, p5, p6, p7, p8], schema)
    w1 = Window.partitionBy("product").orderBy("amount")
    w2 = Window.orderBy("amount")
    dd.select(dd.product, dd.amount, functions.row_number().over(w1).alias("rownum"),
              functions.rank().over(w2).alias("rank")).show()


# 5.5.2.4.21절
def runUDF(spark, df):
    # functions를 이용한 등록
    fn1 = functions.udf(lambda job: job == "student")
    df.select(df["name"], df["age"], df["job"], fn1(df["job"])).show()
    # SparkSession을 이용한 등록
    spark.udf.register("fn2", lambda job: job == "student")
    df.createOrReplaceTempView("persons")
    spark.sql("select name, age, job, fn2(job) from persons").show()


# 5.5.2.4.24절
def runAgg(spark, df):
    df.agg(max("amount"), min("price")).show()
    df.agg({"amount": "max", "price": "min"}).show()


# 5.5.2.4.26절
def runDfAlias(spark, df):
    df.select(df["product"]).show()
    df.alias("aa").select("aa.product").show()


# 5.5.2.4.27절
def runGroupBy(spark, df):
    df.groupBy("store", "product").agg({"price": "sum"}).show()


# 5.5.3.4.28절
def runCube(spark, df):
    df.cube("store", "product").agg({"price": "sum"}).show()


# 5.5.2.4.29절
def runDistinct(spark):
    d1 = ("store1", "note", 20, 2000)
    d2 = ("store1", "bag", 10, 5000)
    d3 = ("store1", "note", 20, 2000)
    rows = [d1, d2, d3]
    cols = ["store", "product", "amount", "price"]
    df = spark.createDataFrame(rows, cols)
    df.distinct().show()
    df.dropDuplicates(["store"]).show()


# 5.5.2.4.30절
def runDrop(spark, df):
    df.drop(df["store"]).show()


# 5.5.2.4.31절
def runIntersect(spark):
    a = spark.range(1, 5)
    b = spark.range(2, 6)
    c = a.intersect(b)
    c.show()


# 5.5.2.4.32절
def runExcept(spark):
    df1 = spark.range(1, 6)
    df2 = spark.createDataFrame([(2,), (4,)], ['value'])
    # 파이썬의 경우 except 대신 subtract 메서드 사용
    # subtract의 동작은 except와 같음
    df1.subtract(df2).show()


# 5.5.2.4.33절
def runJoin(spark, ldf, rdf):
    joinTypes = "inner,outer,leftouter,rightouter,leftsemi".split(",")
    for joinType in joinTypes:
        print("============= %s ===============" % joinType)
        ldf.join(rdf, ["word"], joinType).show()


# 5.5.2.4.35절
def runNa(spark, ldf, rdf):
    result = ldf.join(rdf, ["word"], "outer").toDF("word", "c1", "c2")
    result.show()
    # 파이썬의 경우 na.drop또는 dropna 사용 가능
    # c1과 c2 칼럼의 null이 아닌 값의 개수가 thresh 이하일 경우 drop
    # thresh=1로 설정할 경우 c1 또는 c2 둘 중의 하나만 null 아닌 값을 가질 경우
    # 결과에 포함시킨다는 의미가 됨
    result.na.drop(thresh=2, subset=["c1", "c2"]).show()
    result.dropna(thresh=2, subset=["c1", "c2"]).show()
    # fill
    result.na.fill({"c1": 0}).show()
    # 파이썬의 경우 to_replace에 딕셔너리를 지정하여 replace를 수행(이 경우 value에 선언한 값은 무시됨
    # 딕셔너리를 사용하지 않을 경우 키 목록(첫번째 인자)과 값 목록(두번째 인자)을 지정하여 replace 수행
    result.na.replace(to_replace={"w1": "word1", "w2": "word2"}, value="", subset="word").show()
    result.na.replace(["w1", "w2"], ["word1", "word2"], "word").show()


# 5.5.2.4.36절
def runOrderBy(spark):
    df = spark.createDataFrame([(3, "z"), (10, "a"), (5, "c")], ["idx", "name"])
    df.orderBy("name", "idx").show()
    df.orderBy("idx", "name").show()


# 5.5.2.4.37절
def runRollup(spark, df):
    df.rollup("store", "product").agg({"price": "sum"}).show();


# 5.5.2.4.38절
def runStat(spark):
    df = spark.createDataFrame([("a", 6), ("b", 4), ("c", 12), ("d", 6)], ["word", "count"])
    df.show()
    df.stat.crosstab("word", "count").show()


# 5.5.2.4.39절
def runWithColumn(spark):
    df1 = spark.createDataFrame([("prod1", "100"), ("prod2", "200")], ["pname", "price"])
    df2 = df1.withColumn("dcprice", df1["price"] * 0.9)
    df3 = df2.withColumnRenamed("dcprice", "newprice")
    df1.show()
    df2.show()
    df3.show()


# 5.5.2.4.40절
def runSave(spark):
    sparkHomeDir = "file:///Users/beginspark/Apps/spark"
    df = spark.read.json(sparkHomeDir + "/examples/src/main/resources/people.json")
    df.write.save("/Users/beginspark/Temp/default/%d" % time.time())
    df.write.format("json").save("/Users/beginspark/Temp/json/%d" % time.time())
    df.write.format("json").partitionBy("age").save("/Users/beginspark/Temp/parti/%d" % time.time())
    # saveMode: append, overwrite, error, ignore
    df.write.mode("overwrite").saveAsTable("ohMyTable")
    spark.sql("select * from ohMyTable").show()
    # 파이썬의 경우 bucketBy 지원하지 않음


In [67]:
# [예제 실행 방법] 아래에서 원하는 예제의 주석을 제거하고 실행!!

# runBasicOpsEx(spark, sc, sample_df)
# createDataFrame(spark, sc)
#runColumnEx(spark, sc, sample_df)
# runAlias(spark, sc, sample_df)
# runIsinEx(spark, sc)
# runWhenEx(spark, sc)
# runMaxMin(spark, sample_df)
# runAggregateFunctions(spark, sample_df, sample_df2)
# runCollectionFunctions(spark)
# runDateFunctions(spark)
# runDateFunctions(spark)
# runOtherFunctions(spark, sample_df)
# runUDF(spark, sample_df)
# runAgg(spark, sample_df2)
# runDfAlias(spark, sample_df2)
# runGroupBy(spark, sample_df2)
# runCube(spark, sample_df2)
# runDistinct(spark)
# runDrop(spark, sample_df2)
# runIntersect(spark)
# runExcept(spark)
# runJoin(spark, ldf, rdf)
# runNa(spark, ldf, rdf)
# runOrderBy(spark)
# runRollup(spark, sample_df2)
# runWithColumn(spark)
# runSave(spark)