Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT]Hoodie clean operation result explanation #2304

Closed
bithw1 opened this issue Dec 8, 2020 · 2 comments
Closed

[SUPPORT]Hoodie clean operation result explanation #2304

bithw1 opened this issue Dec 8, 2020 · 2 comments

Comments

@bithw1
Copy link

bithw1 commented Dec 8, 2020

Hi,
I have following simple code that do upsert 100 times(The code is at the end of the question description), and I disable the auto clean during writes. When the writes is done, there are about 100 parquets in the hoodie table folder.

After I run cleans run in the hudi-cli, there are only 11 parquets file left in the hoodie table folder, that means, old version files has been cleaned up after clean.

Then I run the `select count(1) from hudi_hive_read_write_cow_disable_cleaner_1' and the result is still 101, looks like no data is lost after clean.I would like to make sure whether there is no data lost after cleaning old version files. Data Loss, In my case, I mean have written 101 letters since the table is created and still get back 101 letters after clean

Another obervation is:

When I run select _hoodie_file_name from hudi_hive_read_write_cow_disable_cleaner_1; the result shows the parquest file name, but some of them have been deleted after cleaning, not sure whether it is a bug that it shows the file name that is deleted(I think it should show the file name where the data resides), and also this leads to another question, since the corresponding file has been deleted, where does the data in hive query result come from

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig}
import org.apache.hudi.index.HoodieIndex
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object COWDisableCleanerTest {
  val spark = SparkSession.builder.appName("COWDisableCleanerTest")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.sql.warehouse.dir", "hdfs:///user/hive/warehouse")
    .enableHiveSupport().getOrCreate()

  val hudi_table = "hudi_hive_read_write_cow_disable_cleaner_1"

  val base_path = s"/data/hudi_demo/$hudi_table"

  def run(df: DataFrame, round: Int) = {
    val saveMode = round match {
      case 0 => SaveMode.Overwrite
      case _ => SaveMode.Append
    }

    df.write.format("hudi")
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name")
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "creation_date")
      .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "xyz")
      .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, hudi_table)
      .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
      .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
      .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://10.41.90.208:10000")
      .option(HoodieWriteConfig.TABLE_NAME, hudi_table)
      .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
      .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
      .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt")
      .option(HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP, "false")

      //在100次commit过程中不做clean
      .option(HoodieCompactionConfig.AUTO_CLEAN_PROP, "false")
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      .mode(saveMode)
      .save(base_path);
  }


  def main(args: Array[String]): Unit = {
    import spark.implicits._
    val order = MyOrder("Key-0", "Price-0", "2020-11-18 14:43:00", "2020-11-19")

    //Create table and insert 1 row
    run(spark.createDataset(Seq(order)).toDF(), 1)

    //Run 100 times and insert 100 rows ,one row per commit
    (1 to 100).foreach {
      i =>
        val order = MyOrder("Key-" + i, "Price-" + i, "2020-11-18 14:43:" + i, "2020-11-19")
        val insertData = spark.createDataset(Seq(order)).toDF()
        run(insertData, i)
    }

  }
}
@bvaradar
Copy link
Contributor

bvaradar commented Dec 8, 2020

Yeah, cleans are supposed to only delete old versions of the files safely. No data loss is expected as you have noticed.

The observation regarding hudi_file_name is expected. We usually use it to see the file-id and not the physical file name. For records that are untouched, the row retains the last file name where the record got changed.

@bithw1
Copy link
Author

bithw1 commented Dec 8, 2020

Thanks @bvaradar for the great answer. I am closing this.

@bithw1 bithw1 closed this as completed Dec 8, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants