# 사용자 activity 로그를 Hive table 로 제공하기 위한 Spark Application

---


## spark 버전 및 연결 확인

In [1]:
!spark-submit --version
//!spark-shell --version
//!spark-sql --version

//spark.version // 3.3.2
//sc.version // 3.3.2

24/05/27 19:36:37 WARN Utils: Your hostname, Hwikeunui-MacBookAir.local resolves to a loopback address: 127.0.0.1; using 172.29.40.132 instead (on interface en0)


24/05/27 19:36:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.2
      /_/
                        
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 16.0.1
Branch HEAD
Compiled by user liangchi on 2023-02-10T19:57:40Z
Revision 5103e00c4ce5fcc4264ca9c4df12295d42557af6
Url https://github.com/apache/spark
Type --help for more information.



Intitializing Scala interpreter ...

Spark Web UI available at http://172.29.40.132:4041
SparkContext available as 'sc' (version = 3.3.2, master = local[*], app id = local-1716806200986)
SparkSession available as 'spark'


In [11]:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
                        .appName("DE Work")
                        .enableHiveSupport()
                        .getOrCreate()

val FILE_PATH = ".//data//2019-Oct.csv"
val df_test = spark.read.format("csv")
                        .option("header","true")
                        .load(FILE_PATH)
df_test.show(10)

24/05/28 15:29:41 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code|   brand|  price|  user_id|        user_session|
+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|2019-10-01 00:00:...|      view|  44600062|2103807459595387724|                null|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|
|2019-10-01 00:00:...|      view|   3900821|2053013552326770905|appliances.enviro...|    aqua|  33.20|554748717|9333dfbd-b87a-470...|
|2019-10-01 00:00:...|      view|  17200506|2053013559792632471|furniture.living_...|    null| 543.10|519107250|566511c2-e2e3-422...|
|2019-10-01 00:00:...|      view|   1307067|2053013558920217191|  computers.

import org.apache.spark.sql.SparkSession
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@61507cd
FILE_PATH: String = .//data//2019-Oct.csv
df_test: org.apache.spark.sql.DataFrame = [event_time: string, event_type: string ... 7 more fields]


---


## 데이터 처리
\* 사용데이터 : eCommerce behavior data from multi category store(kaggle)  
  \** 2019-Oct.csv (총 42,448,764건 / 5.67GB)  
  \** 2019-Nov.csv (총 67,501,979건 / 9.01GB)  
  \** 데이터 출처 : (https://www.kaggle.com/mkechinov/ecommerce-behavior-data-from-multi-category-store)

- KST 기준 daily partition 처리
- 재처리 후 parquet, snappy 처리
- External Table 방식으로 설계
- 추가 기간 처리에 대응가능하도록 구현
- 배치 장애시 복구를 위한 장치 구현

**데이터 로드**
- inferschema 옵션을 false로 하고 스키마를 직접 선언하여 성능향상 가능(소요시간 단축)

In [32]:
import org.apache.spark.sql.SparkSession

//val SPARK_HOME=sys.env.get("SPARK_HOME").mkString
val spark = SparkSession.builder()
                        .appName("DE Work")
                        .enableHiveSupport()
                        .getOrCreate()

// 소스 데이터 경로
val srcFilePath = "..//src_data//2019-Oct.csv"
val df = spark.read.format("csv")
                    .option("header","true")
                    .option("inferschema","true")  // 스키마 자동 추정
                    .load(srcFilePath)
df.show(10)

24/05/28 16:24:12 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
+-------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code|   brand|  price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|2019-10-01 09:00:00|      view|  44600062|2103807459595387724|                null|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|
|2019-10-01 09:00:00|      view|   3900821|2053013552326770905|appliances.enviro...|    aqua|   33.2|554748717|9333dfbd-b87a-470...|
|2019-10-01 09:00:01|      view|  17200506|2053013559792632471|furniture.living_...|    null|  543.1|519107250|566511c2-e2e3-422...|
|2019-10-01 09:00:01|      view|   1307067|2053013558920217191|  computers.noteboo

import org.apache.spark.sql.SparkSession
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@61507cd
srcFilePath: String = ..//src_data//2019-Oct.csv
df: org.apache.spark.sql.DataFrame = [event_time: timestamp, event_type: string ... 7 more fields]


**스키마 확인**

In [33]:
df.printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



**UTC -> KST 변환 컬럼 생성 (파티션 용 컬럼(yyyy-MM-dd))**
  
<고려사항>
- utc time 에서 kst time 으로 변환 시 일부 데이터는 일, 월, 년도가 변경될 수 있음 

In [55]:
df.createOrReplaceTempView("df") // sql 쿼리로 데이터를 처리하기 위해 템프 뷰 생성
val kst_df = spark.sql("""select *, 
                                 to_date(from_utc_timestamp(event_time, 'Asia/Seoul')) as kst_event_date
                             from df
                       """).sample(0.01) // 테스트를 위한 데이터 샘플링 (약 10%)
kst_df.createOrReplaceTempView("kst_df") // sql 쿼리로 데이터를 처리하기 위해 템프 뷰 생성
//println(kst_df.count()) //423,671

// 파티션용 생성 컬럼 확인 및 날짜가 바뀌는 데이터 확인 (11월 1일로 바뀌는 데이터)
spark.sql("""select event_time,
                    kst_event_date
                from kst_df
                where kst_event_date = "2019-11-01"
          """).show(10)

+-------------------+--------------+
|         event_time|kst_event_date|
+-------------------+--------------+
|2019-10-31 15:00:06|    2019-11-01|
|2019-10-31 15:00:08|    2019-11-01|
|2019-10-31 15:00:22|    2019-11-01|
|2019-10-31 15:00:40|    2019-11-01|
|2019-10-31 15:00:44|    2019-11-01|
|2019-10-31 15:00:50|    2019-11-01|
|2019-10-31 15:01:08|    2019-11-01|
|2019-10-31 15:01:20|    2019-11-01|
|2019-10-31 15:01:22|    2019-11-01|
|2019-10-31 15:01:26|    2019-11-01|
+-------------------+--------------+
only showing top 10 rows



kst_df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [event_time: timestamp, event_type: string ... 8 more fields]


**스키마 확인**

In [52]:
kst_df.printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)
 |-- kst_event_date: date (nullable = true)



**parquet 형식의 snappy 압축 방식으로 데이터 처리**

In [54]:
// Parquet 데이터를 저장할 디렉토리 경로
val snkDataDir = "..//snk_data"

// 테이블 refresh
//kst_df.cache()
//spark.sql("refresh table kst_df")

// 결과 데이터(샘플) parquet 저장
kst_df.write.option("compression", "snappy") // 압축방식 snappy
            .mode("overwrite")
            .partitionBy("kst_event_date") // partition 컬럼
            .parquet(s"${snkDataDir}//") // parquet 저장

24/05/28 16:43:11 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/05/28 16:43:19 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


snkDataDir: String = ..//snk_data


---


**HIVE External Table 생성**  
  
\* 임의로 작성하였습니다

In [None]:
// DDL
CREATE EXTERNAL TABLE user_atv_log(
  event_time timestamp,
  event_type string,
  product_id string,
  category_id string,
  category_code string, 
  brand string,
  price double, 
  user_id string,
  user_session string
)
partitioned by(kst_event_date string)
STORED AS PARQUET
LOCATION '/parquetDataPath/';

---


**추가 기간 처리 대응**  
- 월별 데이터이므로 매월 특정 날짜,시간에 스케쥴러를 등록하여 배치처리
- 소스데이터가 언제, 어디에, 어떻게 저장되는지에 따라 다른방법으로 개발
- Airflow와 같은 데이터파이프라인 자동화 툴 사용 가능
- 단순 스케쥴링을 필요로하는 작업은 Airflow를 사용하기 보다는 cron과 같은 job 스케쥴러를 사용할 수 있음
- yyyy-MMM 파일명 형식의 csv 파일일 경우 아래 소스처럼 어플리케이션이 실행되는 날짜에 해당하는 대상 데이터 작업 가능


In [90]:
import java.text.SimpleDateFormat
import java.util.Locale

//val now_date = java.time.LocalDate.now.toString
val oct_ = "2019-10-01"
val nov_ = "2019-11-01"

val inputFormat = new SimpleDateFormat("yyyy-MM-dd")
val outputFormat = new SimpleDateFormat("yyyy-MMM", Locale.ENGLISH)

//val work_file_name = outputFormat.format(inputFormat.parse(now_date))
val oct_file_name = outputFormat.format(inputFormat.parse(oct_))
val nov_file_name = outputFormat.format(inputFormat.parse(nov_))

//println(s"${work_file_name}.csv") // 2024-May.csv
println(s"${oct_file_name}.csv") // 2019-Oct.csv
println(s"${nov_file_name}.csv") // 2019-Nov.csv

2019-Oct.csv
2019-Nov.csv


import java.text.SimpleDateFormat
import java.util.Locale
oct_: String = 2019-10-01
nov_: String = 2019-11-01
inputFormat: java.text.SimpleDateFormat = java.text.SimpleDateFormat@f67a0200
outputFormat: java.text.SimpleDateFormat = java.text.SimpleDateFormat@ef7955a0
oct_file_name: String = 2019-Oct
nov_file_name: String = 2019-Nov


**배치 장애시 복구 방법**  
- 대상 작업의 중요도에 따라 배치 장애 복구작업의 우선순위가 달라질 수 있음
    - 원인파악이 먼저인지, 복구가 먼저인지
- 해당 작업의 경우 프로그램 소스 앞단에 기본적인 예외처리 가능
    - 대상 작업 소스 데이터 파일이 없는 경우 프로그램 종료
- 배치 파이프라인 툴 Airflow 관련 기능 사용
    - 성공, 실패 알림기능 설정 (이메일, 슬랙 등)
    - task를 분리하여 빠른 원인파악 및 대응 가능
    - dag_arguments에서 retry 관련 설정(retries, retry_delay 등) 및 excution_timeout(최대실행시간, 초과 시 fail) 관련 설정