Skip to content

Commit

Permalink
Add new index on datasource and task_allocator_id for pending segments (
Browse files Browse the repository at this point in the history
#16355)

* Add pending segments index on datasource and task_allocator_id

* Use both datasource and task_allocator_id in queries
  • Loading branch information
AmatyaAvadhanula committed Apr 30, 2024
1 parent e695e52 commit 42e99bf
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1242,7 +1242,10 @@ public void remove(final Task task)
idsInSameGroup.remove(task.getId());
if (idsInSameGroup.isEmpty()) {
final int pendingSegmentsDeleted
= metadataStorageCoordinator.deletePendingSegmentsForTaskAllocatorId(taskAllocatorId);
= metadataStorageCoordinator.deletePendingSegmentsForTaskAllocatorId(
task.getDataSource(),
taskAllocatorId
);
log.info(
"Deleted [%d] entries from pendingSegments table for pending segments group [%s] with APPEND locks.",
pendingSegmentsDeleted, taskAllocatorId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1948,8 +1948,10 @@ public void testCleanupOnUnlock()
// Only the replaceTask should attempt a delete on the upgradeSegments table
EasyMock.expect(coordinator.deleteUpgradeSegmentsForTask(replaceTask.getId())).andReturn(0).once();
// Any task may attempt pending segment clean up
EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(replaceTask.getId())).andReturn(0).once();
EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(appendTask.getId())).andReturn(0).once();
EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(replaceTask.getDataSource(), replaceTask.getId()))
.andReturn(0).once();
EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(appendTask.getDataSource(), appendTask.getId()))
.andReturn(0).once();
EasyMock.replay(coordinator);

final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, coordinator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public int deleteUpgradeSegmentsForTask(final String taskId)
}

@Override
public int deletePendingSegmentsForTaskAllocatorId(final String taskGroup)
public int deletePendingSegmentsForTaskAllocatorId(final String datasource, final String taskAllocatorId)
{
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,10 +492,11 @@ SegmentPublishResult commitMetadataOnly(

/**
* Delete pending segment for a give task group after all the tasks belonging to it have completed.
* @param datasource datasource of the task
* @param taskAllocatorId task id / task group / replica group for an appending task
* @return number of pending segments deleted from the metadata store
*/
int deletePendingSegmentsForTaskAllocatorId(String taskAllocatorId);
int deletePendingSegmentsForTaskAllocatorId(String datasource, String taskAllocatorId);

/**
* Fetches all the pending segments of the datasource that overlap with a given interval.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2813,17 +2813,18 @@ public DataSegment retrieveSegmentForId(final String id, boolean includeUnused)
}

@Override
public int deletePendingSegmentsForTaskAllocatorId(final String pendingSegmentsGroup)
public int deletePendingSegmentsForTaskAllocatorId(final String datasource, final String taskAllocatorId)
{
return connector.getDBI().inTransaction(
(handle, status) -> handle
.createStatement(
StringUtils.format(
"DELETE FROM %s WHERE task_allocator_id = :task_allocator_id",
"DELETE FROM %s WHERE dataSource = :dataSource AND task_allocator_id = :task_allocator_id",
dbTables.getPendingSegmentsTable()
)
)
.bind("task_allocator_id", pendingSegmentsGroup)
.bind("dataSource", datasource)
.bind("task_allocator_id", taskAllocatorId)
.execute()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ tableName, getPayloadType(), getQuoteString(), getCollation()
)
)
);
alterPendingSegmentsTableAddParentIdAndTaskGroup(tableName);
alterPendingSegmentsTable(tableName);
}

public void createDataSourceTable(final String tableName)
Expand Down Expand Up @@ -481,7 +481,16 @@ private void alterEntryTableAddTypeAndGroupId(final String tableName)
}
}

private void alterPendingSegmentsTableAddParentIdAndTaskGroup(final String tableName)
/**
* Adds the following columns to the pending segments table to clean up unused records,
* and to faciliatate concurrent append and replace.
* 1) task_allocator_id -> The task id / task group id / task replica group id of the task that allocated it.
* 2) upgraded_from_segment_id -> The id of the segment from which the entry was upgraded upon concurrent replace.
*
* Also, adds an index on (dataSource, task_allocator_id)
* @param tableName name of the pending segments table
*/
private void alterPendingSegmentsTable(final String tableName)
{
List<String> statements = new ArrayList<>();
if (tableHasColumn(tableName, "upgraded_from_segment_id")) {
Expand All @@ -499,6 +508,14 @@ private void alterPendingSegmentsTableAddParentIdAndTaskGroup(final String table
if (!statements.isEmpty()) {
alterTable(tableName, statements);
}

final Set<String> createdIndexSet = getIndexOnTable(tableName);
createIndex(
tableName,
StringUtils.format("idx_%1$s_datasource_task_allocator_id", tableName),
ImmutableList.of("dataSource", "task_allocator_id"),
createdIndexSet
);
}

public void createLogTable(final String tableName, final String entryTypeName)
Expand Down

0 comments on commit 42e99bf

Please sign in to comment.