In [15]:
import duckdb, glob, os

home        = os.path.expanduser("~")
raw         = os.path.join(home, "dev/RETrend")
parquet_dir = os.path.join(raw, "parquet")
os.makedirs(parquet_dir, exist_ok=True)

con = duckdb.connect()

for csv_fp in glob.glob(os.path.join(raw, "trade_history/", "trade_history_*.csv")):
    fn        = os.path.basename(csv_fp)
    complexNo = int(fn.split("_")[2].split(".")[0])
    pq_fp     = os.path.join(parquet_dir, f"trade_history_{complexNo}.parquet")

    # 1) 변환 실패 행만 먼저 뽑아서 출력
    bad_rows = con.execute(f"""
      SELECT *
      FROM read_csv_auto('{csv_fp}')
      WHERE TRY_CAST(tradeDate AS BIGINT) IS NULL
         OR TRY_CAST(tradeYear AS BIGINT) IS NULL
         OR TRY_CAST(tradeMonth AS BIGINT) IS NULL
         OR TRY_CAST(dealPrice AS BIGINT) IS NULL
         OR TRY_CAST(floor AS BIGINT) IS NULL
         OR TRY_CAST(representativeArea AS DOUBLE) IS NULL
         OR TRY_CAST(exclusiveArea AS DOUBLE) IS NULL
         OR TRY_CAST(areaNo AS BIGINT) IS NULL
    """).fetchall()

    for row in bad_rows:
        print(f"⚠️ 건너뛸 행 ({fn}):", row)

    # 2) 유효한 행만 Parquet으로 저장
    con.execute(f"""
      COPY (
        SELECT
          tradeType::VARCHAR                              AS tradeType,
          CAST(tradeYear       AS BIGINT)                 AS tradeYear,
          CAST(tradeMonth      AS BIGINT)                 AS tradeMonth,
          CAST(tradeDate       AS BIGINT)                 AS tradeDate,
          CAST(dealPrice       AS BIGINT)                 AS dealPrice,
          CAST(floor           AS BIGINT)                 AS floor,
          CAST(representativeArea AS DOUBLE)              AS representativeArea,
          CAST(exclusiveArea   AS DOUBLE)                 AS exclusiveArea,
          formattedPrice::VARCHAR                         AS formattedPrice,
          formattedTradeYearMonth::VARCHAR                AS formattedTradeYearMonth,
          CAST(areaNo         AS BIGINT)                  AS areaNo,
          CAST({complexNo}    AS BIGINT)                  AS complexNo,
          CAST(
            lpad(tradeYear::VARCHAR,4,'0') || '-' ||
            lpad(tradeMonth::VARCHAR,2,'0') || '-' ||
            lpad(tradeDate::VARCHAR,2,'0')
            AS TIMESTAMP
          )                                              AS date
        FROM read_csv_auto('{csv_fp}')
        WHERE TRY_CAST(tradeDate       AS BIGINT) IS NOT NULL
          AND TRY_CAST(tradeYear       AS BIGINT) IS NOT NULL
          AND TRY_CAST(tradeMonth      AS BIGINT) IS NOT NULL
          AND TRY_CAST(dealPrice       AS BIGINT) IS NOT NULL
          AND TRY_CAST(floor           AS BIGINT) IS NOT NULL
          AND TRY_CAST(representativeArea AS DOUBLE) IS NOT NULL
          AND TRY_CAST(exclusiveArea   AS DOUBLE) IS NOT NULL
          AND TRY_CAST(areaNo         AS BIGINT) IS NOT NULL
      )
      TO '{pq_fp}' (FORMAT PARQUET, COMPRESSION 'snappy');
    """)
    print("✅ CSV→Parquet:", fn)

con.close()


✅ CSV→Parquet: trade_history_1040.csv
✅ CSV→Parquet: trade_history_113997.csv
✅ CSV→Parquet: trade_history_102642.csv
✅ CSV→Parquet: trade_history_144617.csv
✅ CSV→Parquet: trade_history_785.csv
✅ CSV→Parquet: trade_history_143178.csv
✅ CSV→Parquet: trade_history_132781.csv
✅ CSV→Parquet: trade_history_22573.csv
✅ CSV→Parquet: trade_history_8702.csv
✅ CSV→Parquet: trade_history_3657.csv
✅ CSV→Parquet: trade_history_160979.csv
✅ CSV→Parquet: trade_history_108409.csv
✅ CSV→Parquet: trade_history_110246.csv
✅ CSV→Parquet: trade_history_4138.csv
✅ CSV→Parquet: trade_history_2549.csv
✅ CSV→Parquet: trade_history_150662.csv
✅ CSV→Parquet: trade_history_26715.csv
✅ CSV→Parquet: trade_history_116637.csv
✅ CSV→Parquet: trade_history_26073.csv
✅ CSV→Parquet: trade_history_4886.csv
✅ CSV→Parquet: trade_history_116151.csv
✅ CSV→Parquet: trade_history_109071.csv
✅ CSV→Parquet: trade_history_106342.csv
✅ CSV→Parquet: trade_history_5540.csv
✅ CSV→Parquet: trade_history_110520.csv
✅ CSV→Parquet: trade

In [26]:
import duckdb, glob, os

# ─── 환경 설정 ─────────────────────────────────────────────────
home        = os.path.expanduser("~")
raw         = os.path.join(home, "dev/RETrend/tmp/raw")
parquet_dir = os.path.join(home, "dev/RETrend/parquet")
os.makedirs(parquet_dir, exist_ok=True)

con = duckdb.connect()

# ─── (B) complex_list.csv → Parquet ────────────────────────────
complex_csv = os.path.join(raw, "complex_list.csv")
complex_pq  = os.path.join(parquet_dir, "complex_list.parquet")

con.execute(f"""
COPY (
  SELECT
    CAST(complexNo              AS BIGINT)  AS complexNo,
    complexName::VARCHAR                       AS complexName,
    CAST(cortarNo               AS BIGINT)    AS cortarNo,
    realEstateTypeCode::VARCHAR                AS realEstateTypeCode,
    realEstateTypeName::VARCHAR                AS realEstateTypeName,
    detailAddress::VARCHAR                     AS detailAddress,
    CAST(latitude              AS DOUBLE)     AS latitude,
    CAST(longitude             AS DOUBLE)     AS longitude,
    CAST(totalHouseholdCount   AS BIGINT)    AS totalHouseholdCount,
    CAST(totalBuildingCount    AS BIGINT)    AS totalBuildingCount,
    CAST(highFloor             AS BIGINT)    AS highFloor,
    CAST(lowFloor              AS BIGINT)    AS lowFloor,
    useApproveYmd::VARCHAR                     AS useApproveYmd,
    CAST(dealCount             AS BIGINT)    AS dealCount,
    CAST(leaseCount            AS BIGINT)    AS leaseCount,
    CAST(rentCount             AS BIGINT)    AS rentCount,
    CAST(shortTermRentCount    AS BIGINT)    AS shortTermRentCount,
    CAST(isInterest            AS BOOLEAN)    AS isInterest,
    cortarAddress::VARCHAR                      AS cortarAddress,
    CAST(tourExist             AS BOOLEAN)    AS tourExist,
    CAST(eupmeandongCortarNo   AS BIGINT)     AS eupmeandongCortarNo,
    eupmeandongCortarName::VARCHAR              AS eupmeandongCortarName
  FROM read_csv_auto('{complex_csv}')
)
TO '{complex_pq}' (FORMAT PARQUET, COMPRESSION 'snappy');
""")
print("✅ CSV→Parquet: complex_list.csv →", complex_pq)

con.close()


✅ CSV→Parquet: complex_list.csv → /Users/dave/dev/RETrend/parquet/complex_list.parquet


In [35]:
from pyspark.sql import SparkSession
import os

home = os.path.expanduser("~")
warehouse = f"file://{os.path.join(home, 'dev/RETrend/tmp/raw/iceberg/phase2_default')}"

spark = (
    SparkSession.builder
    .appName("IcebergHiveCatalogPhase2")
    .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2")
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.hive", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.hive.catalog-impl", "org.apache.iceberg.hive.HiveCatalog")
    .config("spark.sql.catalog.hive.uri", "thrift://localhost:9083")
    .config("spark.sql.catalog.hive.warehouse", warehouse)
    .config("spark.sql.parquet.enableVectorizedReader", "false")
    .config("spark.sql.iceberg.vectorization.enabled", "false")
    .getOrCreate()
)

In [36]:
#spark.sql("DROP DATABASE IF EXISTS hive.phase2 CASCADE")

In [37]:
spark.sql("""
CREATE DATABASE IF NOT EXISTS hive.phase2_1
LOCATION 'file:/Users/dave/dev/RETrend/tmp/raw/iceberg/phase2_1_default'
""")

DataFrame[]

In [38]:
spark.sql("DESCRIBE DATABASE EXTENDED hive.phase2_1").show(truncate=False)

+--------------+---------------------------------------------------------------------------------+
|info_name     |info_value                                                                       |
+--------------+---------------------------------------------------------------------------------+
|Catalog Name  |hive                                                                             |
|Namespace Name|phase2_1                                                                         |
|Location      |file:/Users/dave/dev/RETrend/tmp/raw/iceberg/phase2_1_default                    |
|Owner         |dave                                                                             |
|Properties    |((hive.metastore.database.owner,dave), (hive.metastore.database.owner-type,USER))|
+--------------+---------------------------------------------------------------------------------+



In [39]:
from pyspark.sql.types import *

# Parquet 경로
parquet_dir = os.path.join(home, "dev/RETrend/parquet")

# trade_history 스키마
trade_schema = StructType([
    StructField("tradeType", StringType(), True),
    StructField("tradeYear", LongType(), True),
    StructField("tradeMonth", LongType(), True),
    StructField("tradeDate", LongType(), True),
    StructField("dealPrice", LongType(), True),
    StructField("floor", LongType(), True),
    StructField("representativeArea", DoubleType(), True),
    StructField("exclusiveArea", DoubleType(), True),
    StructField("formattedPrice", StringType(), True),
    StructField("formattedTradeYearMonth", StringType(), True),
    StructField("areaNo", LongType(), True),
    StructField("complexNo", LongType(), True),
    StructField("date", TimestampType(), True),
])

trade_df = (
    spark.read.schema(trade_schema)
        .parquet(f"{parquet_dir}/trade_history_*.parquet")
)

# 테이블 생성
spark.sql("DROP TABLE IF EXISTS hive.phase2_1.trade_history")
trade_df.writeTo("hive.phase2_1.trade_history") \
        .using("iceberg") \
        .createOrReplace()


                                                                                

In [40]:
# complex_list 스키마
complex_schema = StructType([
    StructField("complexNo", LongType(), True),
    StructField("complexName", StringType(), True),
    StructField("cortarNo", LongType(), True),
    StructField("realEstateTypeCode", StringType(), True),
    StructField("realEstateTypeName", StringType(), True),
    StructField("detailAddress", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("totalHouseholdCount", LongType(), True),
    StructField("totalBuildingCount", LongType(), True),
    StructField("highFloor", LongType(), True),
    StructField("lowFloor", LongType(), True),
    StructField("useApproveYmd", StringType(), True),
    StructField("dealCount", LongType(), True),
    StructField("leaseCount", LongType(), True),
    StructField("rentCount", LongType(), True),
    StructField("shortTermRentCount", LongType(), True),
    StructField("isInterest", BooleanType(), True),
    StructField("cortarAddress", StringType(), True),
    StructField("tourExist", BooleanType(), True),
    StructField("eupmeandongCortarNo", LongType(), True),
    StructField("eupmeandongCortarName", StringType(), True),
])

complex_df = (
    spark.read.schema(complex_schema)
        .parquet(f"{parquet_dir}/complex_list.parquet")
)

spark.sql("DROP TABLE IF EXISTS hive.phase2_1.complex_info")
complex_df.writeTo("hive.phase2_1.complex_info") \
          .using("iceberg") \
          .createOrReplace()

In [41]:
spark.sql("SHOW TABLES IN hive.phase2_1").show()

+---------+-------------+-----------+
|namespace|    tableName|isTemporary|
+---------+-------------+-----------+
| phase2_1|trade_history|      false|
| phase2_1| complex_info|      false|
+---------+-------------+-----------+



In [43]:
joined_df = spark.sql("""
  SELECT
    t.date,
    t.tradeType,
    t.dealPrice,
    t.floor,
    t.representativeArea,
    t.exclusiveArea,
    t.formattedPrice,
    t.formattedTradeYearMonth,
    c.complexName,
    c.latitude,
    c.longitude
  FROM hive.phase2_1.trade_history t
  LEFT JOIN hive.phase2_1.complex_info c
    ON t.complexNo = c.complexNo
  ORDER BY t.date DESC
  LIMIT 20
""")
joined_df.show(truncate=False)



+-------------------+---------+---------+-----+------------------+-------------+--------------+-----------------------+---------------------------+---------+----------+
|date               |tradeType|dealPrice|floor|representativeArea|exclusiveArea|formattedPrice|formattedTradeYearMonth|complexName                |latitude |longitude |
+-------------------+---------+---------+-----+------------------+-------------+--------------+-----------------------+---------------------------+---------+----------+
|2025-07-19 09:00:00|A1       |12000    |15   |0.0               |0.0          |1억 2,000     |2025-07-19             |삼익세라믹                 |36.624597|127.478865|
|2025-07-18 09:00:00|A1       |16000    |9    |0.0               |0.0          |1억 6,000     |2025-07-18             |나이스타운12차(도시형)     |36.358901|127.34855 |
|2025-07-18 09:00:00|A1       |49600    |5    |0.0               |0.0          |4억 9,600     |2025-07-18             |운암자이포레나퍼스티체1단지|35.181913|126.877026|
|2025-07-18 09:0

                                                                                


---

1. Write MERGE into Iceberg 개선
2. Airflow 용 Dag

In [None]:
# from pyspark.sql import SparkSession
# from pyspark.sql.functions import col
# import argparse

# parser = argparse.ArgumentParser()
# parser.add_argument("--date", required=True)
# args = parser.parse_args()
# process_date = args.date

# spark = (
#     SparkSession.builder
#     .appName("ParquetToIcebergMerge")
#     .config("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog")
#     .config("spark.sql.catalog.my_catalog.type", "hive")
#     .config("spark.sql.catalog.my_catalog.uri", "thrift://hive-metastore:9083")
#     .getOrCreate()
# )

# # Read Parquet data
# parquet_df = spark.read.parquet(f"s3a://your-bucket/parquet/dt={process_date}")

# # Write MERGE into Iceberg
# (
#     parquet_df
#     .writeTo("my_catalog.db.estate_price")
#     .option("overwrite-mode", "dynamic")  # For MERGE semantics
#     .overwritePartitions()
# )

In [1]:
# from airflow import DAG
# from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
# from datetime import datetime

# default_args = {
#     'owner': 'airflow',
#     'start_date': datetime(2024, 1, 1),
# }

# with DAG(
#     dag_id='iceberg_parquet_merge',
#     default_args=default_args,
#     schedule_interval='@daily',
#     catchup=False,
# ) as dag:

#     merge_task = SparkSubmitOperator(
#         task_id='spark_merge_parquet_to_iceberg',
#         application='/mnt/nfs/spark-jobs/parquet_to_iceberg.py',
#         conn_id='spark_default',
#         conf={
#             'spark.sql.catalog.my_catalog': 'org.apache.iceberg.spark.SparkCatalog',
#             'spark.sql.catalog.my_catalog.type': 'hive',
#             'spark.sql.catalog.my_catalog.uri': 'thrift://hive-metastore:9083',
#             'spark.sql.iceberg.vectorization.enabled': 'true',
#             'spark.executor.memory': '4g',
#             'spark.driver.memory': '2g'
#         },
#         master='spark://spark-master:7077',
#         application_args=[
#             '--date', '{{ ds }}'
#         ],
#         verbose=True
#     )