### Spark Cluster
![img](./data/iShot_2023-03-17_10.26.09.png)  
1. Hadoop 등에서 채용하고 있는 전형적인 Cluster & Mater/Slave 구조
1. Cluster Manager
    * stand alone, hadoop yarn, 메소스
1. Master
    * Driver program
1. Slave(Worker)
    * Executor
        * Task 수행
        * Data 캐싱
---

### Spark 에서 사용 가능한 언어
* Scala
* Java
* Python
* SQL
* R

---

### Spark Shell & Spark App
1. Spark 구동 방법
    * Shell(대화형) or App
1. Shell(대화형)
    * 터미널을 이용하여 spark master 에 명령을 전달, 이를 수행
    * shell 에서 spark context를 자체 생성
1. App
    * Spark Context를 생성 후 명령을 실행
    * Spark Submit 프로그램을 통해 jar 파일 실행

---

### SparkSession
SparkSession 인스턴스는 사용자가 정의한 처리 명령을 클러스터에 실행

In [6]:
import findspark
findspark.init()
findspark.find()

'/Users/ken/Project/spark/spark'

In [7]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("Read S3 Parquet") \
    .getOrCreate()

In [8]:
# SparkContext 내용 확인
spark

In [11]:
# ex
# spark 분산 환경에서 아래 1000개의 row 는 각각 다른 executor 에 할당되어 필요한 작업 수행
myRange = spark.range(1000).toDF("number")
myRange.show(3)

+------+
|number|
+------+
|     0|
|     1|
|     2|
+------+
only showing top 3 rows



---

### DataFrame
* DataFrame은 테이블 형태와 동일
* 일반 테이블과 다른 점은 DataFrame은 다수의 서버에 분산 저장되어 있음

In [12]:
# ex
# 데이터 생성 및 DataFrame 생성
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35), ("David", 40)]
df = spark.createDataFrame(data, ["name", "age"])
# DataFrame 출력
df.show()
# DataFrame의 스키마 출력
df.printSchema()
# DataFrame의 특정 컬럼 선택
df.select("name").show()


+-------+---+
|   name|age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
|  David| 40|
+-------+---+

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

+-------+
|   name|
+-------+
|  Alice|
|    Bob|
|Charlie|
|  David|
+-------+



--- 

### Transformation

1. narrow dependency
    * fitler 등
    * 각 파티션 내에서 이루어 지는 작업
    * 파티션 간 영향을 주지 않음
1. wide dependency
    * grouping, sorting 등
    * 파티션 간 데이터를 교환이 필요한 작업
    * 셔플링이 발생 (성능 저하)
1. lazy evaluation
    * spark 의 장점 중 하나
    * 각 단계마다 작업을 수행하지 않음
    * 실행 계획 수립 후 최적화 진행

---

### Spark 간단하게 사용해 보기

In [1]:
import findspark
findspark.init()
findspark.find()


'/Users/ken/Project/spark/spark'

In [2]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("Read S3 Parquet") \
    .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/17 10:19:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df_order = spark.read \
    .format("parquet") \
    .option("header", True) \
    .load("data/order.parquet")

df_custom = spark.read \
    .format("parquet") \
    .option("header", True) \
    .load("data/custom.parquet")

df_product = spark.read \
    .format("parquet") \
    .option("header", True) \
    .load("data/product.parquet")


In [22]:
# df_custom.printSchema()
# df_custom.show()
df_custom.describe().show()


+-------+----+--------------------+------------------+------+-------------+------------------+-----+-------------------------+
|summary|  Op|     dms_update_time|                id|  name| phone_number|               age|  sex|                  address|
+-------+----+--------------------+------------------+------+-------------+------------------+-----+-------------------------+
|  count| 100|                 100|               100|   100|          100|               100|  100|                      100|
|   mean|null|                null|              50.5|  null|         null|             38.99| null|                     null|
| stddev|null|                null|29.011491975882016|  null|         null|12.109346085824066| null|                     null|
|    min|   I|2023-03-11 07:58:...|                 1|강순자|010-5187-4041|                20|  man| 강원도 고양시 서초중앙길|
|    max|   I|2023-03-11 07:58:...|               100|황진호|070-2143-5674|                59|woman|충청북도 안양시 역삼523로|
+-------+--

In [None]:
# df_product.printSchema()
# df_product.show()
# df_product.describe().show()


In [19]:
# df_order.printSchema()
# df_order.show()
df_order.describe().show()

+-------+-----+--------------------+-----------------+-----------------+------------------+----------------+------------------+----------+
|summary|   Op|     dms_update_time|               id|          cust_id|            prd_id|       order_cnt|       order_price|  order_dt|
+-------+-----+--------------------+-----------------+-----------------+------------------+----------------+------------------+----------+
|  count|20000|               20000|            20000|            20000|             20000|           20000|             20000|     20000|
|   mean| null|                null|          10000.5|          50.0301|          49.80605|          2.5011|       51463.80045|      null|
| stddev| null|                null|5773.647027659381|28.43825998055321|28.795082761420648|1.11196250522697|28319.863064266883|      null|
|    min|    I|2023-03-11 07:58:...|                1|                1|                 1|               1|             10000|2023-03-01|
|    max|    I|2023-03-11 0

In [30]:
df_custom.createOrReplaceTempView("custom")
df_order.createOrReplaceTempView("orders")
df_product.createOrReplaceTempView("product")


In [37]:
# 2023년 판매 내역
sql1 = """
SELECT c.name, c.age, p.name AS product_name, o.order_cnt, o.order_price
FROM custom c
JOIN orders o ON c.id = o.cust_id
JOIN product p ON p.id = o.prd_id
WHERE o.order_dt BETWEEN '2023-01-01' AND '2023-12-31'
"""

In [39]:
# 고객별로 구매한 상품 수, 총 구매액, 최근 주문일
sql2 = """
SELECT c.id AS customer_id, c.name AS customer_name, COUNT(DISTINCT o.prd_id) AS num_products_purchased, 
       SUM(o.order_price * o.order_cnt) AS total_purchase_amount, MAX(o.order_dt) AS last_order_date
FROM custom c
JOIN orders o ON c.id = o.cust_id
JOIN product p ON o.prd_id = p.id
GROUP BY c.id, c.name
ORDER BY total_purchase_amount DESC;
"""

In [41]:
# 가격이 가장 비싼 상품을 조회
sql3 = """
SELECT name, price 
FROM product 
WHERE price = (SELECT MAX(price) FROM product)
"""

In [43]:
# 여자 고객들의 수와 남자 고객들의 수를 조회
sql4 = """
SELECT sex, COUNT(*) AS num_customers
FROM custom
GROUP BY sex
"""

In [60]:
# 2023년 1월부터 3월까지의 총 구매액을 계산
sql5 = """
SELECT int(SUM(order_price * order_cnt)) AS total_purchase_amount
FROM orders
WHERE order_dt >= '2023-01-01' AND order_dt <= '2023-03-31'
"""

In [65]:
# 가장 많이 주문한 상위 10개의 상품과 해당 상품을 주문한 고객의 정보를 조회
sql6 = """
SELECT p.name AS product_name, 
       c.name AS customer_name, 
       c.phone_number, 
       c.age, 
       c.sex, 
       c.address,
       o.order_cnt, 
       o.order_price, 
       o.order_dt
FROM (
  SELECT prd_id, SUM(order_cnt) AS total_cnt
  FROM orders
  GROUP BY prd_id
  ORDER BY total_cnt DESC
  LIMIT 10
) AS t
JOIN orders AS o ON t.prd_id = o.prd_id
JOIN product AS p ON o.prd_id = p.id
JOIN custom AS c ON o.cust_id = c.id
"""

In [67]:
# 30대 이상의 고객들이 구매한 상품 중 가격이 10,000원 이상인 상품들을 구매한 총 금액이 높은 순서로 조회
sql7 = """
SELECT c.name, p.name, SUM(o.order_cnt * p.price) as total_price
FROM custom c
JOIN orders o ON c.id = o.cust_id
JOIN product p ON p.id = o.prd_id
WHERE c.age > 30
GROUP BY c.name, p.name
HAVING SUM(o.order_cnt * p.price) > 10000
ORDER BY total_price DESC;
"""


In [68]:
spark.sql(sql7).show()

+------+------------+-----------+
|  name|        name|total_price|
+------+------------+-----------+
|김승현|차돌된장찌개|  1081000.0|
|홍은주|        닭발|  1012000.0|
|김민준|    부대찌개|   950000.0|
|최현준|    마파두부|   880000.0|
|김하은|  오므라이스|   840000.0|
|김민준|      떡볶이|   828000.0|
|한상철|    돼지갈비|   828000.0|
|김경희|      팟타이|   816000.0|
|이미영|      고로케|   805000.0|
|전서현|    부대찌개|   800000.0|
|이승현|    감자튀김|   800000.0|
|안경숙|    초계국수|   800000.0|
|김민준|      마카롱|   792000.0|
|이준호|      어묵탕|   783000.0|
|이지은|        닭발|   782000.0|
|이준호|      토스트|   770000.0|
|이상철|  아이스크림|   760000.0|
|김민준|    샤브샤브|   752000.0|
|남성훈|    크로와상|   752000.0|
|김민준|차돌된장찌개|   752000.0|
+------+------------+-----------+
only showing top 20 rows

