In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
import pandas as pd
import numpy as np

In [2]:
sc = SparkContext('local')
sqlCtx = SQLContext( sc )

In [3]:
emp =[('홍길동',1),('이순신',2),
      ('임꺽정',3),('김철수',3),('김철수1',5)]
dept = [('개발',1), ('연구',2),
        ('영업',3),('기획',4) ]

In [4]:
empRdd = sc.parallelize(emp)
empRdd

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

In [7]:
empRdd.collect()

[('홍길동', 1), ('이순신', 2), ('임꺽정', 3), ('김철수', 3), ('김철수1', 5)]

#### Rdd : 분산 리스트
#### DataFrame : 분산 데이터 프레임

In [9]:
# Rdd -> DataFrame으로 변환
empDF = empRdd.toDF()
empDF

DataFrame[_1: string, _2: bigint]

In [10]:
empDF.show()

+-------+---+
|     _1| _2|
+-------+---+
| 홍길동|  1|
| 이순신|  2|
| 임꺽정|  3|
| 김철수|  3|
|김철수1|  5|
+-------+---+



In [12]:
# Rdd -> DataFrame으로 변환
empDF1 = sqlCtx.createDataFrame(emp)
empDF1.show()

+-------+---+
|     _1| _2|
+-------+---+
| 홍길동|  1|
| 이순신|  2|
| 임꺽정|  3|
| 김철수|  3|
|김철수1|  5|
+-------+---+



In [14]:
# rdd.collect()와 비슷한 기능 분산 데이터프레임 -> 판다스의 데이터프레임
# 데이터가 필터링된 상태에서 사용
df = empDF1.toPandas()
df

Unnamed: 0,_1,_2
0,홍길동,1
1,이순신,2
2,임꺽정,3
3,김철수,3
4,김철수1,5


In [15]:
# 컬럼명 지정 가능
empDF2 = sqlCtx.createDataFrame(emp,['name','deptid'])
empDF2.show()

+-------+------+
|   name|deptid|
+-------+------+
| 홍길동|     1|
| 이순신|     2|
| 임꺽정|     3|
| 김철수|     3|
|김철수1|     5|
+-------+------+



In [16]:
# pandas의 info()와 같은 기능
empDF2.printSchema()

root
 |-- name: string (nullable = true)
 |-- deptid: long (nullable = true)



In [17]:
# 데이터테이블 이름 지정
empDF2.createOrReplaceTempView('my')

In [22]:
# 사용하기 위해서는 분산데이터프레임 형태로 되어있어야함 

sql = "select * from my"
sqlDF = sqlCtx.sql(sql) # spark DataFrame
sqlDF.show()

+-------+------+
|   name|deptid|
+-------+------+
| 홍길동|     1|
| 이순신|     2|
| 임꺽정|     3|
| 김철수|     3|
|김철수1|     5|
+-------+------+



In [24]:
# deptid가 2이상인 name컬럼의 값 추출
sql = "select name from my where deptid >2"
sqlDF_name = sqlCtx.sql(sql) # spark DataFrame
sqlDF_name.show()

+-------+
|   name|
+-------+
| 임꺽정|
| 김철수|
|김철수1|
+-------+



In [33]:
# 이름에 김이 들어간 값 추출
# sql = "select * from my where name like '%김%'"
# sql = "select * from my where name rlike '김'"
# 수로 끝나는값 추출
# sql = "select * from my where name rlike '수$'"
# '김' 또는 '정'이 들어간 값 추출
# sql = "select * from my where name rlike '[김정]'"
sqlDF_name = sqlCtx.sql(sql) # spark DataFrame
sqlDF_name.show()

+-------+------+
|   name|deptid|
+-------+------+
| 임꺽정|     3|
| 김철수|     3|
|김철수1|     5|
+-------+------+



In [38]:
# 정렬
# sql = "select * from my order by name desc"
sql = "select * from my order by deptid limit 3"
sqlDF_name = sqlCtx.sql(sql) # spark DataFrame
sqlDF_name.show()

+------+------+
|  name|deptid|
+------+------+
|홍길동|     1|
|이순신|     2|
|임꺽정|     3|
+------+------+



In [53]:
# 집계함수 (max, min, avg, sum, count)
sql = "select max(deptid),sum(deptid),avg(deptid),count(deptid) from my "
sqlDF_group = sqlCtx.sql(sql) # spark DataFrame
sqlDF_group.show()

sql1 = "select deptid*2 from my"
sqlDF_2 = sqlCtx.sql(sql1)
sqlDF_2.show()

+-----------+-----------+-----------+-------------+
|max(deptid)|sum(deptid)|avg(deptid)|count(deptid)|
+-----------+-----------+-----------+-------------+
|          5|         14|        2.8|            5|
+-----------+-----------+-----------+-------------+

+----------------------------+
|(deptid * CAST(2 AS BIGINT))|
+----------------------------+
|                           2|
|                           4|
|                           6|
|                           6|
|                          10|
+----------------------------+



In [51]:
sqlDF_group.toPandas()

Unnamed: 0,max(deptid),sum(deptid),avg(deptid),count(deptid)
0,5,14,2.8,5


#####

In [55]:
data1 = ['홍길동,1000','이순신,2000','임꺽정,3000',
         '김철수,4000','이황,5000','이이,6000']

In [56]:
data1Rdd = sc.parallelize(data1)
data1Rdd

ParallelCollectionRDD[218] at parallelize at PythonRDD.scala:195

In [57]:
dept1Rdd1 = data1Rdd.map(lambda v:v.split(',')).map(lambda v:(v[0],int(v[1])))
dept1Rdd1.collect()

[('홍길동', 1000),
 ('이순신', 2000),
 ('임꺽정', 3000),
 ('김철수', 4000),
 ('이황', 5000),
 ('이이', 6000)]

In [58]:
salaryDF1 = dept1Rdd1.toDF(['name','salary'])
salaryDF1.show()

+------+------+
|  name|salary|
+------+------+
|홍길동|  1000|
|이순신|  2000|
|임꺽정|  3000|
|김철수|  4000|
|  이황|  5000|
|  이이|  6000|
+------+------+



In [59]:
# 테이블 이름 지정
salaryDF1.createOrReplaceTempView('emp')

#### sub query

In [61]:
sql= "select name, salary from emp \
        where salary == (select max(salary) from emp)"
sqlDF = sqlCtx.sql(sql)
sqlDF.show()

+----+------+
|name|salary|
+----+------+
|이이|  6000|
+----+------+



#### case when then (조건별)

In [63]:
sql= """select name, salary,
        case 
            when salary >= 4000 then '많음'
            when salary >= 2000 then '보통'
            else '적음'
        end as sal
        from emp"""
sqlDF = sqlCtx.sql(sql)
sqlDF.show()

+------+------+----+
|  name|salary| sal|
+------+------+----+
|홍길동|  1000|적음|
|이순신|  2000|보통|
|임꺽정|  3000|보통|
|김철수|  4000|많음|
|  이황|  5000|많음|
|  이이|  6000|많음|
+------+------+----+



### Hive ql 함수
#### spark Sql : 
- hive(hadoop 에코(echo)시스템 : sql을 이용하여 데이터를 분석하는 툴)
- spark sql : hive ql 의 sql문법과 함수를 따른다
- 표준 sql : https://www.w3schools.com/sql/
- https://rfriend.tistory.com/213 : (hive함수)
- https://spark.apache.org/docs/latest/api/sql/index.html : spark doc

In [66]:
# substr (컬럼명, 숫자) : 숫자지점부터 끝까지 값 출력
sql= """select 
            substr(name,2) as name, 
            salary, 
            round(salary*(1-0.033),2) as tax
        from emp """
sqlDF = sqlCtx.sql(sql)
sqlDF.show()

+----+------+-------+
|name|salary|    tax|
+----+------+-------+
|길동|  1000| 967.00|
|순신|  2000|1934.00|
|꺽정|  3000|2901.00|
|철수|  4000|3868.00|
|  황|  5000|4835.00|
|  이|  6000|5802.00|
+----+------+-------+



In [79]:
# 표준편차 (stddev_samp())
# 범위지정 (percentile())
sql= """select
            round(stddev_samp(salary),2) as std, 
            percentile(salary,0.5) as p1,
            percentile(salary,0.75) as p2
        from emp """
sqlDF = sqlCtx.sql(sql)
sqlDF.show()

+-------+------+------+
|    std|    p1|    p2|
+-------+------+------+
|1870.83|3500.0|4750.0|
+-------+------+------+



In [80]:
sc.stop()