In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Book-User').getOrCreate()

24/12/11 10:28:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## books, users 테이블

In [2]:
from pyspark.sql import Row

books_data = [
    Row(book_id=1, title="Book A", author_fname="John", author_lname="Doe", pages=300, released_year=2005, stock_quantity=55),
    Row(book_id=2, title="Book B", author_fname="Jane", author_lname="Smith", pages=250, released_year=2010, stock_quantity=40),
    Row(book_id=3, title="Book C", author_fname="Emily", author_lname="Jones", pages=180, released_year=2015, stock_quantity=20),
    Row(book_id=4, title="Book D", author_fname="Chris", author_lname="Brown", pages=320, released_year=2012, stock_quantity=75),
    Row(book_id=5, title="Book E", author_fname="Anna", author_lname="Davis", pages=270, released_year=2008, stock_quantity=35)
]

users_data = [
    Row(user_id=1, username='A', address='서울'),
    Row(user_id=2, username='B', address='대전'),
    Row(user_id=3, username='C', address='경기도'),
    Row(user_id=4, username='D', address=None),
    Row(user_id=5, username='E', address=None),
    Row(user_id=6, username='F', address='서울'),
    Row(user_id=7, username='G', address='경기도'),
    Row(user_id=8, username='H', address='대구'),
    Row(user_id=9, username='I', address='부산'),
    Row(user_id=10, username='J', address='전주'),
    Row(user_id=11, username='K', address='광주')
]

books_df = spark.createDataFrame(books_data)
users_df = spark.createDataFrame(users_data)

In [5]:
import pyspark.sql.functions as F

books_df.createOrReplaceTempView("books")
users_df.createOrReplaceTempView("users")

spark.sql('select * from books where released_year=2005').show()

[Stage 0:>                                                          (0 + 1) / 1]

+-------+------+------------+------------+-----+-------------+--------------+
|book_id| title|author_fname|author_lname|pages|released_year|stock_quantity|
+-------+------+------------+------------+-----+-------------+--------------+
|      1|Book A|        John|         Doe|  300|         2005|            55|
+-------+------+------------+------------+-----+-------------+--------------+



                                                                                

## SELECT 문에서 조건식 사용

```
IF(condition, true_value, false_value)
```

```SELECT
    CASE column_name
        WHEN value1 THEN result1
        WHEN value2 THEN result2
        ELSE default_value
    END AS result
FROM table_name;
```

In [6]:
spark.sql('select address, address is Null as is_null from users').show()

+-------+-------+
|address|is_null|
+-------+-------+
|   서울|  false|
|   대전|  false|
| 경기도|  false|
|   null|   true|
|   null|   true|
|   서울|  false|
| 경기도|  false|
|   대구|  false|
|   부산|  false|
|   전주|  false|
|   광주|  false|
+-------+-------+



In [7]:
query = '''
select username,
    if (address is null, '주소없음', address) as address
from users
'''

spark.sql(query).show()

+--------+--------+
|username| address|
+--------+--------+
|       A|    서울|
|       B|    대전|
|       C|  경기도|
|       D|주소없음|
|       E|주소없음|
|       F|    서울|
|       G|  경기도|
|       H|    대구|
|       I|    부산|
|       J|    전주|
|       K|    광주|
+--------+--------+



In [9]:
query = '''
select address,
    if (address in ('경기도','서울'), '수도권', '지방') as region
from users
'''

spark.sql(query).show()

+-------+------+
|address|region|
+-------+------+
|   서울|수도권|
|   대전|  지방|
| 경기도|수도권|
|   null|  지방|
|   null|  지방|
|   서울|수도권|
| 경기도|수도권|
|   대구|  지방|
|   부산|  지방|
|   전주|  지방|
|   광주|  지방|
+-------+------+



In [17]:
book_sql1 = '''
SELECT stock_quantity,
    IF(stock_quantity >= 50, '재고 많음',
        IF(stock_quantity >= 30, '재고 중간', '재고 없음')) AS quantity_level
FROM books;
'''

spark.sql(book_sql1).show()

+--------------+--------------+
|stock_quantity|quantity_level|
+--------------+--------------+
|            55|     재고 많음|
|            40|     재고 중간|
|            20|     재고 없음|
|            75|     재고 많음|
|            35|     재고 중간|
+--------------+--------------+



In [15]:
book_sql2 = '''
SELECT stock_quantity,
    CASE 
        WHEN stock_quantity >= 50 THEN '재고 많음'
        WHEN stock_quantity >= 30 THEN '재고 중간'
        ELSE '재고 없음'
    END AS quantity_level
FROM books;
'''

spark.sql(book_sql2).show()

+--------------+--------------+
|stock_quantity|quantity_level|
+--------------+--------------+
|            55|     재고 많음|
|            40|     재고 중간|
|            20|     재고 없음|
|            75|     재고 많음|
|            35|     재고 중간|
+--------------+--------------+



## 실행계획 비교

In [14]:
spark.sql(book_sql1).explain()

== Physical Plan ==
*(1) Project [stock_quantity#6L, if ((stock_quantity#6L >= 50)) 재고 많음 else if ((stock_quantity#6L >= 30)) 재고 중간 else 재고 없음 AS quantity_level#141]
+- *(1) Scan ExistingRDD[book_id#0L,title#1,author_fname#2,author_lname#3,pages#4L,released_year#5L,stock_quantity#6L]




In [16]:
spark.sql(book_sql2).explain()

== Physical Plan ==
*(1) Project [stock_quantity#6L, CASE WHEN (stock_quantity#6L >= 50) THEN 재고 많음 WHEN (stock_quantity#6L >= 30) THEN 재고 중간 ELSE 재고 없음 END AS quantity_level#156]
+- *(1) Scan ExistingRDD[book_id#0L,title#1,author_fname#2,author_lname#3,pages#4L,released_year#5L,stock_quantity#6L]




## GROUP BY

In [18]:
query = '''
SELECT author_lname, count(*)
FROM books
GROUP BY author_lname;
'''

spark.sql(query).show()
spark.sql(query).explain()



+------------+--------+
|author_lname|count(1)|
+------------+--------+
|       Jones|       1|
|       Davis|       1|
|       Smith|       1|
|         Doe|       1|
|       Brown|       1|
+------------+--------+

== Physical Plan ==
*(2) HashAggregate(keys=[author_lname#3], functions=[count(1)])
+- Exchange hashpartitioning(author_lname#3, 200), ENSURE_REQUIREMENTS, [id=#195]
   +- *(1) HashAggregate(keys=[author_lname#3], functions=[partial_count(1)])
      +- *(1) Project [author_lname#3]
         +- *(1) Scan ExistingRDD[book_id#0L,title#1,author_fname#2,author_lname#3,pages#4L,released_year#5L,stock_quantity#6L]




## 데이터에 열 추가

In [20]:
books_data_with_user = [
    Row(book_id=1, title="Book A", author_fname="John", author_lname="Doe", pages=300, released_year=2005, stock_quantity=55, borrowed_by=1),
    Row(book_id=2, title="Book B", author_fname="Jane", author_lname="Smith", pages=250, released_year=2010, stock_quantity=40, borrowed_by=2),
    Row(book_id=3, title="Book C", author_fname="Emily", author_lname="Jones", pages=180, released_year=2015, stock_quantity=20, borrowed_by=3),
    Row(book_id=4, title="Book D", author_fname="Chris", author_lname="Brown", pages=320, released_year=2012, stock_quantity=75, borrowed_by=None),
    Row(book_id=5, title="Book E", author_fname="Anna", author_lname="Davis", pages=270, released_year=2008, stock_quantity=35, borrowed_by=6)
]

books_df = books_df.withColumn("borrowed_by", 
                   F.when(books_df["book_id"] == 1, 1)  
                    .when(books_df["book_id"] == 2, 2) 
                    .when(books_df["book_id"] == 3, 3)
                    .when(books_df["book_id"] == 5, 6)
                    .otherwise(None) 
)

books_df.show()

+-------+------+------------+------------+-----+-------------+--------------+-----------+
|book_id| title|author_fname|author_lname|pages|released_year|stock_quantity|borrowed_by|
+-------+------+------------+------------+-----+-------------+--------------+-----------+
|      1|Book A|        John|         Doe|  300|         2005|            55|          1|
|      2|Book B|        Jane|       Smith|  250|         2010|            40|          2|
|      3|Book C|       Emily|       Jones|  180|         2015|            20|          3|
|      4|Book D|       Chris|       Brown|  320|         2012|            75|       null|
|      5|Book E|        Anna|       Davis|  270|         2008|            35|          6|
+-------+------+------------+------------+-----+-------------+--------------+-----------+



## 데이터 저장
- 여러 개의 파티션에 나뉘어 저장

In [21]:
books_df.write.csv('data/output/books.csv', header=True, mode='overwrite' )
users_df.write.csv('data/output/users.csv', header=True, mode='overwrite')

## 데이터 불러오기

In [22]:
books_df1 = spark.read.csv('data/output/books.csv', header=True)
books_df1.show()

+-------+------+------------+------------+-----+-------------+--------------+-----------+
|book_id| title|author_fname|author_lname|pages|released_year|stock_quantity|borrowed_by|
+-------+------+------------+------------+-----+-------------+--------------+-----------+
|      3|Book C|       Emily|       Jones|  180|         2015|            20|          3|
|      4|Book D|       Chris|       Brown|  320|         2012|            75|       null|
|      5|Book E|        Anna|       Davis|  270|         2008|            35|          6|
|      1|Book A|        John|         Doe|  300|         2005|            55|          1|
|      2|Book B|        Jane|       Smith|  250|         2010|            40|          2|
+-------+------+------------+------------+-----+-------------+--------------+-----------+



In [23]:
users_df1 = spark.read.csv('data/output/users.csv', header=True)
users_df1.show()

+-------+--------+-------+
|user_id|username|address|
+-------+--------+-------+
|      6|       F|   서울|
|      7|       G| 경기도|
|      8|       H|   대구|
|      9|       I|   부산|
|     10|       J|   전주|
|     11|       K|   광주|
|      1|       A|   서울|
|      2|       B|   대전|
|      3|       C| 경기도|
|      4|       D|   null|
|      5|       E|   null|
+-------+--------+-------+



In [24]:
books_df1.createOrReplaceTempView('books')
users_df1.createOrReplaceTempView('users')

## INNER JOIN

In [26]:
query = '''
SELECT book_id, title, author_fname, author_lname, username, address
FROM books AS b
INNER JOIN users AS u
ON b.borrowed_by = u.user_id;
'''

spark.sql(query).show()

+-------+------+------------+------------+--------+-------+
|book_id| title|author_fname|author_lname|username|address|
+-------+------+------------+------------+--------+-------+
|      3|Book C|       Emily|       Jones|       C| 경기도|
|      5|Book E|        Anna|       Davis|       F|   서울|
|      1|Book A|        John|         Doe|       A|   서울|
|      2|Book B|        Jane|       Smith|       B|   대전|
+-------+------+------------+------------+--------+-------+



In [30]:
query = '''
SELECT book_id, title, author_fname, author_lname, username, address
FROM books AS b
INNER JOIN users AS u
ON b.borrowed_by = u.user_id
WHERE address == '서울';
'''

spark.sql(query).show()

+-------+------+------------+------------+--------+-------+
|book_id| title|author_fname|author_lname|username|address|
+-------+------+------------+------------+--------+-------+
|      5|Book E|        Anna|       Davis|       F|   서울|
|      1|Book A|        John|         Doe|       A|   서울|
+-------+------+------------+------------+--------+-------+



## LEFT JOIN

In [27]:
query = '''
SELECT book_id, title, author_fname, author_lname, username, address
FROM books AS b
LEFT JOIN users AS u
ON b.borrowed_by = u.user_id;
'''

spark.sql(query).show()

+-------+------+------------+------------+--------+-------+
|book_id| title|author_fname|author_lname|username|address|
+-------+------+------------+------------+--------+-------+
|      3|Book C|       Emily|       Jones|       C| 경기도|
|      4|Book D|       Chris|       Brown|    null|   null|
|      5|Book E|        Anna|       Davis|       F|   서울|
|      1|Book A|        John|         Doe|       A|   서울|
|      2|Book B|        Jane|       Smith|       B|   대전|
+-------+------+------------+------------+--------+-------+



In [35]:
# 사용자 별로 대여한 책 수
query = '''
SELECT 
    u.user_id, 
    u.username, 
    COUNT(b.book_id) AS borrowed_books_count
FROM users AS u
LEFT JOIN books AS b
ON b.borrowed_by = u.user_id
GROUP BY user_id, username;
'''

spark.sql(query).show()



+-------+--------+--------------------+
|user_id|username|borrowed_books_count|
+-------+--------+--------------------+
|     11|       K|                   0|
|      1|       A|                   1|
|      5|       E|                   0|
|      2|       B|                   1|
|      7|       G|                   0|
|      3|       C|                   1|
|      6|       F|                   1|
|      4|       D|                   0|
|      9|       I|                   0|
|      8|       H|                   0|
|     10|       J|                   0|
+-------+--------+--------------------+



## RIGHT JOIN

In [29]:
query = '''
SELECT book_id, title, author_fname, author_lname, username, address
FROM books AS b
RIGHT JOIN users AS u
ON b.borrowed_by = u.user_id;
'''

spark.sql(query).show()

+-------+------+------------+------------+--------+-------+
|book_id| title|author_fname|author_lname|username|address|
+-------+------+------------+------------+--------+-------+
|      5|Book E|        Anna|       Davis|       F|   서울|
|   null|  null|        null|        null|       G| 경기도|
|   null|  null|        null|        null|       H|   대구|
|   null|  null|        null|        null|       I|   부산|
|   null|  null|        null|        null|       J|   전주|
|   null|  null|        null|        null|       K|   광주|
|      1|Book A|        John|         Doe|       A|   서울|
|      2|Book B|        Jane|       Smith|       B|   대전|
|      3|Book C|       Emily|       Jones|       C| 경기도|
|   null|  null|        null|        null|       D|   null|
|   null|  null|        null|        null|       E|   null|
+-------+------+------------+------------+--------+-------+



## CASE 

In [36]:
# pages >= 300 이면 long, else short

query = '''
SELECT 
    book_id,
    title,
    pages,
    CASE 
        WHEN pages >= 300 THEN 'long'
        ELSE 'short'
    END AS book_length
FROM books
'''

spark.sql(query).show()

+-------+------+-----+-----------+
|book_id| title|pages|book_length|
+-------+------+-----+-----------+
|      3|Book C|  180|      short|
|      4|Book D|  320|       long|
|      5|Book E|  270|      short|
|      1|Book A|  300|       long|
|      2|Book B|  250|      short|
+-------+------+-----+-----------+



In [37]:
# stock_quantity >= 50 이면 충분, >= 30 이면 보통, 부족

query = '''
SELECT 
    book_id,
    title,
    stock_quantity,
    CASE 
        WHEN stock_quantity >= 50 THEN '충분'
        WHEN stock_quantity >= 30 THEN '보통'
        ELSE '부족'
    END AS stock_quantity_level
FROM books
'''

spark.sql(query).show()

+-------+------+--------------+--------------------+
|book_id| title|stock_quantity|stock_quantity_level|
+-------+------+--------------+--------------------+
|      3|Book C|            20|                부족|
|      4|Book D|            75|                충분|
|      5|Book E|            35|                보통|
|      1|Book A|            55|                충분|
|      2|Book B|            40|                보통|
+-------+------+--------------+--------------------+



## LIMIT

In [40]:
# 대여가 가장 많이 된 책의 작가 조회
query = '''
SELECT author_fname, author_lname, count(book_id) as borrow_count
FROM books
GROUP BY author_fname, author_lname
ORDER BY borrow_count DESC
LIMIT 1
'''

spark.sql(query).show()



+------------+------------+------------+
|author_fname|author_lname|borrow_count|
+------------+------------+------------+
|        Anna|       Davis|           1|
+------------+------------+------------+





## 실습

In [45]:
# 책의 발행 연도별 대여 현황
query = '''
SELECT released_year, COUNT(borrowed_by) AS borrow_count
FROM books AS b
INNER JOIN users AS u
ON b.borrowed_by = u.user_id
GROUP BY released_year
'''

spark.sql(query).show()

+-------------+------------+
|released_year|borrow_count|
+-------------+------------+
|         2005|           1|
|         2008|           1|
|         2015|           1|
|         2010|           1|
+-------------+------------+



In [46]:
# 사용자 지역별 대여된 책 수
query = '''
SELECT address, COUNT(borrowed_by) AS borrow_count
FROM books AS b
INNER JOIN users AS u
ON b.borrowed_by = u.user_id
GROUP BY address
'''

spark.sql(query).show()

+-------+------------+
|address|borrow_count|
+-------+------------+
|   대전|           1|
| 경기도|           1|
|   서울|           2|
+-------+------------+



In [50]:
# 재고가 부족한 책과 대여 상태
# 재고가 30개 미만인 책과 해당 책이 대여된 상태인지 확인
query = '''
SELECT title, CASE WHEN borrowed_by IS NULL THEN 'Not Borrowed'
                    ELSE 'Borrowed' END AS borrow_status
FROM books
WHERE stock_quantity < 30 
'''

spark.sql(query).show()

+------+-------------+
| title|borrow_status|
+------+-------------+
|Book C|     Borrowed|
+------+-------------+



In [51]:
spark.stop()