In [1]:

import boto3

# AWS 세션을 사용하여 자격 증명 가져오기
session = boto3.Session(profile_name="dev-mina")
credentials = session.get_credentials()

# 액세스 키, 비밀 키, 세션 토큰 가져오기
aws_access_key = credentials.access_key
aws_secret_key = credentials.secret_key
aws_token = credentials.token

# S3 리소스를 생성하여 S3 객체에 접근
s3_resource = session.resource(
    "s3",
    aws_access_key_id=aws_access_key,
    aws_secret_access_key=aws_secret_key,
    aws_session_token=aws_token,  
)

from pyspark.sql import SparkSession
import warnings
warnings.filterwarnings('ignore')

jars_path = "/Users/mina/nx-mina/test"  # JAR 파일들이 있는 디렉토리
jars = [
    f"{jars_path}/hadoop-aws-3.3.1.jar",
    f"{jars_path}/aws-java-sdk-bundle-1.12.781.jar",
    f"{jars_path}/iceberg-spark-runtime-3.5_2.12-1.9.0.jar",
    f"{jars_path}/spark-avro_2.12-3.5.5.jar", #spark version과 맞추기 
]
jars_str = ",".join(jars)

spark = SparkSession.builder \
    .appName("test") \
    .config("spark.sql.session.timeZone", "UTC") \
    .config("spark.jars", jars_str) \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.access.key", aws_access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", aws_secret_key) \
    .config("spark.hadoop.fs.s3a.session.token", aws_token) \
    .config("spark.sql.catalog.s3cat", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.s3cat.type", "hadoop") \
    .config("spark.sql.catalog.s3cat.warehouse", "s3a://emr-data-pipeline-test/iceberg-warehouse/") \
    .config("spark.sql.catalog.s3cat.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")\
    .getOrCreate()
    # .config("spark.sql.catalog.s3cat.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \


25/05/26 16:53:34 WARN Utils: Your hostname, minaui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 192.168.9.60 instead (on interface en7)
25/05/26 16:53:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
25/05/26 16:53:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [None]:
# !spark-submit --version

In [2]:
spark.sql("""
    CREATE TABLE IF NOT EXISTS s3cat.db.users (
        id INT,
        name STRING,
        email STRING
    )
    USING iceberg
""")

25/05/26 16:37:21 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
25/05/26 16:37:22 WARN VersionInfoUtils: The AWS SDK for Java 1.x entered maintenance mode starting July 31, 2024 and will reach end of support on December 31, 2025. For more information, see https://aws.amazon.com/blogs/developer/the-aws-sdk-for-java-1-x-is-in-maintenance-mode-effective-july-31-2024/
You can print where on the file system the AWS SDK for Java 1.x core runtime is located by setting the AWS_JAVA_V1_PRINT_LOCATION environment variable or aws.java.v1.printLocation system property to 'true'.
This message can be disabled by setting the AWS_JAVA_V1_DISABLE_DEPRECATION_ANNOUNCEMENT environment variable or aws.java.v1.disableDeprecationAnnouncement system property to 'true'.
The AWS SDK for Java 1.x is being used here:
at java.base/java.lang.Thread.getStackTrace(Thread.java:1602)
at com.amazonaws.util.VersionInfoUtils.printDeprecationAn

DataFrame[]

In [3]:
spark.sql("""
    INSERT INTO s3cat.db.users VALUES
    (1, 'Alice', 'alice@example.com'),
    (2, 'Bob', 'bob@example.com')
""")

25/05/26 16:38:31 WARN S3ABlockOutputStream: Application invoked the Syncable API against stream writing to iceberg-warehouse/db/users/data/00000-0-d594af11-9a44-4b90-94fa-12402fca49b2-0-00001.parquet. This is unsupported
                                                                                

DataFrame[]

In [4]:
df = spark.sql("SELECT * FROM s3cat.db.users")
df.show()

                                                                                

+---+-----+-----------------+
| id| name|            email|
+---+-----+-----------------+
|  1|Alice|alice@example.com|
|  2|  Bob|  bob@example.com|
+---+-----+-----------------+



In [None]:
'''
S3에 데이터는 .avro 파일로 저장됨
Apache Avro는 Apache Hadoop의 데이터 직렬화 프레임워크이며 데이터 + 스키마가 함께 저장되므로 스키마가 없는 환경에서도 읽을 수 있는 자체 기술서 포맷임

iceberg-warehouse/db/users/
├── data/                    ← row data (.parquet)
├── metadata/
│   ├── snap-00001-xyz.avro        ← snapshot manifest list
│   ├── 00000-abc.avro             ← manifest file (파일별 정보)
│   ├── v1.metadata.json           ← 테이블 스키마/이력 전체 요약
│   └── version-hint.text
'''

In [6]:
'''
MERGE TEST
'''

from pyspark.sql import Row

# 예제 업데이트 데이터
merge_data = [
    Row(id=1, name='Alice Updated', email='alice_updated@example.com'),  # 업데이트 대상
    Row(id=3, name='Charlie', email='charlie@example.com')               # 새로 삽입될 데이터
]

merge_df = spark.createDataFrame(merge_data)
merge_df.createOrReplaceTempView("updates")

In [7]:
spark.sql("""
    MERGE INTO s3cat.db.users AS target
    USING updates AS source
    ON target.id = source.id

    WHEN MATCHED THEN
    UPDATE SET *
    
    WHEN NOT MATCHED THEN
    INSERT *
          """)

                                                                                

DataFrame[]

In [8]:
df = spark.sql("SELECT * FROM s3cat.db.users")
df.show()

+---+-------------+--------------------+
| id|         name|               email|
+---+-------------+--------------------+
|  1|Alice Updated|alice_updated@exa...|
|  3|      Charlie| charlie@example.com|
|  2|          Bob|     bob@example.com|
+---+-------------+--------------------+



In [4]:
'''
argo file 읽기
'''
df = spark.read.format("avro").load("s3a://emr-data-pipeline-test/iceberg-warehouse/db/users/metadata/14d4221a-bbc8-4c90-b1c2-d04c307dcf5a-m0.avro")
df.show()

+------+-------------------+---------------+--------------------+--------------------+
|status|        snapshot_id|sequence_number|file_sequence_number|           data_file|
+------+-------------------+---------------+--------------------+--------------------+
|     1|9022688425129316256|           NULL|                NULL|{0, s3a://emr-dat...|
|     1|9022688425129316256|           NULL|                NULL|{0, s3a://emr-dat...|
+------+-------------------+---------------+--------------------+--------------------+



In [None]:
'''
data 읽기 
'''
df = spark.read.parquet("s3a://emr-data-pipeline-test/iceberg-warehouse/db/users/data/")
df.show()

                                                                                

+---+-------------+--------------------+
| id|         name|               email|
+---+-------------+--------------------+
|  1|Alice Updated|alice_updated@exa...|
|  3|      Charlie| charlie@example.com|
|  1|        Alice|   alice@example.com|
|  2|          Bob|     bob@example.com|
+---+-------------+--------------------+

