### `events.csv` 전처리

##### 데이터 불러오기

In [None]:
# 필요한 라이브러리 임포트 및 SparkSession 설정
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("DE_project").getOrCreate()

In [None]:
# events 데이터 읽기 및 구조 확인
df_events = spark.read.option("header", "true").option("inferSchema", "true").csv("../rawdata/events.csv")

print("Events 데이터 스키마:")
df_events.printSchema()
print("\n샘플 데이터:")
df_events.show(5)

Events 데이터 스키마:
root
 |-- display_id: integer (nullable = true)
 |-- uuid: string (nullable = true)
 |-- document_id: integer (nullable = true)
 |-- timestamp: integer (nullable = true)
 |-- platform: string (nullable = true)
 |-- geo_location: string (nullable = true)


샘플 데이터:
+----------+--------------+-----------+---------+--------+------------+
|display_id|          uuid|document_id|timestamp|platform|geo_location|
+----------+--------------+-----------+---------+--------+------------+
|         1|cb8c55702adb93|     379743|       61|       3|   US>SC>519|
|         2|79a85fa78311b9|    1794259|       81|       2|   US>CA>807|
|         3|822932ce3d8757|    1179111|      182|       2|   US>MI>505|
|         4|85281d0a49f7ac|    1777797|      234|       2|   US>WV>564|
|         5|8d0daef4bf5b56|     252458|      338|       2|       SG>00|
+----------+--------------+-----------+---------+--------+------------+
only showing top 5 rows



##### partition 기준으로 사용할 event_date 컬럼 생성

In [87]:
# timestamp를 기반으로 event_time과 event_date 컬럼 생성
df_events_processed = (df_events
    .withColumn("event_time", ((F.col("timestamp") + 1465876799998)/1000).cast("timestamp"))
    .withColumn("event_date", F.to_date("event_time"))
)

# 샘플로 처리된 데이터 확인
print("처리된 데이터 샘플:")
df_events_processed.select("timestamp", "event_time", "event_date", "platform").show(5)


처리된 데이터 샘플:
+---------+--------------------+----------+--------+
|timestamp|          event_time|event_date|platform|
+---------+--------------------+----------+--------+
|       61|2016-06-14 04:00:...|2016-06-14|       3|
|       81|2016-06-14 04:00:...|2016-06-14|       2|
|      182|2016-06-14 04:00:...|2016-06-14|       2|
|      234|2016-06-14 04:00:...|2016-06-14|       2|
|      338|2016-06-14 04:00:...|2016-06-14|       2|
+---------+--------------------+----------+--------+
only showing top 5 rows



##### platform이 누락된 레코드 확인 및 입력

In [None]:
print("🔍 Platform이 1, 2, 3이 아닌 데이터:")
df_events_processed.filter(~F.col("platform").isin(['1', '2', '3'])).show()

🔍 Platform이 1, 2, 3이 아닌 데이터:
+----------+--------------+-----------+---------+--------+------------+--------------------+----------+
|display_id|          uuid|document_id|timestamp|platform|geo_location|          event_time|event_date|
+----------+--------------+-----------+---------+--------+------------+--------------------+----------+
|    303066|83e9ec48908c6a|     968149| 28799999|      \N|   US>VA>511|2016-06-14 11:59:...|2016-06-14|
|  11328496|7e8aa06b36db6a|    1827718|752399961|      \N|   US>KY>529|2016-06-22 20:59:...|2016-06-22|
|  13489553|5cda9845a1b9be|    2624774|896400000|      \N|   US>IN>527|2016-06-24 12:59:...|2016-06-24|
|  14004328|f4bb634c3871b9|     635051|921599944|      \N|   US>NY>501|2016-06-24 19:59:...|2016-06-24|
|  15056922|558ba104e8a37c|    2746860|997199999|      \N|   US>MA>506|2016-06-25 16:59:...|2016-06-25|
+----------+--------------+-----------+---------+--------+------------+--------------------+----------+



In [None]:
# platform missing value 처리 based on page_views.csv
# +----------+--------------+-----------+---------+--------+------------+
# |display_id|          uuid|document_id|timestamp|platform|geo_location|
# +----------+--------------+-----------+---------+--------+------------+
# |    303066|83e9ec48908c6a|     968149| 28799999|      \N|   US>VA>511| -> platform 2
# |  11328496|7e8aa06b36db6a|    1827718|752399961|      \N|   US>KY>529| -> platform 1
# |  13489553|5cda9845a1b9be|    2624774|896400000|      \N|   US>IN>527| -> platform 3
# |  14004328|f4bb634c3871b9|     635051|921599944|      \N|   US>NY>501| -> platform 1
# |  15056922|558ba104e8a37c|    2746860|997199999|      \N|   US>MA>506| -> platform 1
# +----------+--------------+-----------+---------+--------+------------+

df_events_processed = df_events_processed.withColumn("platform",
    F.when(F.col("display_id").isin([11328496, 14004328, 15056922]), "1")
    .when(F.col("display_id").isin([303066]), "2") 
    .when(F.col("display_id").isin([13489553]), "3")
    .otherwise(F.col("platform"))
)

In [None]:
# 확인
df_events_processed.filter(F.col("display_id").isin([303066, 11328496, 13489553, 14004328, 15056922])).show()

+----------+--------------+-----------+---------+--------+------------+--------------------+----------+
|display_id|          uuid|document_id|timestamp|platform|geo_location|          event_time|event_date|
+----------+--------------+-----------+---------+--------+------------+--------------------+----------+
|    303066|83e9ec48908c6a|     968149| 28799999|       2|   US>VA>511|2016-06-14 11:59:...|2016-06-14|
|  11328496|7e8aa06b36db6a|    1827718|752399961|       1|   US>KY>529|2016-06-22 20:59:...|2016-06-22|
|  13489553|5cda9845a1b9be|    2624774|896400000|       3|   US>IN>527|2016-06-24 12:59:...|2016-06-24|
|  14004328|f4bb634c3871b9|     635051|921599944|       1|   US>NY>501|2016-06-24 19:59:...|2016-06-24|
|  15056922|558ba104e8a37c|    2746860|997199999|       1|   US>MA>506|2016-06-25 16:59:...|2016-06-25|
+----------+--------------+-----------+---------+--------+------------+--------------------+----------+



##### platform - event_date 구조로 나누고 timestamp로 정렬해서 parquet으로 저장

In [95]:
output_path_final = "../rawdata/events_partitioned"

# 1단계: platform, event_date 조합별로 개별 저장
platforms_dates = df_events_processed.select("platform", "event_date").distinct().collect()

print(f"총 {len(platforms_dates)}개의 파티션을 개별 정렬해서 저장합니다...")

for row in platforms_dates:
    platform = row['platform']
    event_date = row['event_date']
    
    print(f"처리 중: platform={platform}, event_date={event_date}")
    
    # 각 조합별로 필터링하고 정렬
    df_partition = (df_events_processed
        .filter((F.col("platform") == platform) & (F.col("event_date") == event_date))
        .orderBy("timestamp")  # timestamp만 정렬 (이미 platform, date로 필터됨)
        .coalesce(1)  # 단일 파일로 강제
    )
    
    # 개별 저장 (파티션 구조 유지)
    partition_path = f"{output_path_final}/platform={platform}/event_date={event_date}"
    df_partition.write.mode("overwrite").parquet(partition_path)

print("✅ 모든 파티션 정렬 완료!")

총 48개의 파티션을 개별 정렬해서 저장합니다...
처리 중: platform=3, event_date=2016-06-15
처리 중: platform=1, event_date=2016-06-14
처리 중: platform=2, event_date=2016-06-15
처리 중: platform=2, event_date=2016-06-14
처리 중: platform=3, event_date=2016-06-14
처리 중: platform=1, event_date=2016-06-15
처리 중: platform=2, event_date=2016-06-16
처리 중: platform=3, event_date=2016-06-17
처리 중: platform=1, event_date=2016-06-17
처리 중: platform=2, event_date=2016-06-17
처리 중: platform=1, event_date=2016-06-16
처리 중: platform=3, event_date=2016-06-16
처리 중: platform=3, event_date=2016-06-18
처리 중: platform=2, event_date=2016-06-19
처리 중: platform=3, event_date=2016-06-19
처리 중: platform=1, event_date=2016-06-19
처리 중: platform=2, event_date=2016-06-18
처리 중: platform=1, event_date=2016-06-18
처리 중: platform=3, event_date=2016-06-20
처리 중: platform=3, event_date=2016-06-21
처리 중: platform=2, event_date=2016-06-21
처리 중: platform=2, event_date=2016-06-20
처리 중: platform=1, event_date=2016-06-20
처리 중: platform=1, event_date=2016-06-21
처리 중: platf

##### 정렬확인

In [96]:
print("🔍 정렬 확인:")
target_path = "../rawdata/events_partitioned/platform=2/event_date=2016-06-24/*.parquet"

df_check = spark.read.parquet(target_path)
print("첫 20개 레코드의 timestamp 순서:")
df_check.select("timestamp").show(20)

# timestamp가 정렬되었는지 확인
timestamps = df_check.select("timestamp").collect()
is_sorted = all(timestamps[i].timestamp <= timestamps[i+1].timestamp for i in range(len(timestamps)-1))
print(f"✅ 정렬 여부: {is_sorted}")

🔍 정렬 확인:
첫 20개 레코드의 timestamp 순서:
+---------+
|timestamp|
+---------+
|849600071|
|849600117|
|849600157|
|849600161|
|849600206|
|849600250|
|849600303|
|849600504|
|849600723|
|849600748|
|849601085|
|849601221|
|849601498|
|849601587|
|849601697|
|849601871|
|849602045|
|849602374|
|849602453|
|849602660|
+---------+
only showing top 20 rows

✅ 정렬 여부: True


### `page_views.csv` 전처리 in AWS GLUE

In [None]:
# import sys
# from awsglue.transforms import *
# from awsglue.utils import getResolvedOptions
# from pyspark.context import SparkContext
# from awsglue.context import GlueContext
# from awsglue.job import Job

# from pyspark.sql import functions as F

# ## @params: [JOB_NAME]
# args = getResolvedOptions(sys.argv, ['JOB_NAME'])
# SRC = "s3://data-engineering-project-argus/page_views.csv"
# DST = "s3://data-engineering-project-argus/page_views_partitioned"

# sc = SparkContext()
# glueContext = GlueContext(sc)
# spark = glueContext.spark_session

# job = Job(glueContext)
# job.init(args['JOB_NAME'], args)

# df = (spark.read.option("header", "true").option("inferSchema", "true").csv(SRC))

# OFFSET = 1465876799998

# # timestamp를 기반으로 event_time과 event_date 컬럼 생성
# df_processed = (df
#     .withColumn("event_time", ((F.col("timestamp") + OFFSET)/1000).cast("timestamp"))
#     .withColumn("event_date", F.to_date("event_time"))
# )

# platforms_dates = df_processed.select("platform", "event_date").distinct().collect()

# for row in platforms_dates:
#     platform = row['platform']
#     event_date = row['event_date']
    
    
#     # 각 조합별로 필터링하고 정렬
#     df_partition = (df_processed
#         .filter((F.col("platform") == platform) & (F.col("event_date") == event_date))
#         .orderBy("timestamp")  # timestamp만 정렬 (이미 platform, date로 필터됨)
#         .coalesce(1)  # 단일 파일로 강제
#     )
    
#     # 개별 저장 (파티션 구조 유지)
#     partition_path = f"{output_path_final}/platform={platform}/event_date={event_date}"
#     df_partition.write.mode("overwrite").parquet(DST)


# job.commit()