## 사용자 정의 함수 : UDF
+ 특정 기능을 수행하는 함수는 UDF 형식으로 만들어 사용 가능
+ 주로 파생변수를 만들 때 사용
+ udf(사용자정의함수명, 반환값타입)
+ udf를 통한 파생변수명은 withColumn이라는 함수를 사용

In [9]:
# 사원들의 급여를 다음의 분류기준에 따라 적절히 변환해서 조회하세요.
# 2100 이하 : slave salary
# 2200 ~ 3100 : low salary
# 3200 ~ 6100 : medium salary
# 6200 ~ 15000 : high salary
# 15000 이상 : master salary

# 급여에 대한 등급을 출력하는 함수 정의
def sal_grade(x):
    grade = 'slave salary'
    
    if x >= 15000: grade = 'master salary'
    elif x >= 6200: grade = 'high salary'
    elif x >= 3200: grade = 'medium salary'
    elif x >= 2100: grade = 'low salary'
    
    return grade

In [10]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType

In [11]:
emp = spark.read.csv('employees.csv', header=True, inferSchema=True)
emp.show(3)

+-----------+----------+---------+--------+------------+-------------------+-------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|          HIRE_DATE| JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+-------------------+-------+------+--------------+----------+-------------+
|        100|    Steven|     King|   SKING|515.123.4567|2003-06-17 00:00:00|AD_PRES| 24000|          null|      null|           90|
|        101|     Neena|  Kochhar|NKOCHHAR|515.123.4568|2005-09-21 00:00:00|  AD_VP| 17000|          null|       100|           90|
|        102|       Lex|  De Haan| LDEHAAN|515.123.4569|2001-01-13 00:00:00|  AD_VP| 17000|          null|       100|           90|
+-----------+----------+---------+--------+------------+-------------------+-------+------+--------------+----------+-------------+
only showing top 3 rows



In [12]:
# 위에서 작성한 함수를 udf함수로 지정
sg_udf = udf(sal_grade, StringType())

In [13]:
# 기존의 데이터프레임에 salgrade라는 파생변수(컬럼)을 하나 추가함
emp = emp.withColumn('salgrade', sg_udf(emp.SALARY))

# 생성한 파생변수 확인
emp.select('SALARY', 'salgrade').show()

+------+-------------+
|SALARY|     salgrade|
+------+-------------+
| 24000|master salary|
| 17000|master salary|
| 17000|master salary|
|  9000|  high salary|
|  6000|medium salary|
|  4800|medium salary|
|  4800|medium salary|
|  4200|medium salary|
| 12008|  high salary|
|  9000|  high salary|
|  8200|  high salary|
|  7700|  high salary|
|  7800|  high salary|
|  6900|  high salary|
| 11000|  high salary|
|  3100|   low salary|
|  2900|   low salary|
|  2800|   low salary|
|  2600|   low salary|
|  2500|   low salary|
+------+-------------+
only showing top 20 rows



In [14]:
# 타이타닉 승객의 나이를 다음 조건에 따라 범주형 데이터로 변환하세요.
# 1age, 5age, 10age, 20age, ..., 80age로 분류
# 파생변수명은 ages, 함수명은 age_class 로 지정

def age_class(x):
    aclass = '1age'
    if x >=80: aclass='80age'
    elif x >=70: aclass='70age'
    elif x >=60: aclass='60age'
    elif x >=50: aclass='50age'
    elif x >=40: aclass='40age'
    elif x >=30: aclass='30age'
    elif x >=20: aclass='20age'
    elif x >=10: aclass='10age'
    elif x >=5: aclass='5age'
    return aclass

ac_udf = udf(age_class, StringType())

titanic = spark.read.csv('titanic.csv', header=True, inferSchema=True)
titanic.show(3)

+------+--------+--------------------+------+------+-----+-----+------+--------+--------+----+----+------------+
|pclass|survived|                name|   sex|   age|sibsp|parch|ticket|    fare|embarked|life|seat|        port|
+------+--------+--------------------+------+------+-----+-----+------+--------+--------+----+----+------------+
|     1|       1|Allen, Miss. Elis...|female|  29.0|    0|    0| 24160|211.3375|       S|live| 1st|southampthon|
|     1|       1|Allison, Master. ...|  male|0.9167|    1|    2|113781|  151.55|       S|live| 1st|southampthon|
|     1|       0|Allison, Miss. He...|female|   2.0|    1|    2|113781|  151.55|       S|dead| 1st|southampthon|
+------+--------+--------------------+------+------+-----+-----+------+--------+--------+----+----+------------+
only showing top 3 rows



In [15]:
titanic = titanic.withColumn('ages', ac_udf(titanic.age))
titanic.select('age', 'ages').show(5)

+------+-----+
|   age| ages|
+------+-----+
|  29.0|20age|
|0.9167| 1age|
|   2.0| 1age|
|  30.0|30age|
|  25.0|20age|
+------+-----+
only showing top 5 rows



In [18]:
# 타이타닉 승객의 나이대별 승객수를 조회하세요.
titanic.groupBy('ages').count().show()

+-----+-----+
| ages|count|
+-----+-----+
| 1age|   51|
|70age|    7|
|50age|   70|
|40age|  135|
|30age|  231|
|80age|    1|
|60age|   30|
|10age|  143|
|20age|  607|
| 5age|   31|
+-----+-----+



## JOIN
+ 여러 데이블을 연결해서 데이터를 검색하는 것
+ 테이블의 결합기준은 각 테이블에 존재하는 공통 속성임
+ 결합 유형 ::
    + inner join, outer join, self join
+ 데이터객체명.join(조인대상, 조인조건, 조인유형)

In [9]:
dept = spark.read.csv('departments.csv', header=True, inferSchema=True)

dept.show(5)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:44637)
Traceback (most recent call last):
  File "/usr/share/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/share/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:44637)

In [2]:
emp = spark.read.csv('employees.csv', header=True, inferSchema=True)
emp.show(5)

+-----------+----------+---------+--------+------------+-------------------+-------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|          HIRE_DATE| JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+-------------------+-------+------+--------------+----------+-------------+
|        100|    Steven|     King|   SKING|515.123.4567|2003-06-17 00:00:00|AD_PRES| 24000|          null|      null|           90|
|        101|     Neena|  Kochhar|NKOCHHAR|515.123.4568|2005-09-21 00:00:00|  AD_VP| 17000|          null|       100|           90|
|        102|       Lex|  De Haan| LDEHAAN|515.123.4569|2001-01-13 00:00:00|  AD_VP| 17000|          null|       100|           90|
|        103| Alexander|   Hunold| AHUNOLD|590.423.4567|2006-01-03 00:00:00|IT_PROG|  9000|          null|       102|           60|
|        104|     Bruce|    Ernst|  BERNST|590.423.4568|2007-05-21 00:00:00|

In [3]:
join_condition = emp.DEPARTMENT_ID == dept.DEPARTMENT_ID

In [4]:
empdept = emp.join(dept, join_condition, 'inner')
empdept.show(5)

+-----------+----------+---------+--------+------------+-------------------+-------+------+--------------+----------+-------------+-------------+---------------+----------+-----------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|          HIRE_DATE| JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|DEPARTMENT_ID|DEPARTMENT_NAME|MANAGER_ID|LOCATION_ID|
+-----------+----------+---------+--------+------------+-------------------+-------+------+--------------+----------+-------------+-------------+---------------+----------+-----------+
|        100|    Steven|     King|   SKING|515.123.4567|2003-06-17 00:00:00|AD_PRES| 24000|          null|      null|           90|           90|      Executive|       100|       1700|
|        101|     Neena|  Kochhar|NKOCHHAR|515.123.4568|2005-09-21 00:00:00|  AD_VP| 17000|          null|       100|           90|           90|      Executive|       100|       1700|
|        102|       Lex|  De Haan| LDEHAAN|515.123.4569|2001-01-13 00:00:00

In [5]:
# 내부조인 : 공통 속성명이 동일한 경우
empdept = emp.join(dept, 'DEPARTMENT_ID', 'inner')
empdept.show(5)

+-------------+-----------+----------+---------+--------+------------+-------------------+-------+------+--------------+----------+---------------+----------+-----------+
|DEPARTMENT_ID|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|          HIRE_DATE| JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_NAME|MANAGER_ID|LOCATION_ID|
+-------------+-----------+----------+---------+--------+------------+-------------------+-------+------+--------------+----------+---------------+----------+-----------+
|           90|        100|    Steven|     King|   SKING|515.123.4567|2003-06-17 00:00:00|AD_PRES| 24000|          null|      null|      Executive|       100|       1700|
|           90|        101|     Neena|  Kochhar|NKOCHHAR|515.123.4568|2005-09-21 00:00:00|  AD_VP| 17000|          null|       100|      Executive|       100|       1700|
|           90|        102|       Lex|  De Haan| LDEHAAN|515.123.4569|2001-01-13 00:00:00|  AD_VP| 17000|          null|       100|      Executiv

In [6]:
empdept.select('FIRST_NAME','DEPARTMENT_NAME').show(5)

+----------+---------------+
|FIRST_NAME|DEPARTMENT_NAME|
+----------+---------------+
|    Steven|      Executive|
|     Neena|      Executive|
|       Lex|      Executive|
| Alexander|             IT|
|     Bruce|             IT|
+----------+---------------+
only showing top 5 rows



In [7]:
# 외부조인 
empdept2 = emp.join(dept, 'DEPARTMENT_ID', 'outer')
empdept2.show(10)



+-------------+-----------+----------+---------+--------+------------------+-------------------+------+------+--------------+----------+------------------+----------+-----------+
|DEPARTMENT_ID|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|      PHONE_NUMBER|          HIRE_DATE|JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|   DEPARTMENT_NAME|MANAGER_ID|LOCATION_ID|
+-------------+-----------+----------+---------+--------+------------------+-------------------+------+------+--------------+----------+------------------+----------+-----------+
|          210|       null|      null|     null|    null|              null|               null|  null|  null|          null|      null|        IT Support|      null|       1700|
|          230|       null|      null|     null|    null|              null|               null|  null|  null|          null|      null|       IT Helpdesk|      null|       1700|
|          190|       null|      null|     null|    null|              null|               null|  null|  

                                                                                

In [8]:
# 사원이 한명도 없는 부서를 조회
empdept2.filter(empdept2.FIRST_NAME.isNull()).select('DEPARTMENT_ID','DEPARTMENT_NAME').show()

[Stage 25:>                                                         (0 + 1) / 1]ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/share/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/share/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/usr/share/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving


Py4JError: An error occurred while calling o67.showString

In [None]:
# 부서에 소속되지 않는 사원을 조회
empdept2.filter(empdept2.DEPARTMENT_ID.isNull()).select('FIRST_NAME','LAST_NAME').show()

In [None]:
customers = spark.read.csv('customers.csv',header=True, inferSchema=True)

In [None]:
products = spark.read.csv('products.csv',header=True, inferSchema=True)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:44637)
Traceback (most recent call last):
  File "/usr/share/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/share/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:44637)
Traceback (most recent call last):
  File "/usr/share/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception

In [None]:
orders = spark.read.csv('orders.csv',header=True, inferSchema=True)

In [None]:
prodOdr= orders.join(products, 'prodid', 'inner')
prodOdr.filter(prodOdr.userid == 'carrot').select('prodname', 'price').show()

In [None]:
custOdr = orders.join(customers, 'userid', 'right_outer')

custOdr.filter(custOdr.orderid.isNull()).select('userid', 'name', 'grade').show()


In [None]:
prodOdr2 = orders.join(products, 'prodid', 'right_outer')
prodOdr2.filter(prodOdr2.orderid.isNull()).select('prodname', 'maker').show()

In [None]:
# 셀프조인 : 같은 이름의 테이블을 결합해서 원하는 데이터를 조회
# 각 사원들의 상사를 조회
from pyspark.sql.functions import col

empmgr = emp.alias('emp').join(emp.alias('mgr'), col('emp.MANAGER_ID') == col('mgr.EMPLOYEE_ID'),'inner')
empmgr.show(5)


#empmgr.select('emp.EMPLOYEE_ID','emp.FIRST_NAME','emp.MANAGER_ID','mgr.EMPLOYEE_ID','mgr.FIRST_NAME').show(5)


### SparkSQL
+ 스파크 데이터프레임에 저장된 데이터들을 SQL문법을 이용해서 탐색할 수 있도록 해 줌
+ spark.sql() 함수 사용
+ 


In [2]:
import warnings
warnings.filterwarnings(action='ignore')

In [3]:
# 스파크 SQL을 위한 스파크 세션 생성
spark = SparkSession.builder.master('app').appName('sparkSQL').getOrCreate()

In [4]:
# SQL 사용을 위한 View 객체 생성
EMP = emp.createOrReplaceTempView("EMP")

NameError: name 'emp' is not defined

In [5]:
# sql 함수를 이용해서 SQL 질의문 실행하고 결과 출력
spark.sql('select FIRST_NAME, LAST_NAME from EMP').show(5)

21/10/07 07:51:26 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException


AnalysisException: 'Table or view not found: EMP; line 1 pos 34'

In [None]:
CUSTOMER = customers.createOrReplaceTempView("CUSTOMER")

In [None]:
spark.sql('select * from customer').show(5)