In [49]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
     .appName("241211_01_SparkSQL_SQLtest")\
     .getOrCreate()


In [50]:
from pyspark.sql import Row

user_data = [
    Row(user_id=1, username='A', address='서울'),
    Row(user_id=2, username='B', address='대전'),
    Row(user_id=3, username='C', address='경기도'),
    Row(user_id=4, username='D', address=None),
    Row(user_id=5, username='E', address=None),
    Row(user_id=6, username='F', address='서울'),
    Row(user_id=7, username='G', address='경기도'),
    Row(user_id=8, username='H', address='대구'),
    Row(user_id=9, username='I', address='부산'),
    Row(user_id=10, username='J', address='전주'),
    Row(user_id=11, username='K', address='광주')
]

In [51]:
user_df = spark.createDataFrame(user_data)
user_df.createOrReplaceTempView('users')


In [52]:
books_data = [
    Row(book_id=1, title="Book A", author_fname="John", author_lname="Doe", pages=300, released_year=2005, stock_quantity=55),
    Row(book_id=2, title="Book B", author_fname="Jane", author_lname="Smith", pages=250, released_year=2010, stock_quantity=40),
    Row(book_id=3, title="Book C", author_fname="Emily", author_lname="Jones", pages=180, released_year=2015, stock_quantity=20),
    Row(book_id=4, title="Book D", author_fname="Chris", author_lname="Brown", pages=320, released_year=2012, stock_quantity=75),
    Row(book_id=5, title="Book E", author_fname="Anna", author_lname="Davis", pages=270, released_year=2008, stock_quantity=35)
]


In [53]:
books_df = spark.createDataFrame(books_data)
books_df.createOrReplaceTempView('books')

In [54]:
query_users = '''
select * from users;
'''
spark.sql(query_users).show()

+-------+--------+-------+
|user_id|username|address|
+-------+--------+-------+
|      1|       A|   서울|
|      2|       B|   대전|
|      3|       C| 경기도|
|      4|       D|   null|
|      5|       E|   null|
|      6|       F|   서울|
|      7|       G| 경기도|
|      8|       H|   대구|
|      9|       I|   부산|
|     10|       J|   전주|
|     11|       K|   광주|
+-------+--------+-------+



In [55]:
query_users = '''
SELECT username,
    IF(address IS NULL, '주소없음', address) AS address
FROM users;
'''

spark.sql(query_users).show()

+--------+--------+
|username| address|
+--------+--------+
|       A|    서울|
|       B|    대전|
|       C|  경기도|
|       D|주소없음|
|       E|주소없음|
|       F|    서울|
|       G|  경기도|
|       H|    대구|
|       I|    부산|
|       J|    전주|
|       K|    광주|
+--------+--------+



In [56]:
# books table 
# stock_quantity >=50 '재고 많음', >=30 '재고중간', '재고없음'

In [57]:
books_sql = '''
SELECT stock_quantity, 
	   IF(stock_quantity >= 50, '재고 많음',
		  IF(stock_quantity >= 30, '재고 중간', '재고 없음')) AS quantity_level
FROM books;
'''
spark.sql(books_sql).show()

+--------------+--------------+
|stock_quantity|quantity_level|
+--------------+--------------+
|            55|     재고 많음|
|            40|     재고 중간|
|            20|     재고 없음|
|            75|     재고 많음|
|            35|     재고 중간|
+--------------+--------------+



In [58]:
books_sql_1= '''
SELECT stock_quantity, 
	   CASE 
		   WHEN stock_quantity >= 50 THEN '재고 많음'
		   WHEN stock_quantity >= 30 THEN '재고 중간'
		   ELSE '재고 부족'
	   END AS quantity_level
FROM books;
'''
spark.sql(books_sql_1).show()

+--------------+--------------+
|stock_quantity|quantity_level|
+--------------+--------------+
|            55|     재고 많음|
|            40|     재고 중간|
|            20|     재고 부족|
|            75|     재고 많음|
|            35|     재고 중간|
+--------------+--------------+



In [59]:
#실행계획 비교

In [60]:
spark.sql(books_sql).explain()

== Physical Plan ==
*(1) Project [stock_quantity#405L, if ((stock_quantity#405L >= 50)) 재고 많음 else if ((stock_quantity#405L >= 30)) 재고 중간 else 재고 없음 AS quantity_level#465]
+- *(1) Scan ExistingRDD[book_id#399L,title#400,author_fname#401,author_lname#402,pages#403L,released_year#404L,stock_quantity#405L]




In [61]:
spark.sql(books_sql_1).explain()

== Physical Plan ==
*(1) Project [stock_quantity#405L, CASE WHEN (stock_quantity#405L >= 50) THEN 재고 많음 WHEN (stock_quantity#405L >= 30) THEN 재고 중간 ELSE 재고 부족 END AS quantity_level#468]
+- *(1) Scan ExistingRDD[book_id#399L,title#400,author_fname#401,author_lname#402,pages#403L,released_year#404L,stock_quantity#405L]




In [62]:
books_sql_2 = '''
select distinct author_lname from books;
'''
spark.sql(books_sql_2).explain()

== Physical Plan ==
*(2) HashAggregate(keys=[author_lname#402], functions=[])
+- Exchange hashpartitioning(author_lname#402, 200), ENSURE_REQUIREMENTS, [id=#514]
   +- *(1) HashAggregate(keys=[author_lname#402], functions=[])
      +- *(1) Project [author_lname#402]
         +- *(1) Scan ExistingRDD[book_id#399L,title#400,author_fname#401,author_lname#402,pages#403L,released_year#404L,stock_quantity#405L]




In [63]:
spark.sql(books_sql_2).show()

+------------+
|author_lname|
+------------+
|       Jones|
|       Davis|
|       Smith|
|         Doe|
|       Brown|
+------------+



In [64]:
books_sql_3 = '''
select author_lname, count(*)
from books
group by author_lname;
'''
spark.sql(books_sql_3).explain()
spark.sql(books_sql_3).show()

== Physical Plan ==
*(2) HashAggregate(keys=[author_lname#402], functions=[count(1)])
+- Exchange hashpartitioning(author_lname#402, 200), ENSURE_REQUIREMENTS, [id=#561]
   +- *(1) HashAggregate(keys=[author_lname#402], functions=[partial_count(1)])
      +- *(1) Project [author_lname#402]
         +- *(1) Scan ExistingRDD[book_id#399L,title#400,author_fname#401,author_lname#402,pages#403L,released_year#404L,stock_quantity#405L]


+------------+--------+
|author_lname|count(1)|
+------------+--------+
|       Jones|       1|
|       Davis|       1|
|       Smith|       1|
|         Doe|       1|
|       Brown|       1|
+------------+--------+



데이터 변경

In [65]:
# books 테이블 데이터에 borrowed_by 추가
books_data_with_user = [
    Row(book_id=1, title="Book A", author_fname="John", author_lname="Doe", pages=300, released_year=2005, stock_quantity=55, borrowed_by=1),
    Row(book_id=2, title="Book B", author_fname="Jane", author_lname="Smith", pages=250, released_year=2010, stock_quantity=40, borrowed_by=2),
    Row(book_id=3, title="Book C", author_fname="Emily", author_lname="Jones", pages=180, released_year=2015, stock_quantity=20, borrowed_by=3),
    Row(book_id=4, title="Book D", author_fname="Chris", author_lname="Brown", pages=320, released_year=2012, stock_quantity=75, borrowed_by=None),
    Row(book_id=5, title="Book E", author_fname="Anna", author_lname="Davis", pages=270, released_year=2008, stock_quantity=35, borrowed_by=6)
]

# DataFrame 생성
books_df_with_user = spark.createDataFrame(books_data_with_user)

# Temp View 등록
books_df_with_user.createOrReplaceTempView("books")

In [66]:
# borrowed_by 컬럼 추가 및 데이터 입력
updated_books_df = books_df.withColumn(
    "borrowed_by",
    when(books_df.book_id == 1, 1)
    .when(books_df.book_id == 2, 2)
    .when(books_df.book_id == 3, 3)
    .when(books_df.book_id == 4, lit(None))
    .when(books_df.book_id == 5, 6)
    .otherwise(None)
)


In [67]:
books_sql = '''
SELECT *
FROM books;
'''

spark.sql(books_sql).show()

+-------+------+------------+------------+-----+-------------+--------------+-----------+
|book_id| title|author_fname|author_lname|pages|released_year|stock_quantity|borrowed_by|
+-------+------+------------+------------+-----+-------------+--------------+-----------+
|      1|Book A|        John|         Doe|  300|         2005|            55|          1|
|      2|Book B|        Jane|       Smith|  250|         2010|            40|          2|
|      3|Book C|       Emily|       Jones|  180|         2015|            20|          3|
|      4|Book D|       Chris|       Brown|  320|         2012|            75|       null|
|      5|Book E|        Anna|       Davis|  270|         2008|            35|          6|
+-------+------+------------+------------+-----+-------------+--------------+-----------+



In [68]:
from pyspark.sql.functions import *

In [69]:
#book_id = 3, stock_quantity=50으로 바꾼다. > 전처리 과정

updated_books_df= books_df_with_user.withColumn(
    "stock_quantity",
    when(books_df_with_user.book_id == 3, 50).otherwise(books_df_with_user.stock_quantity)
)
updated_books_df.show()

+-------+------+------------+------------+-----+-------------+--------------+-----------+
|book_id| title|author_fname|author_lname|pages|released_year|stock_quantity|borrowed_by|
+-------+------+------------+------------+-----+-------------+--------------+-----------+
|      1|Book A|        John|         Doe|  300|         2005|            55|          1|
|      2|Book B|        Jane|       Smith|  250|         2010|            40|          2|
|      3|Book C|       Emily|       Jones|  180|         2015|            50|          3|
|      4|Book D|       Chris|       Brown|  320|         2012|            75|       null|
|      5|Book E|        Anna|       Davis|  270|         2008|            35|          6|
+-------+------+------------+------------+-----+-------------+--------------+-----------+



#데이터 저장

In [30]:
# mode : overwrite, append, ignore, error

updated_books_df.write.csv("data/output/sqltest_updated_books.csv", header=True, mode="overwrite")

In [33]:

user_df.write.csv("data/output/sqltest_users.csv", header=True, mode="overwrite")

In [None]:
#조인 실습

In [None]:
book_id, title, author_fname, author_lname, username, address

In [34]:
join_query = '''
SELECT book_id, title, author_fname, author_lname, username, address
FROM books b INNER JOIN users u ON b.borrowed_by = u.user_id;
'''
spark.sql(join_query).show()



+-------+------+------------+------------+--------+-------+
|book_id| title|author_fname|author_lname|username|address|
+-------+------+------------+------------+--------+-------+
|      5|Book E|        Anna|       Davis|       F|   서울|
|      1|Book A|        John|         Doe|       A|   서울|
|      3|Book C|       Emily|       Jones|       C| 경기도|
|      2|Book B|        Jane|       Smith|       B|   대전|
+-------+------+------------+------------+--------+-------+



In [None]:
# books LEFT JOIN users

In [None]:
# 사용자의 책 대여 목록 > 전체 사용자 > 대여한 정보가 있으면 나오면, 없으면 NULL
# books RIGHT JOIN users

In [35]:
# 특정지역=서울에 거주하는 사용자가 대여한 책 목록
join_query = '''
SELECT book_id, title, author_fname, author_lname, username, address
FROM books b LEFT JOIN users u ON b.borrowed_by = u.user_id
WHERE u.address = '서울';
'''

spark.sql(join_query).show()

+-------+------+------------+------------+--------+-------+
|book_id| title|author_fname|author_lname|username|address|
+-------+------+------------+------------+--------+-------+
|      5|Book E|        Anna|       Davis|       F|   서울|
|      1|Book A|        John|         Doe|       A|   서울|
+-------+------+------------+------------+--------+-------+



In [39]:
join_query = '''
SELECT user_id, user_name, count(book_id)
FROM users LEFT JOIN books ON u.user_id = b.borrowed_by
GROUP BY u.user_id, u.username;
'''
#사용자별로 대여한 책수 

In [None]:
#book_category > 300 이상이면 Long, Short
join_query = '''
SELECT book_id,title, pages, CASE 
                        WHEN pages>=300 THEN 'Long' ELSE 'Short'
                        END AS book_category
FROM books;
'''

In [40]:
stock_status_query = '''
SELECT book_id, title, stock_quantity,
       CASE 
           WHEN stock_quantity > 50 THEN '충분'
           WHEN stock_quantity >= 30 THEN '보통'
           ELSE '부족'
       END AS stock_status
FROM books
'''

spark.sql(stock_status_query).show()


+-------+------+--------------+------------+
|book_id| title|stock_quantity|stock_status|
+-------+------+--------------+------------+
|      1|Book A|            55|        충분|
|      2|Book B|            40|        보통|
|      3|Book C|            20|        부족|
|      4|Book D|            75|        충분|
|      5|Book E|            35|        보통|
+-------+------+--------------+------------+



In [None]:
#책제목에 특정 키워드가 포함되어 있는지 확인 할때
'''WHERE title LIKE '%A%'''

In [None]:
#대여가 많이 된 책의 작가를 조회
'''
SELECT author_fname, author_lname, count(book_id) as borrow_count
FROM books 
GROUP BY author_fname, author_lname
ORDER BY borrow_count DESC
LIMIT 1
'''

In [None]:
#책의 발행 연도별 대여 현황: 발행 연도별로 대여된 책의 수를 확인합니다.
borrowed_by_year_query = '''
SELECT b.released_year, COUNT(b.borrowed_by) AS borrow_count
FROM books AS b
WHERE b.borrowed_by IS NOT NULL
GROUP BY b.released_year
ORDER BY b.released_year
'''


In [None]:
# 사용자의 지역별 대여된 책 수: 사용자 지역별로 대여된 책의 수를 계산합니다.
region_borrow_query = '''
SELECT u.address AS user_region, COUNT(b.book_id) AS borrowed_count
FROM users AS u
JOIN books AS b ON u.user_id = b.borrowed_by
GROUP BY u.address
ORDER BY borrowed_count DESC
'''

In [None]:
# 대여되지 않은 책 중 가장 페이지 수가 많은 책: 대여되지 않은 책 중에서 페이지 수가 가장 많은 책을 조회합니다.
largest_unborrowed_query = '''
SELECT *
FROM books
WHERE borrowed_by IS NULL
ORDER BY pages DESC
LIMIT 1
'''

In [None]:
# 재고가 부족한 책과 대여 상태: 재고가 30개 미만인 책과 해당 책이 대여된 상태인지 확인합니다.

In [None]:
low_stock_query = '''
SELECT book_id, title, stock_quantity, 
       CASE 
           WHEN borrowed_by IS NULL THEN '미대여'
           ELSE '대여됨'
       END AS borrow_status
FROM books
WHERE stock_quantity < 30
'''

In [70]:
user_df.write.csv("data/output/sqltest_users.csv", header=True, mode="overwrite")

In [71]:
updated_books_df.write.csv("data/output/sqltest_updated_books.csv_02", header=True, mode="overwrite")

In [None]:
spark.stop()