Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;

import org.apache.avro.Schema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -138,4 +140,29 @@ public static HashMap<String, String> getFileIdWithoutSuffixAndRelativePaths(Map
}
return fileIdToPath;
}

/**
* Process previous commits metadata in the timeline to determine the checkpoint given a checkpoint key.
* NOTE: This is very similar in intent to DeltaSync#getLatestCommitMetadataWithValidCheckpointInfo except that
* different deployment models (deltastreamer or spark structured streaming) could have different checkpoint keys.
*
* @param timeline completed commits in active timeline.
* @param checkpointKey the checkpoint key in the extra metadata of the commit.
* @return An optional commit metadata with latest checkpoint.
*/
public static Option<HoodieCommitMetadata> getLatestCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline, String checkpointKey) {
return (Option<HoodieCommitMetadata>) timeline.getReverseOrderedInstants().map(instant -> {
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
if (StringUtils.nonEmpty(commitMetadata.getMetadata(checkpointKey))) {
return Option.of(commitMetadata);
} else {
return Option.empty();
}
} catch (IOException e) {
throw new HoodieIOException("Failed to parse HoodieCommitMetadata for " + instant.toString(), e);
}
}).filter(Option::isPresent).findFirst().orElse(Option.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
package org.apache.hudi

import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.async.{AsyncClusteringService, AsyncCompactService, SparkStreamingAsyncClusteringService, SparkStreamingAsyncCompactService}
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
Expand All @@ -24,12 +28,13 @@ import org.apache.hudi.common.table.marker.MarkerType
import org.apache.hudi.common.table.timeline.HoodieInstant.State
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.{ClusteringUtils, CompactionUtils}
import org.apache.hudi.common.util.ValidationUtils.checkArgument
import org.apache.hudi.common.util.{ClusteringUtils, CommitUtils, CompactionUtils, StringUtils}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieCorruptedDataException
import org.apache.hudi.exception.{HoodieCorruptedDataException, HoodieException, TableNotFoundException}
import org.apache.log4j.LogManager
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.execution.streaming.{Sink, StreamExecution}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}

Expand All @@ -44,16 +49,34 @@ class HoodieStreamingSink(sqlContext: SQLContext,
outputMode: OutputMode)
extends Sink
with Serializable {
@volatile private var latestBatchId = -1L
@volatile private var latestCommittedBatchId = -1L

private val log = LogManager.getLogger(classOf[HoodieStreamingSink])

private val retryCnt = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_CNT.key,
DataSourceWriteOptions.STREAMING_RETRY_CNT.defaultValue).toInt
private val retryIntervalMs = options.getOrDefault(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.key,
DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong
private val ignoreFailedBatch = options.getOrDefault(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key,
DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.defaultValue).toBoolean
private val tablePath = options.get("path")
if (tablePath.isEmpty || tablePath.get == null) {
throw new HoodieException(s"'path' must be specified.")
}
private var metaClient: Option[HoodieTableMetaClient] = {
try {
Some(HoodieTableMetaClient.builder()
.setConf(sqlContext.sparkContext.hadoopConfiguration)
.setBasePath(tablePath.get)
.build())
} catch {
case _: TableNotFoundException =>
log.warn("Ignore TableNotFoundException as it is first microbatch.")
Option.empty
}
}
private val retryCnt = options.getOrDefault(STREAMING_RETRY_CNT.key,
STREAMING_RETRY_CNT.defaultValue).toInt
private val retryIntervalMs = options.getOrDefault(STREAMING_RETRY_INTERVAL_MS.key,
STREAMING_RETRY_INTERVAL_MS.defaultValue).toLong
private val ignoreFailedBatch = options.getOrDefault(STREAMING_IGNORE_FAILED_BATCH.key,
STREAMING_IGNORE_FAILED_BATCH.defaultValue).toBoolean
// This constant serves as the checkpoint key for streaming sink so that each microbatch is processed exactly-once.
private val SINK_CHECKPOINT_KEY = "_hudi_streaming_sink_checkpoint"

private var isAsyncCompactorServiceShutdownAbnormally = false
private var isAsyncClusteringServiceShutdownAbnormally = false
Expand All @@ -65,10 +88,10 @@ class HoodieStreamingSink(sqlContext: SQLContext,
SaveMode.Overwrite
}

private var asyncCompactorService : AsyncCompactService = _
private var asyncCompactorService: AsyncCompactService = _
private var asyncClusteringService: AsyncClusteringService = _
private var writeClient : Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty
private var hoodieTableConfig : Option[HoodieTableConfig] = Option.empty
private var writeClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty
private var hoodieTableConfig: Option[HoodieTableConfig] = Option.empty

override def addBatch(batchId: Long, data: DataFrame): Unit = this.synchronized {
if (isAsyncCompactorServiceShutdownAbnormally) {
Expand All @@ -78,26 +101,50 @@ class HoodieStreamingSink(sqlContext: SQLContext,
log.error("Async clustering service shutdown unexpectedly")
throw new IllegalStateException("Async clustering service shutdown unexpectedly")
}

val queryId = sqlContext.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY)
checkArgument(queryId != null, "queryId is null")
if (metaClient.isDefined && canSkipBatch(batchId, options.getOrDefault(OPERATION.key, UPSERT_OPERATION_OPT_VAL))) {
log.warn(s"Skipping already completed batch $batchId in query $queryId")
return
}

// Override to use direct markers. In Structured streaming, timeline server is closed after
// first micro-batch and subsequent micro-batches do not have timeline server running.
// Thus, we can't use timeline-server-based markers.
var updatedOptions = options.updated(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.DIRECT.name())
// we need auto adjustment enabled for streaming sink since async table services are feasible within the same JVM.
updatedOptions = updatedOptions.updated(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key, "true")
// Add batchId as checkpoint to the extra metadata. To enable same checkpoint metadata structure for multi-writers,
// SINK_CHECKPOINT_KEY holds a map of batchId to writer context (composed of applicationId and queryId), e.g.
// "_hudi_streaming_sink_checkpoint" : "{\"$batchId\":\"${sqlContext.sparkContext.applicationId}-$queryId\"}"
// NOTE: In case of multi-writers, this map should be mutable and sorted by key to facilitate merging of batchIds.
// HUDI-4432 tracks the implementation of checkpoint management for multi-writer.
val checkpointMap = Map(batchId.toString -> s"${sqlContext.sparkContext.applicationId}-$queryId")
updatedOptions = updatedOptions.updated(SINK_CHECKPOINT_KEY, HoodieSinkCheckpoint.toJson(checkpointMap))

retry(retryCnt, retryIntervalMs)(
Try(
HoodieSparkSqlWriter.write(
sqlContext, mode, updatedOptions, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering))
) match {
)
match {
case Success((true, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) =>
log.info(s"Micro batch id=$batchId succeeded"
+ (commitOps.isPresent match {
case true => s" for commit=${commitOps.get()}"
case _ => s" with no new commits"
}))
log.info(s"Current value of latestCommittedBatchId: $latestCommittedBatchId. Setting latestCommittedBatchId to batchId $batchId.")
latestCommittedBatchId = batchId
writeClient = Some(client)
hoodieTableConfig = Some(tableConfig)
if (client != null) {
metaClient = Some(HoodieTableMetaClient.builder()
.setConf(sqlContext.sparkContext.hadoopConfiguration)
.setBasePath(client.getConfig.getBasePath)
.build())
}
if (compactionInstantOps.isPresent) {
asyncCompactorService.enqueuePendingAsyncServiceInstant(
new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionInstantOps.get()))
Expand All @@ -112,7 +159,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
// clean up persist rdds in the write process
data.sparkSession.sparkContext.getPersistentRDDs
.foreach {
case (id, rdd) =>
case (_, rdd) =>
try {
rdd.unpersist()
} catch {
Expand All @@ -122,28 +169,29 @@ class HoodieStreamingSink(sqlContext: SQLContext,
log.error(s"Micro batch id=$batchId threw following exception: ", e)
if (ignoreFailedBatch) {
log.warn(s"Ignore the exception and move on streaming as per " +
s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key} configuration")
s"${STREAMING_IGNORE_FAILED_BATCH.key} configuration")
Success((true, None, None))
} else {
if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...")
Failure(e)
}
case Success((false, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) =>
case Success((false, commitOps, _, _, _, _)) =>
log.error(s"Micro batch id=$batchId ended up with errors"
+ (commitOps.isPresent match {
case true => s" for commit=${commitOps.get()}"
case _ => s""
}))
case true => s" for commit=${commitOps.get()}"
case _ => s""
}))
if (ignoreFailedBatch) {
log.info(s"Ignore the errors and move on streaming as per " +
s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key} configuration")
s"${STREAMING_IGNORE_FAILED_BATCH.key} configuration")
Success((true, None, None))
} else {
if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...")
if (retryCnt > 1) log.warn(s"Retrying the failed micro batch id=$batchId ...")
Failure(new HoodieCorruptedDataException(s"Micro batch id=$batchId ended up with errors"))
}
}
) match {
)
match {
case Failure(e) =>
if (!ignoreFailedBatch) {
log.error(s"Micro batch id=$batchId threw following expections," +
Expand Down Expand Up @@ -198,14 +246,14 @@ class HoodieStreamingSink(sqlContext: SQLContext,
// First time, scan .hoodie folder and get all pending compactions
val metaClient = HoodieTableMetaClient.builder().setConf(sqlContext.sparkContext.hadoopConfiguration)
.setBasePath(client.getConfig.getBasePath).build()
val pendingInstants :java.util.List[HoodieInstant] =
val pendingInstants: java.util.List[HoodieInstant] =
CompactionUtils.getPendingCompactionInstantTimes(metaClient)
pendingInstants.foreach((h : HoodieInstant) => asyncCompactorService.enqueuePendingAsyncServiceInstant(h))
pendingInstants.foreach((h: HoodieInstant) => asyncCompactorService.enqueuePendingAsyncServiceInstant(h))
}
}

protected def triggerAsyncClustering(client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]]): Unit = {
if (null == asyncClusteringService) {
if (null == asyncClusteringService) {
log.info("Triggering async clustering!")
asyncClusteringService = new SparkStreamingAsyncClusteringService(new HoodieSparkEngineContext(new JavaSparkContext(sqlContext.sparkContext)),
client)
Expand All @@ -226,12 +274,12 @@ class HoodieStreamingSink(sqlContext: SQLContext,
// First time, scan .hoodie folder and get all pending clustering instants
val metaClient = HoodieTableMetaClient.builder().setConf(sqlContext.sparkContext.hadoopConfiguration)
.setBasePath(client.getConfig.getBasePath).build()
val pendingInstants :java.util.List[HoodieInstant] = ClusteringUtils.getPendingClusteringInstantTimes(metaClient)
pendingInstants.foreach((h : HoodieInstant) => asyncClusteringService.enqueuePendingAsyncServiceInstant(h))
val pendingInstants: java.util.List[HoodieInstant] = ClusteringUtils.getPendingClusteringInstantTimes(metaClient)
pendingInstants.foreach((h: HoodieInstant) => asyncClusteringService.enqueuePendingAsyncServiceInstant(h))
}
}

private def reset(force: Boolean) : Unit = this.synchronized {
private def reset(force: Boolean): Unit = this.synchronized {
if (asyncCompactorService != null) {
asyncCompactorService.shutdown(force)
asyncCompactorService = null
Expand All @@ -247,4 +295,45 @@ class HoodieStreamingSink(sqlContext: SQLContext,
writeClient = Option.empty
}
}

private def canSkipBatch(incomingBatchId: Long, operationType: String): Boolean = {
if (!DELETE_OPERATION_OPT_VAL.equals(operationType)) {
// get the latest checkpoint from the commit metadata to check if the microbatch has already been prcessed or not
val commitMetadata = CommitUtils.getLatestCommitMetadataWithValidCheckpointInfo(
metaClient.get.getActiveTimeline.getCommitsTimeline, SINK_CHECKPOINT_KEY)
if (commitMetadata.isPresent) {
val lastCheckpoint = commitMetadata.get.getMetadata(SINK_CHECKPOINT_KEY)
if (!StringUtils.isNullOrEmpty(lastCheckpoint)) {
latestCommittedBatchId = HoodieSinkCheckpoint.fromJson(lastCheckpoint).keys.head.toLong
}
}
latestCommittedBatchId >= incomingBatchId
} else {
// In case of DELETE_OPERATION_OPT_VAL the incoming batch id is sentinel value (-1)
false
}
}
}

/**
* SINK_CHECKPOINT_KEY holds a map of batchId to writer context (composed of applicationId and queryId).
* This is a util object to serialize/deserialize map to/from json.
*/
object HoodieSinkCheckpoint {

lazy val mapper: ObjectMapper = {
val _mapper = new ObjectMapper
_mapper.setSerializationInclusion(Include.NON_ABSENT)
_mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
_mapper.registerModule(DefaultScalaModule)
_mapper
}

def toJson(checkpoint: Map[String, String]): String = {
mapper.writeValueAsString(checkpoint)
}

def fromJson(json: String): Map[String, String] = {
mapper.readValue(json, classOf[Map[String, String]])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,9 @@ public int addInputAndValidateIngestion(SparkSession spark, FileSystem fs, Strin
}

if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
numExpCommits += 1;
if (inputDF2 != null) {
numExpCommits += 1;
}
// Wait for compaction to also finish and track latest timestamp as commit timestamp
waitTillNCommits(fs, numExpCommits, 180, 3);
commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {
success = true
}
} catch {
case te: TableNotFoundException =>
case _: TableNotFoundException =>
log.info("Got table not found exception. Retrying")
} finally {
if (!success) {
Expand Down Expand Up @@ -312,7 +312,7 @@ class TestStructuredStreaming extends HoodieClientTestBase {

@throws[InterruptedException]
private def waitTillHasCompletedReplaceInstant(tablePath: String,
timeoutSecs: Int, sleepSecsAfterEachRun: Int) = {
timeoutSecs: Int, sleepSecsAfterEachRun: Int) = {
val beginTime = System.currentTimeMillis
var currTime = beginTime
val timeoutMsecs = timeoutSecs * 1000
Expand Down
2 changes: 1 addition & 1 deletion style/scalastyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
</check>
<check level="error" class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker" enabled="true"/>
<check level="error" class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker" enabled="true"/>
<check level="error" class="org.scalastyle.scalariform.ReturnChecker" enabled="true"/>
<check level="warning" class="org.scalastyle.scalariform.ReturnChecker" enabled="true"/>
<check level="error" class="org.scalastyle.scalariform.NullChecker" enabled="false"/>
<check level="error" class="org.scalastyle.scalariform.NoCloneChecker" enabled="true"/>
<check level="error" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"/>
Expand Down