## 사용자 정의 함수 : UDF
+ 특정 기능을 수행하는 함수는 UDF 형식으로 만들어 사용가능
+ 주로 파생변수를 만들 때 사용
+ udf(사용자정의함수명, 반환값타입)
+ udf함수는 udf(사용자정의함수값,
+ udf를 통한 파생변수는 withColumn(컬럼명, udf함수명)로 생성

#### 사원들의 연봉을 다음의 분류기준에 따라 변환해서 조회
##### 2100 이하 : slave salary
##### 2200 ~ 3200 : low salary
##### 3200 ~ 6200 : medium salary
##### 6200 ~ 15000 : high salary
##### 15000 이상 : master salary

In [123]:
emp = spark.read.csv("employees.csv", header=True, inferSchema=True)

#### 급여에 대한 등급을 출력하는 함수 정의

In [None]:
def sal_grade(x):
    grade = 'slave salary'
    if x >= 15000 : grade = 'master salary'
    elif x >= 6200 : grade = 'high salary'
    elif x >= 3200 : grade = 'medium salary'
    elif x >= 2100 : grade = 'low salary'
    return grade

#### sal_grade를 udf함수로 지정

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType

In [None]:
sg_udf = udf(sal_grade, StringType())

#### 기존의 데이터프레임에 salgrade라는 파생변수(컬럼)을 추가

In [None]:
emp = emp.withColumn('salgrade', sg_udf(emp.SALARY))

#### 생성한 파생변수 확인

In [None]:
emp.select('SALARY', 'salgrade').show(5)

#### 타이타닉 승객 나이를 다음 조건에 따라 범주형 데이터로 변환하기
+ 1age, 5age, 10age, 20age, ..., 80age로 분류
+ 파생변수명 ages, 함수명 age_class

In [None]:
titanic = spark.read.csv("titanic.csv", header=True, inferSchema=True)

In [None]:
def age_class(x):
    aclass = '1age'
    if x >= 80 : aclass = 'master elder'
    elif x >= 60 : aclass = 'elder'
    elif x >= 40 : aclass = 'older'
    elif x >= 20 : aclass = 'youngling'
    elif x >= 10 : aclass = 'kid'
    else : aclass = 'baby'
    return aclass

In [None]:
ac_udf = udf(age_class, StringType())

In [None]:
titanic.withColumn('ages', ac_udf(titanic.age))

In [None]:
titanic.select('age', 'ages').show(5)

#### 타이타닉 승객의 나이별 승객수를 조회하세요

In [None]:
titanic.groupBy('ages').count().show()

## Join
+ 여러 테이블을 연결해서 데이터를 검색하는 것
+ 테이블의 결합기준은 각 테이블에 존재하는 공통 속성
+ 결합 유형 : inner join, outer join. self join
+ 데이터객체명.join(조인대상, 조인조건, 조인유형)

In [None]:
dept = spark.read.csv('departments.csv', header=True, inferSchema=True)

In [None]:
dept.show(5)

In [None]:
join_condition = emp.DEPARTMENT_ID == dept.DEPARTMENT_ID

#### 내부조인 : 공통 속성명이 다를 경우

In [None]:
empdept = emp.join(dept, join_condition, 'inner')

In [None]:
empdept.show(5)

#### 공통 속성명이 동일한 경우

In [None]:
empdept2 = emp.join(dept, 'DEPARTMENT_ID', 'inner')

In [None]:
empdept2.show(5)

#### 사원 이름과 부서명 조회

In [None]:
empdept2.select('FIRST_NAME', 'DEPARTMENT_NAME').show(5)

#### 외부조인

In [None]:
empdept3 = emp.join(dept, 'DEPARTMENT_ID', 'outer')

In [None]:
empdept3.show(5)

#### 사원이 한명도 없는 부서 조회

In [None]:
empdept3.filter(empdept3.FIRST_NAME.isNull()).select('DEPARTMENT_ID','DEPARTMENT_NAME').show()

#### 부서에 소속되지 않은 사원 조회

In [None]:
empdept3.filter(empdept3.DEPARTMENT_ID.isNull()).select('FIRST_NAME','LAST_NAME').show()

In [None]:
orders = spark.read.csv('orders.csv', header=True, inferSchema=True)

In [None]:
products = spark.read.csv('products.csv', header=True, inferSchema=True)

In [None]:
customers = spark.read.csv('customers.csv', header=True, inferSchema=True)

In [None]:
orderpro = orders.join(products, 'prodid', 'inner')

In [None]:
orderpro.show()

In [None]:
orderpro.filter(orderpro.userid == 'carrot').select('userid', 'prodname', 'price').show()

In [None]:
ordercus = orders.join(customers, 'userid', 'right_outer')

In [None]:
ordercus.show()

In [None]:
ordercus.filter(ordercus.orderid.isNull()).select('userid', 'name', 'grade').show()

In [None]:
orderpro2 = orders.join(products, 'prodid', 'outer')

In [None]:
orderpro2.filter(orderpro2.orderid.isNull()).select('prodname', 'maker').show()

#### 셀프조인 : 같은 이름의 데이블을 결합해서 원하는 데이터를 조회

#### 각 사원들의 상사 사번과 이름을 조회

In [None]:
from pyspark.sql.functions import col

In [None]:
mgr = emp

In [None]:
join_condition = emp.MANAGER_ID == mgr.EMPLOYEE_ID

In [None]:
empmgr = emp.alias('emp').join(emp.alias('mgr'), col('emp.MANAGER_ID') == col('mgr.EMPLOYEE_ID'), "inner")

In [None]:
empmgr.show(5)

In [None]:
empmgr.select('emp.EMPLOYEE_ID','emp.FIRST_NAME','emp.MANAGER_ID','mgr.EMPLOYEE_ID', 'mgr.FIRST_NAME').show(5)

# SparkSQL
+ 스파크 데이터프레임에 저장된 데이터들을 SQL 문법을 이용해서 탐색할 수 있도록 해 줌
+ spark.sql() 함수 사용

### 스파크 SQL을 위한 스파크 세션 생성

In [127]:
spark = SparkSession.builder.master('app').appName('sparkSQL').getOrCreate()

### SQL 사용을 위한 View 객체 생성

In [124]:
EMP = emp.createOrReplaceTempView("EMP")

In [125]:
cus = customers.createOrReplaceTempView("cus")

In [34]:
order = orders.createOrReplaceTempView("order")

In [126]:
prod = products.createOrReplaceTempView("prod")

### SQL 함수를 이용하여 SQL 질의문을 수행하고 결과 출력

In [None]:
spark.sql('select FIRST_NAME, LAST_NAME from EMP').show()

## SPAKR_DSL_SQL

In [42]:
customers.select('*').show()

+------+------+----+------+------+-----+
|userid|  name| age| grade|   job|coins|
+------+------+----+------+------+-----+
| apple|정소화|  20|  gold|  학생| 1000|
|banana|김선우|  25|   vip|간호사| 2500|
|carrot|고명석|  28|  gold|  교사| 4500|
| melon|성원용|  35|  gold|회사원| 5000|
|orange|김용욱|  22|silver|  학생|    0|
| peach|오형준|null|silver|  의사|  300|
|  pear|채광주|  31|silver|회사원|  500|
+------+------+----+------+------+-----+



In [32]:
spark.sql('select name, userid, grade from cus').show()

+------+------+------+
|  name|userid| grade|
+------+------+------+
|정소화| apple|  gold|
|김선우|banana|   vip|
|고명석|carrot|  gold|
|성원용| melon|  gold|
|김용욱|orange|silver|
|오형준| peach|silver|
|채광주|  pear|silver|
+------+------+------+



In [None]:
customers.select('userid', 'name', 'grade').show(5)

In [55]:
products.select('maker').distinct().show()

                                                                                

+--------+
|   maker|
+--------+
|한빛제과|
|대한식품|
|민국푸드|
+--------+



                                                                                

In [62]:
products.select('prodname', products.'price'.alias('value').show()

SyntaxError: unexpected EOF while parsing (1745456254.py, line 1)

In [63]:
products.select('prodname', 'price', (products.price + 500).alias('adjprice')).show(5)

+----------+-----+--------+
|  prodname|price|adjprice|
+----------+-----+--------+
|  그냥만두| 4500|    5000|
|  매운쫄면| 5500|    6000|
|  쿵떡파이| 2600|    3100|
|맛난초콜렛| 2500|    3000|
|  얼큰라면| 1200|    1700|
+----------+-----+--------+
only showing top 5 rows



In [None]:
spark.sql('select * from cus').show()

In [None]:
spark.sql('select prodid, maker from prod group by maker').show()

In [None]:
spark.sql('select distinct maker from prod').show()

In [61]:
spark.sql('select prodname, price value from prod').show()

+----------+-----+
|  prodname|value|
+----------+-----+
|  그냥만두| 4500|
|  매운쫄면| 5500|
|  쿵떡파이| 2600|
|맛난초콜렛| 2500|
|  얼큰라면| 1200|
|  통통우동| 1550|
|달콤비스켓| 1500|
+----------+-----+



In [65]:
spark.sql('select prodname, price, price + 500 adj from prod').show()

+----------+-----+----+
|  prodname|price| adj|
+----------+-----+----+
|  그냥만두| 4500|5000|
|  매운쫄면| 5500|6000|
|  쿵떡파이| 2600|3100|
|맛난초콜렛| 2500|3000|
|  얼큰라면| 1200|1700|
|  통통우동| 1550|2050|
|달콤비스켓| 1500|2000|
+----------+-----+----+



In [66]:
products.filter(products.maker == '한빛제과').select('prodname', 'stock', 'price').show()

+----------+-----+-----+
|  prodname|stock|price|
+----------+-----+-----+
|  쿵떡파이| 3600| 2600|
|맛난초콜렛| 1250| 2500|
|달콤비스켓| 1650| 1500|
+----------+-----+-----+



In [67]:
spark.sql("select prodname, stock, price, maker from prod where maker = '한빛제과'").show()

+----------+-----+-----+--------+
|  prodname|stock|price|   maker|
+----------+-----+-----+--------+
|  쿵떡파이| 3600| 2600|한빛제과|
|맛난초콜렛| 1250| 2500|한빛제과|
|달콤비스켓| 1650| 1500|한빛제과|
+----------+-----+-----+--------+



In [68]:
orders.filter(orders.amount > 15).select('prodid', 'amount', 'orddate').show()

+------+------+-------------------+
|prodid|amount|            orddate|
+------+------+-------------------+
|   p06|    45|2013-01-11 00:00:00|
|   p06|    36|2013-02-20 00:00:00|
|   p01|    19|2013-03-02 00:00:00|
|   p03|    22|2013-03-15 00:00:00|
|   p02|    50|2013-04-10 00:00:00|
|   p03|    20|2013-05-22 00:00:00|
+------+------+-------------------+



In [69]:
spark.sql("select prodid, amount, orddate from order where amount > 15").show()

+------+------+-------------------+
|prodid|amount|            orddate|
+------+------+-------------------+
|   p06|    45|2013-01-11 00:00:00|
|   p06|    36|2013-02-20 00:00:00|
|   p01|    19|2013-03-02 00:00:00|
|   p03|    22|2013-03-15 00:00:00|
|   p02|    50|2013-04-10 00:00:00|
|   p03|    20|2013-05-22 00:00:00|
+------+------+-------------------+



In [75]:
orders.filter((orders.amount > 15) & (orders.userid == 'apple')).select('prodid', 'amount', 'orddate').show()

+------+------+-------------------+
|prodid|amount|            orddate|
+------+------+-------------------+
|   p03|    22|2013-03-15 00:00:00|
+------+------+-------------------+



In [76]:
spark.sql("select prodid, amount, orddate from order where amount > 15 and userid == 'apple'").show()

+------+------+-------------------+
|prodid|amount|            orddate|
+------+------+-------------------+
|   p03|    22|2013-03-15 00:00:00|
+------+------+-------------------+



In [77]:
products.filter((products.price >= 2000) & (products.price <=3000)).select('prodname', 'price', 'maker').show()

+----------+-----+--------+
|  prodname|price|   maker|
+----------+-----+--------+
|  쿵떡파이| 2600|한빛제과|
|맛난초콜렛| 2500|한빛제과|
+----------+-----+--------+



In [85]:
products.filter(products.price.between(2000, 3000)).select('prodname', 'price', 'maker').show()

+----------+-----+--------+
|  prodname|price|   maker|
+----------+-----+--------+
|  쿵떡파이| 2600|한빛제과|
|맛난초콜렛| 2500|한빛제과|
+----------+-----+--------+



In [81]:
spark.sql("select prodname, price, maker from prod where price >=2000 and price <= 3000").show()

+----------+-----+--------+
|  prodname|price|   maker|
+----------+-----+--------+
|  쿵떡파이| 2600|한빛제과|
|맛난초콜렛| 2500|한빛제과|
+----------+-----+--------+



In [88]:
spark.sql("select prodname, price, maker from prod where price between 2000 and 3000").show()

+----------+-----+--------+
|  prodname|price|   maker|
+----------+-----+--------+
|  쿵떡파이| 2600|한빛제과|
|맛난초콜렛| 2500|한빛제과|
+----------+-----+--------+



In [89]:
customers.filter(customers.name.like('김%')).select('name','age','grade','coins').show()

+------+---+------+-----+
|  name|age| grade|coins|
+------+---+------+-----+
|김선우| 25|   vip| 2500|
|김용욱| 22|silver|    0|
+------+---+------+-----+



### ※ 따옴표 3개를 쓰면 줄바꿈 코드 인식가능(파이썬)

In [90]:
spark.sql("select name, age, grade,coins from cus where name like '김%'").show()

+------+---+------+-----+
|  name|age| grade|coins|
+------+---+------+-----+
|김선우| 25|   vip| 2500|
|김용욱| 22|silver|    0|
+------+---+------+-----+



In [93]:
customers.filter(customers.userid.like('_____')).select('userid','name','grade','coins').show()

+------+------+------+-----+
|userid|  name| grade|coins|
+------+------+------+-----+
| apple|정소화|  gold| 1000|
| melon|성원용|  gold| 5000|
| peach|오형준|silver|  300|
+------+------+------+-----+



In [95]:
spark.sql("select name, age, grade, userid from cus where userid like '_____'").show()

+------+----+------+------+
|  name| age| grade|userid|
+------+----+------+------+
|정소화|  20|  gold| apple|
|성원용|  35|  gold| melon|
|오형준|null|silver| peach|
+------+----+------+------+



In [97]:
customers.filter(customers.age.isNull()).select('name').show()

+------+
|  name|
+------+
|오형준|
+------+



In [98]:
spark.sql("select name from cus where age is null").show()

+------+
|  name|
+------+
|오형준|
+------+



In [99]:
customers.filter(customers.age.isNotNull()).select('name').show()

+------+
|  name|
+------+
|정소화|
|김선우|
|고명석|
|성원용|
|김용욱|
|채광주|
+------+



In [100]:
spark.sql("select name from cus where age is not null").show()

+------+
|  name|
+------+
|정소화|
|김선우|
|고명석|
|성원용|
|김용욱|
|채광주|
+------+



In [102]:
customers.select('name', 'grade', 'age').orderBy('age', ascending=False).show()

+------+------+----+
|  name| grade| age|
+------+------+----+
|성원용|  gold|  35|
|채광주|silver|  31|
|고명석|  gold|  28|
|김선우|   vip|  25|
|김용욱|silver|  22|
|정소화|  gold|  20|
|오형준|silver|null|
+------+------+----+



In [105]:
spark.sql("select name, grade, age from cus order by age desc").show()

+------+------+----+
|  name| grade| age|
+------+------+----+
|성원용|  gold|  35|
|채광주|silver|  31|
|고명석|  gold|  28|
|김선우|   vip|  25|
|김용욱|silver|  22|
|정소화|  gold|  20|
|오형준|silver|null|
+------+------+----+



In [106]:
orders.select('userid', 'prodid', 'amount', 'orddate').orderBy('prodid').orderBy('amount', ascending=False).show()

+------+------+------+-------------------+
|userid|prodid|amount|            orddate|
+------+------+------+-------------------+
|  pear|   p02|    50|2013-04-10 00:00:00|
|banana|   p06|    45|2013-01-11 00:00:00|
| melon|   p06|    36|2013-02-20 00:00:00|
| apple|   p03|    22|2013-03-15 00:00:00|
|carrot|   p03|    20|2013-05-22 00:00:00|
|banana|   p01|    19|2013-03-02 00:00:00|
|banana|   p04|    15|2013-04-11 00:00:00|
| apple|   p03|    10|2013-01-01 00:00:00|
|carrot|   p02|     8|2013-02-01 00:00:00|
| melon|   p01|     5|2013-01-10 00:00:00|
+------+------+------+-------------------+



In [128]:
spark.sql("select userid, prodid, amount, orddate from order order by prodid, amount desc").show()

+------+------+------+-------------------+
|userid|prodid|amount|            orddate|
+------+------+------+-------------------+
|banana|   p01|    19|2013-03-02 00:00:00|
| melon|   p01|     5|2013-01-10 00:00:00|
|  pear|   p02|    50|2013-04-10 00:00:00|
|carrot|   p02|     8|2013-02-01 00:00:00|
| apple|   p03|    22|2013-03-15 00:00:00|
|carrot|   p03|    20|2013-05-22 00:00:00|
| apple|   p03|    10|2013-01-01 00:00:00|
|banana|   p04|    15|2013-04-11 00:00:00|
|banana|   p06|    45|2013-01-11 00:00:00|
| melon|   p06|    36|2013-02-20 00:00:00|
+------+------+------+-------------------+



In [176]:

from pyspark.sql.functions import min,max,avg,sum,count,round,col,countDistinct

In [135]:
products.select(round(avg('price'),2).alias('avgp')).show()

+-------+
|   avgp|
+-------+
|2764.29|
+-------+



In [136]:
spark.sql("select round(avg(price),2) avgp from prod").show()

+-------+
|   avgp|
+-------+
|2764.29|
+-------+



In [122]:
sql = """ select avg(price) from prod """
spark.sql(sql).show() 

+-----------------+
|       avg(price)|
+-----------------+
|2764.285714285714|
+-----------------+



In [141]:
products.filter(products.maker == '한빛제과').select(sum('stock').alias('hansum')).show()

+------+
|hansum|
+------+
|  6500|
+------+



In [144]:
spark.sql("select sum(stock) hansum from prod where maker = '한빛제과'").show()

+------+
|hansum|
+------+
|  6500|
+------+



In [167]:
customers.select(count('userid')).show()

+-------------+
|count(userid)|
+-------------+
|            7|
+-------------+



In [149]:
sql = """select count(userid) usernum from cus"""
spark.sql(sql).show()

+-------+
|usernum|
+-------+
|      7|
+-------+



In [171]:
products.select(count('maker')).distinct().alias('makernum').show()

+------------+
|count(maker)|
+------------+
|           7|
+------------+



In [155]:
sql = """select count(maker) makernum from prod"""
spark.sql(sql).show()

+--------+
|makernum|
+--------+
|       7|
+--------+



In [175]:
products.select('maker').distinct().count()

3

In [178]:
products.select(countDistinct('maker').alias('cnt')).show()

+---+
|cnt|
+---+
|  3|
+---+



In [174]:
sql = """select count(distinct maker) cnt from prod """
spark.sql(sql).show()

+---+
|cnt|
+---+
|  3|
+---+



In [188]:
orders.groupBy('prodid').agg(sum('amount').alias('cnt')).orderBy('prodid').show()

+------+---+
|prodid|cnt|
+------+---+
|   p01| 24|
|   p02| 58|
|   p03| 52|
|   p04| 15|
|   p06| 81|
+------+---+



In [192]:
sql = """select prodid, sum(amount) from order group by prodid order by prodid"""
spark.sql(sql).show()

+------+-----------+
|prodid|sum(amount)|
+------+-----------+
|   p01|         24|
|   p02|         58|
|   p03|         52|
|   p04|         15|
|   p06|         81|
+------+-----------+



In [197]:
products.groupBy('maker').agg(count('prodid').alias('prodnum'), (max('price').alias('highpr'))).show()

+--------+-------+------+
|   maker|prodnum|highpr|
+--------+-------+------+
|한빛제과|      3|  2600|
|대한식품|      2|  4500|
|민국푸드|      2|  5500|
+--------+-------+------+



In [199]:
sql = """select maker, count(prodid) prodnum, max(price) hihgpr from prod group by maker"""
spark.sql(sql).show()

+--------+-------+------+
|   maker|prodnum|hihgpr|
+--------+-------+------+
|한빛제과|      3|  2600|
|대한식품|      2|  4500|
|민국푸드|      2|  5500|
+--------+-------+------+



In [203]:
products.groupBy('maker').agg(count('prodid').alias('prodnum'), (max('price').alias('highpr'))).filter(count('prodid') >= 3).show()

+--------+-------+------+
|   maker|prodnum|highpr|
+--------+-------+------+
|한빛제과|      3|  2600|
+--------+-------+------+



In [206]:
products.groupBy('maker').agg(count('prodid').alias('prodnum'), (max('price').alias('highpr'))).filter(col('prodnum') >= 3).show()

+--------+-------+------+
|   maker|prodnum|highpr|
+--------+-------+------+
|한빛제과|      3|  2600|
+--------+-------+------+



In [205]:
sql = """select maker, count(prodid) prodnum, max(price) hihgpr from prod group by maker having prodnum >= 3"""
spark.sql(sql).show()

+--------+-------+------+
|   maker|prodnum|hihgpr|
+--------+-------+------+
|한빛제과|      3|  2600|
+--------+-------+------+



In [210]:
customers.groupBy('grade').agg((count('userid').alias('usernum')), round(avg('coins'), 2).alias('avgcoin')).filter(col('avgcoin')>1000).show()

+-----+-------+-------+
|grade|usernum|avgcoin|
+-----+-------+-------+
| gold|      3| 3500.0|
|  vip|      1| 2500.0|
+-----+-------+-------+



In [211]:
sql = """select count(userid) usernum, round((avg(coins)),2) avgcoins from cus group by grade having avg(coins) > 1000 """
spark.sql(sql).show()

+-------+--------+
|usernum|avgcoins|
+-------+--------+
|      3|  3500.0|
|      1|  2500.0|
+-------+--------+



In [220]:
cusOrd = customers.join(orders, 'userid', 'inner')

In [216]:
prodOrd = products.join(orders, 'prodid', 'inner')

In [217]:
prodOrd.filter(col('userid') == 'banana').select('userid', 'prodname').show()

+------+----------+
|userid|  prodname|
+------+----------+
|banana|  통통우동|
|banana|  그냥만두|
|banana|맛난초콜렛|
+------+----------+



In [219]:
sql = """select o.userid, p.prodname  from prod p inner join order o using(prodid) where userid = 'banana'"""
spark.sql(sql).show()

+------+----------+
|userid|  prodname|
+------+----------+
|banana|  통통우동|
|banana|  그냥만두|
|banana|맛난초콜렛|
+------+----------+



In [235]:
cusOrd.filter(col('age') >= 30).select('age', 'prodid', 'orddate').show()

+---+------+-------------------+
|age|prodid|            orddate|
+---+------+-------------------+
| 35|   p01|2013-01-10 00:00:00|
| 35|   p06|2013-02-20 00:00:00|
| 31|   p02|2013-04-10 00:00:00|
+---+------+-------------------+



In [238]:
sql = """select c.age, o.prodid, o.orddate  from cus c inner join order o using(userid) where age >= 30"""
spark.sql(sql).show()

+---+------+-------------------+
|age|prodid|            orddate|
+---+------+-------------------+
| 35|   p01|2013-01-10 00:00:00|
| 35|   p06|2013-02-20 00:00:00|
| 31|   p02|2013-04-10 00:00:00|
+---+------+-------------------+



In [244]:
sql = """select c.age, o.prodid, substr(orddate,0,10) odate, o.orddate  from cus c inner join order o using(userid) where age >= 30"""
spark.sql(sql).show()

+---+------+----------+-------------------+
|age|prodid|     odate|            orddate|
+---+------+----------+-------------------+
| 35|   p01|2013-01-10|2013-01-10 00:00:00|
| 35|   p06|2013-02-20|2013-02-20 00:00:00|
| 31|   p02|2013-04-10|2013-04-10 00:00:00|
+---+------+----------+-------------------+



In [241]:
from pyspark.sql.functions import substring

In [242]:
cusOrd.filter(col('age') >= 30).select('age', 'prodid', 'orddate', substring('orddate',0,10).alias('odate')).show()

+---+------+-------------------+----------+
|age|prodid|            orddate|     odate|
+---+------+-------------------+----------+
| 35|   p01|2013-01-10 00:00:00|2013-01-10|
| 35|   p06|2013-02-20 00:00:00|2013-02-20|
| 31|   p02|2013-04-10 00:00:00|2013-04-10|
+---+------+-------------------+----------+



### 뷰를 만들어 두면 sql에서 join 코드 대체가능

In [246]:
PRODORDER = prodOrd.createOrReplaceTempView('PRODORDER')

In [248]:
CUSTORDER = cusOrd.createOrReplaceTempView('CUSORDER')

In [250]:
sql = """select userid, prodname from PRODORDER where userid = 'banana' """
spark.sql(sql).show()

+------+----------+
|userid|  prodname|
+------+----------+
|banana|  통통우동|
|banana|  그냥만두|
|banana|맛난초콜렛|
+------+----------+



In [228]:
cusordpro = orders.join(customers, 'userid', 'inner').join(products, 'prodid', 'inner')

In [233]:
cusordpro.filter(col('name') == '고명석').select('prodname').show()

+--------+
|prodname|
+--------+
|매운쫄면|
|쿵떡파이|
+--------+



In [234]:
spark.sql("select prodname from order o inner join cus c using(userid) inner join prod p using(prodid) where c.name = '고명석'").show()

+--------+
|prodname|
+--------+
|매운쫄면|
|쿵떡파이|
+--------+



In [251]:
cusordpro1 = cusordpro.createOrReplaceTempView('cusordpro1')

In [253]:
spark.sql("select prodname from cusordpro1 where name = '고명석'").show()

+--------+
|prodname|
+--------+
|매운쫄면|
|쿵떡파이|
+--------+



In [256]:
cusordpro.show()

+------+------+-------+------+---------------+-------------------+------+---+------+------+-----+----------+-----+-----+--------+
|prodid|userid|orderid|amount|           addr|            orddate|  name|age| grade|   job|coins|  prodname|stock|price|   maker|
+------+------+-------+------+---------------+-------------------+------+---+------+------+-----+----------+-----+-----+--------+
|   p03| apple|    o01|    10|  서울시 마포구|2013-01-01 00:00:00|정소화| 20|  gold|  학생| 1000|  쿵떡파이| 3600| 2600|한빛제과|
|   p01| melon|    o02|     5|  인천시 계양구|2013-01-10 00:00:00|성원용| 35|  gold|회사원| 5000|  그냥만두| 5000| 4500|대한식품|
|   p06|banana|    o03|    45|  경기도 부천시|2013-01-11 00:00:00|김선우| 25|   vip|간호사| 2500|  통통우동| 1000| 1550|민국푸드|
|   p02|carrot|    o04|     8|  부산시 금정구|2013-02-01 00:00:00|고명석| 28|  gold|  교사| 4500|  매운쫄면| 2500| 5500|민국푸드|
|   p06| melon|    o05|    36|  경기도 용인시|2013-02-20 00:00:00|성원용| 35|  gold|회사원| 5000|  통통우동| 1000| 1550|민국푸드|
|   p01|banana|    o06|    19|충청북도 보은군|2013-03-02 00:00:00

In [263]:
spark.sql("select prodname, price from prod where maker = (select maker from prod where prodname = '달콤비스켓'").show()

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:40440)
Traceback (most recent call last):
  File "/usr/share/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/share/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:40440)