From b18d836624f6925453cbd77200801d8d0c299269 Mon Sep 17 00:00:00 2001 From: AmatyaAvadhanula Date: Tue, 30 Apr 2024 15:09:17 +0530 Subject: [PATCH] [Backport] Check for handoff of upgraded segments (#16162) (#16344) Changes: 1) Check for handoff of upgraded realtime segments. 2) Drop sink only when all associated realtime segments have been abandoned. 3) Delete pending segments upon commit to prevent unnecessary upgrades and partition space exhaustion when a concurrent replace happens. This also prevents potential data duplication. 4) Register pending segment upgrade only on those tasks to which the segment is associated. Co-authored-by: Kashif Faraz --- .../MaterializedViewSupervisor.java | 6 - .../MaterializedViewSupervisorSpecTest.java | 2 - .../druid/msq/indexing/MSQWorkerTask.java | 10 +- .../druid/msq/indexing/MSQWorkerTaskTest.java | 8 - .../SegmentTransactionalReplaceAction.java | 51 +---- .../batch/parallel/SinglePhaseSubTask.java | 9 +- .../druid/indexing/overlord/TaskLockbox.java | 2 +- .../supervisor/SupervisorManager.java | 21 +- .../PendingSegmentVersions.java | 56 ------ .../SeekableStreamIndexTaskClient.java | 11 +- ...eekableStreamIndexTaskClientAsyncImpl.java | 7 +- .../SeekableStreamIndexTaskRunner.java | 14 +- .../supervisor/SeekableStreamSupervisor.java | 57 ++---- .../task/concurrent/ActionsTestTask.java | 6 +- ...ncurrentReplaceAndStreamingAppendTest.java | 63 ++---- .../indexing/overlord/TaskLockboxTest.java | 4 +- .../supervisor/SupervisorManagerTest.java | 51 +++++ .../SeekableStreamSupervisorStateTest.java | 64 +++++- ...TestIndexerMetadataStorageCoordinator.java | 6 +- .../IndexerMetadataStorageCoordinator.java | 6 +- .../overlord/SegmentPublishResult.java | 31 +++ .../supervisor/NoopSupervisorSpec.java | 7 - .../overlord/supervisor/Supervisor.java | 6 - .../IndexerSQLMetadataStorageCoordinator.java | 78 ++++++-- .../druid/metadata/PendingSegmentRecord.java | 18 +- .../metadata/SqlSegmentsMetadataQuery.java | 2 +- .../appenderator/BaseAppenderatorDriver.java | 21 +- .../SegmentsAndCommitMetadata.java | 53 ++++- .../appenderator/SinkQuerySegmentWalker.java | 6 +- .../appenderator/StreamAppenderator.java | 189 +++++++++++------- .../StreamAppenderatorDriver.java | 25 ++- .../StreamAppenderatorDriverFailTest.java | 8 +- .../StreamAppenderatorDriverTest.java | 75 +++++++ 33 files changed, 581 insertions(+), 392 deletions(-) delete mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/PendingSegmentVersions.java diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index 7e0eaf60d836..9da665adde46 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -298,12 +298,6 @@ public LagStats computeLagStats() throw new UnsupportedOperationException("Compute Lag Stats not supported in MaterializedViewSupervisor"); } - @Override - public Set getActiveRealtimeSequencePrefixes() - { - throw new UnsupportedOperationException(); - } - @Override public int getActiveTaskGroupsCount() { diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java index 365fb1751eac..14bd59871253 100644 --- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java +++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java @@ -207,8 +207,6 @@ public void testMaterializedViewSupervisorSpecCreated() Assert.assertThrows(UnsupportedOperationException.class, () -> supervisor.getActiveTaskGroupsCount()); - Assert.assertThrows(UnsupportedOperationException.class, () -> supervisor.getActiveRealtimeSequencePrefixes()); - Callable noop = new Callable() { @Override public Integer call() diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java index c04948402079..b4d18ea390e9 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java @@ -32,7 +32,6 @@ import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.AbstractTask; -import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.msq.exec.MSQTasks; import org.apache.druid.msq.exec.Worker; @@ -46,7 +45,7 @@ import java.util.Set; @JsonTypeName(MSQWorkerTask.TYPE) -public class MSQWorkerTask extends AbstractTask implements PendingSegmentAllocatingTask +public class MSQWorkerTask extends AbstractTask { public static final String TYPE = "query_worker"; @@ -126,13 +125,6 @@ public Set getInputSourceResources() return ImmutableSet.of(); } - @Override - public String getTaskAllocatorId() - { - return getControllerTaskId(); - } - - @Override public boolean isReady(final TaskActionClient taskActionClient) { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java index 482d67d81abe..6eff77184ea7 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java @@ -108,12 +108,4 @@ public void testGetInputSourceResources() MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); Assert.assertTrue(msqWorkerTask.getInputSourceResources().isEmpty()); } - - @Test - public void testGetTaskAllocatorId() - { - MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); - Assert.assertEquals(controllerTaskId, msqWorkerTask.getTaskAllocatorId()); - } - } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java index f2b080cff6ef..df188ac81533 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java @@ -34,13 +34,10 @@ import org.apache.druid.metadata.ReplaceTaskLock; import org.apache.druid.segment.SegmentSchemaMapping; import org.apache.druid.segment.SegmentUtils; -import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; import javax.annotation.Nullable; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; +import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -155,7 +152,7 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) // failure to upgrade pending segments does not affect success of the commit if (publishResult.isSuccess() && toolbox.getSupervisorManager() != null) { try { - registerUpgradedPendingSegmentsOnSupervisor(task, toolbox); + registerUpgradedPendingSegmentsOnSupervisor(task, toolbox, publishResult.getUpgradedPendingSegments()); } catch (Exception e) { log.error(e, "Error while upgrading pending segments for task[%s]", task.getId()); @@ -168,7 +165,11 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) /** * Registers upgraded pending segments on the active supervisor, if any */ - private void registerUpgradedPendingSegmentsOnSupervisor(Task task, TaskActionToolbox toolbox) + private void registerUpgradedPendingSegmentsOnSupervisor( + Task task, + TaskActionToolbox toolbox, + List upgradedPendingSegments + ) { final SupervisorManager supervisorManager = toolbox.getSupervisorManager(); final Optional activeSupervisorIdWithAppendLock = @@ -178,42 +179,10 @@ private void registerUpgradedPendingSegmentsOnSupervisor(Task task, TaskActionTo return; } - final Set replaceLocksForTask = toolbox - .getTaskLockbox() - .getAllReplaceLocksForDatasource(task.getDataSource()) - .stream() - .filter(lock -> task.getId().equals(lock.getSupervisorTaskId())) - .collect(Collectors.toSet()); - - - Set pendingSegments = new HashSet<>(); - for (ReplaceTaskLock replaceLock : replaceLocksForTask) { - pendingSegments.addAll( - toolbox.getIndexerMetadataStorageCoordinator() - .getPendingSegments(task.getDataSource(), replaceLock.getInterval()) - ); - } - Map idToPendingSegment = new HashMap<>(); - pendingSegments.forEach(pendingSegment -> idToPendingSegment.put( - pendingSegment.getId().asSegmentId().toString(), - pendingSegment.getId() - )); - Map segmentToParent = new HashMap<>(); - pendingSegments.forEach(pendingSegment -> { - if (pendingSegment.getUpgradedFromSegmentId() != null - && !pendingSegment.getUpgradedFromSegmentId().equals(pendingSegment.getId().asSegmentId().toString())) { - segmentToParent.put( - pendingSegment.getId(), - idToPendingSegment.get(pendingSegment.getUpgradedFromSegmentId()) - ); - } - }); - - segmentToParent.forEach( - (newId, oldId) -> supervisorManager.registerNewVersionOfPendingSegmentOnSupervisor( + upgradedPendingSegments.forEach( + upgradedPendingSegment -> supervisorManager.registerUpgradedPendingSegmentOnSupervisor( activeSupervisorIdWithAppendLock.get(), - oldId, - newId + upgradedPendingSegment ) ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index 0a1f00f90251..b8027fcc5ea1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -43,7 +43,6 @@ import org.apache.druid.indexing.common.task.BatchAppenderators; import org.apache.druid.indexing.common.task.IndexTask; import org.apache.druid.indexing.common.task.IndexTaskUtils; -import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch; import org.apache.druid.indexing.common.task.SegmentAllocators; import org.apache.druid.indexing.common.task.TaskResource; @@ -109,7 +108,7 @@ * generates and pushes segments, and reports them to the {@link SinglePhaseParallelIndexTaskRunner} instead of * publishing on its own. */ -public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHandler, PendingSegmentAllocatingTask +public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHandler { public static final String TYPE = "single_phase_sub_task"; public static final String OLD_TYPE_NAME = "index_sub"; @@ -240,12 +239,6 @@ public String getSubtaskSpecId() return subtaskSpecId; } - @Override - public String getTaskAllocatorId() - { - return getGroupId(); - } - @Override public TaskStatus runTask(final TaskToolbox toolbox) throws Exception { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 7248fcab865e..5d71940d4704 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -1242,7 +1242,7 @@ public void remove(final Task task) idsInSameGroup.remove(task.getId()); if (idsInSameGroup.isEmpty()) { final int pendingSegmentsDeleted - = metadataStorageCoordinator.deletePendingSegmentsForTaskGroup(taskAllocatorId); + = metadataStorageCoordinator.deletePendingSegmentsForTaskAllocatorId(taskAllocatorId); log.info( "Deleted [%d] entries from pendingSegments table for pending segments group [%s] with APPEND locks.", pendingSegmentsDeleted, taskAllocatorId diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index dd57b560660c..288b2a141564 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -33,9 +33,9 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.MetadataSupervisorManager; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.query.QueryContexts; import org.apache.druid.segment.incremental.ParseExceptionReport; -import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import javax.annotation.Nullable; @@ -308,16 +308,19 @@ public boolean checkPointDataSourceMetadata( * allows the supervisor to include the pending segment in queries fired against * that segment version. */ - public boolean registerNewVersionOfPendingSegmentOnSupervisor( + public boolean registerUpgradedPendingSegmentOnSupervisor( String supervisorId, - SegmentIdWithShardSpec basePendingSegment, - SegmentIdWithShardSpec newSegmentVersion + PendingSegmentRecord upgradedPendingSegment ) { try { Preconditions.checkNotNull(supervisorId, "supervisorId cannot be null"); - Preconditions.checkNotNull(basePendingSegment, "rootPendingSegment cannot be null"); - Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion cannot be null"); + Preconditions.checkNotNull(upgradedPendingSegment, "upgraded pending segment cannot be null"); + Preconditions.checkNotNull(upgradedPendingSegment.getTaskAllocatorId(), "taskAllocatorId cannot be null"); + Preconditions.checkNotNull( + upgradedPendingSegment.getUpgradedFromSegmentId(), + "upgradedFromSegmentId cannot be null" + ); Pair supervisor = supervisors.get(supervisorId); Preconditions.checkNotNull(supervisor, "supervisor could not be found"); @@ -326,12 +329,12 @@ public boolean registerNewVersionOfPendingSegmentOnSupervisor( } SeekableStreamSupervisor seekableStreamSupervisor = (SeekableStreamSupervisor) supervisor.lhs; - seekableStreamSupervisor.registerNewVersionOfPendingSegment(basePendingSegment, newSegmentVersion); + seekableStreamSupervisor.registerNewVersionOfPendingSegment(upgradedPendingSegment); return true; } catch (Exception e) { - log.error(e, "PendingSegmentRecord[%s] mapping update request to version[%s] on Supervisor[%s] failed", - basePendingSegment.asSegmentId(), newSegmentVersion.getVersion(), supervisorId); + log.error(e, "Failed to upgrade pending segment[%s] to new pending segment[%s] on Supervisor[%s].", + upgradedPendingSegment.getUpgradedFromSegmentId(), upgradedPendingSegment.getId().getVersion(), supervisorId); } return false; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/PendingSegmentVersions.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/PendingSegmentVersions.java deleted file mode 100644 index 146b0afc4b9d..000000000000 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/PendingSegmentVersions.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.seekablestream; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; - -/** - * Contains a new version of an existing base pending segment. Used by realtime - * tasks to serve queries against multiple versions of the same pending segment. - */ -public class PendingSegmentVersions -{ - private final SegmentIdWithShardSpec baseSegment; - private final SegmentIdWithShardSpec newVersion; - - @JsonCreator - public PendingSegmentVersions( - @JsonProperty("baseSegment") SegmentIdWithShardSpec baseSegment, - @JsonProperty("newVersion") SegmentIdWithShardSpec newVersion - ) - { - this.baseSegment = baseSegment; - this.newVersion = newVersion; - } - - @JsonProperty - public SegmentIdWithShardSpec getBaseSegment() - { - return baseSegment; - } - - @JsonProperty - public SegmentIdWithShardSpec getNewVersion() - { - return newVersion; - } -} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java index 5e5924249608..7fd282e44ce2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java @@ -21,8 +21,8 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.segment.incremental.ParseExceptionReport; -import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.joda.time.DateTime; import java.util.List; @@ -158,15 +158,14 @@ ListenableFuture setEndOffsetsAsync( * Update the task state to redirect queries for later versions to the root pending segment. * The task also announces that it is serving the segments belonging to the subsequent versions. * The update is processed only if the task is serving the original pending segment. - * @param taskId - task id - * @param basePendingSegment - the pending segment that was originally allocated - * @param newVersionOfSegment - the ids belonging to the versions to which the root segment needs to be updated + * + * @param taskId - task id + * @param pendingSegmentRecord - the ids belonging to the versions to which the root segment needs to be updated * @return true if the update succeeds */ ListenableFuture registerNewVersionOfPendingSegmentAsync( String taskId, - SegmentIdWithShardSpec basePendingSegment, - SegmentIdWithShardSpec newVersionOfSegment + PendingSegmentRecord pendingSegmentRecord ); Class getPartitionType(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java index 40d475909e68..5de1cb50a971 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java @@ -43,6 +43,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler; import org.apache.druid.java.util.http.client.response.HttpResponseHandler; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.rpc.HttpResponseException; import org.apache.druid.rpc.IgnoreHttpResponseHandler; import org.apache.druid.rpc.RequestBuilder; @@ -57,7 +58,6 @@ import org.apache.druid.rpc.StandardRetryPolicy; import org.apache.druid.rpc.indexing.SpecificTaskRetryPolicy; import org.apache.druid.segment.incremental.ParseExceptionReport; -import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; @@ -197,13 +197,12 @@ public ListenableFuture> getEndOffsetsA @Override public ListenableFuture registerNewVersionOfPendingSegmentAsync( String taskId, - SegmentIdWithShardSpec basePendingSegment, - SegmentIdWithShardSpec newVersionOfSegment + PendingSegmentRecord pendingSegmentRecord ) { final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.POST, "/pendingSegmentVersion") - .jsonContent(jsonMapper, new PendingSegmentVersions(basePendingSegment, newVersionOfSegment)); + .jsonContent(jsonMapper, pendingSegmentRecord); return makeRequest(taskId, requestBuilder) .handler(IgnoreHttpResponseHandler.INSTANCE) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 96e1dd401459..94ce367fc847 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -78,6 +78,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.ParseExceptionReport; import org.apache.druid.segment.incremental.RowIngestionMeters; @@ -1575,18 +1576,15 @@ public Response setEndOffsetsHTTP( @Path("/pendingSegmentVersion") @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) - public Response registerNewVersionOfPendingSegment( - PendingSegmentVersions pendingSegmentVersions, + public Response registerUpgradedPendingSegment( + PendingSegmentRecord upgradedPendingSegment, // this field is only for internal purposes, shouldn't be usually set by users @Context final HttpServletRequest req ) { authorizationCheck(req, Action.WRITE); try { - ((StreamAppenderator) appenderator).registerNewVersionOfPendingSegment( - pendingSegmentVersions.getBaseSegment(), - pendingSegmentVersions.getNewVersion() - ); + ((StreamAppenderator) appenderator).registerUpgradedPendingSegment(upgradedPendingSegment); return Response.ok().build(); } catch (DruidException e) { @@ -1598,8 +1596,8 @@ public Response registerNewVersionOfPendingSegment( catch (Exception e) { log.error( e, - "Could not register new version[%s] of pending segment[%s]", - pendingSegmentVersions.getNewVersion(), pendingSegmentVersions.getBaseSegment() + "Could not register pending segment[%s] upgraded from[%s]", + upgradedPendingSegment.getId().asSegmentId(), upgradedPendingSegment.getUpgradedFromSegmentId() ); return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index f15a975694fd..58a433325a3f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -89,12 +89,12 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.MetadataSupervisorManager; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.ordering.StringComparators; import org.apache.druid.segment.incremental.ParseExceptionReport; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; -import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.joda.time.DateTime; import javax.annotation.Nonnull; @@ -178,7 +178,8 @@ public abstract class SeekableStreamSupervisor taskIds() return tasks.keySet(); } + @VisibleForTesting + public String getBaseSequenceName() + { + return baseSequenceName; + } } private class TaskData @@ -1096,42 +1102,23 @@ public void resetOffsets(@Nonnull DataSourceMetadata resetDataSourceMetadata) addNotice(new ResetOffsetsNotice(resetDataSourceMetadata)); } - /** - * The base sequence name of a seekable stream task group is used as a prefix of the sequence names - * of pending segments published by it. - * This method can be used to identify the active pending segments for a datasource - * by checking if the sequence name begins with any of the active realtime sequence prefix returned by this method - * @return the set of base sequence names of both active and pending completion task gruops. - */ - @Override - public Set getActiveRealtimeSequencePrefixes() - { - final Set activeBaseSequences = new HashSet<>(); - for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) { - activeBaseSequences.add(taskGroup.baseSequenceName); - } - for (List taskGroupList : pendingCompletionTaskGroups.values()) { - for (TaskGroup taskGroup : taskGroupList) { - activeBaseSequences.add(taskGroup.baseSequenceName); - } - } - return activeBaseSequences; - } - public void registerNewVersionOfPendingSegment( - SegmentIdWithShardSpec basePendingSegment, - SegmentIdWithShardSpec newSegmentVersion + PendingSegmentRecord pendingSegmentRecord ) { for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) { - for (String taskId : taskGroup.taskIds()) { - taskClient.registerNewVersionOfPendingSegmentAsync(taskId, basePendingSegment, newSegmentVersion); + if (taskGroup.baseSequenceName.equals(pendingSegmentRecord.getTaskAllocatorId())) { + for (String taskId : taskGroup.taskIds()) { + taskClient.registerNewVersionOfPendingSegmentAsync(taskId, pendingSegmentRecord); + } } } for (List taskGroupList : pendingCompletionTaskGroups.values()) { for (TaskGroup taskGroup : taskGroupList) { - for (String taskId : taskGroup.taskIds()) { - taskClient.registerNewVersionOfPendingSegmentAsync(taskId, basePendingSegment, newSegmentVersion); + if (taskGroup.baseSequenceName.equals(pendingSegmentRecord.getTaskAllocatorId())) { + for (String taskId : taskGroup.taskIds()) { + taskClient.registerNewVersionOfPendingSegmentAsync(taskId, pendingSegmentRecord); + } } } } @@ -1548,7 +1535,7 @@ private List getCurrentParseErrors() } @VisibleForTesting - public void addTaskGroupToActivelyReadingTaskGroup( + public TaskGroup addTaskGroupToActivelyReadingTaskGroup( int taskGroupId, ImmutableMap partitionOffsets, Optional minMsgTime, @@ -1572,10 +1559,11 @@ public void addTaskGroupToActivelyReadingTaskGroup( taskGroupId ); } + return group; } @VisibleForTesting - public void addTaskGroupToPendingCompletionTaskGroup( + public TaskGroup addTaskGroupToPendingCompletionTaskGroup( int taskGroupId, ImmutableMap partitionOffsets, Optional minMsgTime, @@ -1595,6 +1583,7 @@ public void addTaskGroupToPendingCompletionTaskGroup( group.tasks.putAll(tasks.stream().collect(Collectors.toMap(x -> x, x -> new TaskData()))); pendingCompletionTaskGroups.computeIfAbsent(taskGroupId, x -> new CopyOnWriteArrayList<>()) .add(group); + return group; } @VisibleForTesting @@ -3202,9 +3191,7 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException // If we received invalid endOffset values, we clear the known offset to refetch the last committed offset // from metadata. If any endOffset values are invalid, we treat the entire set as invalid as a safety measure. if (!endOffsetsAreInvalid) { - for (Entry entry : endOffsets.entrySet()) { - partitionOffsets.put(entry.getKey(), entry.getValue()); - } + partitionOffsets.putAll(endOffsets); } else { for (Entry entry : endOffsets.entrySet()) { partitionOffsets.put(entry.getKey(), getNotSetMarker()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java index b80641fe94bf..62b5e48e00b4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java @@ -52,7 +52,7 @@ public class ActionsTestTask extends CommandQueueTask { private final TaskActionClient client; private final AtomicInteger sequenceId = new AtomicInteger(0); - private final Map announcedSegmentsToParentSegments = new HashMap<>(); + private final Map announcedSegmentsToParentSegments = new HashMap<>(); public ActionsTestTask(String datasource, String groupId, TaskActionClientFactory factory) { @@ -82,7 +82,7 @@ public SegmentPublishResult commitReplaceSegments(DataSegment... segments) ); } - public Map getAnnouncedSegmentsToParentSegments() + public Map getAnnouncedSegmentsToParentSegments() { return announcedSegmentsToParentSegments; } @@ -114,7 +114,7 @@ public SegmentIdWithShardSpec allocateSegmentForTimestamp(DateTime timestamp, Gr TaskLockType.APPEND ) ); - announcedSegmentsToParentSegments.put(pendingSegment.asSegmentId(), pendingSegment.asSegmentId()); + announcedSegmentsToParentSegments.put(pendingSegment.asSegmentId(), pendingSegment.asSegmentId().toString()); return pendingSegment; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java index 50c318683e8f..7da5a3d19fe8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java @@ -55,6 +55,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; @@ -74,7 +75,6 @@ import org.junit.Before; import org.junit.Test; -import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -83,7 +83,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -122,10 +121,9 @@ public class ConcurrentReplaceAndStreamingAppendTest extends IngestionTestBase private final AtomicInteger groupId = new AtomicInteger(0); private final SupervisorManager supervisorManager = EasyMock.mock(SupervisorManager.class); private Capture supervisorId; - private Capture oldPendingSegment; - private Capture newPendingSegment; + private Capture pendingSegment; private Map>> versionToIntervalToLoadSpecs; - private Map parentSegmentToLoadSpec; + private Map parentSegmentToLoadSpec; @Override @Before @@ -169,12 +167,10 @@ public void setUpIngestionTestBase() throws IOException groupId.set(0); appendTask = createAndStartTask(); supervisorId = Capture.newInstance(CaptureType.ALL); - oldPendingSegment = Capture.newInstance(CaptureType.ALL); - newPendingSegment = Capture.newInstance(CaptureType.ALL); - EasyMock.expect(supervisorManager.registerNewVersionOfPendingSegmentOnSupervisor( + pendingSegment = Capture.newInstance(CaptureType.ALL); + EasyMock.expect(supervisorManager.registerUpgradedPendingSegmentOnSupervisor( EasyMock.capture(supervisorId), - EasyMock.capture(oldPendingSegment), - EasyMock.capture(newPendingSegment) + EasyMock.capture(pendingSegment) )).andReturn(true).anyTimes(); replaceTask = createAndStartTask(); EasyMock.replay(supervisorManager); @@ -682,20 +678,6 @@ public void testLockAllocateDayReplaceMonthAllocateAppend() verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11, segmentV12); } - - @Nullable - private DataSegment findSegmentWith(String version, Map loadSpec, Set segments) - { - for (DataSegment segment : segments) { - if (version.equals(segment.getVersion()) - && Objects.equals(segment.getLoadSpec(), loadSpec)) { - return segment; - } - } - - return null; - } - private static DataSegment asSegment(SegmentIdWithShardSpec pendingSegment) { final SegmentId id = pendingSegment.asSegmentId(); @@ -739,23 +721,6 @@ private void verifySegments(Interval interval, Segments visibility, DataSegment. } } - private void verifyInputSegments(Task task, Interval interval, DataSegment... expectedSegments) - { - try { - final TaskActionClient taskActionClient = taskActionClientFactory.create(task); - Collection allUsedSegments = taskActionClient.submit( - new RetrieveUsedSegmentsAction( - WIKI, - Collections.singletonList(interval) - ) - ); - Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments)); - } - catch (IOException e) { - throw new ISE(e, "Error while fetching segments to replace in interval[%s]", interval); - } - } - private TaskToolboxFactory createToolboxFactory( TaskConfig taskConfig, TaskActionClientFactory taskActionClientFactory @@ -799,11 +764,10 @@ private void commitReplaceSegments(DataSegment... dataSegments) { replaceTask.commitReplaceSegments(dataSegments); for (int i = 0; i < supervisorId.getValues().size(); i++) { - announceUpgradedPendingSegment(oldPendingSegment.getValues().get(i), newPendingSegment.getValues().get(i)); + announceUpgradedPendingSegment(pendingSegment.getValues().get(i)); } supervisorId.reset(); - oldPendingSegment.reset(); - newPendingSegment.reset(); + pendingSegment.reset(); replaceTask.finishRunAndGetStatus(); } @@ -812,19 +776,16 @@ private SegmentPublishResult commitAppendSegments(DataSegment... dataSegments) SegmentPublishResult result = appendTask.commitAppendSegments(dataSegments); result.getSegments().forEach(this::unannounceUpgradedPendingSegment); for (DataSegment segment : dataSegments) { - parentSegmentToLoadSpec.put(segment.getId(), Iterables.getOnlyElement(segment.getLoadSpec().values())); + parentSegmentToLoadSpec.put(segment.getId().toString(), Iterables.getOnlyElement(segment.getLoadSpec().values())); } appendTask.finishRunAndGetStatus(); return result; } - private void announceUpgradedPendingSegment( - SegmentIdWithShardSpec oldPendingSegment, - SegmentIdWithShardSpec newPendingSegment - ) + private void announceUpgradedPendingSegment(PendingSegmentRecord pendingSegment) { appendTask.getAnnouncedSegmentsToParentSegments() - .put(newPendingSegment.asSegmentId(), oldPendingSegment.asSegmentId()); + .put(pendingSegment.getId().asSegmentId(), pendingSegment.getUpgradedFromSegmentId()); } private void unannounceUpgradedPendingSegment( @@ -849,7 +810,7 @@ private void verifyVersionIntervalLoadSpecUniqueness() loadSpecs.add(loadSpec); } - for (Map.Entry entry : appendTask.getAnnouncedSegmentsToParentSegments().entrySet()) { + for (Map.Entry entry : appendTask.getAnnouncedSegmentsToParentSegments().entrySet()) { final String version = entry.getKey().getVersion(); final Interval interval = entry.getKey().getInterval(); final Object loadSpec = parentSegmentToLoadSpec.get(entry.getValue()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index 7c16e2efc240..3af74235e823 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -1948,8 +1948,8 @@ public void testCleanupOnUnlock() // Only the replaceTask should attempt a delete on the upgradeSegments table EasyMock.expect(coordinator.deleteUpgradeSegmentsForTask(replaceTask.getId())).andReturn(0).once(); // Any task may attempt pending segment clean up - EasyMock.expect(coordinator.deletePendingSegmentsForTaskGroup(replaceTask.getId())).andReturn(0).once(); - EasyMock.expect(coordinator.deletePendingSegmentsForTaskGroup(appendTask.getId())).andReturn(0).once(); + EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(replaceTask.getId())).andReturn(0).once(); + EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(appendTask.getId())).andReturn(0).once(); EasyMock.replay(coordinator); final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, coordinator); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index 5ffbd4b94608..4a9fccd4663b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -31,7 +31,11 @@ import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.metadata.MetadataSupervisorManager; +import org.apache.druid.metadata.PendingSegmentRecord; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.timeline.partition.NumberedShardSpec; import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; @@ -544,6 +548,53 @@ public void testGetActiveSupervisorIdForDatasourceWithAppendLock() verifyAll(); } + @Test + public void testRegisterUpgradedPendingSegmentOnSupervisor() + { + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(Collections.emptyMap()); + + NoopSupervisorSpec noopSpec = new NoopSupervisorSpec("noop", ImmutableList.of("noopDS")); + metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); + + SeekableStreamSupervisorSpec streamingSpec = EasyMock.mock(SeekableStreamSupervisorSpec.class); + SeekableStreamSupervisor streamSupervisor = EasyMock.mock(SeekableStreamSupervisor.class); + streamSupervisor.registerNewVersionOfPendingSegment(EasyMock.anyObject()); + EasyMock.expectLastCall().once(); + EasyMock.expect(streamingSpec.getId()).andReturn("sss").anyTimes(); + EasyMock.expect(streamingSpec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.expect(streamingSpec.getDataSources()).andReturn(ImmutableList.of("DS")).anyTimes(); + EasyMock.expect(streamingSpec.createSupervisor()).andReturn(streamSupervisor).anyTimes(); + EasyMock.expect(streamingSpec.createAutoscaler(streamSupervisor)).andReturn(null).anyTimes(); + EasyMock.expect(streamingSpec.getContext()).andReturn(null).anyTimes(); + EasyMock.replay(streamingSpec); + metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); + EasyMock.expectLastCall().once(); + + replayAll(); + + final PendingSegmentRecord pendingSegment = new PendingSegmentRecord( + new SegmentIdWithShardSpec( + "DS", + Intervals.ETERNITY, + "version", + new NumberedShardSpec(0, 0) + ), + "sequenceName", + "prevSegmentId", + "upgradedFromSegmentId", + "taskAllocatorId" + ); + manager.start(); + + manager.createOrUpdateAndStartSupervisor(noopSpec); + Assert.assertFalse(manager.registerUpgradedPendingSegmentOnSupervisor("noop", pendingSegment)); + + manager.createOrUpdateAndStartSupervisor(streamingSpec); + Assert.assertTrue(manager.registerUpgradedPendingSegmentOnSupervisor("sss", pendingSegment)); + + verifyAll(); + } + private static class TestSupervisorSpec implements SupervisorSpec { private final String id; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 2602f8e5441f..489315cc2495 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -74,11 +74,13 @@ import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -86,6 +88,10 @@ import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.easymock.Capture; +import org.easymock.CaptureType; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; import org.hamcrest.MatcherAssert; @@ -1548,10 +1554,19 @@ public void testSupervisorResetOneTaskSpecificOffsetsWithCheckpoints() throws In } @Test - public void testGetActiveRealtimeSequencePrefixes() + public void testRegisterNewVersionOfPendingSegment() { EasyMock.expect(spec.isSuspended()).andReturn(false); + Capture captured0 = Capture.newInstance(CaptureType.FIRST); + Capture captured1 = Capture.newInstance(CaptureType.FIRST); + EasyMock.expect( + indexTaskClient.registerNewVersionOfPendingSegmentAsync(EasyMock.eq("task0"), EasyMock.capture(captured0)) + ).andReturn(Futures.immediateFuture(true)); + EasyMock.expect( + indexTaskClient.registerNewVersionOfPendingSegmentAsync(EasyMock.eq("task2"), EasyMock.capture(captured1)) + ).andReturn(Futures.immediateFuture(true)); + replayAll(); final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); @@ -1559,34 +1574,63 @@ public void testGetActiveRealtimeSequencePrefixes() // Spin off two active tasks with each task serving one partition. supervisor.getIoConfig().setTaskCount(3); supervisor.start(); - supervisor.addTaskGroupToActivelyReadingTaskGroup( + + final SeekableStreamSupervisor.TaskGroup taskGroup0 = supervisor.addTaskGroupToActivelyReadingTaskGroup( supervisor.getTaskGroupIdForPartition("0"), ImmutableMap.of("0", "5"), Optional.absent(), Optional.absent(), - ImmutableSet.of("task1"), + ImmutableSet.of("task0"), ImmutableSet.of() ); - - supervisor.addTaskGroupToActivelyReadingTaskGroup( + final SeekableStreamSupervisor.TaskGroup taskGroup1 = supervisor.addTaskGroupToActivelyReadingTaskGroup( supervisor.getTaskGroupIdForPartition("1"), ImmutableMap.of("1", "6"), Optional.absent(), Optional.absent(), - ImmutableSet.of("task2"), + ImmutableSet.of("task1"), ImmutableSet.of() ); - - supervisor.addTaskGroupToPendingCompletionTaskGroup( + final SeekableStreamSupervisor.TaskGroup taskGroup2 = supervisor.addTaskGroupToPendingCompletionTaskGroup( supervisor.getTaskGroupIdForPartition("2"), ImmutableMap.of("2", "100"), Optional.absent(), Optional.absent(), - ImmutableSet.of("task3"), + ImmutableSet.of("task2"), ImmutableSet.of() ); - Assert.assertEquals(3, supervisor.getActiveRealtimeSequencePrefixes().size()); + final PendingSegmentRecord pendingSegmentRecord0 = new PendingSegmentRecord( + new SegmentIdWithShardSpec( + "DS", + Intervals.of("2024/2025"), + "2024", + new NumberedShardSpec(1, 0) + ), + taskGroup0.getBaseSequenceName(), + "prevId0", + "someAppendedSegment0", + taskGroup0.getBaseSequenceName() + ); + final PendingSegmentRecord pendingSegmentRecord1 = new PendingSegmentRecord( + new SegmentIdWithShardSpec( + "DS", + Intervals.of("2024/2025"), + "2024", + new NumberedShardSpec(2, 0) + ), + taskGroup2.getBaseSequenceName(), + "prevId1", + "someAppendedSegment1", + taskGroup2.getBaseSequenceName() + ); + + supervisor.registerNewVersionOfPendingSegment(pendingSegmentRecord0); + supervisor.registerNewVersionOfPendingSegment(pendingSegmentRecord1); + + Assert.assertEquals(pendingSegmentRecord0, captured0.getValue()); + Assert.assertEquals(pendingSegmentRecord1, captured1.getValue()); + verifyAll(); } @Test diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 1de41bb43a0f..6c4f556133ec 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -252,11 +252,11 @@ public SegmentIdWithShardSpec allocatePendingSegment( } @Override - public Map upgradePendingSegmentsOverlappingWith( + public List upgradePendingSegmentsOverlappingWith( Set replaceSegments ) { - return Collections.emptyMap(); + return Collections.emptyList(); } @Override @@ -297,7 +297,7 @@ public int deleteUpgradeSegmentsForTask(final String taskId) } @Override - public int deletePendingSegmentsForTaskGroup(final String taskGroup) + public int deletePendingSegmentsForTaskAllocatorId(final String taskGroup) { throw new UnsupportedOperationException(); } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 23513c82ad7b..aea2674f6b85 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -389,9 +389,9 @@ SegmentPublishResult commitReplaceSegments( * * * @param replaceSegments Segments being committed by a REPLACE task - * @return Map from originally allocated pending segment to its new upgraded ID. + * @return List of inserted pending segment records */ - Map upgradePendingSegmentsOverlappingWith( + List upgradePendingSegmentsOverlappingWith( Set replaceSegments ); @@ -495,7 +495,7 @@ SegmentPublishResult commitMetadataOnly( * @param taskAllocatorId task id / task group / replica group for an appending task * @return number of pending segments deleted from the metadata store */ - int deletePendingSegmentsForTaskGroup(String taskAllocatorId); + int deletePendingSegmentsForTaskAllocatorId(String taskAllocatorId); /** * Fetches all the pending segments of the datasource that overlap with a given interval. diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java index 620ff8831b09..e4bc1645f710 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java @@ -23,10 +23,13 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.utils.CollectionUtils; import javax.annotation.Nullable; +import java.util.List; import java.util.Objects; import java.util.Set; @@ -46,12 +49,19 @@ public class SegmentPublishResult private final boolean success; @Nullable private final String errorMsg; + @Nullable + private final List upgradedPendingSegments; public static SegmentPublishResult ok(Set segments) { return new SegmentPublishResult(segments, true, null); } + public static SegmentPublishResult ok(Set segments, List upgradedPendingSegments) + { + return new SegmentPublishResult(segments, true, null, upgradedPendingSegments); + } + public static SegmentPublishResult fail(String errorMsg) { return new SegmentPublishResult(ImmutableSet.of(), false, errorMsg); @@ -63,13 +73,28 @@ private SegmentPublishResult( @JsonProperty("success") boolean success, @JsonProperty("errorMsg") @Nullable String errorMsg ) + { + this(segments, success, errorMsg, null); + } + + private SegmentPublishResult( + Set segments, + boolean success, + @Nullable String errorMsg, + List upgradedPendingSegments + ) { this.segments = Preconditions.checkNotNull(segments, "segments"); this.success = success; this.errorMsg = errorMsg; + this.upgradedPendingSegments = upgradedPendingSegments; if (!success) { Preconditions.checkArgument(segments.isEmpty(), "segments must be empty for unsuccessful publishes"); + Preconditions.checkArgument( + CollectionUtils.isNullOrEmpty(upgradedPendingSegments), + "upgraded pending segments must be null or empty for unsuccessful publishes" + ); } } @@ -92,6 +117,12 @@ public String getErrorMsg() return errorMsg; } + @Nullable + public List getUpgradedPendingSegments() + { + return upgradedPendingSegments; + } + @Override public boolean equals(Object o) { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index 20c102533862..e733ef6c233d 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -31,7 +31,6 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Set; @@ -186,12 +185,6 @@ public int getActiveTaskGroupsCount() { return -1; } - - @Override - public Set getActiveRealtimeSequencePrefixes() - { - return Collections.emptySet(); - } }; } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index 9b9511cbf3da..b1fb439184d4 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -29,7 +29,6 @@ import javax.annotation.Nullable; import java.util.List; import java.util.Map; -import java.util.Set; public interface Supervisor { @@ -103,9 +102,4 @@ default long computeLagForAutoScaler() } int getActiveTaskGroupsCount(); - - /** - * @return active sequence prefixes for reading and pending completion task groups of a seekable stream supervisor - */ - Set getActiveRealtimeSequencePrefixes(); } diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index e36412e5dc1e..c5a36656c9a2 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -79,6 +79,7 @@ import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.TransactionCallback; import org.skife.jdbi.v2.TransactionStatus; +import org.skife.jdbi.v2.Update; import org.skife.jdbi.v2.exceptions.CallbackFailedException; import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.util.ByteArrayMapper; @@ -530,9 +531,9 @@ public SegmentPublishResult commitReplaceSegments( segmentSchemaMapping, upgradeSegmentMetadata, Collections.emptyMap() - ) + ), + upgradePendingSegmentsOverlappingWith(segmentsToInsert) ); - upgradePendingSegmentsOverlappingWith(segmentsToInsert); return result; }, 3, @@ -735,12 +736,12 @@ public SegmentIdWithShardSpec allocatePendingSegment( } @Override - public Map upgradePendingSegmentsOverlappingWith( + public List upgradePendingSegmentsOverlappingWith( Set replaceSegments ) { if (replaceSegments.isEmpty()) { - return Collections.emptyMap(); + return Collections.emptyList(); } // Any replace interval has exactly one version of segments @@ -769,16 +770,15 @@ public Map upgradePendingSegment * those versions. * * - * @return Map from original pending segment to the new upgraded ID. + * @return Inserted pending segment records */ - private Map upgradePendingSegments( + private List upgradePendingSegments( Handle handle, String datasource, Map replaceIntervalToMaxId ) throws JsonProcessingException { final List upgradedPendingSegments = new ArrayList<>(); - final Map pendingSegmentToNewId = new HashMap<>(); for (Map.Entry entry : replaceIntervalToMaxId.entrySet()) { final Interval replaceInterval = entry.getKey(); @@ -813,7 +813,6 @@ private Map upgradePendingSegmen overlappingPendingSegment.getTaskAllocatorId() ) ); - pendingSegmentToNewId.put(pendingSegmentId, newId); } } } @@ -831,7 +830,7 @@ private Map upgradePendingSegmen numInsertedPendingSegments, upgradedPendingSegments.size() ); - return pendingSegmentToNewId; + return upgradedPendingSegments; } private boolean shouldUpgradePendingSegment( @@ -1114,8 +1113,15 @@ private SegmentIdWithShardSpec allocatePendingSegment( ); // always insert empty previous sequence id - insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1, - taskAllocatorId + insertPendingSegmentIntoMetastore( + handle, + newIdentifier, + dataSource, + interval, + "", + sequenceName, + sequenceNamePrevIdSha1, + taskAllocatorId ); log.info( @@ -1320,6 +1326,39 @@ public int hashCode() } } + private static void bindColumnValuesToQueryWithInCondition( + final String columnName, + final List values, + final Update query + ) + { + if (values == null) { + return; + } + + for (int i = 0; i < values.size(); i++) { + query.bind(StringUtils.format("%s%d", columnName, i), values.get(i)); + } + } + + private int deletePendingSegmentsById(Handle handle, String datasource, List pendingSegmentIds) + { + if (pendingSegmentIds.isEmpty()) { + return 0; + } + + Update query = handle.createStatement( + StringUtils.format( + "DELETE FROM %s WHERE dataSource = :dataSource %s", + dbTables.getPendingSegmentsTable(), + SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn("id", pendingSegmentIds) + ) + ).bind("dataSource", datasource); + bindColumnValuesToQueryWithInCondition("id", pendingSegmentIds, query); + + return query.execute(); + } + private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( Set appendSegments, Map appendSegmentToReplaceLock, @@ -1383,7 +1422,6 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( if (metadataUpdateResult.isFailed()) { transactionStatus.setRollbackOnly(); metadataNotUpdated.set(true); - if (metadataUpdateResult.canRetry()) { throw new RetryTransactionException(metadataUpdateResult.getErrorMsg()); } else { @@ -1393,6 +1431,20 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( } insertIntoUpgradeSegmentsTable(handle, appendSegmentToReplaceLock); + + // Delete the pending segments to be committed in this transaction in batches of at most 100 + final List> pendingSegmentIdBatches = Lists.partition( + allSegmentsToInsert.stream() + .map(pendingSegment -> pendingSegment.getId().toString()) + .collect(Collectors.toList()), + 100 + ); + int numDeletedPendingSegments = 0; + for (List pendingSegmentIdBatch : pendingSegmentIdBatches) { + numDeletedPendingSegments += deletePendingSegmentsById(handle, dataSource, pendingSegmentIdBatch); + } + log.info("Deleted [%d] entries from pending segments table upon commit.", numDeletedPendingSegments); + return SegmentPublishResult.ok( insertSegments( handle, @@ -2761,7 +2813,7 @@ public DataSegment retrieveSegmentForId(final String id, boolean includeUnused) } @Override - public int deletePendingSegmentsForTaskGroup(final String pendingSegmentsGroup) + public int deletePendingSegmentsForTaskAllocatorId(final String pendingSegmentsGroup) { return connector.getDBI().inTransaction( (handle, status) -> handle diff --git a/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java index 44c62bf47ad1..bfbaad18ef1a 100644 --- a/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java +++ b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java @@ -19,6 +19,8 @@ package org.apache.druid.metadata; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; @@ -50,12 +52,13 @@ public class PendingSegmentRecord private final String upgradedFromSegmentId; private final String taskAllocatorId; + @JsonCreator public PendingSegmentRecord( - SegmentIdWithShardSpec id, - String sequenceName, - String sequencePrevId, - @Nullable String upgradedFromSegmentId, - @Nullable String taskAllocatorId + @JsonProperty("id") SegmentIdWithShardSpec id, + @JsonProperty("sequenceName") String sequenceName, + @JsonProperty("sequencePrevId") String sequencePrevId, + @JsonProperty("upgradedFromSegmentId") @Nullable String upgradedFromSegmentId, + @JsonProperty("taskAllocatorId") @Nullable String taskAllocatorId ) { this.id = id; @@ -65,16 +68,19 @@ public PendingSegmentRecord( this.taskAllocatorId = taskAllocatorId; } + @JsonProperty public SegmentIdWithShardSpec getId() { return id; } + @JsonProperty public String getSequenceName() { return sequenceName; } + @JsonProperty public String getSequencePrevId() { return sequencePrevId; @@ -85,6 +91,7 @@ public String getSequencePrevId() * Can be null for pending segments allocated before this column was added or for segments that have not been upgraded. */ @Nullable + @JsonProperty public String getUpgradedFromSegmentId() { return upgradedFromSegmentId; @@ -95,6 +102,7 @@ public String getUpgradedFromSegmentId() * Can be null for pending segments allocated before this column was added. */ @Nullable + @JsonProperty public String getTaskAllocatorId() { return taskAllocatorId; diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index fc990e107dd3..f14cc9950505 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -955,7 +955,7 @@ private static int computeNumChangedSegments(List segmentIds, int[] segm * * @implNote JDBI 3.x has better support for binding {@code IN} clauses directly. */ - private static String getParameterizedInConditionForColumn(final String columnName, final List values) + static String getParameterizedInConditionForColumn(final String columnName, final List values) { if (values == null) { return ""; diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index d022580f7c19..65df4f56761a 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -563,7 +563,8 @@ ListenableFuture dropInBackground(SegmentsAndCommitMe return new SegmentsAndCommitMetadata( segmentsAndCommitMetadata.getSegments(), metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata(), - segmentsAndCommitMetadata.getSegmentSchemaMapping() + segmentsAndCommitMetadata.getSegmentSchemaMapping(), + segmentsAndCommitMetadata.getUpgradedSegments() ); }, MoreExecutors.directExecutor() @@ -618,9 +619,10 @@ ListenableFuture publishInBackground( return executor.submit( () -> { try { - RetryUtils.retry( + return RetryUtils.retry( () -> { try { + final Set upgradedSegments = new HashSet<>(); final ImmutableSet ourSegments = ImmutableSet.copyOf(pushedAndTombstones); final SegmentPublishResult publishResult = publisher.publishSegments( segmentsToBeOverwritten, @@ -629,7 +631,6 @@ ListenableFuture publishInBackground( callerMetadata, segmentsAndCommitMetadata.getSegmentSchemaMapping() ); - if (publishResult.isSuccess()) { log.info( "Published [%s] segments with commit metadata [%s]", @@ -637,6 +638,13 @@ ListenableFuture publishInBackground( callerMetadata ); log.infoSegments(segmentsAndCommitMetadata.getSegments(), "Published segments"); + // This set must contain only those segments that were upgraded as a result of a concurrent replace. + upgradedSegments.addAll(publishResult.getSegments()); + segmentsAndCommitMetadata.getSegments().forEach(upgradedSegments::remove); + if (!upgradedSegments.isEmpty()) { + log.info("Published [%d] upgraded segments.", upgradedSegments.size()); + log.infoSegments(upgradedSegments, "Upgraded segments"); + } log.info("Published segment schemas: [%s]", segmentsAndCommitMetadata.getSegmentSchemaMapping()); } else { // Publishing didn't affirmatively succeed. However, segments with our identifiers may still be active @@ -691,6 +699,7 @@ ListenableFuture publishInBackground( throw new ISE("Failed to publish segments"); } } + return segmentsAndCommitMetadata.withUpgradedSegments(upgradedSegments); } catch (Exception e) { // Must not remove segments here, we aren't sure if our transaction succeeded or not. @@ -703,9 +712,10 @@ ListenableFuture publishInBackground( Throwables.propagateIfPossible(e); throw new RuntimeException(e); } - return segmentsAndCommitMetadata; }, - e -> (e.getMessage() != null && e.getMessage().contains("Failed to update the metadata Store. The new start metadata is ahead of last commited end state.")), + e -> (e != null && e.getMessage() != null + && e.getMessage().contains("Failed to update the metadata Store." + + " The new start metadata is ahead of last commited end state.")), RetryUtils.DEFAULT_MAX_TRIES ); } @@ -717,7 +727,6 @@ ListenableFuture publishInBackground( Throwables.propagateIfPossible(e); throw new RuntimeException(e); } - return segmentsAndCommitMetadata; } ); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java index 4f0a53398e45..721876880578 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.realtime.appenderator; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.apache.druid.segment.SegmentSchemaMapping; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.timeline.DataSegment; @@ -28,26 +29,59 @@ import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Set; public class SegmentsAndCommitMetadata { - private static final SegmentsAndCommitMetadata NIL = new SegmentsAndCommitMetadata(Collections.emptyList(), null, null); + private static final SegmentsAndCommitMetadata NIL + = new SegmentsAndCommitMetadata(Collections.emptyList(), null, null, null); private final Object commitMetadata; private final ImmutableList segments; private final SegmentSchemaMapping segmentSchemaMapping; + private final ImmutableSet upgradedSegments; + + public SegmentsAndCommitMetadata( + List segments, + Object commitMetadata + ) + { + this(segments, commitMetadata, null, null); + } + + public SegmentsAndCommitMetadata( + List segments, + Object commitMetadata, + SegmentSchemaMapping segmentSchemaMapping + ) + { + this(segments, commitMetadata, segmentSchemaMapping, null); + } + public SegmentsAndCommitMetadata( List segments, @Nullable Object commitMetadata, - @Nullable SegmentSchemaMapping segmentSchemaMapping + @Nullable SegmentSchemaMapping segmentSchemaMapping, + @Nullable Set upgradedSegments ) { this.segments = ImmutableList.copyOf(segments); this.commitMetadata = commitMetadata; + this.upgradedSegments = upgradedSegments == null ? null : ImmutableSet.copyOf(upgradedSegments); this.segmentSchemaMapping = segmentSchemaMapping; } + public SegmentsAndCommitMetadata withUpgradedSegments(Set upgradedSegments) + { + return new SegmentsAndCommitMetadata( + this.segments, + this.commitMetadata, + this.segmentSchemaMapping, + upgradedSegments + ); + } + @Nullable public Object getCommitMetadata() { @@ -59,6 +93,15 @@ public List getSegments() return segments; } + /** + * @return the set of extra upgraded segments committed due to a concurrent replace. + */ + @Nullable + public Set getUpgradedSegments() + { + return upgradedSegments; + } + public SegmentSchemaMapping getSegmentSchemaMapping() { return segmentSchemaMapping; @@ -75,13 +118,15 @@ public boolean equals(Object o) } SegmentsAndCommitMetadata that = (SegmentsAndCommitMetadata) o; return Objects.equals(commitMetadata, that.commitMetadata) && + Objects.equals(upgradedSegments, that.upgradedSegments) && + Objects.equals(segmentSchemaMapping, that.segmentSchemaMapping) && Objects.equals(segments, that.segments); } @Override public int hashCode() { - return Objects.hash(commitMetadata, segments); + return Objects.hash(commitMetadata, segments, upgradedSegments, segmentSchemaMapping); } @Override @@ -90,6 +135,8 @@ public String toString() return getClass().getSimpleName() + "{" + "commitMetadata=" + commitMetadata + ", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) + + ", upgradedSegments=" + SegmentUtils.commaSeparatedIdentifiers(upgradedSegments) + + ", segmentSchemaMapping=" + segmentSchemaMapping + '}'; } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index d02e200cfcbd..aba071de1dfc 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -356,13 +356,13 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final } } - public void registerNewVersionOfPendingSegment( + public void registerUpgradedPendingSegment( SegmentIdWithShardSpec basePendingSegment, - SegmentIdWithShardSpec newSegmentVersion + SegmentIdWithShardSpec upgradedPendingSegment ) { newIdToBasePendingSegment.put( - newSegmentVersion.asSegmentId().toDescriptor(), + upgradedPendingSegment.asSegmentId().toDescriptor(), basePendingSegment.asSegmentId().toDescriptor() ); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java index 35ff42d3daba..1c5dd42dd770 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java @@ -51,6 +51,7 @@ import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QuerySegmentWalker; @@ -147,6 +148,7 @@ public class StreamAppenderator implements Appenderator * of any thread from {@link #drop}. */ private final ConcurrentMap sinks = new ConcurrentHashMap<>(); + private final ConcurrentMap idToPendingSegment = new ConcurrentHashMap<>(); private final Set droppingSinks = Sets.newConcurrentHashSet(); private final VersionedIntervalTimeline sinkTimeline; private final long maxBytesTuningConfig; @@ -166,8 +168,25 @@ public class StreamAppenderator implements Appenderator private final AtomicBoolean closed = new AtomicBoolean(false); - private final ConcurrentHashMap> - baseSegmentToUpgradedVersions = new ConcurrentHashMap<>(); + /** + * Map from base segment identifier of a sink to the set of all the segment ids associated with it. + * The set contains the base segment itself and its upgraded versions announced as a result of a concurrent replace. + * The map contains all the available sinks' identifiers in its keyset. + */ + private final ConcurrentMap> baseSegmentToUpgradedSegments + = new ConcurrentHashMap<>(); + /** + * Map from the id of an upgraded pending segment to the segment corresponding to its upgradedFromSegmentId. + */ + private final ConcurrentMap upgradedSegmentToBaseSegment + = new ConcurrentHashMap<>(); + /** + * Set of all segment identifiers that have been marked to be abandoned. + * This is used to determine if all the segments corresponding to a sink have been abandoned and it can be dropped. + */ + private final ConcurrentHashMap.KeySetView abandonedSegments + = ConcurrentHashMap.newKeySet(); + private final SinkSchemaAnnouncer sinkSchemaAnnouncer; private final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig; @@ -527,9 +546,7 @@ private Sink getOrCreateSink(final SegmentIdWithShardSpec identifier) .emit(); } - sinks.put(identifier, retVal); - metrics.setSinkCount(sinks.size()); - sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), identifier.getShardSpec().createChunk(retVal)); + addSink(identifier, retVal); } return retVal; @@ -1058,14 +1075,7 @@ public void closeNow() log.debug("Shutting down immediately..."); for (Map.Entry entry : sinks.entrySet()) { - try { - unannounceAllVersionsOfSegment(entry.getValue().getSegment()); - } - catch (Exception e) { - log.makeAlert(e, "Failed to unannounce segment[%s]", schema.getDataSource()) - .addData("identifier", entry.getKey().toString()) - .emit(); - } + unannounceAllVersionsOfSegment(entry.getValue().getSegment(), entry.getValue()); } try { shutdownExecutors(); @@ -1098,61 +1108,78 @@ SinkSchemaAnnouncer getSinkSchemaAnnouncer() /** * Unannounces the given base segment and all its upgraded versions. */ - private void unannounceAllVersionsOfSegment(DataSegment baseSegment) throws IOException + private void unannounceAllVersionsOfSegment(DataSegment baseSegment, Sink sink) { - segmentAnnouncer.unannounceSegment(baseSegment); + synchronized (sink) { + final SegmentIdWithShardSpec baseId = SegmentIdWithShardSpec.fromDataSegment(baseSegment); + if (!baseSegmentToUpgradedSegments.containsKey(baseId)) { + return; + } - final Set upgradedVersionsOfSegment - = baseSegmentToUpgradedVersions.remove(baseSegment.getId()); - if (upgradedVersionsOfSegment == null || upgradedVersionsOfSegment.isEmpty()) { - return; + final Set upgradedVersionsOfSegment = baseSegmentToUpgradedSegments.remove(baseId); + for (SegmentIdWithShardSpec newId : upgradedVersionsOfSegment) { + final DataSegment newSegment = new DataSegment( + newId.getDataSource(), + newId.getInterval(), + newId.getVersion(), + baseSegment.getLoadSpec(), + baseSegment.getDimensions(), + baseSegment.getMetrics(), + newId.getShardSpec(), + baseSegment.getBinaryVersion(), + baseSegment.getSize() + ); + unannounceSegment(newSegment); + upgradedSegmentToBaseSegment.remove(newId); + } } + } - for (SegmentIdWithShardSpec newId : upgradedVersionsOfSegment) { - final DataSegment newSegment = new DataSegment( - newId.getDataSource(), - newId.getInterval(), - newId.getVersion(), - baseSegment.getLoadSpec(), - baseSegment.getDimensions(), - baseSegment.getMetrics(), - newId.getShardSpec(), - baseSegment.getBinaryVersion(), - baseSegment.getSize() - ); - segmentAnnouncer.unannounceSegment(newSegment); + private void unannounceSegment(DataSegment segment) + { + try { + segmentAnnouncer.unannounceSegment(segment); + } + catch (Exception e) { + log.makeAlert(e, "Failed to unannounce segment[%s]", schema.getDataSource()) + .addData("identifier", segment.getId().toString()) + .emit(); } } - public void registerNewVersionOfPendingSegment( - SegmentIdWithShardSpec basePendingSegment, - SegmentIdWithShardSpec newSegmentVersion - ) throws IOException + public void registerUpgradedPendingSegment(PendingSegmentRecord pendingSegmentRecord) throws IOException { + SegmentIdWithShardSpec basePendingSegment = idToPendingSegment.get(pendingSegmentRecord.getUpgradedFromSegmentId()); + SegmentIdWithShardSpec upgradedPendingSegment = pendingSegmentRecord.getId(); if (!sinks.containsKey(basePendingSegment) || droppingSinks.contains(basePendingSegment)) { return; } // Update query mapping with SinkQuerySegmentWalker - ((SinkQuerySegmentWalker) texasRanger).registerNewVersionOfPendingSegment(basePendingSegment, newSegmentVersion); + ((SinkQuerySegmentWalker) texasRanger).registerUpgradedPendingSegment(basePendingSegment, upgradedPendingSegment); // Announce segments final DataSegment baseSegment = sinks.get(basePendingSegment).getSegment(); + final DataSegment newSegment = getUpgradedSegment(baseSegment, upgradedPendingSegment); - final DataSegment newSegment = new DataSegment( - newSegmentVersion.getDataSource(), - newSegmentVersion.getInterval(), - newSegmentVersion.getVersion(), + segmentAnnouncer.announceSegment(newSegment); + baseSegmentToUpgradedSegments.get(basePendingSegment).add(upgradedPendingSegment); + upgradedSegmentToBaseSegment.put(upgradedPendingSegment, basePendingSegment); + } + + private DataSegment getUpgradedSegment(DataSegment baseSegment, SegmentIdWithShardSpec upgradedVersion) + { + return new DataSegment( + upgradedVersion.getDataSource(), + upgradedVersion.getInterval(), + upgradedVersion.getVersion(), baseSegment.getLoadSpec(), baseSegment.getDimensions(), baseSegment.getMetrics(), - newSegmentVersion.getShardSpec(), + upgradedVersion.getShardSpec(), baseSegment.getBinaryVersion(), baseSegment.getSize() ); - segmentAnnouncer.announceSegment(newSegment); - baseSegmentToUpgradedVersions.computeIfAbsent(basePendingSegment.asSegmentId(), id -> new HashSet<>()) - .add(newSegmentVersion); } private void lockBasePersistDirectory() @@ -1367,13 +1394,8 @@ private Object bootstrapSinksFromDisk() hydrants ); rowsSoFar += currSink.getNumRows(); - sinks.put(identifier, currSink); - sinkTimeline.add( - currSink.getInterval(), - currSink.getVersion(), - identifier.getShardSpec().createChunk(currSink) - ); + addSink(identifier, currSink); segmentAnnouncer.announceSegment(currSink.getSegment()); } catch (IOException e) { @@ -1396,12 +1418,49 @@ private Object bootstrapSinksFromDisk() return committed.getMetadata(); } + /** + * Update the state of the appenderator when adding a sink. + * + * @param identifier sink identifier + * @param sink sink to be added + */ + private void addSink(SegmentIdWithShardSpec identifier, Sink sink) + { + sinks.put(identifier, sink); + // Asoociate the base segment of a sink with its string identifier + // Needed to get the base segment using upgradedFromSegmentId of a pending segment + idToPendingSegment.put(identifier.asSegmentId().toString(), identifier); + + // The base segment is associated with itself in the maps to maintain all the upgraded ids of a sink. + baseSegmentToUpgradedSegments.put(identifier, new HashSet<>()); + baseSegmentToUpgradedSegments.get(identifier).add(identifier); + + sinkTimeline.add( + sink.getInterval(), + sink.getVersion(), + identifier.getShardSpec().createChunk(sink) + ); + } + private ListenableFuture abandonSegment( final SegmentIdWithShardSpec identifier, final Sink sink, final boolean removeOnDiskData ) { + abandonedSegments.add(identifier); + final SegmentIdWithShardSpec baseIdentifier = upgradedSegmentToBaseSegment.getOrDefault(identifier, identifier); + synchronized (sink) { + if (baseSegmentToUpgradedSegments.containsKey(baseIdentifier)) { + Set relevantSegments = new HashSet<>(baseSegmentToUpgradedSegments.get(baseIdentifier)); + relevantSegments.removeAll(abandonedSegments); + // If there are unabandoned segments associated with the sink, return early + // This may be the case if segments have been upgraded as the result of a concurrent replace + if (!relevantSegments.isEmpty()) { + return Futures.immediateFuture(null); + } + } + } // Ensure no future writes will be made to this sink. if (sink.finishWriting()) { // Decrement this sink's rows from the counters. we only count active sinks so that we don't double decrement, @@ -1419,7 +1478,7 @@ private ListenableFuture abandonSegment( } // Mark this identifier as dropping, so no future push tasks will pick it up. - droppingSinks.add(identifier); + droppingSinks.add(baseIdentifier); // Wait for any outstanding pushes to finish, then abandon the segment inside the persist thread. return Futures.transform( @@ -1430,8 +1489,8 @@ private ListenableFuture abandonSegment( @Override public Void apply(@Nullable Object input) { - if (!sinks.remove(identifier, sink)) { - log.error("Sink for segment[%s] no longer valid, not abandoning.", identifier); + if (!sinks.remove(baseIdentifier, sink)) { + log.error("Sink for segment[%s] no longer valid, not abandoning.", baseIdentifier); return null; } @@ -1439,17 +1498,17 @@ public Void apply(@Nullable Object input) if (removeOnDiskData) { // Remove this segment from the committed list. This must be done from the persist thread. - log.debug("Removing commit metadata for segment[%s].", identifier); + log.debug("Removing commit metadata for segment[%s].", baseIdentifier); try { commitLock.lock(); final Committed oldCommit = readCommit(); if (oldCommit != null) { - writeCommit(oldCommit.without(identifier.toString())); + writeCommit(oldCommit.without(baseIdentifier.toString())); } } catch (Exception e) { log.makeAlert(e, "Failed to update committed segments[%s]", schema.getDataSource()) - .addData("identifier", identifier.toString()) + .addData("identifier", baseIdentifier.toString()) .emit(); throw new RuntimeException(e); } @@ -1458,22 +1517,14 @@ public Void apply(@Nullable Object input) } } - // Unannounce the segment. - try { - unannounceAllVersionsOfSegment(sink.getSegment()); - } - catch (Exception e) { - log.makeAlert(e, "Failed to unannounce segment[%s]", schema.getDataSource()) - .addData("identifier", identifier.toString()) - .emit(); - } + unannounceAllVersionsOfSegment(sink.getSegment(), sink); Runnable removeRunnable = () -> { - droppingSinks.remove(identifier); + droppingSinks.remove(baseIdentifier); sinkTimeline.remove( sink.getInterval(), sink.getVersion(), - identifier.getShardSpec().createChunk(sink) + baseIdentifier.getShardSpec().createChunk(sink) ); for (FireHydrant hydrant : sink) { if (cache != null) { @@ -1483,7 +1534,7 @@ public Void apply(@Nullable Object input) } if (removeOnDiskData) { - removeDirectory(computePersistDir(identifier)); + removeDirectory(computePersistDir(baseIdentifier)); } log.info("Dropped segment[%s].", identifier); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index 164b81b0c49b..2b5c153d602e 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -51,10 +51,12 @@ import java.util.Collection; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.NavigableMap; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -322,10 +324,14 @@ public ListenableFuture registerHandoff(SegmentsAndCo return Futures.immediateFuture(null); } else { - final List waitingSegmentIdList = segmentsAndCommitMetadata.getSegments().stream() - .map( - SegmentIdWithShardSpec::fromDataSegment) - .collect(Collectors.toList()); + final Set segmentsToBeHandedOff = new HashSet<>(segmentsAndCommitMetadata.getSegments()); + if (segmentsAndCommitMetadata.getUpgradedSegments() != null) { + segmentsToBeHandedOff.addAll(segmentsAndCommitMetadata.getUpgradedSegments()); + } + final List waitingSegmentIdList = + segmentsToBeHandedOff.stream() + .map(SegmentIdWithShardSpec::fromDataSegment) + .collect(Collectors.toList()); final Object metadata = Preconditions.checkNotNull(segmentsAndCommitMetadata.getCommitMetadata(), "commitMetadata"); if (waitingSegmentIdList.isEmpty()) { @@ -333,7 +339,8 @@ public ListenableFuture registerHandoff(SegmentsAndCo new SegmentsAndCommitMetadata( segmentsAndCommitMetadata.getSegments(), ((AppenderatorDriverMetadata) metadata).getCallerMetadata(), - segmentsAndCommitMetadata.getSegmentSchemaMapping() + segmentsAndCommitMetadata.getSegmentSchemaMapping(), + segmentsAndCommitMetadata.getUpgradedSegments() ) ); } @@ -365,8 +372,7 @@ public ListenableFuture registerHandoff(SegmentsAndCo public void onSuccess(Object result) { if (numRemainingHandoffSegments.decrementAndGet() == 0) { - List segments = segmentsAndCommitMetadata.getSegments(); - log.info("Successfully handed off [%d] segments.", segments.size()); + log.info("Successfully handed off [%d] segments.", segmentsToBeHandedOff.size()); final long handoffTotalTime = System.currentTimeMillis() - handoffStartTime; metrics.reportMaxSegmentHandoffTime(handoffTotalTime); if (handoffTotalTime > HANDOFF_TIME_THRESHOLD) { @@ -375,9 +381,10 @@ public void onSuccess(Object result) } resultFuture.set( new SegmentsAndCommitMetadata( - segments, + segmentsAndCommitMetadata.getSegments(), ((AppenderatorDriverMetadata) metadata).getCallerMetadata(), - segmentsAndCommitMetadata.getSegmentSchemaMapping() + segmentsAndCommitMetadata.getSegmentSchemaMapping(), + segmentsAndCommitMetadata.getUpgradedSegments() ) ); } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java index eb8f9358cef8..5a21a4331fe2 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java @@ -201,7 +201,7 @@ public void testFailDuringDrop() throws IOException, InterruptedException, Timeo expectedException.expect(ExecutionException.class); expectedException.expectCause(CoreMatchers.instanceOf(ISE.class)); expectedException.expectMessage( - "Fail test while dropping segment[foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123]" + "Fail test while dropping segment" ); driver = new StreamAppenderatorDriver( @@ -221,10 +221,8 @@ public void testFailDuringDrop() throws IOException, InterruptedException, Timeo Assert.assertNull(driver.startJob(null)); - for (int i = 0; i < ROWS.size(); i++) { - committerSupplier.setMetadata(i + 1); - Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk()); - } + committerSupplier.setMetadata(1); + Assert.assertTrue(driver.add(ROWS.get(0), "dummy", committerSupplier, false, true).isOk()); final SegmentsAndCommitMetadata published = driver.publish( StreamAppenderatorDriverTest.makeOkPublisher(), diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java index 335d1b219fe2..63775e2dc3bb 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java @@ -43,6 +43,7 @@ import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; @@ -58,6 +59,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -72,6 +74,7 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport { private static final String DATA_SOURCE = "foo"; private static final String VERSION = "abc123"; + private static final String UPGRADED_VERSION = "xyz456"; private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper(); private static final int MAX_ROWS_IN_MEMORY = 100; private static final int MAX_ROWS_PER_SEGMENT = 3; @@ -246,6 +249,44 @@ public void testHandoffTimeout() throws Exception driver.registerHandoff(published).get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); } + @Test + public void testHandoffUpgradedSegments() + throws IOException, InterruptedException, TimeoutException, ExecutionException + { + final TestCommitterSupplier committerSupplier = new TestCommitterSupplier<>(); + + Assert.assertNull(driver.startJob(null)); + + for (int i = 0; i < ROWS.size(); i++) { + committerSupplier.setMetadata(i + 1); + Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk()); + } + + driver.persist(committerSupplier.get()); + + // There is no remaining rows in the driver, and thus the result must be empty + final SegmentsAndCommitMetadata segmentsAndCommitMetadata = driver.publishAndRegisterHandoff( + makeUpgradingPublisher(), + committerSupplier.get(), + ImmutableList.of("dummy") + ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + + Assert.assertNotNull(segmentsAndCommitMetadata.getUpgradedSegments()); + Assert.assertEquals( + segmentsAndCommitMetadata.getSegments().size(), + segmentsAndCommitMetadata.getUpgradedSegments().size() + ); + + Set expectedHandedOffSegments = new HashSet<>(); + for (DataSegment segment : segmentsAndCommitMetadata.getSegments()) { + expectedHandedOffSegments.add(segment.toDescriptor()); + } + for (DataSegment segment : segmentsAndCommitMetadata.getUpgradedSegments()) { + expectedHandedOffSegments.add(segment.toDescriptor()); + } + Assert.assertEquals(expectedHandedOffSegments, segmentHandoffNotifierFactory.getHandedOffSegmentDescriptors()); + } + @Test public void testPublishPerRow() throws IOException, InterruptedException, TimeoutException, ExecutionException { @@ -379,6 +420,29 @@ static TransactionalSegmentPublisher makeOkPublisher() SegmentPublishResult.ok(Collections.emptySet()); } + private TransactionalSegmentPublisher makeUpgradingPublisher() + { + return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, segmentSchemaMapping) -> { + Set allSegments = new HashSet<>(segmentsToPublish); + int id = 0; + for (DataSegment segment : segmentsToPublish) { + DataSegment upgradedSegment = new DataSegment( + SegmentId.of(DATA_SOURCE, Intervals.ETERNITY, UPGRADED_VERSION, id), + segment.getLoadSpec(), + segment.getDimensions(), + segment.getMetrics(), + new NumberedShardSpec(id, 0), + null, + segment.getBinaryVersion(), + segment.getSize() + ); + id++; + allSegments.add(upgradedSegment); + } + return SegmentPublishResult.ok(allSegments); + }; + } + static TransactionalSegmentPublisher makeFailingPublisher(boolean failWithException) { return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, segmentSchemaMapping) -> { @@ -459,6 +523,7 @@ static class TestSegmentHandoffNotifierFactory implements SegmentHandoffNotifier { private boolean handoffEnabled = true; private long handoffDelay; + private final Set handedOffSegmentDescriptors = new HashSet<>(); public void disableHandoff() { @@ -470,6 +535,13 @@ public void setHandoffDelay(long delay) handoffDelay = delay; } + public Set getHandedOffSegmentDescriptors() + { + synchronized (handedOffSegmentDescriptors) { + return ImmutableSet.copyOf(handedOffSegmentDescriptors); + } + } + @Override public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) { @@ -494,6 +566,9 @@ public boolean registerSegmentHandoffCallback( } exec.execute(handOffRunnable); + synchronized (handedOffSegmentDescriptors) { + handedOffSegmentDescriptors.add(descriptor); + } } return true; }