Skip to content

Commit

Permalink
16361 default lock is exclusive; replace for concurrent locks if it's…
Browse files Browse the repository at this point in the history
… not with markAsUnused on

feature-13324 better granularity for lock type in kill task, removing markAsUnused  and all it's traces
  • Loading branch information
IgorBerman committed May 5, 2024
1 parent 2a638d7 commit 05014be
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,21 @@
import org.apache.druid.indexer.report.KillTaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
import org.apache.druid.indexing.common.actions.RetrieveUnusedSegmentsAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentNukeAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskLocks;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;

Expand All @@ -66,9 +66,6 @@
* The client representation of this task is {@link ClientKillUnusedSegmentsTaskQuery}.
* JSON serialization fields of this class must correspond to those of {@link
* ClientKillUnusedSegmentsTaskQuery}, except for {@link #id} and {@link #context} fields.
* <p>
* The field {@link #isMarkAsUnused()} is now deprecated.
* </p>
*/
public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
{
Expand All @@ -91,8 +88,6 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
@Nullable
private final List<String> versions;

@Deprecated
private final boolean markAsUnused;
/**
* Split processing to try and keep each nuke operation relatively short, in the case that either
* the database or the storage layer is particularly slow.
Expand All @@ -117,7 +112,6 @@ public KillUnusedSegmentsTask(
@JsonProperty("interval") Interval interval,
@JsonProperty("versions") @Nullable List<String> versions,
@JsonProperty("context") Map<String, Object> context,
@JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused,
@JsonProperty("batchSize") Integer batchSize,
@JsonProperty("limit") @Nullable Integer limit,
@JsonProperty("maxUsedStatusLastUpdatedTime") @Nullable DateTime maxUsedStatusLastUpdatedTime
Expand All @@ -129,22 +123,13 @@ public KillUnusedSegmentsTask(
interval,
context
);
this.markAsUnused = markAsUnused != null && markAsUnused;
this.batchSize = (batchSize != null) ? batchSize : DEFAULT_SEGMENT_NUKE_BATCH_SIZE;
if (this.batchSize <= 0) {
throw InvalidInput.exception("batchSize[%d] must be a positive integer.", batchSize);
}
if (limit != null && limit <= 0) {
throw InvalidInput.exception("limit[%d] must be a positive integer.", limit);
}
if (Boolean.TRUE.equals(markAsUnused)) {
if (limit != null) {
throw InvalidInput.exception("limit[%d] cannot be provided when markAsUnused is enabled.", limit);
}
if (!CollectionUtils.isNullOrEmpty(versions)) {
throw InvalidInput.exception("versions[%s] cannot be provided when markAsUnused is enabled.", versions);
}
}
this.versions = versions;
this.limit = limit;
this.maxUsedStatusLastUpdatedTime = maxUsedStatusLastUpdatedTime;
Expand All @@ -158,21 +143,6 @@ public List<String> getVersions()
return versions;
}

/**
* This field has been deprecated as "kill" tasks should not be responsible for
* marking segments as unused. Instead, users should call the Coordinator API
* {@code /{dataSourceName}/markUnused} to explicitly mark segments as unused.
* Segments may also be marked unused by the Coordinator if they become overshadowed
* or have a {@code DropRule} applied to them.
*/
@Deprecated
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public boolean isMarkAsUnused()
{
return markAsUnused;
}

@JsonProperty
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public int getBatchSize()
Expand Down Expand Up @@ -214,16 +184,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
// Track stats for reporting
int numSegmentsKilled = 0;
int numBatchesProcessed = 0;
final int numSegmentsMarkedAsUnused;
if (markAsUnused) {
numSegmentsMarkedAsUnused = toolbox.getTaskActionClient().submit(
new MarkSegmentsAsUnusedAction(getDataSource(), getInterval())
);
LOG.info("Marked [%d] segments of datasource[%s] in interval[%s] as unused.",
numSegmentsMarkedAsUnused, getDataSource(), getInterval());
} else {
numSegmentsMarkedAsUnused = 0;
}
final int numSegmentsMarkedAsUnused = 0;

// List unused segments
int nextBatchSize = computeNextBatchSize(numSegmentsKilled);
Expand Down Expand Up @@ -346,4 +307,32 @@ public LookupLoadingSpec getLookupLoadingSpec()
{
return LookupLoadingSpec.NONE;
}

@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
final boolean useConcurrentLocks = Boolean.TRUE.equals(
getContextValue(
Tasks.USE_CONCURRENT_LOCKS,
Tasks.DEFAULT_USE_CONCURRENT_LOCKS
)
);

TaskLockType defaultLockType = useConcurrentLocks ? TaskLockType.REPLACE : TaskLockType.EXCLUSIVE;

TaskLockType actualLockType = getContextValue(Tasks.TASK_LOCK_TYPE, defaultLockType);

final TaskLock lock = taskActionClient.submit(
new TimeChunkLockTryAcquireAction(
actualLockType,
getInterval()
)
);
if (lock == null) {
return false;
}
lock.assertNotRevoked();
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() thro
"datasource",
Intervals.of("2020-01-01/P1D"),
null,
false,
99,
5,
DateTimes.nowUtc()
Expand All @@ -65,7 +64,6 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() thro
Assert.assertEquals(taskQuery.getDataSource(), fromJson.getDataSource());
Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval());
Assert.assertNull(taskQuery.getVersions());
Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused());
Assert.assertEquals(taskQuery.getBatchSize(), Integer.valueOf(fromJson.getBatchSize()));
Assert.assertEquals(taskQuery.getLimit(), fromJson.getLimit());
Assert.assertEquals(taskQuery.getMaxUsedStatusLastUpdatedTime(), fromJson.getMaxUsedStatusLastUpdatedTime());
Expand All @@ -79,7 +77,6 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTaskDefault
"datasource",
Intervals.of("2020-01-01/P1D"),
null,
true,
null,
null,
null
Expand All @@ -90,7 +87,6 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTaskDefault
Assert.assertEquals(taskQuery.getDataSource(), fromJson.getDataSource());
Assert.assertEquals(taskQuery.getInterval(), fromJson.getInterval());
Assert.assertNull(taskQuery.getVersions());
Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused());
Assert.assertEquals(100, fromJson.getBatchSize());
Assert.assertNull(taskQuery.getLimit());
Assert.assertNull(taskQuery.getMaxUsedStatusLastUpdatedTime());
Expand All @@ -105,7 +101,6 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro
Intervals.of("2020-01-01/P1D"),
null,
null,
true,
99,
null,
null
Expand All @@ -119,7 +114,6 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro
Assert.assertEquals(task.getDataSource(), taskQuery.getDataSource());
Assert.assertEquals(task.getInterval(), taskQuery.getInterval());
Assert.assertNull(taskQuery.getVersions());
Assert.assertEquals(task.isMarkAsUnused(), taskQuery.getMarkAsUnused());
Assert.assertEquals(Integer.valueOf(task.getBatchSize()), taskQuery.getBatchSize());
Assert.assertNull(taskQuery.getLimit());
Assert.assertNull(taskQuery.getMaxUsedStatusLastUpdatedTime());
Expand All @@ -134,7 +128,6 @@ public void testKillUnusedSegmentsTaskWithNonNullValuesToClientKillUnusedSegment
Intervals.of("2020-01-01/P1D"),
ImmutableList.of("v1", "v2"),
null,
null,
99,
100,
DateTimes.nowUtc()
Expand All @@ -148,7 +141,6 @@ public void testKillUnusedSegmentsTaskWithNonNullValuesToClientKillUnusedSegment
Assert.assertEquals(task.getDataSource(), taskQuery.getDataSource());
Assert.assertEquals(task.getInterval(), taskQuery.getInterval());
Assert.assertEquals(task.getVersions(), taskQuery.getVersions());
Assert.assertNull(taskQuery.getMarkAsUnused());
Assert.assertEquals(Integer.valueOf(task.getBatchSize()), taskQuery.getBatchSize());
Assert.assertEquals(task.getLimit(), taskQuery.getLimit());
Assert.assertEquals(task.getMaxUsedStatusLastUpdatedTime(), taskQuery.getMaxUsedStatusLastUpdatedTime());
Expand Down

0 comments on commit 05014be

Please sign in to comment.