Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,84 +54,97 @@ public String getTaskType() {

@Override
public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
String taskType = RefreshSegmentTask.TASK_TYPE;
List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
PinotHelixResourceManager pinotHelixResourceManager = _clusterInfoAccessor.getPinotHelixResourceManager();

int tableNumTasks = 0;

for (TableConfig tableConfig : tableConfigs) {
String tableNameWithType = tableConfig.getTableName();
LOGGER.info("Start generating RefreshSegment tasks for table: {}", tableNameWithType);

// Get the task configs for the table. This is used to restrict the maximum number of allowed tasks per table at
// any given point.
Map<String, String> taskConfigs;
TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
if (tableTaskConfig == null) {
LOGGER.warn("Failed to find task config for table: {}", tableNameWithType);
LOGGER.warn("Failed to find task config for table: {}", tableConfig.getTableName());
continue;
}
taskConfigs = tableTaskConfig.getConfigsForTaskType(RefreshSegmentTask.TASK_TYPE);
Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null for Table: %s", tableNameWithType);
int tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE;
String tableMaxNumTasksConfig = taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
if (tableMaxNumTasksConfig != null) {
try {
tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig);
} catch (Exception e) {
tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE;
LOGGER.warn("MaxNumTasks have been wrongly set for table : {}, and task {}", tableNameWithType, taskType);
}
pinotTaskConfigs.addAll(generateTasksForTable(tableConfig, taskConfigs));
}

return pinotTaskConfigs;
}

@Override
public List<PinotTaskConfig> generateTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
throws Exception {
return generateTasksForTable(tableConfig, taskConfigs);
}

private List<PinotTaskConfig> generateTasksForTable(TableConfig tableConfig, Map<String, String> taskConfigs) {
String tableNameWithType = tableConfig.getTableName();
Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null for Table: %s", tableNameWithType);


String taskType = RefreshSegmentTask.TASK_TYPE;
List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
PinotHelixResourceManager pinotHelixResourceManager = _clusterInfoAccessor.getPinotHelixResourceManager();

LOGGER.info("Start generating RefreshSegment tasks for table: {}", tableNameWithType);

int tableNumTasks = 0;
int tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not introduced in this PR but any rationale for keeping RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE default value as 20.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No particular reason - chose a low value to make sure we don't put stress on deployments that have few minions. This can be overriden with table config.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm one improvement can be to keep it equal to number of partitions in a table and process one segment per partition (or instance) by default. This way the load gets normalized across hosts. We can do this as a follow-up too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Can you please add these thoughts to this issue - #14483 where we can do it as a followup?

String tableMaxNumTasksConfig = taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
if (tableMaxNumTasksConfig != null) {
try {
tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig);
} catch (Exception e) {
tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE;
LOGGER.warn("MaxNumTasks have been wrongly set for table : {}, and task {}", tableNameWithType, taskType);
}
}

// Get info about table and schema.
Stat tableStat = pinotHelixResourceManager.getTableStat(tableNameWithType);
Schema schema = pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
Stat schemaStat = pinotHelixResourceManager.getSchemaStat(schema.getSchemaName());

// Get the running segments for a table.
Set<Segment> runningSegments =
TaskGeneratorUtils.getRunningSegments(RefreshSegmentTask.TASK_TYPE, _clusterInfoAccessor);

// Make a single ZK call to get the segments.
List<SegmentZKMetadata> allSegments = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);

for (SegmentZKMetadata segmentZKMetadata : allSegments) {
// Skip if we have reached the maximum number of permissible tasks per iteration.
if (tableNumTasks >= tableMaxNumTasks) {
break;
}

// Get info about table and schema.
Stat tableStat = pinotHelixResourceManager.getTableStat(tableNameWithType);
Schema schema = pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
Stat schemaStat = pinotHelixResourceManager.getSchemaStat(schema.getSchemaName());

// Get the running segments for a table.
Set<Segment> runningSegments =
TaskGeneratorUtils.getRunningSegments(RefreshSegmentTask.TASK_TYPE, _clusterInfoAccessor);

// Make a single ZK call to get the segments.
List<SegmentZKMetadata> allSegments = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);

for (SegmentZKMetadata segmentZKMetadata : allSegments) {
// Skip if we have reached the maximum number of permissible tasks per iteration.
if (tableNumTasks >= tableMaxNumTasks) {
break;
}

// Skip consuming segments.
if (tableConfig.getTableType() == TableType.REALTIME && !segmentZKMetadata.getStatus().isCompleted()) {
continue;
}

// Skip segments for which a task is already running.
if (runningSegments.contains(new Segment(tableNameWithType, segmentZKMetadata.getSegmentName()))) {
continue;
}

String segmentName = segmentZKMetadata.getSegmentName();

// Skip if the segment is already up-to-date and doesn't have to be refreshed.
if (!shouldRefreshSegment(segmentZKMetadata, tableConfig, tableStat, schemaStat)) {
continue;
}

Map<String, String> configs = new HashMap<>(getBaseTaskConfigs(tableConfig, List.of(segmentName)));
configs.put(MinionConstants.DOWNLOAD_URL_KEY, segmentZKMetadata.getDownloadUrl());
configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments");
configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, String.valueOf(segmentZKMetadata.getCrc()));
pinotTaskConfigs.add(new PinotTaskConfig(taskType, configs));
tableNumTasks++;
// Skip consuming segments.
if (tableConfig.getTableType() == TableType.REALTIME && !segmentZKMetadata.getStatus().isCompleted()) {
continue;
}

// Skip segments for which a task is already running.
if (runningSegments.contains(new Segment(tableNameWithType, segmentZKMetadata.getSegmentName()))) {
continue;
}

String segmentName = segmentZKMetadata.getSegmentName();

// Skip if the segment is already up-to-date and doesn't have to be refreshed.
if (!shouldRefreshSegment(segmentZKMetadata, tableConfig, tableStat, schemaStat)) {
continue;
}

LOGGER.info("Finished generating {} tasks configs for table: {} " + "for task: {}", tableNumTasks,
tableNameWithType, taskType);
Map<String, String> configs = new HashMap<>(getBaseTaskConfigs(tableConfig, List.of(segmentName)));
configs.put(MinionConstants.DOWNLOAD_URL_KEY, segmentZKMetadata.getDownloadUrl());
configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments");
configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, String.valueOf(segmentZKMetadata.getCrc()));
pinotTaskConfigs.add(new PinotTaskConfig(taskType, configs));
tableNumTasks++;
}

LOGGER.info("Finished generating {} tasks configs for table: {} for task: {}", tableNumTasks, tableNameWithType,
taskType);
return pinotTaskConfigs;
}

Expand Down