## 1. aws connect

In [2]:
!aws --version

aws-cli/1.33.4 Python/3.11.6 Linux/6.5.11-linuxkit botocore/1.34.122


In [3]:
!aws s3 ls

2024-01-25 15:01:41 aws-glue-assets-280432648345-ap-northeast-2
2024-01-08 06:11:06 aws-lambda-dummy-prod-serverlessdeploymentbucket-uu3bbgjbjfza
2024-01-12 09:39:11 book-data-pipeline-prod-serverlessdeploymentbucket-bjra5omsi63o
2024-01-06 09:08:06 docker-selenium-lambda-pr-serverlessdeploymentbuck-wbhslhm4ylrn
2024-01-08 05:08:38 serverless.metrics-firehose-backup-wnrela9vnmmexwlkffxd44


### issue(solved)
- spark, aws 연결에 필요한 설정 및 디펜던시를 갖춰주었으나 java classpath에서 구성해준 파일들을 제대로 찾아오지 못함
- 컨테이너 내부에서 aws cli 사용했을때에는 config가 제대로 설정되어 연결되는데, spark session에서 aws연결 안됨 
- 그래서 s3에서 데이터를 읽어오지 못하고 classnotfound 오류


#### 해결 방법

spark session 생성 시 
- 1. master 메소드에 local넣어줌
      - 현재 단일 컨테이너 local mode. 클러스터 매니저 또한 local이라고 명시를 해줘야 정확하게 context를 읽어올 수 있다.
- 2. spark.jars.packages -> spark.jars
      - conf 파일에 필요한 패키지 명사해줘서 갖고오는 
      - spark.jars.packages 설정은 Maven 패키지를 로드하는 데 사용됨, 로컬 JAR 파일을 로드하려면 spark.jars 설정으로 진행 필요


In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
.appName("spark-local-environment-test")\
.master("local[*]") \
.config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')\
.config('spark.jars', '$SPARK_HOME/jars/hadoop-aws-3.2.0.jar')\
.config('spark.jars', '$SPARK_HOME/jars/aws-java-sdk-bundle-1.11.375.jar')\
.getOrCreate()

spark

In [4]:
import os

s3_uri = "s3a://book-data-pipeline-prod-serverlessdeploymentbucket-bjra5omsi63o/AWSDynamoDB/01707815361219-bbc1fc5b"
df = spark.read.format('json').load(os.path.join(s3_uri, 'data/*.json.gz'))
df.show()

+--------------------------+
|                      Item|
+--------------------------+
|      {{'Fishel, Cathar...|
|   {{'S.N. 고엔카'}, {1...|
|   {{'이순신'}, {158128...|
|   {{'김현구'}, {223179...|
|      {{'Urry'}, {22208...|
| {{'토피 편집부'}, {213...|
|{{'파멜라 크리베'}, {10...|
|   {{'이득우'}, {214994...|
|   {{'최광희'}, {212662...|
|      {{''}, {20541571}...|
|   {{'김아영'}, {145331...|
|   {{'김은영'}, {136171...|
|{{'존 야블론스키'}, {16...|
|   {{'장조원'}, {206334...|
|      {{''}, {16624530}...|
|   {{'손원호'}, {208102...|
|   {{'김연주'}, {220596...|
|   {{'변희재'}, {215112...|
|   {{'유대영'}, {213759...|
|{{'배창호,조오영'}, {22...|
+--------------------------+
only showing top 20 rows



# 2. common task 

In [6]:
user_list_df = spark.read.option("header", "true").option("inferSchema", "true").csv('./data/user_list.csv')
user_list_df.printSchema()
user_list_df.show()
user_list_df.count()

root
 |-- REG_DATE: timestamp (nullable = true)
 |-- SEX_ID: string (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- WITHDRAW_DATE: string (nullable = true)
 |-- PREF_NAME: string (nullable = true)
 |-- USER_ID_hash: string (nullable = true)

+-------------------+------+---+-------------------+---------+--------------------+
|           REG_DATE|SEX_ID|AGE|      WITHDRAW_DATE|PREF_NAME|        USER_ID_hash|
+-------------------+------+---+-------------------+---------+--------------------+
|2012-03-28 14:14:18|     f| 25|                 NA|     NULL|d9dca3cb44bab12ba...|
|2011-05-18 00:41:48|     f| 34|                 NA|   東京都|560574a339f1b25e5...|
|2011-06-13 16:36:58|     m| 41|                 NA|   愛知県|e66ae91b978b3229f...|
|2012-02-08 12:56:15|     m| 25|                 NA|     NULL|43fc18f32eafb0571...|
|2011-05-22 23:43:56|     m| 62|                 NA| 神奈川県|dc6df8aa860f8db0d...|
|2011-05-27 16:17:19|     f| 50|                 NA|   広島県|f430e8302c1fd0915...|
|201

22873

In [7]:
coupon_detail_df = spark.read.option("header", "true").option("inferSchema", "true").csv('./data/coupon_detail_train.csv')
coupon_detail_df.printSchema()
coupon_detail_df.show()
coupon_detail_df.count()

root
 |-- ITEM_COUNT: integer (nullable = true)
 |-- I_DATE: timestamp (nullable = true)
 |-- SMALL_AREA_NAME: string (nullable = true)
 |-- PURCHASEID_hash: string (nullable = true)
 |-- USER_ID_hash: string (nullable = true)
 |-- COUPON_ID_hash: string (nullable = true)

+----------+-------------------+----------------------------+--------------------+--------------------+--------------------+
|ITEM_COUNT|             I_DATE|             SMALL_AREA_NAME|     PURCHASEID_hash|        USER_ID_hash|      COUPON_ID_hash|
+----------+-------------------+----------------------------+--------------------+--------------------+--------------------+
|         1|2012-03-28 15:06:06|                        兵庫|c820a8882374a4e47...|d9dca3cb44bab12ba...|34c48f84026e08355...|
|         1|2011-07-04 23:52:54|      銀座・新橋・東京・上野|1b4eb2435421ede98...|560574a339f1b25e5...|767673b7a777854a9...|
|         1|2011-07-16 00:52:49|          恵比寿・目黒・品川|36b5f9ba46c44b655...|560574a339f1b25e5...|4f3b5b91d98311925...

168996

#### 1. join
user_list, coupon_detail_train 데이터를 "USER_ID_hash" 기준으로 user_list로 inner join

In [11]:
joined_df = user_list_df.join(coupon_detail_df, on='USER_ID_hash')
joined_df.show()

+--------------------+-------------------+------+---+-------------+---------+----------+-------------------+----------------------------+--------------------+--------------------+
|        USER_ID_hash|           REG_DATE|SEX_ID|AGE|WITHDRAW_DATE|PREF_NAME|ITEM_COUNT|             I_DATE|             SMALL_AREA_NAME|     PURCHASEID_hash|      COUPON_ID_hash|
+--------------------+-------------------+------+---+-------------+---------+----------+-------------------+----------------------------+--------------------+--------------------+
|d9dca3cb44bab12ba...|2012-03-28 14:14:18|     f| 25|           NA|     NULL|         1|2012-03-28 15:06:06|                        兵庫|c820a8882374a4e47...|34c48f84026e08355...|
|560574a339f1b25e5...|2011-05-18 00:41:48|     f| 34|           NA|   東京都|         1|2011-07-04 23:52:54|      銀座・新橋・東京・上野|1b4eb2435421ede98...|767673b7a777854a9...|
|560574a339f1b25e5...|2011-05-18 00:41:48|     f| 34|           NA|   東京都|         1|2011-07-16 00:52:49|          恵

#### hack
spark dataframe 출력 아주 가독성 떨어짐, pandas dataframe처럼 출력하는법

In [12]:
from IPython.display import display, display_pretty, clear_output, JSON

spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # display enabled
display(joined_df)

USER_ID_hash,REG_DATE,SEX_ID,AGE,WITHDRAW_DATE,PREF_NAME,ITEM_COUNT,I_DATE,SMALL_AREA_NAME,PURCHASEID_hash,COUPON_ID_hash
d9dca3cb44bab12ba...,2012-03-28 14:14:18,f,25,,,1,2012-03-28 15:06:06,兵庫,c820a8882374a4e47...,34c48f84026e08355...
560574a339f1b25e5...,2011-05-18 00:41:48,f,34,,東京都,1,2011-07-04 23:52:54,銀座・新橋・東京・上野,1b4eb2435421ede98...,767673b7a777854a9...
560574a339f1b25e5...,2011-05-18 00:41:48,f,34,,東京都,1,2011-07-16 00:52:49,恵比寿・目黒・品川,36b5f9ba46c44b655...,4f3b5b91d98311925...
560574a339f1b25e5...,2011-05-18 00:41:48,f,34,,東京都,1,2011-07-16 00:54:53,恵比寿・目黒・品川,2f30f46937cc90047...,4f3b5b91d98311925...
560574a339f1b25e5...,2011-05-18 00:41:48,f,34,,東京都,1,2011-07-16 00:55:52,恵比寿・目黒・品川,4d000c64a55ac573d...,4f3b5b91d98311925...
560574a339f1b25e5...,2011-05-18 00:41:48,f,34,,東京都,1,2011-07-16 00:57:09,恵比寿・目黒・品川,d8b030c8a4a2c1051...,4f3b5b91d98311925...
560574a339f1b25e5...,2011-05-18 00:41:48,f,34,,東京都,2,2011-07-16 00:58:29,恵比寿・目黒・品川,2c98138766edf5d5b...,4f3b5b91d98311925...
560574a339f1b25e5...,2011-05-18 00:41:48,f,34,,東京都,2,2011-09-07 00:04:47,渋谷・青山・自由が丘,5a45d62c72a4cc415...,259a22533cab38146...
560574a339f1b25e5...,2011-05-18 00:41:48,f,34,,東京都,1,2011-10-19 22:21:50,渋谷・青山・自由が丘,592bb67c0553648d6...,f4a6f861a266799b0...
560574a339f1b25e5...,2011-05-18 00:41:48,f,34,,東京都,1,2011-12-01 16:53:45,恵比寿・目黒・品川,509883c481716b1f3...,4257a8a169b40bbbc...


#### 2. date column
"REG_DATE" 기준으로 "yyyy-MM-dd" 형식의 date 열 만들기

pyspark.sql.functions.date_format(date, format)[source]¶

Converts a date/timestamp/string to a value of string in the format specified by the date format given by the second argument.

In [23]:
import pyspark.sql.functions as F

date_df = joined_df.withColumn('date', F.date_format(F.col("REG_DATE"), "yyyy-MM-dd"))
display(date_df)

USER_ID_hash,REG_DATE,SEX_ID,AGE,WITHDRAW_DATE,PREF_NAME,ITEM_COUNT,I_DATE,SMALL_AREA_NAME,PURCHASEID_hash,COUPON_ID_hash,date
d9dca3cb44bab12ba...,2012-03-28 14:14:18,f,25,,,1,2012-03-28 15:06:06,兵庫,c820a8882374a4e47...,34c48f84026e08355...,2012-03-28
560574a339f1b25e5...,2011-05-18 00:41:48,f,34,,東京都,1,2011-07-04 23:52:54,銀座・新橋・東京・上野,1b4eb2435421ede98...,767673b7a777854a9...,2011-05-18
560574a339f1b25e5...,2011-05-18 00:41:48,f,34,,東京都,1,2011-07-16 00:52:49,恵比寿・目黒・品川,36b5f9ba46c44b655...,4f3b5b91d98311925...,2011-05-18
560574a339f1b25e5...,2011-05-18 00:41:48,f,34,,東京都,1,2011-07-16 00:54:53,恵比寿・目黒・品川,2f30f46937cc90047...,4f3b5b91d98311925...,2011-05-18
560574a339f1b25e5...,2011-05-18 00:41:48,f,34,,東京都,1,2011-07-16 00:55:52,恵比寿・目黒・品川,4d000c64a55ac573d...,4f3b5b91d98311925...,2011-05-18
560574a339f1b25e5...,2011-05-18 00:41:48,f,34,,東京都,1,2011-07-16 00:57:09,恵比寿・目黒・品川,d8b030c8a4a2c1051...,4f3b5b91d98311925...,2011-05-18
560574a339f1b25e5...,2011-05-18 00:41:48,f,34,,東京都,2,2011-07-16 00:58:29,恵比寿・目黒・品川,2c98138766edf5d5b...,4f3b5b91d98311925...,2011-05-18
560574a339f1b25e5...,2011-05-18 00:41:48,f,34,,東京都,2,2011-09-07 00:04:47,渋谷・青山・自由が丘,5a45d62c72a4cc415...,259a22533cab38146...,2011-05-18
560574a339f1b25e5...,2011-05-18 00:41:48,f,34,,東京都,1,2011-10-19 22:21:50,渋谷・青山・自由が丘,592bb67c0553648d6...,f4a6f861a266799b0...,2011-05-18
560574a339f1b25e5...,2011-05-18 00:41:48,f,34,,東京都,1,2011-12-01 16:53:45,恵比寿・目黒・品川,509883c481716b1f3...,4257a8a169b40bbbc...,2011-05-18


### 3. 필요한 열 선택 / ETL 처리

date	year	month	day	dow	week

user_id	sex_id	name	area_name	purchased_id	

item_count	cost	cost_VAT						

In [27]:
df = (
    date_df
    .withColumn('year', F.year(F.col('date')))
    .withColumn('month', F.month(F.col('date')))
    .withColumn('day', F.dayofmonth(F.col('date')))
    .withColumn('dow', F.dayofweek(F.col('date')))
    .withColumn('week', F.weekofyear(F.col('date')))   
    .withColumn('user_id', F.col('USER_ID_hash'))
    .withColumn('sex_id', F.col('SEX_ID'))
    .withColumn('name', F.col('PREF_NAME'))
    .withColumn('area_name', F.col('SMALL_AREA_NAME'))
    .withColumn('purchased_id', F.col('PURCHASEID_hash'))
    .withColumn('item_count', F.col('ITEM_COUNT'))
    .withColumn('cost', F.col('ITEM_COUNT')*8000)
    .withColumn('cost_VAT', F.col('cost')*1.1)
    .select('date', 'year', 'month', 'day', 'dow', 'week', 'user_id', 'sex_id', 'name', 'area_name', 'purchased_id', 'item_count', 'cost', 'cost_VAT')
)
display(df)

date,year,month,day,dow,week,user_id,sex_id,name,area_name,purchased_id,item_count,cost,cost_VAT
2012-03-28,2012,3,28,4,13,d9dca3cb44bab12ba...,f,,兵庫,c820a8882374a4e47...,1,8000,8800.0
2011-05-18,2011,5,18,4,20,560574a339f1b25e5...,f,東京都,銀座・新橋・東京・上野,1b4eb2435421ede98...,1,8000,8800.0
2011-05-18,2011,5,18,4,20,560574a339f1b25e5...,f,東京都,恵比寿・目黒・品川,36b5f9ba46c44b655...,1,8000,8800.0
2011-05-18,2011,5,18,4,20,560574a339f1b25e5...,f,東京都,恵比寿・目黒・品川,2f30f46937cc90047...,1,8000,8800.0
2011-05-18,2011,5,18,4,20,560574a339f1b25e5...,f,東京都,恵比寿・目黒・品川,4d000c64a55ac573d...,1,8000,8800.0
2011-05-18,2011,5,18,4,20,560574a339f1b25e5...,f,東京都,恵比寿・目黒・品川,d8b030c8a4a2c1051...,1,8000,8800.0
2011-05-18,2011,5,18,4,20,560574a339f1b25e5...,f,東京都,恵比寿・目黒・品川,2c98138766edf5d5b...,2,16000,17600.0
2011-05-18,2011,5,18,4,20,560574a339f1b25e5...,f,東京都,渋谷・青山・自由が丘,5a45d62c72a4cc415...,2,16000,17600.0
2011-05-18,2011,5,18,4,20,560574a339f1b25e5...,f,東京都,渋谷・青山・自由が丘,592bb67c0553648d6...,1,8000,8800.0
2011-05-18,2011,5,18,4,20,560574a339f1b25e5...,f,東京都,恵比寿・目黒・品川,509883c481716b1f3...,1,8000,8800.0


### 4. null
- 구분열 결측치 "-"으로, 수치열 결측치 0 으로 처리
- "NA"로 된 값도 위에 정의한 값으로 결측치 처리

In [28]:
df_null_replaced = (
    df
    .na.fill('-', subset=['date', 'year', 'month', 'day', 'dow', 'week', 'user_id', 'sex_id', 'name', 'area_name', 'purchased_id'])
    .replace('NA', '-', subset=['date', 'user_id', 'sex_id', 'name', 'area_name', 'purchased_id'])
    .na.fill(0, subset=['item_count', 'cost', 'cost_VAT'])
)
display(df_null_replaced)

date,year,month,day,dow,week,user_id,sex_id,name,area_name,purchased_id,item_count,cost,cost_VAT
2012-03-28,2012,3,28,4,13,d9dca3cb44bab12ba...,f,-,兵庫,c820a8882374a4e47...,1,8000,8800.0
2011-05-18,2011,5,18,4,20,560574a339f1b25e5...,f,東京都,銀座・新橋・東京・上野,1b4eb2435421ede98...,1,8000,8800.0
2011-05-18,2011,5,18,4,20,560574a339f1b25e5...,f,東京都,恵比寿・目黒・品川,36b5f9ba46c44b655...,1,8000,8800.0
2011-05-18,2011,5,18,4,20,560574a339f1b25e5...,f,東京都,恵比寿・目黒・品川,2f30f46937cc90047...,1,8000,8800.0
2011-05-18,2011,5,18,4,20,560574a339f1b25e5...,f,東京都,恵比寿・目黒・品川,4d000c64a55ac573d...,1,8000,8800.0
2011-05-18,2011,5,18,4,20,560574a339f1b25e5...,f,東京都,恵比寿・目黒・品川,d8b030c8a4a2c1051...,1,8000,8800.0
2011-05-18,2011,5,18,4,20,560574a339f1b25e5...,f,東京都,恵比寿・目黒・品川,2c98138766edf5d5b...,2,16000,17600.0
2011-05-18,2011,5,18,4,20,560574a339f1b25e5...,f,東京都,渋谷・青山・自由が丘,5a45d62c72a4cc415...,2,16000,17600.0
2011-05-18,2011,5,18,4,20,560574a339f1b25e5...,f,東京都,渋谷・青山・自由が丘,592bb67c0553648d6...,1,8000,8800.0
2011-05-18,2011,5,18,4,20,560574a339f1b25e5...,f,東京都,恵比寿・目黒・品川,509883c481716b1f3...,1,8000,8800.0


### 5. date 컬럼을 활용해 year / month / day / dow / weeks 열 만들기
- dow의 경우 일요일이 0, 토요일이 6으로 나타나도록
- dayofweek: (1 = Sunday, 2 = Monday, ..., 7 = Saturday) -> needs to extract 1

In [29]:
df_null_replaced = df_null_replaced.withColumn('dow', F.col('dow')-1)
display(df_null_replaced)

date,year,month,day,dow,week,user_id,sex_id,name,area_name,purchased_id,item_count,cost,cost_VAT
2012-03-28,2012,3,28,3,13,d9dca3cb44bab12ba...,f,-,兵庫,c820a8882374a4e47...,1,8000,8800.0
2011-05-18,2011,5,18,3,20,560574a339f1b25e5...,f,東京都,銀座・新橋・東京・上野,1b4eb2435421ede98...,1,8000,8800.0
2011-05-18,2011,5,18,3,20,560574a339f1b25e5...,f,東京都,恵比寿・目黒・品川,36b5f9ba46c44b655...,1,8000,8800.0
2011-05-18,2011,5,18,3,20,560574a339f1b25e5...,f,東京都,恵比寿・目黒・品川,2f30f46937cc90047...,1,8000,8800.0
2011-05-18,2011,5,18,3,20,560574a339f1b25e5...,f,東京都,恵比寿・目黒・品川,4d000c64a55ac573d...,1,8000,8800.0
2011-05-18,2011,5,18,3,20,560574a339f1b25e5...,f,東京都,恵比寿・目黒・品川,d8b030c8a4a2c1051...,1,8000,8800.0
2011-05-18,2011,5,18,3,20,560574a339f1b25e5...,f,東京都,恵比寿・目黒・品川,2c98138766edf5d5b...,2,16000,17600.0
2011-05-18,2011,5,18,3,20,560574a339f1b25e5...,f,東京都,渋谷・青山・自由が丘,5a45d62c72a4cc415...,2,16000,17600.0
2011-05-18,2011,5,18,3,20,560574a339f1b25e5...,f,東京都,渋谷・青山・自由が丘,592bb67c0553648d6...,1,8000,8800.0
2011-05-18,2011,5,18,3,20,560574a339f1b25e5...,f,東京都,恵比寿・目黒・品川,509883c481716b1f3...,1,8000,8800.0


### 6.구분열 기준 수치열 합으로 중복 제거 

In [36]:
result = (
    df_null_replaced
    .groupby('date', 'year', 'month', 'day', 'dow', 'week', 'user_id', 'sex_id', 'name', 'area_name', 'purchased_id')
    .agg(
        F.sum('item_count').alias('item_count_agg'), 
        F.sum('cost').alias('cost_agg'), 
        F.sum('cost_VAT').alias('cost_VAT_agg'), 
    )
)
display(result)

date,year,month,day,dow,week,user_id,sex_id,name,area_name,purchased_id,item_count_agg,cost_agg,cost_VAT_agg
2012-01-30,2012,1,30,1,5,f55c2f67be0321adc...,f,千葉県,栃木,2779ec028f8a6b0e8...,1,8000,8800.0
2011-10-29,2011,10,29,6,43,7e7fef0e10cf867c9...,m,-,愛知,7655b2feee390e81c...,1,8000,8800.0
2012-02-25,2012,2,25,6,8,f246fa8279a41f21e...,m,-,奈良,1f6142427b23ecd94...,2,16000,17600.0
2011-07-09,2011,7,9,6,27,79893d409081aeaed...,f,大阪府,キタ,e4593ed444796dfea...,2,16000,17600.0
2010-11-17,2010,11,17,3,46,6434a69c5df328f00...,f,東京都,立川・町田・八王子他,86cc54962769e8cff...,1,8000,8800.0
2011-11-13,2011,11,13,0,45,0c19892c7e2fc274a...,m,福岡県,渋谷・青山・自由が丘,7ea8f1a8cc9fd02a2...,3,24000,26400.000000000004
2011-07-30,2011,7,30,6,30,7734cde45d6a1b8b0...,m,-,新宿・高田馬場・中野・吉祥寺,e955ca644c7861299...,1,8000,8800.0
2011-03-10,2011,3,10,4,10,8e5755396392ba36a...,m,東京都,赤坂・六本木・麻布,f018ef6c7fa679195...,1,8000,8800.0
2010-11-16,2010,11,16,2,46,3eb03ecb7f08e8544...,m,愛知県,ミナミ他,ab75ceb398adde2f2...,1,8000,8800.0
2011-05-27,2011,5,27,5,21,7afc4e216672636f6...,m,神奈川県,赤坂・六本木・麻布,830467a8a0754cdba...,1,8000,8800.0
