You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
import org.apache.hudi.index.HoodieIndex
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
case class MyOrder(
name: String,
price: String,
creation_date: String,
dt: String)
object COWCleanerTest {
val spark = SparkSession.builder.appName("COWCleanerTest")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.warehouse.dir", "hdfs:///user/hive/warehouse")
.enableHiveSupport().getOrCreate()
val hudi_table = "hudi_hive_read_write_cow_cleaner_1"
val base_path = s"/data/hudi_demo/$hudi_table"
def run(df: DataFrame, round: Int) = {
val saveMode = round match {
case 0 => SaveMode.Overwrite
case _ => SaveMode.Append
}
df.write.format("hudi")
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "creation_date")
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "xyz")
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, hudi_table)
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
.option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
.option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://10.41.90.208:10000")
.option(HoodieWriteConfig.TABLE_NAME, hudi_table)
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
.option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt")
.option(HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP, "false")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
.mode(saveMode)
.save(base_path);
}
def main(args: Array[String]): Unit = {
import spark.implicits._
val order = MyOrder("Key-0", "Price-0", "2020-11-18 14:43:00", "2020-11-19")
//Create table and insert 1 row
run(spark.createDataset(Seq(order)).toDF(), 0)
//Run 100 times and insert 100 rows ,one row per commit
(1 to 100).foreach {
i =>
val order = MyOrder("Key-" + i, "Price-" + i, "2020-11-18 14:43:" + i, "2020-11-19")
val insertData = spark.createDataset(Seq(order)).toDF()
run(insertData, i)
}
}
}
The text was updated successfully, but these errors were encountered:
bithw1
changed the title
[SUPPORT]How to see the effect of hudi cleaner
[SUPPORT]How to disable clean during write and run it with hudi-cli command
Dec 5, 2020
Hi,
I have following code snippet and create 101 commits during writes, when I look at the hdfs .hoodie folder, the files are as follows:
Looks the clean is done during the 101 writes, I would ask whether I could disable clean and run it with hudi-cli command
The text was updated successfully, but these errors were encountered: