## 사용자 정의함수 : UDF
+ 특정 기능을 수행하는 함수는 UDF 형식으로 만들어 사용 가능
+ 주로 파생변수를 만들때 사용
+ udf(사용자 정의함수명, 반환값타입)
+ udf를 통한 파생변수는 withColumn(컬럼명, udf 함수명)으로 생성

In [1]:
# 사원들의 연봉을 다음의 분류기준에 따라 적절히 변환해서 조회하세요

In [2]:
# 급여에 대한 등급을 출력하는 함수
def sal_grade(x):
    grade = 'slave salary'
    
    if x >= 15000: grade = 'master salary'
    elif x >= 6200: grade = 'high salary'
    elif x >= 3200: grade = 'medium salary'
    elif x >= 2100: grade = 'low salary'
    
    return grade

In [3]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType

In [4]:
# 위에서 함수를 udf 함수로 지정

sg_udf = udf(sal_grade, StringType())

In [5]:
emp = spark.read.csv('employees.csv', header=True, inferSchema=True)

                                                                                

In [6]:
# 기존의 데이터프레임에 salgrade 라는 파생변수(컬럼)을 하나 추가함

emp = emp.withColumn('salgrade', sg_udf(emp.SALARY))

In [7]:
# 생성한 파생변수 확인
emp.select('SALARY', 'salgrade').show(5)

[Stage 2:>                                                          (0 + 1) / 1]

+------+-------------+
|SALARY|     salgrade|
+------+-------------+
| 24000|master salary|
| 17000|master salary|
| 17000|master salary|
|  9000|  high salary|
|  6000|medium salary|
+------+-------------+
only showing top 5 rows



                                                                                

In [8]:
# 타이타닉 승객의 나이를 다음 조건에 따라 범주형 데이터로 변환하세요
# 1age, 5age, 10age, 20age, ... , 80age로 분류
# 파생변수명은 ages, 함수명은 age_class로 지정

In [9]:
def age_class(x):
    aclass = '1age'
    
    if x >= 80 : aclass = '80age'
    elif x >= 70: aclass = '70age'
    elif x >= 60: aclass = '60age'
    elif x >= 50: aclass = '50age'
    elif x >= 40: aclass = '40age'
    elif x >= 30: aclass = '30age'
    elif x >= 20: aclass = '20age'
    elif x >= 10: aclass = '10age'
    elif x >= 5: aclass = '5age'
    
    return aclass

In [10]:
ac_udf = udf(age_class, StringType())

In [11]:
ttn = spark.read.csv('titanic.csv', header=True, inferSchema=True)

In [12]:
ttn = ttn.withColumn('ages', ac_udf(ttn.age))

In [13]:
ttn.select('age', 'ages').show(5)

+------+-----+
|   age| ages|
+------+-----+
|  29.0|20age|
|0.9167| 1age|
|   2.0| 1age|
|  30.0|30age|
|  25.0|20age|
+------+-----+
only showing top 5 rows



In [14]:
# 타이타닉 승객의 나이대별 승객수를 조회하세요

ttn.groupBy('ages').count().show()

                                                                                

+-----+-----+
| ages|count|
+-----+-----+
| 1age|   51|
|70age|    7|
|50age|   70|
|40age|  135|
|30age|  231|
|80age|    1|
|60age|   30|
|10age|  143|
|20age|  607|
| 5age|   31|
+-----+-----+



                                                                                

# join

In [1]:
emp = spark.read.csv('employees.csv', header=True, inferSchema=True)
emp.show(5)

+-----------+----------+---------+--------+------------+-------------------+-------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|          HIRE_DATE| JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+-------------------+-------+------+--------------+----------+-------------+
|        100|    Steven|     King|   SKING|515.123.4567|2003-06-17 00:00:00|AD_PRES| 24000|          null|      null|           90|
|        101|     Neena|  Kochhar|NKOCHHAR|515.123.4568|2005-09-21 00:00:00|  AD_VP| 17000|          null|       100|           90|
|        102|       Lex|  De Haan| LDEHAAN|515.123.4569|2001-01-13 00:00:00|  AD_VP| 17000|          null|       100|           90|
|        103| Alexander|   Hunold| AHUNOLD|590.423.4567|2006-01-03 00:00:00|IT_PROG|  9000|          null|       102|           60|
|        104|     Bruce|    Ernst|  BERNST|590.423.4568|2007-05-21 00:00:00|

In [2]:
dept = spark.read.csv('departments.csv', header=True, inferSchema=True)
dept.show(5)

+-------------+---------------+----------+-----------+
|DEPARTMENT_ID|DEPARTMENT_NAME|MANAGER_ID|LOCATION_ID|
+-------------+---------------+----------+-----------+
|           10| Administration|       200|       1700|
|           20|      Marketing|       201|       1800|
|           30|     Purchasing|       114|       1700|
|           40|Human Resources|       203|       2400|
|           50|       Shipping|       121|       1500|
+-------------+---------------+----------+-----------+
only showing top 5 rows



In [3]:
# 공통 속성명이 다를경우
join_condition = emp.DEPARTMENT_ID == dept.DEPARTMENT_ID
empdept = emp.join(dept, join_condition, 'inner')
empdept.show(5)

+-----------+----------+---------+--------+------------+-------------------+-------+------+--------------+----------+-------------+-------------+---------------+----------+-----------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|          HIRE_DATE| JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|DEPARTMENT_ID|DEPARTMENT_NAME|MANAGER_ID|LOCATION_ID|
+-----------+----------+---------+--------+------------+-------------------+-------+------+--------------+----------+-------------+-------------+---------------+----------+-----------+
|        100|    Steven|     King|   SKING|515.123.4567|2003-06-17 00:00:00|AD_PRES| 24000|          null|      null|           90|           90|      Executive|       100|       1700|
|        101|     Neena|  Kochhar|NKOCHHAR|515.123.4568|2005-09-21 00:00:00|  AD_VP| 17000|          null|       100|           90|           90|      Executive|       100|       1700|
|        102|       Lex|  De Haan| LDEHAAN|515.123.4569|2001-01-13 00:00:00

In [5]:
# 공통 속성명이동일한 경우
empdept = emp.join(dept, 'DEPARTMENT_ID', 'inner')
empdept.show(5)

+-------------+-----------+----------+---------+--------+------------+-------------------+-------+------+--------------+----------+---------------+----------+-----------+
|DEPARTMENT_ID|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|          HIRE_DATE| JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_NAME|MANAGER_ID|LOCATION_ID|
+-------------+-----------+----------+---------+--------+------------+-------------------+-------+------+--------------+----------+---------------+----------+-----------+
|           90|        100|    Steven|     King|   SKING|515.123.4567|2003-06-17 00:00:00|AD_PRES| 24000|          null|      null|      Executive|       100|       1700|
|           90|        101|     Neena|  Kochhar|NKOCHHAR|515.123.4568|2005-09-21 00:00:00|  AD_VP| 17000|          null|       100|      Executive|       100|       1700|
|           90|        102|       Lex|  De Haan| LDEHAAN|515.123.4569|2001-01-13 00:00:00|  AD_VP| 17000|          null|       100|      Executiv

In [7]:
# 사원의 이름과 부서명을 조회하세요
empdept.select('FIRST_NAME','DEPARTMENT_NAME').show(5)

+----------+---------------+
|FIRST_NAME|DEPARTMENT_NAME|
+----------+---------------+
|    Steven|      Executive|
|     Neena|      Executive|
|       Lex|      Executive|
| Alexander|             IT|
|     Bruce|             IT|
+----------+---------------+
only showing top 5 rows



In [8]:
# 외부조인
empdept2 = emp.join(dept, 'DEPARTMENT_ID', 'outer')
empdept2.show(5)



+-------------+-----------+----------+---------+------+------------------+-------------------+------+------+--------------+----------+------------------+----------+-----------+
|DEPARTMENT_ID|EMPLOYEE_ID|FIRST_NAME|LAST_NAME| EMAIL|      PHONE_NUMBER|          HIRE_DATE|JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|   DEPARTMENT_NAME|MANAGER_ID|LOCATION_ID|
+-------------+-----------+----------+---------+------+------------------+-------------------+------+------+--------------+----------+------------------+----------+-----------+
|          210|       null|      null|     null|  null|              null|               null|  null|  null|          null|      null|        IT Support|      null|       1700|
|          230|       null|      null|     null|  null|              null|               null|  null|  null|          null|      null|       IT Helpdesk|      null|       1700|
|          190|       null|      null|     null|  null|              null|               null|  null|  null|       

                                                                                

In [11]:
# 사원이 한명도 없는 부서를 조회하세요
empdept2.filter(empdept2.FIRST_NAME.isNull()).select('DEPARTMENT_ID','DEPARTMENT_NAME').show()



+-------------+--------------------+
|DEPARTMENT_ID|     DEPARTMENT_NAME|
+-------------+--------------------+
|          210|          IT Support|
|          230|         IT Helpdesk|
|          190|         Contracting|
|          140|  Control And Credit|
|          250|        Retail Sales|
|          120|            Treasury|
|          220|                 NOC|
|          130|       Corporate Tax|
|          240|    Government Sales|
|          160|            Benefits|
|          200|          Operations|
|          170|       Manufacturing|
|          150|Shareholder Services|
|          260|          Recruiting|
|          270|             Payroll|
|          180|        Construction|
+-------------+--------------------+



                                                                                

In [12]:
# 부서에 소속되지 않은 사원을 조회하세요
empdept2.filter(empdept2.DEPARTMENT_ID.isNull()).select('FIRST_NAME','LAST_NAME').show()



+----------+---------+
|FIRST_NAME|LAST_NAME|
+----------+---------+
| Kimberely|    Grant|
+----------+---------+



                                                                                

In [14]:
costomers = spark.read.csv('Customers.csv', header=True, inferSchema=True)

In [17]:
products = spark.read.csv('Productc.csv', header=True, inferSchema=True)

In [16]:
orders = spark.read.csv('Orders.csv', header=True, inferSchema=True)

In [1]:
# 당근 고객이 주문한 상품의 가격은 얼마인지 조회하세요

cust0dr = orders.join(product, 'prodid', 'inner')
cust0dr.filter(cust0dr.userid == 'carrot').select('prodname', 'price').show()

NameError: name 'orders' is not defined

In [24]:
# 주문을 한번도 하지않은 고객의 이름, 등급 을 조회하세요

cust0dr = orders.join(costomers, 'userid', 'right_outer')
cust0dr.filter(cust0dr.orderid.isNull()).select('userid', 'name', 'grade').show()

+------+------+------+
|userid|  name| grade|
+------+------+------+
|orange|김용욱|silver|
| peach|오형준|silver|
+------+------+------+



In [25]:
# 주문이 한번도 되지 않은 제품, 제조업체를 조회하세요

cust0dr2 = orders.join(products, 'prodid', 'right_outer')
cust0dr2.filter(cust0dr2.orderid.isNull()).select('prodname', 'maker').show()

+----------+--------+
|  prodname|   maker|
+----------+--------+
|  얼큰라면|대한식품|
|달콤비스켓|한빛제과|
+----------+--------+



In [30]:
from pyspark.sql.functions import col

In [31]:
# 셀프조인 : 각 사원들의 상사(사번과 이름)을 조회하세요
# 같은 이름의 테이블을 결합해서 원하는 데이터를 조회하는 것

empmgr = emp.alias('emp').join(emp.alias('mgr'), col('emp.MANAGER_ID') == col('mgr.EMPLOYEE_ID'))
empmgr.show(5)

+-----------+----------+---------+--------+------------+-------------------+-------+------+--------------+----------+-------------+-----------+----------+---------+-------+------------+-------------------+-------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|          HIRE_DATE| JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|  EMAIL|PHONE_NUMBER|          HIRE_DATE| JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+-------------------+-------+------+--------------+----------+-------------+-----------+----------+---------+-------+------------+-------------------+-------+------+--------------+----------+-------------+
|        101|     Neena|  Kochhar|NKOCHHAR|515.123.4568|2005-09-21 00:00:00|  AD_VP| 17000|          null|       100|           90|        100|    Steven|     King|  SKING|515.123.4567|2003-06-17 00:00:00|AD_PRES| 240

In [32]:
empmgr.select('emp.EMPLOYEE_ID','emp.FIRST_NAME', 'emp.MANAGER_ID',
              'mgr.EMPLOYEE_ID', 'mgr.FIRST_NAME').show(5)

+-----------+----------+----------+-----------+----------+
|EMPLOYEE_ID|FIRST_NAME|MANAGER_ID|EMPLOYEE_ID|FIRST_NAME|
+-----------+----------+----------+-----------+----------+
|        101|     Neena|       100|        100|    Steven|
|        102|       Lex|       100|        100|    Steven|
|        103| Alexander|       102|        102|       Lex|
|        104|     Bruce|       103|        103| Alexander|
|        105|     David|       103|        103| Alexander|
+-----------+----------+----------+-----------+----------+
only showing top 5 rows



### SparkSQL
+ 스파크 데이터프레임에 저장된 데이터들을 SQL 문법을 이용해서 탐색할 수 있도록 해줌
+ spark.sql() 함수 사용
+ OLTP 보다는 OLAP 처리에 적합

In [36]:
# 스파크 SQL을 위한 스파크 세션 생성
spark = SparkSession.builder.master('app').appName('sparkSQL').getOrCreate()

In [40]:
# SQL 사용을 위한 View 객체 생성
EMP = emp.createOrReplaceTempView("EMP")

# sql 함수를 이용해서 SQL 질의문 실행하고 결과 출력
spark.sql('select FIRST_NAME, LAST_NAME from EMP').show()

+-----------+----------+
| FIRST_NAME| LAST_NAME|
+-----------+----------+
|     Steven|      King|
|      Neena|   Kochhar|
|        Lex|   De Haan|
|  Alexander|    Hunold|
|      Bruce|     Ernst|
|      David|    Austin|
|      Valli| Pataballa|
|      Diana|   Lorentz|
|      Nancy| Greenberg|
|     Daniel|    Faviet|
|       John|      Chen|
|     Ismael|   Sciarra|
|Jose Manuel|     Urman|
|       Luis|      Popp|
|        Den|  Raphaely|
|  Alexander|      Khoo|
|     Shelli|     Baida|
|      Sigal|    Tobias|
|        Guy|    Himuro|
|      Karen|Colmenares|
+-----------+----------+
only showing top 20 rows

