In [1]:
import sys

print(sys.version)

3.8.10 (default, Nov 14 2021, 21:32:59) 
[Clang 12.0.5 (clang-1205.0.22.9)]


### Spark 환경 설정

로컬 머신 내 다운받은 SPARK_HOME 경로를 정확히 지정합니다. findspark() 를 통해 경로 내 Spark 를 실행할 수 있습니다.
만약 AWS 환경이라면 EFS 등의 공유 스토리지에 Hadoop, Java, Spark 환경을 버전별로 구비해놓고 Read-only 로 마운트 해 사용할 수 있습니다.

In [2]:
DATASET_ROOT = "/Users/kun/github/1ambda/practical-data-pipeline-code"
SPARK_HOME = "/Users/kun/github/spark/spark-3.2.0-bin-hadoop3.2"

In [3]:
import findspark

findspark.init(SPARK_HOME)
findspark.add_packages([
    "org.apache.hadoop:hadoop-aws:3.3.1", 
    "com.amazonaws:aws-java-sdk-bundle:1.11.375",
    "mysql:mysql-connector-java:8.0.25",
])

### Spark Session 생성

로컬모드에서 실행할 Spark Session 을 만듭니다. (`.master("local[*]")`)
- 일반적인 Spark 설정은 `$SPARK_HOME/conf/spark-defaults.conf` 내에서 세팅해 공통환경으로 사용합니다. 다만 이 예제에서는 보여주기 위해 SparkConf 를 이용해 설정합니다.
- Hive Metastore URI 등 HMS 관련 설정은 `$SPARK_HOME/conf/hive-site.conf` 내에서 세팅해 공통 환경으로 사용합니다.
- 이 예제에서는 Minio 를 사용하므로 Access Key, Secret Key 를 사용합니다. AWS 위에서 실행된다면 [AWS Instance Profile](https://docs.aws.amazon.com/ko_kr/IAM/latest/UserGuide/id_roles_use_switch-role-ec2_instance-profiles.html) 을 이용할 수 있으므로 키를 세팅하지 않습니다.

In [4]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("example-app") \
    .enableHiveSupport() \
    .getOrCreate()

:: loading settings :: url = jar:file:/Users/kun/github/spark/spark-3.2.0-bin-hadoop3.2/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/kun/.ivy2/cache
The jars for the packages stored in: /Users/kun/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
mysql#mysql-connector-java added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1c500042-d1d7-42ed-928c-6aa6cb27e8c9;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.1 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.901 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in local-m2-cache
	found mysql#mysql-connector-java;8.0.25 in central
	found com.google.protobuf#protobuf-java;3.11.4 in central
:: resolution report :: resolve 272ms :: artifacts dl 14ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.901 from central in [default]
	com.google.protobuf#protobuf-java;3.11.4 from central in [default]
	mysql#mysql-connector-java;8.0.25 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.1

### Spark UI 확인

http://localhost:4040 에서 Spark UI 를 확인할 수 있습니다.  
만약 아래와 같은 메세지가 Spark Session 을 초기화 하는 과정에서 보였다면 Port 를 4041 (http://localhost:4041) 로 변경해 브라우저에서 확인합니다.

```
21/12/01 14:51:41 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
```

  
  

Production 환경에서는 사용자의 Jupyter Container (Kubernetes) 내에 https://github.com/jupyterhub/jupyter-server-proxy 등을 통해 제공할 수 있습니다.

### 데이터 읽기

Airbnb 데이터셋 3개를 읽습니다.
- `airbnb_listings.csv` 는 숙소 목록을 담고 있습니다.
- `airbnb_reviews.csv` 는 숙소에 대한 후기와 평점을 담고 있습니다.
- `airbnb_calendar.csv` 는 숙소의 일별 인벤토리 (재고) 여부 및 가격을 담고 있습니다.

In [9]:
dfListings = spark.read.load(f"{DATASET_ROOT}/_datasets/airbnb/airbnb_listings.csv", 
                             format="csv", inferSchema=True, header=True,
                             quote='"', escape='"', sep=',', multiline=True)


dfReviews = spark.read.load(f"{DATASET_ROOT}/_datasets/airbnb/airbnb_reviews.csv", 
                             format="csv", inferSchema=True, header=True,
                             quote='"', escape='"', sep=',', multiline=True)

dfCalendar = spark.read.load(f"{DATASET_ROOT}/_datasets/airbnb/airbnb_calendar.csv", 
                             format="csv", inferSchema=True, header=True,
                             quote='"', escape='"', sep=',', multiline=False)

                                                                                

In [10]:
dfListings.limit(5).toPandas()

  df[column_name] = series


Unnamed: 0,id,listing_url,scrape_id,last_scraped,name,summary,space,description,experiences_offered,neighborhood_overview,...,instant_bookable,is_business_travel_ready,cancellation_policy,require_guest_profile_picture,require_guest_phone_verification,calculated_host_listings_count,calculated_host_listings_count_entire_homes,calculated_host_listings_count_private_rooms,calculated_host_listings_count_shared_rooms,reviews_per_month
0,360,https://www.airbnb.com/rooms/360,20191129210509,2019-11-30,LoHi Secret garden at the Chickadee Cottage,Come enjoy our oasis is the city and stay at o...,Chickadee Cottage is the largest of our guest ...,Come enjoy our oasis is the city and stay at o...,none,those who are interested in our local brews - ...,...,t,f,moderate,t,t,2,2,0,0,6.0
1,590,https://www.airbnb.com/rooms/590,20191129210509,2019-11-30,Comfortable - and a great value!,"Large guest room in my home, where I also live...",I have been enjoying welcoming many wonderful ...,"Large guest room in my home, where I also live...",none,I love the diversity of my neighborhood and it...,...,f,f,flexible,f,f,2,0,2,0,4.53
2,592,https://www.airbnb.com/rooms/592,20191129210509,2019-11-30,private,This room is in the basement. It does not hav...,This is a basement room. You can sometimes he...,This room is in the basement. It does not hav...,none,,...,f,f,flexible,f,f,2,0,2,0,1.36
3,1940,https://www.airbnb.com/rooms/1940,20191129210509,2019-11-30,Baker Studio Close to EVERYTHING,Great place for a few nights or months! Signif...,"This newly built, highly functional studio is ...",Great place for a few nights or months! Signif...,none,Walking through the Baker historical neighborh...,...,f,f,strict_14_with_grace_period,f,f,1,1,0,0,1.33
4,2086,https://www.airbnb.com/rooms/2086,20191129210509,2019-11-30,Garden Level Condo,"A furnished, garden level, one bedroom/one bat...",Furnished one bedroom condo. Pets with additio...,"A furnished, garden level, one bedroom/one bat...",none,restaurants and park within walking distance. ...,...,f,f,strict_14_with_grace_period,f,f,1,1,0,0,0.57


In [11]:
dfReviews.limit(5).toPandas()

Unnamed: 0,listing_id,id,date,reviewer_id,reviewer_name,comments
0,360,307152490,2018-08-13,46723582,Madeleine,"This space was perfect! Great location, hosts,..."
1,360,311601388,2018-08-21,68751664,Janelle,Hidden Gem in Denver. Magical Secret garden ce...
2,360,312497032,2018-08-23,57773484,Ziad,Amazing stay at one of the best kept AirBnB se...
3,360,313089933,2018-08-24,11982181,Andrea,The cottage has a great setting with a garden ...
4,360,314492150,2018-08-26,14620568,Iris,"Super cute place, very peaceful (loved the gar..."


In [12]:
dfCalendar.limit(5).toPandas()

Unnamed: 0,listing_id,date,available,price,adjusted_price,minimum_nights,maximum_nights
0,1153002,2019-11-29,f,$145.00,$145.00,30,60
1,3138055,2019-11-29,f,$130.00,$130.00,2,1125
2,3138055,2019-11-30,f,$130.00,$130.00,2,1125
3,3138055,2019-12-01,t,$130.00,$130.00,2,1125
4,3138055,2019-12-02,f,$130.00,$130.00,2,1125


### 테이블 생성

로컬 환경에 Docker 를 이용해 MySQL 을 띄우고, [DataGrip](https://www.jetbrains.com/datagrip) 등 사용하기 편리한 도구를 이용해 테이블 2개를 생성합니다. 

```sql
CREATE TABLE pipeline.airbnb_stat_review
(
    listing_id       BIGINT UNSIGNED NOT NULL,
    count_review     BIGINT UNSIGNED NOT NULL,
    score_review_avg DOUBLE(10, 5)   NOT NULL,

    PRIMARY KEY (listing_id)

) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4
  COLLATE = utf8mb4_unicode_ci;
```

```sql
CREATE TABLE pipeline.airbnb_stat_sales_month
(
    listing_id       BIGINT UNSIGNED NOT NULL,

    month VARCHAR(8),
    count_order BIGINT UNSIGNED NOT NULL,
    price_order DOUBLE(10, 5) NOT NULL,

    PRIMARY KEY (listing_id, month)

) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4
  COLLATE = utf8mb4_unicode_ci;

```

위 테이블들은 데이터 및 배치 운영 관점에서 몇 가지 문제점들을 가지고 있습니다.  
이 부분은 차후에 논의하도록 하고 먼저 데이터를 적재해보겠습니다.

### 데이터 가공 및 적재: airbnb_stat_review

In [13]:
dfListingMeta = dfListings.selectExpr("CAST(id AS BIGINT) as listing_id", "CAST(review_scores_rating AS DOUBLE) as score_review_avg")

In [14]:
dfGroupedReview = dfReviews.selectExpr("CAST(listing_id AS BIGINT) as listing_id").groupBy("listing_id").agg(count("*").alias("count_review")) 

In [15]:
dfStatReview = dfListingMeta.alias("LISTING_META").join(
    other = dfGroupedReview.alias("LISTING_REVIEW_GROUPED"), 
    on = col("LISTING_META.listing_id") == col("LISTING_REVIEW_GROUPED.listing_id"), 
    how = "left"
).select(
    col("LISTING_META.listing_id"), 
    coalesce(col("LISTING_META.score_review_avg"), lit(0.0)).alias("score_review_avg"), 
    coalesce(col("LISTING_REVIEW_GROUPED.count_review"), lit(0)).alias("count_review"),
)

In [16]:
dfStatReview\
    .repartition(2)\
    .write\
    .mode("append")\
    .format("jdbc")\
    .option("driver", "com.mysql.jdbc.Driver")\
    .option("url", "jdbc:mysql://localhost:3306?useSSL=false")\
    .option("dbtable", "pipeline.airbnb_stat_review")\
    .option("user", "root")\
    .option("password", "root")\
    .option("truncate", "false")\
    .save()

Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
                                                                                

### 데이터 가공 및 적재: airbnb_stat_sales_month

In [17]:
dfCalendarParsed = dfCalendar\
    .withColumn("price", regexp_extract(col("price"), "[0-9]+.[0-9]+", 0).cast(DoubleType()))\
    .withColumn("adjusted_price", regexp_extract(col("adjusted_price"), "[0-9]+.[0-9]+", 0).cast(DoubleType()))\
    .withColumn("month", col('date').substr(lit(1), lit(7)))

In [19]:
dfCalendarParsed.limit(10).toPandas()

Unnamed: 0,listing_id,date,available,price,adjusted_price,minimum_nights,maximum_nights,month
0,1153002,2019-11-29,f,145.0,145.0,30,60,2019-11
1,3138055,2019-11-29,f,130.0,130.0,2,1125,2019-11
2,3138055,2019-11-30,f,130.0,130.0,2,1125,2019-11
3,3138055,2019-12-01,t,130.0,130.0,2,1125,2019-12
4,3138055,2019-12-02,f,130.0,130.0,2,1125,2019-12
5,3138055,2019-12-03,f,130.0,130.0,2,1125,2019-12
6,3138055,2019-12-04,f,130.0,130.0,2,1125,2019-12
7,3138055,2019-12-05,f,130.0,130.0,2,1125,2019-12
8,3138055,2019-12-06,f,130.0,130.0,2,1125,2019-12
9,3138055,2019-12-07,f,130.0,130.0,2,1125,2019-12


<br/>
`dfListingCalendar` 컬럼은 숙소의 일별 재고와 (예약 가능 여부) 가격을 나타냅니다.  

`available = f` 일 경우 판매된건으로 취급해 집계하기 위해 Group By 를 수행하고 Aggregation 함수를 사용합니다.

In [20]:
dfCalendarParsed.agg(min("date"), max("date")).show()

+----------+----------+
| min(date)| max(date)|
+----------+----------+
|2019-11-29|2020-11-28|
+----------+----------+



                                                                                

In [21]:
dfGroupedSales = dfCalendarParsed\
    .where(col("available") == lit('f'))\
    .groupBy("listing_id", "month")\
    .agg(sum("price").alias("price_order"), count("*").alias("count_order"))

In [22]:
dfListingMeta.createOrReplaceTempView("LISTING_META")
dfGroupedSales.createOrReplaceTempView("GROUPED_SALES")

In [37]:

dfStatSalesMonth = spark.sql("""
SELECT 
    /*+ REPARTITION(3, listing_id), BROADCAST(LISTING_META) */
    LISTING_META.listing_id,
    GROUPED_SALES.month,
    coalesce(GROUPED_SALES.price_order, 0.0) as price_order, 
    coalesce(GROUPED_SALES.count_order, 0) as count_order
    
FROM LISTING_META

INNER JOIN GROUPED_SALES
    ON LISTING_META.listing_id = GROUPED_SALES.listing_id
    
ORDER BY listing_id ASC, month ASC
""")

In [38]:
dfStatSalesMonth.printSchema()

root
 |-- listing_id: long (nullable = true)
 |-- month: string (nullable = true)
 |-- price_order: double (nullable = false)
 |-- count_order: long (nullable = false)



In [39]:
dfStatSalesMonth.limit(5).toPandas()

                                                                                

Unnamed: 0,listing_id,month,price_order,count_order
0,360,2019-12,1434.0,11
1,360,2020-01,537.0,4
2,360,2020-02,777.0,6
3,360,2020-03,508.0,4
4,360,2020-05,1143.0,9


In [40]:
dfStatSalesMonth.count()

                                                                                

46992

모든 숙소에 대해 예약이 없어도 2019.11 - 2020.11 까지 기간을 다 만들려면 어떻게 해야할까요?  
기간을 모두 가진 DataFrame 과 Cross Join 하는 경우를 생각해 봅시다.

In [33]:
dfStatSalesMonth\
    .repartition(10)\
    .write\
    .mode("append")\
    .format("jdbc")\
    .option("driver", "com.mysql.jdbc.Driver")\
    .option("url", "jdbc:mysql://localhost:3306?useSSL=false")\
    .option("dbtable", "pipeline.airbnb_stat_sales_month")\
    .option("user", "root")\
    .option("password", "root")\
    .option("truncate", "false")\
    .save()

                                                                                

### 개선 논의

```sql
CREATE TABLE pipeline.airbnb_stat_review
(
    listing_id       BIGINT UNSIGNED NOT NULL,
    count_review     BIGINT UNSIGNED NOT NULL,
    score_review_avg DOUBLE(10, 5)   NOT NULL,

    PRIMARY KEY (listing_id)

) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4
  COLLATE = utf8mb4_unicode_ci;
```

```sql
CREATE TABLE pipeline.airbnb_stat_sales_month
(
    listing_id       BIGINT UNSIGNED NOT NULL,

    month VARCHAR(8), -- index 를 걸어야 할까요?
    count_order BIGINT UNSIGNED NOT NULL,
    price_order DOUBLE(10, 5) NOT NULL,

    PRIMARY KEY (listing_id, month)

) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4
  COLLATE = utf8mb4_unicode_ci;

```

<br/>
위 테이블들의 문제점은 다음과 같습니다.

#### 1) `score_review_avg` 와 같이 "평균" 을 미리 구해버리면, 데이터의 손실이 발생합니다. 
`score_view_sum` 과 같이 원본 데이터를 남겨 나중에 계산할 수 있도록 하는 방식이 좋습니다.  
NodeJS 와 같이 API 에서 소수를 정확하게 계산할 수 없는 경우가 있으나 그게 우려된다면 `avg`, `sum` 을 둘 다 적재하는 편이 낫습니다. 

#### 2) 통계적 데이터를 읽는 사용자 관점에서 "전체"에서의 평균 은 크게 의미가 없는 경우가 많습니다.  
대부분 나 또는 내 주변 (동일 지역, 동일 도시, 경쟁업체) 에만 관심이 있으므로 해당 기준으로 등수를 구하는 편이 의미가 있습니다.  
등수의 경우에도, 1등이 10000건 주문 2등이 100건 주문이라면 등수 차이는 의미가 없으므로 Percentile (백분위) 를 구해주는 편이 낫습니다. 

#### 3) 경쟁 업체를 구하기 위해서는 다양한 방법을 사용할 수 있습니다.
같은 지역 내 (상품 메타 기반) / 사용자가 같이 보는 (사용자 행위 데이터 기반) / 매출이 숫자와 총액, 상품 평균 단가가 비슷한 (판매 관점)

#### 4) 데이터 기간 범위가 더 상세해질 수 있습니다.
일별 데이터로 확장될 수 있기 때문에 `month VARCHAR(8)` 라는 컬럼으로 미리 정하기 보단 `period TIMESTAMP` 등을 이용해  
월별이면 2021-12-01 등 하루로 적재하고 API 에서 Group By 해 나갈 수 있습니다. 이 편이 필터링 (VARCHAR < TIMESTAMP 타입) 에도 더 좋습니다. 

<br/>

운영 관점에서도 다음 포인트를 고민해봅시다

#### A) 잘못 적재 했을 경우엔 어떻게 해야할까요?

DELETE 를 해야하는데, 전체를 할 수는 없고 금일 잘못 실행된 데이터만 삭제해야 한다면 데이터 업데이트 날짜를 표시하는 updated_at 같은 컬럼이 필요할까요?  
또한 배치를 Immutable 하게 만드려면 항상 해당 날짜 데이터를 DELETE 후 APPEND 하면 어떨지도 고민해 봅시다.

#### B) 사용자가 데이터를 사용하는 와중에 삭제되는 것을 보지 않길 원할 때

Spark JDBC Data Source 를 이용하면 Append 혹은 Truncate Overwrite (테이블은 유지하고 데이터 전체 삭제 후 Append) 만 가능합니다.  
따라서 Upsert 와 같은 내용을 구현하려면 foreachPartition 등을 이용해 직접 UPSERT (ON DUPLICATE KEY UPDATE, MERGE INTO) 를 이용할 수 있습니다. 
- https://issues.apache.org/jira/browse/SPARK-19335

### 과제: 데이터 읽기

JDBC Driver 를 이용해 Data 를 읽어봅시다. `upperBound` 를 조절해 가며 파티셔닝이 어떻게 되는지도 확인해 봅시다 (df.rdd.getNumPartitions)

```python
queryListingJdbc = """(SELECT * FROM pipeline.airbnb_stat_review WHERE listing_id >= 100000) AS target"""


df = spark\
    .read\
    .format("jdbc")\
    .option("driver", "com.mysql.jdbc.Driver")\
    .option("url", "jdbc:mysql://localhost:3306?useSSL=false")\
    .option("dbtable", queryListingJdbc)\
    .option("user", "root")\
    .option("password", "root")\
    .option("partitionColumn", "listing_id")\
    .option("lowerBound", "0")\
    .option("upperBound", "1000")\
    .option("customSchema", "listing_id LONG")\
    .option("numPartitions", "5")\
    .load()
```

In [58]:
queryListingJdbc = """(SELECT * FROM pipeline.airbnb_stat_review WHERE listing_id >= 100000) AS target"""


dfLoaded = spark\
    .read\
    .format("jdbc")\
    .option("driver", "com.mysql.jdbc.Driver")\
    .option("url", "jdbc:mysql://localhost:3306?useSSL=false")\
    .option("dbtable", queryListingJdbc)\
    .option("user", "root")\
    .option("password", "root")\
    .option("partitionColumn", "listing_id")\
    .option("lowerBound", "0")\
    .option("upperBound", "370209")\
    .option("customSchema", "listing_id LONG, count_review LONG")\
    .option("numPartitions", "5")\
    .load()

In [59]:
dfLoaded.limit(10).toPandas()

Unnamed: 0,listing_id,count_review,score_review_avg
0,142683,303,94.0
1,172196,25,99.0
2,184529,6,97.0
3,192430,115,94.0
4,217996,11,100.0
5,236207,62,99.0
6,283162,1,80.0
7,364585,135,96.0
8,370209,90,99.0
9,477016,315,97.0


In [60]:
dfLoaded.printSchema()

root
 |-- listing_id: long (nullable = true)
 |-- count_review: long (nullable = true)
 |-- score_review_avg: double (nullable = true)



In [61]:
dfLoaded.rdd.getNumPartitions()

5

In [62]:
dfLoaded.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().toPandas()

Unnamed: 0,partitionId,count
0,1,1
1,2,4
2,3,2
3,4,4844
