In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp

# --- 설정 (Configuration) ---
# MinIO 접속 정보 & 데이터 경로
MINIO_ENDPOINT = "http://titan-minio:9000"
ACCESS_KEY = "minioadmin"
SECRET_KEY = "minioadmin"

# 1. Spark 세션 생성 (여기가 제일 중요!)
# spark.jars.packages: S3(MinIO)와 통신하기 위한 필수 라이브러리를 Maven에서 다운로드
spark = SparkSession.builder \
    .appName("TitanLog-ETL-BronzeToSilver") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.533") \
    .config("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT) \
    .config("spark.hadoop.fs.s3a.access.key", ACCESS_KEY) \
    .config("spark.hadoop.fs.s3a.secret.key", SECRET_KEY) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

print("Spark Session 생성 완료! (MinIO 연결 준비 끝)")

# 2. Bronze 데이터 읽기 (Extract)
# MinIO의 bronze 버킷에 있는 모든 JSON 파일을 읽어옵니다.
print("Bronze 데이터 읽는 중...")
bronze_df = spark.read.json("s3a://bronze/*/*.json")

# 데이터 구조(Schema)와 내용 살짝 확인
bronze_df.printSchema()
bronze_df.show(5)

# 3. 데이터 정제 (Transform)
# - timestamp 컬럼이 문자열(String)이라서 실제 시간(Timestamp) 타입으로 변환
# - user_id, item_id가 혹시 null인 데이터는 삭제 (Clean)
print("⚙데이터 정제 중...")
silver_df = bronze_df \
    .withColumn("event_time", to_timestamp(col("timestamp"))) \
    .drop("timestamp") \
    .dropna(subset=["user_id", "item_id"])

# 변환된 스키마 확인
silver_df.printSchema()

# 4. Silver 데이터 저장 (Load)
# - 포맷: Parquet (압축률 좋음)
# - 모드: overwrite (기존 거 지우고 덮어쓰기)
print("Silver(Parquet)로 저장 중...")
silver_df.write \
    .mode("overwrite") \
    .parquet("s3a://silver/log_v1")

print("Bronze -> Silver 변환 및 저장 완료!")

Spark Session 생성 완료! (MinIO 연결 준비 끝)
Bronze 데이터 읽는 중...
root
 |-- device_os: string (nullable = true)
 |-- event_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- item_id: long (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- user_id: long (nullable = true)

+---------+--------------------+----------+--------------+-------+--------------------+-------+
|device_os|            event_id|event_type|    ip_address|item_id|           timestamp|user_id|
+---------+--------------------+----------+--------------+-------+--------------------+-------+
|      Web|210dea62-200c-414...|      cart|148.246.56.162|     26|2026-01-25T11:52:...|   1042|
|      iOS|9ff446f8-dad6-40e...|      view|  2.225.195.47|     12|2026-01-25T11:52:...|   1073|
|      Web|a747981b-a664-4f6...|      view|  222.4.31.144|     10|2026-01-25T11:52:...|   1085|
|      iOS|2655f429-6441-435...|      view|33.238.140.232|      6|2026-01-25T11:52

In [7]:
# Silver 데이터 검증
check_df = spark.read.parquet("s3a://silver/log_v1")
print(f"총 데이터 개수: {check_df.count()}개")
check_df.show(3)

총 데이터 개수: 5000개
+---------+--------------------+----------+--------------+-------+-------+--------------------+
|device_os|            event_id|event_type|    ip_address|item_id|user_id|          event_time|
+---------+--------------------+----------+--------------+-------+-------+--------------------+
|  Android|bbf06d30-1a25-439...|      cart|  28.110.36.39|     30|   1098|2026-01-25 11:53:...|
|      iOS|a0f5cdc4-e268-4e6...|      view| 186.245.14.56|     39|   1086|2026-01-25 11:53:...|
|      Web|fbe8d977-2783-4b6...|  purchase|220.118.164.46|     36|   1040|2026-01-25 11:53:...|
+---------+--------------------+----------+--------------+-------+-------+--------------------+
only showing top 3 rows

