Skip to content

Commit

Permalink
[HUDI-415] Get commit time when Spark start (#1113)
Browse files Browse the repository at this point in the history
  • Loading branch information
garyli1019 authored and vinothchandar committed Dec 20, 2019
1 parent 14881e9 commit 36b3b6f
Showing 1 changed file with 4 additions and 3 deletions.
Expand Up @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.util.{FSUtils, TypedProperties}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
Expand Down Expand Up @@ -74,11 +75,11 @@ private[hudi] object HoodieSparkSqlWriter {
}

var writeSuccessful: Boolean = false
var commitTime: String = null
var writeStatuses: JavaRDD[WriteStatus] = null

val jsc = new JavaSparkContext(sparkContext)
val basePath = new Path(parameters("path"))
val commitTime = HoodieActiveTimeline.createNewInstantTime();
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
var exists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))

Expand Down Expand Up @@ -145,7 +146,7 @@ private[hudi] object HoodieSparkSqlWriter {
log.info("new batch has no new records, skipping...")
return (true, common.util.Option.empty())
}
commitTime = client.startCommit()
client.startCommitWithTime(commitTime)
writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation)
// Check for errors and commit the write.
val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
Expand Down Expand Up @@ -223,7 +224,7 @@ private[hudi] object HoodieSparkSqlWriter {
)

// Issue deletes
commitTime = client.startCommit()
client.startCommitWithTime(commitTime)
writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, commitTime)
val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
writeSuccessful =
Expand Down

0 comments on commit 36b3b6f

Please sign in to comment.