Skip to content

How to read/write iceberg in Spark Structed Streaming #1230

@zhangdove

Description

@zhangdove

I did some test consume kafka message, write to iceberg table by Spark structed streaming. I'm having some trouble.

1.My environment

Spark version:3.0.0
Iceberg version:0.9.0

2.Create Iceberg table

  def createPartitionTable(catalog: HadoopCatalog, tableIdentifier: TableIdentifier): Unit = {
    val columns: List[Types.NestedField] = new ArrayList[Types.NestedField]
    columns.add(Types.NestedField.of(1, true, "id", Types.IntegerType.get, "id doc"))
    columns.add(Types.NestedField.of(2, true, "name", Types.StringType.get, "name doc"))

    val schema: Schema = new Schema(columns)
    val table = catalog.createTable(tableIdentifier, schema, PartitionSpec.unpartitioned())
  }

3.The pseudocode is as follows

    val (dbName, tbName, kafkatopic, bootstrapServers) = ("testNs", "doveTb", "topic", "ip:9092....")

    val spark = SparkSession.builder()
      .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
      .config("spark.sql.catalog.hadoop_prod.type", "hadoop")
      .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://nameservice1/iceberg/warehouse")
      .getOrCreate()

    // 1. read kafka data
    val streamingDF = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", kafkatopic)
      .load()

    // 2. consume message
    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
      executorBatchDf(spark, batchDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").toDF("key", "value"), batchId)
    }

    def executorBatchDf(spark: SparkSession, batchDF: DataFrame, batchId: Long): Unit = {
      batchDF.persist()
      val icebergHadoopWarehouse = spark.sparkContext.getConf.get("spark.sql.catalog.hadoop_prod.warehouse")

      val selectArray = Array("database", "table", "type", "data")
      val kafkaSourceDF = batchDF.filter(_.get(1) != null)
        .select(json_tuple(batchDF("value"), selectArray: _*))
        .toDF(selectArray: _*)

      println(s"kafkaSourceDF println(batchId:${batchId})")
      kafkaSourceDF.show(false)

      // case one : read table by spark.table("prod.db.table")
      // val icebergTableDF = spark.table(s"hadoop_prod.${schemaName}.${tableName}")
      // case two : read table by spark.read.format("iceberg").load("hdfs://nn:8020/path/to/table")
      val icebergTableDF = spark.read.format("iceberg").load(s"${icebergHadoopWarehouse}/${dbName}/${tbName}")

      println(s"icebergTableDF println(batchId:${batchId})")
      icebergTableDF.show(false)

      val insertDf = kafkaSourceDF
        .filter(kafkaSourceDF("type") === "insert")
        .select(from_json(kafkaSourceDF("data"), icebergTableDF.schema))
        .toDF("struct")
        .selectExpr(icebergTableDF.schema.fieldNames.map(row => "struct." + row): _*)

      val df = insertDf.union(icebergTableDF)

      df.writeTo(s"hadoop_prod.${dbName}.${tbName}")
        .overwrite(lit(true))
      batchDF.unpersist()
    }

4.Kafka Message

{"database": "testNs","table": "doveTb","type": "insert","data": {"id": 1,"name": "dove1"}}
{"database": "testNs","table": "doveTb","type": "insert","data": {"id": 2,"name": "dove2"}}

5.Result
case one : read table by spark.table("prod.db.table").
Result : Read iceberg table is error(table is empty) when the second batch .

kafkaSourceDF println(batchId:1)
+--------+------+------+-----------------------+
|database|table |type  |data                   |
+--------+------+------+-----------------------+
|testNs  |doveTb|insert|{"id":1,"name":"dove1"}|
+--------+------+------+-----------------------+

icebergTableDF println(batchId:1)
+---+----+
|id |name|
+---+----+
+---+----+

kafkaSourceDF println(batchId:2)
+--------+------+------+-----------------------+
|database|table |type  |data                   |
+--------+------+------+-----------------------+
|testNs  |doveTb|insert|{"id":2,"name":"dove2"}|
+--------+------+------+-----------------------+

icebergTableDF println(batchId:2)
+---+----+
|id |name|
+---+----+
+---+----+

case two : read table by spark.read.format("iceberg").load("hdfs://nn:8020/path/to/table").
Result : normal.

kafkaSourceDF println(batchId:1)
+--------+------+------+-----------------------+
|database|table |type  |data                   |
+--------+------+------+-----------------------+
|testNs  |doveTb|insert|{"id":1,"name":"dove1"}|
+--------+------+------+-----------------------+

icebergTableDF println(batchId:1)
+---+----+
|id |name|
+---+----+
+---+----+

kafkaSourceDF println(batchId:2)
+--------+------+------+-----------------------+
|database|table |type  |data                   |
+--------+------+------+-----------------------+
|testNs  |doveTb|insert|{"id":2,"name":"dove2"}|
+--------+------+------+-----------------------+

icebergTableDF println(batchId:2)
+---+-----+
|id |name |
+---+-----+
|1  |dove1|
+---+-----+

6.Question
The phenomenon is spark.table("prod.db.table") is not refreshed iceberg table when the next batch. However, spark.read.format("iceberg").load("hdfs://nn:8020/path/to/table") does the opposite by automatically refreshing.
Is there a difference between spark.table("prod.db.table") and spark.read.format("iceberg").load("hdfs://nn:8020/path/to/table")?

I'm not sure if I'm using it the wrong way.

Link: https://iceberg.apache.org/spark/#querying-with-dataframes

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions