Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport] Add new index on datasource and task_allocator_id for pending segmens (#16355) #16357

Merged
merged 1 commit into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1242,7 +1242,10 @@ public void remove(final Task task)
idsInSameGroup.remove(task.getId());
if (idsInSameGroup.isEmpty()) {
final int pendingSegmentsDeleted
= metadataStorageCoordinator.deletePendingSegmentsForTaskAllocatorId(taskAllocatorId);
= metadataStorageCoordinator.deletePendingSegmentsForTaskAllocatorId(
task.getDataSource(),
taskAllocatorId
);
log.info(
"Deleted [%d] entries from pendingSegments table for pending segments group [%s] with APPEND locks.",
pendingSegmentsDeleted, taskAllocatorId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1948,8 +1948,10 @@ public void testCleanupOnUnlock()
// Only the replaceTask should attempt a delete on the upgradeSegments table
EasyMock.expect(coordinator.deleteUpgradeSegmentsForTask(replaceTask.getId())).andReturn(0).once();
// Any task may attempt pending segment clean up
EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(replaceTask.getId())).andReturn(0).once();
EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(appendTask.getId())).andReturn(0).once();
EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(replaceTask.getDataSource(), replaceTask.getId()))
.andReturn(0).once();
EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(appendTask.getDataSource(), appendTask.getId()))
.andReturn(0).once();
EasyMock.replay(coordinator);

final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, coordinator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public int deleteUpgradeSegmentsForTask(final String taskId)
}

@Override
public int deletePendingSegmentsForTaskAllocatorId(final String taskGroup)
public int deletePendingSegmentsForTaskAllocatorId(final String datasource, final String taskAllocatorId)
{
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,10 +492,11 @@ SegmentPublishResult commitMetadataOnly(

/**
* Delete pending segment for a give task group after all the tasks belonging to it have completed.
* @param datasource datasource of the task
* @param taskAllocatorId task id / task group / replica group for an appending task
* @return number of pending segments deleted from the metadata store
*/
int deletePendingSegmentsForTaskAllocatorId(String taskAllocatorId);
int deletePendingSegmentsForTaskAllocatorId(String datasource, String taskAllocatorId);

/**
* Fetches all the pending segments of the datasource that overlap with a given interval.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2813,17 +2813,18 @@ public DataSegment retrieveSegmentForId(final String id, boolean includeUnused)
}

@Override
public int deletePendingSegmentsForTaskAllocatorId(final String pendingSegmentsGroup)
public int deletePendingSegmentsForTaskAllocatorId(final String datasource, final String taskAllocatorId)
{
return connector.getDBI().inTransaction(
(handle, status) -> handle
.createStatement(
StringUtils.format(
"DELETE FROM %s WHERE task_allocator_id = :task_allocator_id",
"DELETE FROM %s WHERE dataSource = :dataSource AND task_allocator_id = :task_allocator_id",
dbTables.getPendingSegmentsTable()
)
)
.bind("task_allocator_id", pendingSegmentsGroup)
.bind("dataSource", datasource)
.bind("task_allocator_id", taskAllocatorId)
.execute()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ tableName, getPayloadType(), getQuoteString(), getCollation()
)
)
);
alterPendingSegmentsTableAddParentIdAndTaskGroup(tableName);
alterPendingSegmentsTable(tableName);
}

public void createDataSourceTable(final String tableName)
Expand Down Expand Up @@ -481,7 +481,16 @@ private void alterEntryTableAddTypeAndGroupId(final String tableName)
}
}

private void alterPendingSegmentsTableAddParentIdAndTaskGroup(final String tableName)
/**
* Adds the following columns to the pending segments table to clean up unused records,
* and to faciliatate concurrent append and replace.
* 1) task_allocator_id -> The task id / task group id / task replica group id of the task that allocated it.
* 2) upgraded_from_segment_id -> The id of the segment from which the entry was upgraded upon concurrent replace.
*
* Also, adds an index on (dataSource, task_allocator_id)
* @param tableName name of the pending segments table
*/
private void alterPendingSegmentsTable(final String tableName)
{
List<String> statements = new ArrayList<>();
if (tableHasColumn(tableName, "upgraded_from_segment_id")) {
Expand All @@ -499,6 +508,14 @@ private void alterPendingSegmentsTableAddParentIdAndTaskGroup(final String table
if (!statements.isEmpty()) {
alterTable(tableName, statements);
}

final Set<String> createdIndexSet = getIndexOnTable(tableName);
createIndex(
tableName,
StringUtils.format("idx_%1$s_datasource_task_allocator_id", tableName),
ImmutableList.of("dataSource", "task_allocator_id"),
createdIndexSet
);
}

public void createLogTable(final String tableName, final String entryTypeName)
Expand Down