In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, create_map, expr
import itertools

# -------------------- สร้าง SparkSession --------------------
spark = (
    SparkSession.builder
    .appName("ReadExchangeRate")
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
    .getOrCreate()
)

# ------------------------ Load JSON ------------------------
bucket_name = "exchange.rate"
prefix = "raw_json"
date = "2025-06-09"
object_path = f"s3a://{bucket_name}/{prefix}/exchange_rate_{date}.json"

df = spark.read.option("multiline", "true").json(object_path)

# ------------------ แปลง struct 'rates' เป็น map ------------------
# เอาชื่อฟิลด์ทั้งหมดใน struct rates ออกมา
rate_fields = df.select("rates").schema[0].dataType.names

kv_pairs = list(itertools.chain.from_iterable(
    [(lit(f), col("rates").getField(f)) for f in rate_fields]
))

df_with_map = df.select("date", "base", create_map(*kv_pairs).alias("rates_map"))

# ------------------------ Explode map ------------------------
exploded_df = df_with_map.select(
    "date", "base", expr("explode(rates_map) as (currency, rate)")
)

# ------------------------ Save to MinIO ------------------------
output_path = f"s3a://{bucket_name}/cleaned/exchange_rate_{date}.csv"
exploded_df.write.mode("overwrite").option("header", "true").csv(output_path)


In [1]:
!ps aux | grep spark

jovyan     750  4.0  3.9 9280040 318888 pts/2  Tl   15:34   0:11 /usr/lib/jvm/java-17-openjdk-amd64/bin/java -cp /usr/local/spark/conf/:/usr/local/spark/jars/* -Xmx1g -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false org.apache.spark.deploy.SparkSubmit --conf spark.hadoop.fs.s3a.im