# spqrk dataset

## 스파크 중요 개념 : RDD, dataframe
* RDD(Resillient Distributed Data) : 탄력적이고 분산된 데이터셋 
* HDFS와는 달리 쓰기 불가능 데이터셋
* 다양한 연산(map, reduce, count, filter, join) 수행 가능
* 작업은 lazy하게 병렬로 수행되고 메모리에 저장됨

# History of Spark API
* RDD   (2011)  : v1 부터 지원, 분산 데이터셋, 연산을 제어하는 코드 작성이 어려움
* dataframe (2013) : v1.3부터 지원, 데이터를 스키마형태로 추상화, 속도 개선
* dataset (2015) : v1.6부터 지원, 데이터의 자료형 검사, 직렬화 지원
* dataset (2016) : v2.0부터 지원, dataframe과 dataset을 dataset으로 통합
* 스파크 애플리케이션 개발 : RDD 이용, SparkContext 사용
* SQL on Spark : dataset,dataframe 이용, SparkSession 사용

## RDD를 이용해서 데이터프레임 생성

In [72]:
from pyspark.sql.types import StringType, IntegerType

In [73]:
# 리스트를 이용해서 데이터프레임 만들기
# 리스트로 RDD 객체 생성
data = ['apple', 'peach', 'banana', 'mango', 'pineapple']
words = sc.parallelize(data)   # rdd객체 생성
words.collect()

['apple', 'peach', 'banana', 'mango', 'pineapple']

In [74]:
# RDD 객체를 데이터프레임으로 만들기
# 데이터프레임 : 행과 열로 구성된 2차원 데이터객체
# createDataFrame(RDD객체, 타입)
df = spark.createDataFrame(words, StringType())
df.show()         # 데이터프레임 내용 출력

+---------+
|    value|
+---------+
|    apple|
|    peach|
|   banana|
|    mango|
|pineapple|
+---------+



In [75]:
# 리스트 객체로 데이터프레임으로 만들기 - (과일명, 가격)
# 데이터프레임 생성시 컬럼명 지정
data =  [('apple', 1500), ('peach', 2000), ('banana', 1500), ('mango', 2500), ('pineapple', 3000)]
fruits = spark.createDataFrame(data,['fruit', 'price'])
fruits.collect()

[Row(fruit='apple', price=1500),
 Row(fruit='peach', price=2000),
 Row(fruit='banana', price=1500),
 Row(fruit='mango', price=2500),
 Row(fruit='pineapple', price=3000)]

In [76]:
fruits.show()

+---------+-----+
|    fruit|price|
+---------+-----+
|    apple| 1500|
|    peach| 2000|
|   banana| 1500|
|    mango| 2500|
|pineapple| 3000|
+---------+-----+



In [77]:
# 리스트 객체로 데이터프레임 만들기 2
# 데이터프레임 생성시 컬럼명과 자료형 지정(컬럼명:자료형)
fruits = spark.createDataFrame(data, "fruit:string, price:int")
fruits.collect()

[Row(fruit='apple', price=1500),
 Row(fruit='peach', price=2000),
 Row(fruit='banana', price=1500),
 Row(fruit='mango', price=2500),
 Row(fruit='pineapple', price=3000)]

In [78]:
# 데이터프레임에서 컬럼 조회 : select(컬럼명)
fruits.select('fruit').show()
fruits.select('price').show()

+---------+
|    fruit|
+---------+
|    apple|
|    peach|
|   banana|
|    mango|
|pineapple|
+---------+

+-----+
|price|
+-----+
| 1500|
| 2000|
| 1500|
| 2500|
| 3000|
+-----+



# 스파크세션을 이용한 고급 데이터프레임 다루기

In [79]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType

In [80]:
# 데이터프레임 스키마 정의 - employees
# 스파크 세션 객체 직접 생성
spark = SparkSession.builder.appName('emp').getOrCreate()

In [81]:
# 데이터프레임 생성전 스키마 정의
# add(컬럼명, 데이터타입)
emp_schema = StructType().add('empno', 'integer').add('fname', 'string')\
.add('lanme', 'string').add('hdate', 'string').add('sal', 'integer')\
.add('deptid', 'integer')

In [82]:
# 지정한 스키마를 이용해서 데이터프레임 생성
# 데이터프레임의 각 행은 set 객체로 정의
# 위에서 정의한 스키마는 schema 속성으로 지정
df = spark.createDataFrame(
   [(123,'steve','king','2003-06-17',35000,None),
   (456,'john','seo','2005-12-15',20000,50),
   (789,'david',None,'2004-03-01',22000,90)], 
   schema=emp_schema)

In [83]:
# 데이터프레임 스키마 확인
df.printSchema()

root
 |-- empno: integer (nullable = true)
 |-- fname: string (nullable = true)
 |-- lanme: string (nullable = true)
 |-- hdate: string (nullable = true)
 |-- sal: integer (nullable = true)
 |-- deptid: integer (nullable = true)



In [84]:
# 데이터프레임 갯수 출력
df.count()

3

In [85]:
# 데이터프레임 요약 보기
# summary(요약항목)
df.summary().show()

+-------+-----+-----+-----+----------+------------------+------------------+
|summary|empno|fname|lanme|     hdate|               sal|            deptid|
+-------+-----+-----+-----+----------+------------------+------------------+
|  count|    3|    3|    2|         3|                 3|                 2|
|   mean|456.0| null| null|      null|25666.666666666668|              70.0|
| stddev|333.0| null| null|      null| 8144.527815247077|28.284271247461902|
|    min|  123|david| king|2003-06-17|             20000|                50|
|    25%|  123| null| null|      null|             20000|                50|
|    50%|  456| null| null|      null|             22000|                50|
|    75%|  789| null| null|      null|             35000|                90|
|    max|  789|steve|  seo|2005-12-15|             35000|                90|
+-------+-----+-----+-----+----------+------------------+------------------+



In [86]:
# 특정칼럼에 대한 요약정보 보기
# select(대상칼럼들)
df.select('empno', 'sal', 'deptid').summary().show()

+-------+-----+------------------+------------------+
|summary|empno|               sal|            deptid|
+-------+-----+------------------+------------------+
|  count|    3|                 3|                 2|
|   mean|456.0|25666.666666666668|              70.0|
| stddev|333.0| 8144.527815247077|28.284271247461902|
|    min|  123|             20000|                50|
|    25%|  123|             20000|                50|
|    50%|  456|             22000|                50|
|    75%|  789|             35000|                90|
|    max|  789|             35000|                90|
+-------+-----+------------------+------------------+



In [87]:
# csv파일로 데이터프레임 생성하기
# read.csv(파일이름, 헤더여부, 스키마여부)
emp = spark.read.csv('EMPLOYEES.csv', header=True, inferSchema=True)

In [88]:
# 데이터프레임 스키마 확인
emp.printSchema()

root
 |-- EMPLOYEE_ID: integer (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- EMAIL: string (nullable = true)
 |-- PHONE_NUMBER: string (nullable = true)
 |-- HIRE_DATE: string (nullable = true)
 |-- JOB_ID: string (nullable = true)
 |-- SALARY: integer (nullable = true)
 |-- COMMISSION_PCT: double (nullable = true)
 |-- MANAGER_ID: integer (nullable = true)
 |-- DEPARTMENT_ID: integer (nullable = true)



## 데이터프레임 데이터 탐색
+ select : 컬럼 선택
+ filter : 조건 검색
+ where : 고급 조건 검색
+ orderBy : 정렬
+ groupBy : 그룹화

In [89]:
# 모든 사원의 이름 조회 : select(['칼럼명', '칼럼명'])
emp.select(['FIRST_NAME']).show()  # show(num) num은 num만큼

+-----------+
| FIRST_NAME|
+-----------+
|     Steven|
|      Neena|
|        Lex|
|  Alexander|
|      Bruce|
|      David|
|      Valli|
|      Diana|
|      Nancy|
|     Daniel|
|       John|
|     Ismael|
|Jose Manuel|
|       Luis|
|        Den|
|  Alexander|
|     Shelli|
|      Sigal|
|        Guy|
|      Karen|
+-----------+
only showing top 20 rows



In [90]:
# 급여가 7000 이상인 사원 조회 : filter(조건식)
# 특정 컬럼을 지정 : 데이터프레임이름[컬럼명]
emp.filter(emp['SALARY'] >= 7000).show(5)

+-----------+----------+---------+--------+------------+----------+-------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER| HIRE_DATE| JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+----------+-------+------+--------------+----------+-------------+
|        100|    Steven|     King|   SKING|515.123.4567|2003-06-17|AD_PRES| 24000|          null|      null|           90|
|        101|     Neena|  Kochhar|NKOCHHAR|515.123.4568|2005-09-21|  AD_VP| 17000|          null|       100|           90|
|        102|       Lex|  De Haan| LDEHAAN|515.123.4569|2001-01-13|  AD_VP| 17000|          null|       100|           90|
|        103| Alexander|   Hunold| AHUNOLD|590.423.4567|2006-01-03|IT_PROG|  9000|          null|       102|           60|
|        108|     Nancy|Greenberg|NGREENBE|515.124.4569|2002-08-17| FI_MGR| 12008|          null|       101|          100|
+-----------+---

In [91]:
# 급여가 7000 이상인 사원의 수 조회
emp.filter(emp['SALARY'] >= 7000).count()

47

In [92]:
# 2006-02-05부터 2006-11-15사이에 고용된 사원 조회
emp.filter((emp['HIRE_DATE'] >= '2006-02-05') & (emp['HIRE_DATE'] <= '2006-11-15')).show()

+-----------+-----------+-----------+--------+------------------+----------+----------+------+--------------+----------+-------------+
|EMPLOYEE_ID| FIRST_NAME|  LAST_NAME|   EMAIL|      PHONE_NUMBER| HIRE_DATE|    JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+-----------+-----------+--------+------------------+----------+----------+------+--------------+----------+-------------+
|        106|      Valli|  Pataballa|VPATABAL|      590.423.4560|2006-02-05|   IT_PROG|  4800|          null|       103|           60|
|        112|Jose Manuel|      Urman| JMURMAN|      515.124.4469|2006-03-07|FI_ACCOUNT|  7800|          null|       108|          100|
|        118|        Guy|     Himuro| GHIMURO|      515.127.4565|2006-11-15|  PU_CLERK|  2600|          null|       114|           30|
|        126|      Irene|Mikkilineni|IMIKKILI|      650.124.1224|2006-09-28|  ST_CLERK|  2700|          null|       120|           50|
|        134|    Michael|     Rogers| MROGERS|      650

In [93]:
# 2006-02-05부터 2006-11-15사이에 고용된 사원들의 부서번호 조회
emp.filter((emp['HIRE_DATE'] >= '2006-02-05') & (emp['HIRE_DATE'] <= '2006-11-15')).select(['DEPARTMENT_ID']).show(5)

+-------------+
|DEPARTMENT_ID|
+-------------+
|           60|
|          100|
|           30|
|           50|
|           50|
+-------------+
only showing top 5 rows



In [94]:
# 직책이 IP-PROG인 사원수를 조회
emp.filter(emp['JOB_ID'] == 'IT_PROG').show()
emp.filter(emp['JOB_ID'] == 'IT_PROG').count()

+-----------+----------+---------+--------+------------+----------+-------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER| HIRE_DATE| JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+----------+-------+------+--------------+----------+-------------+
|        103| Alexander|   Hunold| AHUNOLD|590.423.4567|2006-01-03|IT_PROG|  9000|          null|       102|           60|
|        104|     Bruce|    Ernst|  BERNST|590.423.4568|2007-05-21|IT_PROG|  6000|          null|       103|           60|
|        105|     David|   Austin| DAUSTIN|590.423.4569|2005-06-25|IT_PROG|  4800|          null|       103|           60|
|        106|     Valli|Pataballa|VPATABAL|590.423.4560|2006-02-05|IT_PROG|  4800|          null|       103|           60|
|        107|     Diana|  Lorentz|DLORENTZ|590.423.5567|2007-02-07|IT_PROG|  4200|          null|       103|           60|
+-----------+---

5

In [96]:
# 부서별 사원수 조회
# groupBy(집계대상칼럼).집계함수
emp.groupBy('DEPARTMENT_ID').count().show()

+-------------+-----+
|DEPARTMENT_ID|count|
+-------------+-----+
|         null|    1|
|           20|    2|
|           40|    1|
|          100|    6|
|           10|    1|
|           50|   45|
|           80|   34|
|           70|    1|
|           90|    3|
|           60|    5|
|          110|    2|
|           30|    6|
+-------------+-----+



In [97]:
# 직책별 사원수 조회
emp.groupBy('JOB_ID').count().show()

+----------+-----+
|    JOB_ID|count|
+----------+-----+
|FI_ACCOUNT|    5|
|    MK_MAN|    1|
|   IT_PROG|    5|
|    FI_MGR|    1|
|AC_ACCOUNT|    1|
|    HR_REP|    1|
|  PU_CLERK|    5|
|    AC_MGR|    1|
|    PR_REP|    1|
|    ST_MAN|    5|
|    MK_REP|    1|
|    SA_REP|   30|
|    SA_MAN|    5|
|    PU_MAN|    1|
|  SH_CLERK|   20|
|   AD_PRES|    1|
|  ST_CLERK|   20|
|   AD_ASST|    1|
|     AD_VP|    2|
+----------+-----+



In [101]:
# 부서별 사원수 조회후 부서번호 순으로 정렬
emp.groupBy('DEPARTMENT_ID').count()\
.orderBy('DEPARTMENT_ID').show()

+-------------+-----+
|DEPARTMENT_ID|count|
+-------------+-----+
|         null|    1|
|           10|    1|
|           20|    2|
|           30|    6|
|           40|    1|
|           50|   45|
|           60|    5|
|           70|    1|
|           80|   34|
|           90|    3|
|          100|    6|
|          110|    2|
+-------------+-----+



In [102]:
# 직책별 사원수 조회후 직책 순으로 정렬
emp.groupBy('JOB_ID').count()\
.orderBy('JOB_ID').show()

+----------+-----+
|    JOB_ID|count|
+----------+-----+
|AC_ACCOUNT|    1|
|    AC_MGR|    1|
|   AD_ASST|    1|
|   AD_PRES|    1|
|     AD_VP|    2|
|FI_ACCOUNT|    5|
|    FI_MGR|    1|
|    HR_REP|    1|
|   IT_PROG|    5|
|    MK_MAN|    1|
|    MK_REP|    1|
|    PR_REP|    1|
|  PU_CLERK|    5|
|    PU_MAN|    1|
|    SA_MAN|    5|
|    SA_REP|   30|
|  SH_CLERK|   20|
|  ST_CLERK|   20|
|    ST_MAN|    5|
+----------+-----+



In [104]:
# 직책별 사원수 조회후 사원수를 기준으로 내림차순으로 정렬
emp.groupBy('JOB_ID').count().orderBy('count').show()
# 직책별 사원수 조회후 사원수를 기준으로 오름차순으로 정렬
emp.groupBy('JOB_ID').count().orderBy('count', ascending=False).show()

+----------+-----+
|    JOB_ID|count|
+----------+-----+
|    FI_MGR|    1|
|    MK_MAN|    1|
|   AD_ASST|    1|
|    AC_MGR|    1|
|    PR_REP|    1|
|AC_ACCOUNT|    1|
|    MK_REP|    1|
|    PU_MAN|    1|
|    HR_REP|    1|
|   AD_PRES|    1|
|     AD_VP|    2|
|   IT_PROG|    5|
|  PU_CLERK|    5|
|FI_ACCOUNT|    5|
|    ST_MAN|    5|
|    SA_MAN|    5|
|  SH_CLERK|   20|
|  ST_CLERK|   20|
|    SA_REP|   30|
+----------+-----+

+----------+-----+
|    JOB_ID|count|
+----------+-----+
|    SA_REP|   30|
|  SH_CLERK|   20|
|  ST_CLERK|   20|
|FI_ACCOUNT|    5|
|  PU_CLERK|    5|
|    ST_MAN|    5|
|   IT_PROG|    5|
|    SA_MAN|    5|
|     AD_VP|    2|
|    MK_MAN|    1|
|    FI_MGR|    1|
|AC_ACCOUNT|    1|
|    MK_REP|    1|
|    HR_REP|    1|
|    AC_MGR|    1|
|    PU_MAN|    1|
|   AD_PRES|    1|
|    PR_REP|    1|
|   AD_ASST|    1|
+----------+-----+



## 집계함수 사용하기
* agg(집계함수명)

In [106]:
import pyspark.sql.functions as F

In [110]:
# 직책별 평균 급여를 조회해서 내림차순으로 정렬
# 단, '평균급여' 컬럼의 이름에 함수명이 포함되어 출력
emp.groupby('JOB_ID').agg(F.avg('SALARY'))\
.orderBy('avg(SALARY)', ascending=False).show()

+----------+-----------+
|    JOB_ID|avg(SALARY)|
+----------+-----------+
|   AD_PRES|    24000.0|
|     AD_VP|    17000.0|
|    MK_MAN|    13000.0|
|    SA_MAN|    12200.0|
|    FI_MGR|    12008.0|
|    AC_MGR|    12008.0|
|    PU_MAN|    11000.0|
|    PR_REP|    10000.0|
|    SA_REP|     8350.0|
|AC_ACCOUNT|     8300.0|
|FI_ACCOUNT|     7920.0|
|    ST_MAN|     7280.0|
|    HR_REP|     6500.0|
|    MK_REP|     6000.0|
|   IT_PROG|     5760.0|
|   AD_ASST|     4400.0|
|  SH_CLERK|     3215.0|
|  ST_CLERK|     2785.0|
|  PU_CLERK|     2780.0|
+----------+-----------+



In [115]:
# 직책별 평균 급여를 조회해서 내림차순으로 정렬 (별칭부여:alias)
emp.groupby('JOB_ID').agg(F.avg('SALARY').alias('mean sal'))\
.orderBy('mean sal', ascending=False).show()

+----------+--------+
|    JOB_ID|mean sal|
+----------+--------+
|   AD_PRES| 24000.0|
|     AD_VP| 17000.0|
|    MK_MAN| 13000.0|
|    SA_MAN| 12200.0|
|    FI_MGR| 12008.0|
|    AC_MGR| 12008.0|
|    PU_MAN| 11000.0|
|    PR_REP| 10000.0|
|    SA_REP|  8350.0|
|AC_ACCOUNT|  8300.0|
|FI_ACCOUNT|  7920.0|
|    ST_MAN|  7280.0|
|    HR_REP|  6500.0|
|    MK_REP|  6000.0|
|   IT_PROG|  5760.0|
|   AD_ASST|  4400.0|
|  SH_CLERK|  3215.0|
|  ST_CLERK|  2785.0|
|  PU_CLERK|  2780.0|
+----------+--------+



In [123]:
# 사원들의 직책을 모두 출력하세요, 
# 단 중복없이 하나씩만 표시되도록 합니다 (distinct)
emp.select('JOB_ID').distinct().show()

+----------+
|    JOB_ID|
+----------+
|FI_ACCOUNT|
|    MK_MAN|
|   IT_PROG|
|    FI_MGR|
|AC_ACCOUNT|
|    HR_REP|
|  PU_CLERK|
|    AC_MGR|
|    PR_REP|
|    ST_MAN|
|    MK_REP|
|    SA_REP|
|    SA_MAN|
|    PU_MAN|
|  SH_CLERK|
|   AD_PRES|
|  ST_CLERK|
|   AD_ASST|
|     AD_VP|
+----------+



In [125]:
# 모든 직책 수는? (중복제외하고 카운팅)
emp.select('JOB_ID').distinct().count()

19

In [153]:
# 사원의 이름, 직책, 급여 출력하세요
# 단, 5% 인상한 급여도 같이 출력합니다
# select first_naem, job_id, salary, salary*1.05 form emp
emp.select('FIRST_NAME', 'JOB_ID', 'SALARY', (emp.SALARY * 1.05).alias('105% sal')).show(5)

+----------+-------+------+--------+
|FIRST_NAME| JOB_ID|SALARY|105% sal|
+----------+-------+------+--------+
|    Steven|AD_PRES| 24000| 25200.0|
|     Neena|  AD_VP| 17000| 17850.0|
|       Lex|  AD_VP| 17000| 17850.0|
| Alexander|IT_PROG|  9000|  9450.0|
|     Bruce|IT_PROG|  6000|  6300.0|
+----------+-------+------+--------+
only showing top 5 rows



In [160]:
# 20번 또는 50번 부서에 근무하며, 
# 급여가 5000 ~ 12,000 사이인 사원들의 LAST_NAME 및 급여를 조회하세요
# emp.filter(((emp['DEPARTMENT_ID'] == 20) | (emp['DEPARTMENT_ID'] == 50)) & \
#             ((emp['SALARY'] >= 5000) & (emp['SALARY'] <= 12000)))\
# .select('LAST_NAME', 'SALARY').show()

emp.filter((emp['DEPARTMENT_ID'] == 20) | (emp['DEPARTMENT_ID'] == 50))\
    .filter((emp['SALARY'] >= 5000) & (emp['SALARY'] <= 12000))\
    .select('LAST_NAME', 'SALARY').show()

+---------+------+
|LAST_NAME|SALARY|
+---------+------+
|    Weiss|  8000|
|    Fripp|  8200|
| Kaufling|  7900|
|  Vollman|  6500|
|  Mourgos|  5800|
|      Fay|  6000|
+---------+------+

