Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize overlord GET /tasks memory usage #12404

Conversation

AmatyaAvadhanula
Copy link
Contributor

@AmatyaAvadhanula AmatyaAvadhanula commented Apr 6, 2022

The web-console (indirectly) calls the Overlord’s GET tasks API to fetch the tasks' summary which in turn queries the metadata tasks table. This query tries to fetch several columns, including payload, of all the rows at once. This introduces a significant memory overhead and can cause unresponsiveness or overlord failure when the ingestion tab is opened multiple times (due to several parallel calls to this API)

Another thing to note is that the task table (the payload column in particular) can be very large. Extracting large payloads from such tables can be very slow, leading to slow UI. While we are fixing the memory pressure in the overlord, we can also fix the slowness in UI caused by fetching large payloads from the table. Fetching large payloads also puts pressure on the metadata store as reported in the community (Metadata store query performance degrades as the tasks in druid_tasks table grows · Issue #12318 · apache/druid )

The task summaries returned as a response for the API are several times smaller and can fit comfortably in memory. So, there is an opportunity here to fix the memory usage, slow ingestion, and under-pressure metadata store by removing the need to handle large payloads in every layer we can. Of course, the solution becomes complex as we try to fix more layers. With that in mind, this page captures two approaches. They vary in complexity and also in the degree to which they fix the aforementioned problems.

Approach 1:
A simple solution that mitigates the memory usage is to use streaming SQL queries and process the payload blob immediately to extract the necessary fields. This removes the current logic in the overlord that buffers the heavy payloads in memory. So at any point in time, you only keep the minimum required data in overlord memory.

PROS:

Mitigates the memory usage problem without “complex” changes.

CONS:

The SQL calls, and the processing overhead to deserialize the payload remain the same as before.

While this mitigates the memory problem, it can still be unresponsive when there are several tasks that need to be fetched in parallel.

It also doesn’t help with the overhead seen on the metadata store side.

Final approach:
This approach proposes adding two new columns without modifying the rest of the columns' data, to the MySQL table. These fields are task type and task group id. We only need these two fields in the API result and we currently extract them from the payload blob. Since we are fetching the minimal required data, the calls to the metadata store will be faster and reduce the network throughput as well.

Since this approach involves a schema change for existing clusters, we need to migrate the task history that is already stored at the time of upgrade. We also need to ensure that the new tasks use these columns to store task type and group id. And of course, the new query for this API utilizes these fields instead of the payload column.

Local testing confirms that it is significantly more responsive when there are multiple concurrent calls to a sizeable task set.

Please find the detailed changes below

Schema changes
Alter the table to add the two new columns in the schema if not already present. This alteration will be executed from the java code. It is assumed that the druid services have permission to execute DDL instructions in the database.

Task history migration
After the schema changes are done, we need to migrate the task history. These two steps will be done in sequence.

Migration can be done in a separate thread, kicked off at startup. This thread will continue to check for completed tasks in a loop till all the complete tasks have been migrated.

  • fetch completed task rows with a limit (say 100)

  • parse the payload to obtain type, groupId

  • update the group id and task type in the rows corresponding to these tasks

  • Repeat

We need to ensure that we do not repeat this process for new tasks that are already populating the new fields. We also need to ensure that already migrated tasks are not visited again.

Task peristence
Any new task must now persist the new columns to the table

Task history query
The new mysql query ignores the payload, and has a much smaller footprint. Instead of passing the entire Task in the TaskInfo wrapper, we now pass a map of strings with keys corresponding to the new columns, which are utilized by the parsing utility in OverlordResource. While the migration is ongoing, we will utilize the query of Approach 1 to avoid memory issues, and still populate the columns. If the migration succeeds, a local state will be maintained to ensure that the new query is used.

PROS:

Mitigates both the memory and responsiveness issues in a holistic way.

CONS:

Requires schema changes and data migration.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@AmatyaAvadhanula AmatyaAvadhanula marked this pull request as draft April 6, 2022 14:29
@lgtm-com
Copy link

lgtm-com bot commented Apr 14, 2022

This pull request introduces 2 alerts when merging c784c3a into 5824ab9 - view on LGTM.com

new alerts:

  • 2 for Potential database resource leak

@AmatyaAvadhanula AmatyaAvadhanula marked this pull request as ready for review April 20, 2022 11:02
@@ -170,6 +172,25 @@ public List<Task> getActiveTasksByDatasource(String datasource)
return listBuilder.build();
}

private TaskStatusPlus toTaskStatusPlus(TaskInfo<Task, TaskStatus> taskInfo)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not make this a static method on TaskStatusPlus? Is it really important for it to be private to this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed the API by moving the entire migration logic to SQLMetadataActionHandler

: completeTaskLookup.withDurationBeforeNow(config.getRecentlyFinishedThreshold()),
datasource
).stream()
.map(taskInfo -> toTaskStatusPlus(taskInfo))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: might as well just pass toTaskStatusPlus it can be a lambda on its own.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

{
if (!tableContainsColumn(handle, tableName, "type")) {
log.info("Adding column: type to table[%s]", tableName);
handle.execute(StringUtils.format("ALTER TABLE %1$s ADD COLUMN type VARCHAR(255)", tableName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We likely want to explicitly default the column to null. This tends to avoid locks on DDL tables, where if there is a default value (or if the default that the ALTER table uses is non null), then the ALTER TABLE can end up locking the table and that can end up causing other sadness.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the documentation, it appears that new columns without constraints are nullable by default, and that the default value of a nullable column is always NULL.

Adding NULL explicitly is not supported in certain cases (derby for example)

ObjectNode payload = objectMapper.readValue(resultSet.getBytes("payload"), ObjectNode.class);
resultSet.updateString("type", payload.get("type").asText());
resultSet.updateString("group_id", payload.get("groupId").asText());
resultSet.updateRow();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing updates like this in the same transaction is overkill and might get into weird locking behaviors on the table. It's best not to do it. Each update should be effectively 2 queries: 1 to get the task payloads, that would exit and return all DB resources and then a second one to actually issue an UPDATE command to set the extra fields based on the taskId.

When the first query stops returning results, that's when you know the migration is complete.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fwiw, you probably don't even need to set a "limit" clause on the query anymore if you follow what I suggested above. Basically, if you just tell JDBI to only give you the first 100 things from the ResultSet, it should be able to just return them and then when you close the cursor, the DB will close things and be happy-ish.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separated read and update into separate transactions during migration and resources are released after each iteration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added logs and a sleep after each iteration

public List<TaskStatusPlus> getTaskStatusPlusList(
Map<TaskLookupType, TaskLookup> taskLookups,
@Nullable String dataSource,
boolean fetchPayload
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This boolean here about fetching payload is weird, especially on a public method. This class should already know if it shoudl be using the payload or not. Once you move the mgiration logic into this class, you won't need this anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 126 to 128
taskMigrationCompleteFuture = executorService.submit(() -> {
return metadataStorageConnector.migrateTaskTable();
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can be lambda simplified.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


// New columns based fetch before migration is complete.
// type and payload are not null for completed task but are still null for active ones since they aren't migrated
// An active task will be eventually updated on its own due to insertion
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A task that was ACTIVE before the migration continues to have the columns type and groupId as null even after the migration (atleast until the task completes and is finally updated in the db).

How is this handled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The webconsole will show empty values now. Possible solutions:

  1. I think a simple check could be to just use the query requiring payload for both cases. Inefficient
  2. The class holds a boolean state indicating which active query to use.
    It checks if there is at least one active query with null type / groupId, in which case it fetches payload
    If not, update the boolean state and only use the new query for active tasks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that there will be two boolean states for the query / mapper combination.

  1. Which query to use for completed tasks determinted by completion of migration of completed tasks
  2. Which query to use for active tasks determined by whether there exist any active tasks with null type. Once set to true, this check need not happen. This should be far more efficient than trying to use the payload query for each call to active tasks when the number of active tasks themselves can be high

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As soon as any state update happens on the active task, it will end up having the fields. The time-lag to when that will happen is on the order of minutes. As long as the console doesn't explode on these being null (or we can inject an "unknown" to work around any potential explosions), it should be fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be a state update called modifyState during which the type and groupId need to be altered.
If there are any other state update operations, type / groupId update has to happen alongside each of them

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the potential issues of trying to update the type and groupId of active tasks during migration as well?
The tasks will be processed as small batches and the resources will be returned after each read / update of each of these as you had suggested.

PROS:

  1. Fetching and updating after (reverse) sorting by created_time ensures that we utilize the index while also enabling storage of the timestamp as an internal state in SQLMetadataStorageActionHandler. This timestamp can then be used to query all tasks before it using the payload query, and the ones after with the new query.
  2. The above also means that we don't have to fetch the payload for all tasks during the migration, and only those for which migration has yet to happen.
  3. Active tasks with empty type / groupId will not occur in the webconsole
  4. We only need to handle type / groupId insertion during task insertion. Any new / existing method to update the task in the db doesn't have to be handled

CONS:
?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Active tasks have been migrated as a simple fix

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fixes, @AmatyaAvadhanula ! I have requested some more changes.

I am little uncomfortable with the method getTaskMetadataInfos which returns a List<TaskInfo<TaskMetadata, TaskStatus>>. This seems like a very roundabout way of just fetching task statuses. But I don't have a better suggestion right now. I will let you know if something better comes to mind.

It would be nice if you could please share some test results and also how these changes would affect an upgrade.

@Override
public Void withHandle(Handle handle)
{
if (!tableContainsColumn(handle, tableName, "type")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this is done yet. The code still seems to be checking tableContainsColumn() twice, once for group_id and once for type. They will either both be there or neither.

import java.util.Objects;

/**
* Model class containing the fields relevant to view tasks in the ingestion tab.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment seems too specific to web-console. This javadoc should just explain the contents of this model class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* Model class containing the fields relevant to view tasks in the ingestion tab.
* These fields are extracted from the task payload for the new schema and this model can be used for migration as well.
*/
public class TaskMetadata
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskMetadata is a little misleading. A better name for this class would be TaskIdentifier or simply TaskId.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -252,4 +254,29 @@ public String toString()
", errorMsg='" + errorMsg + '\'' +
'}';
}

/**
* Convert a TaskInfo pair of TaskMetadata and TaskStatus to a TaskStatusPlus
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: TaskInfo is not exactly a Pair even though there are similarities.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

* @param tasksTable
* @return
*/
boolean migrateTaskTable(String tasksTable);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method name is too vague. Please rename to what it exactly does, e.g. populateTaskTypeAndGroupId

Why should tasksTable be passed here? You can use the entryTable field inside SQLMetadataStorageActionHandler if needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

@Override
public String getTaskTableName()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, removed

}

private Stream<TaskInfo<Task, TaskStatus>> getTaskInfoStreamFromTaskStorage(
private Stream<TaskStatusPlus> getTaskStatusPlusList(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should just return a List and all the filtering should happen in the calling method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filtering has been happening before my changes, I haven't made semantic changes to this method

final Query<Map<String, Object>> query;
switch (entry.getKey()) {
case ACTIVE:
query = !fetchPayload
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: use condition on fetchPayload rather than !fetchPayload

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

CompleteTaskLookup completeTaskLookup = (CompleteTaskLookup) entry.getValue();
DateTime priorTo = completeTaskLookup.getTasksCreatedPriorTo();
Integer limit = completeTaskLookup.getMaxTaskStatuses();
query = !fetchPayload
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: use condition on fetchPayload rather than !fetchPayload

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

log.warn(e, "Task migration failed while updating entries in task table");
return false;
}
id = taskMetadatas.get(taskMetadatas.size() - 1).getId();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In every iteration of the loop, it might be useful to add a debug log which prints the id upto which processing has been done so far.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an info log indicating the total number of tasks migrated after each iteration. Also added a sleep of 1 second

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some more comments.

A question regarding migration:
It seems that the migration method would be called on every start up.
In batches of 100, it would go through all taskIds to see if anything has the type column as null and try to populate them.

Shouldn't this behaviour stop once everything is migrated?

Maybe fire just one query at the start to get the total count of all tasks that need migrating. And only if that count is non-zero, do we proceed with the actual batch-wise migration.

@@ -391,6 +571,109 @@ private String getWhereClauseForActiveStatusesQuery(String dataSource)
return sql;
}

class TaskMetadataInfoMapperFromPayload implements ResultSetMapper<TaskInfo<TaskMetadata, StatusType>>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be private.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
}

class TaskMetadataInfoMapper implements ResultSetMapper<TaskInfo<TaskMetadata, StatusType>>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be private.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 608 to 611
private TaskInfo<TaskMetadata, StatusType> toTaskMetadataInfo(ObjectMapper objectMapper,
ResultSet resultSet,
boolean usePayload
) throws SQLException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: formatting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* @param taskLookups task lookup type and filters.
* @param datasource datasource filter
*/
List<TaskInfo<TaskMetadata, StatusType>> getTaskMetadataInfos(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These new methods should be called getTaskStatus so that the intent is clear. getTaskMetadataInfos is a little vague.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used getTaskStatusList

* @param taskLookups task lookup type and filters.
* @param datasource datasource filter
*/
List<TaskInfo<TaskMetadata, StatusType>> getTaskMetadataInfosFromPayload(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mark this as deprecated.
Rename to getTaskStatusWithPayload

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed. I'm not sure if it should be marked as deprecated.
This method must be used before migration for all usecases

Comment on lines 131 to 144
* Please use this method to fetch task for viewing on ingestion tab only before task migration
* This deserializes the payload column to get the required fields, and has a greater overhead
* Returns a list of TaskInfo for the tasks corresponding to the given filters
* The TaskInfo comprises the TaskMetadata which is significantly smaller than a Task, and the TaskStatus
* These are sufficient to create the TaskStatusPlus for a given Task, and prevent unnecessary memory usage
*
* If {@code taskLookups} includes {@link TaskLookupType#ACTIVE}, it returns all active tasks in the metadata store.
* If {@code taskLookups} includes {@link TaskLookupType#COMPLETE}, it returns all complete tasks in the metadata
* store. For complete tasks, additional filters in {@code CompleteTaskLookup} can be applied.
* All lookups should be processed atomically if more than one lookup is given.
*
* fetchPayload determines the query used to fetch from the tasks table
* If true, fetch the payload and deserialize it to obtain the above fields
* Else, use the newly created type and group_id columns in the query for task summaries
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Please use this method to fetch task for viewing on ingestion tab only before task migration
* This deserializes the payload column to get the required fields, and has a greater overhead
* Returns a list of TaskInfo for the tasks corresponding to the given filters
* The TaskInfo comprises the TaskMetadata which is significantly smaller than a Task, and the TaskStatus
* These are sufficient to create the TaskStatusPlus for a given Task, and prevent unnecessary memory usage
*
* If {@code taskLookups} includes {@link TaskLookupType#ACTIVE}, it returns all active tasks in the metadata store.
* If {@code taskLookups} includes {@link TaskLookupType#COMPLETE}, it returns all complete tasks in the metadata
* store. For complete tasks, additional filters in {@code CompleteTaskLookup} can be applied.
* All lookups should be processed atomically if more than one lookup is given.
*
* fetchPayload determines the query used to fetch from the tasks table
* If true, fetch the payload and deserialize it to obtain the above fields
* Else, use the newly created type and group_id columns in the query for task summaries
* Returns the statuses of the specified tasks. Implementations of this method may
* read from the corresponding task payloads to retrieve task information.
*
* This method is deprecated and {@link #getTaskStatus} should be used instead.
*
* If {@code taskLookups} includes {@link TaskLookupType#ACTIVE}, it returns all active tasks in the metadata store.
* If {@code taskLookups} includes {@link TaskLookupType#COMPLETE}, it returns all complete tasks in the metadata
* store. For complete tasks, additional filters in {@code CompleteTaskLookup} can be applied.
* All lookups should be processed atomically if more than one lookup is given.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. Has been applied

Comment on lines 107 to 120
* This is the recommended method to fetch Tasks for the task view
* This utilizes the new type and group_id columns and should be utilized after migration
* Returns a list of TaskInfo for the tasks corresponding to the given filters
* The TaskInfo comprises the TaskMetadata which is significantly smaller than a Task, and the TaskStatus
* These are sufficient to create the TaskStatusPlus for a given Task, and prevent unnecessary memory usage
*
* If {@code taskLookups} includes {@link TaskLookupType#ACTIVE}, it returns all active tasks in the metadata store.
* If {@code taskLookups} includes {@link TaskLookupType#COMPLETE}, it returns all complete tasks in the metadata
* store. For complete tasks, additional filters in {@code CompleteTaskLookup} can be applied.
* All lookups should be processed atomically if more than one lookup is given.
*
* fetchPayload determines the query used to fetch from the tasks table
* If true, fetch the payload and deserialize it to obtain the above fields
* Else, use the newly created type and group_id columns in the query for task summaries
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* This is the recommended method to fetch Tasks for the task view
* This utilizes the new type and group_id columns and should be utilized after migration
* Returns a list of TaskInfo for the tasks corresponding to the given filters
* The TaskInfo comprises the TaskMetadata which is significantly smaller than a Task, and the TaskStatus
* These are sufficient to create the TaskStatusPlus for a given Task, and prevent unnecessary memory usage
*
* If {@code taskLookups} includes {@link TaskLookupType#ACTIVE}, it returns all active tasks in the metadata store.
* If {@code taskLookups} includes {@link TaskLookupType#COMPLETE}, it returns all complete tasks in the metadata
* store. For complete tasks, additional filters in {@code CompleteTaskLookup} can be applied.
* All lookups should be processed atomically if more than one lookup is given.
*
* fetchPayload determines the query used to fetch from the tasks table
* If true, fetch the payload and deserialize it to obtain the above fields
* Else, use the newly created type and group_id columns in the query for task summaries
* Returns the statuses of the specified tasks. Implementations of this method must not
* read the task payload from the underlying storage as it may increase memory usage.
*
* If {@code taskLookups} includes {@link TaskLookupType#ACTIVE}, it returns all active tasks in the metadata store.
* If {@code taskLookups} includes {@link TaskLookupType#COMPLETE}, it returns all complete tasks in the metadata
* store. For complete tasks, additional filters in {@code CompleteTaskLookup} can be applied.
* All lookups should be processed atomically if more than one lookup is given.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines +56 to +57
@NotNull String type,
@NotNull String groupId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: These arguments should probably come earlier, maybe right after id.

* @param dataSource datasource to which the tasks belong. null if we don't want to filter
* @return Query object for completed TaskInfos of interest
*/
protected Query<Map<String, Object>> createCompletedTaskStreamingQuery(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These query builder methods can probably be private.
Also see if it is possible to combine any of these methods as it seems that the queries are mostly repeated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


public void prepareTaskEntryTable(final String tableName)
{
createEntryTable(tableName);
Copy link
Contributor

@capistrant capistrant Jun 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not update createEntryTable to create the table how we want it going forward so no alert needs to be run every time a new cluster is coming online?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@capistrant it was left as it as is since the subsequent update would have the same effect.
But yes, I'll modify the createEntryTable method. Thanks for the feedback

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new flow to avoid additional db calls would be the following, right?

If table exists and column does not exist:
  Update schema
Else:
  Create table with new schema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also wanted to confirm if you meant avoiding additional db calls to check and update the schema by "no alert needs to be run". Could you please clarify that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, I meant to type "alter" not "alert". I was just suggesting that newly built clusters only run CREATE TABLE and not create + ALTER TABLE. More of a nit comment than anything

@AmatyaAvadhanula
Copy link
Contributor Author

@kfaraz thank you for the review!
I've addressed your comments and was hoping that you could provide feedback

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nitpicks, otherwise LGTM 🚀

private TaskInfo<TaskMetadata, StatusType> toTaskMetadataInfo(ObjectMapper objectMapper,
ResultSet resultSet,
boolean usePayload
private TaskInfo<TaskIdentifier, StatusType> toTaskIdentifierInfo(ObjectMapper objectMapper,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: start the args in a newline as they are not going to fit in the same line anyway.

@@ -241,4 +243,20 @@ default <ContextValueType> ContextValueType getContextValue(String key, ContextV
final ContextValueType value = getContextValue(key);
return value == null ? defaultValue : value;
}

default TaskIdentifier getMetadata()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Rename to getTaskIdentifier.

@@ -327,6 +331,29 @@ tableName, getPayloadType()
)
);
}

public boolean tableContainsColumn(Handle handle, String table, String column)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still seems to be public.

log.info("Adding column: group_id to table[%s]", tableName);
batch.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN group_id VARCHAR(255)", tableName));
}
batch.execute();
Copy link
Contributor

@kfaraz kfaraz Jun 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, it seems to be, but it's generally better to avoid having to guess how the underlying library behaves in such corner cases.

public int[] execute()
    {
        // short circuit empty batch
        if (parts.size() == 0) {
            return new int[] {};
        }
        
        ...
}        

@abhishekagarwal87
Copy link
Contributor

Thank you @AmatyaAvadhanula for this major contribution. This is going to help a lot of users.

@abhishekagarwal87
Copy link
Contributor

CI failed because of the low code coverage bot on equals and hashcode method. I have ignored it for merge since all the tests are passing.

@abhishekagarwal87 abhishekagarwal87 added this to the 24.0.0 milestone Aug 26, 2022
anishanagarajan pushed a commit to twitter-forks/druid that referenced this pull request Sep 23, 2022
The web-console (indirectly) calls the Overlord’s GET tasks API to fetch the tasks' summary which in turn queries the metadata tasks table. This query tries to fetch several columns, including payload, of all the rows at once. This introduces a significant memory overhead and can cause unresponsiveness or overlord failure when the ingestion tab is opened multiple times (due to several parallel calls to this API)

Another thing to note is that the task table (the payload column in particular) can be very large. Extracting large payloads from such tables can be very slow, leading to slow UI. While we are fixing the memory pressure in the overlord, we can also fix the slowness in UI caused by fetching large payloads from the table. Fetching large payloads also puts pressure on the metadata store as reported in the community (Metadata store query performance degrades as the tasks in druid_tasks table grows · Issue apache#12318 · apache/druid )

The task summaries returned as a response for the API are several times smaller and can fit comfortably in memory. So, there is an opportunity here to fix the memory usage, slow ingestion, and under-pressure metadata store by removing the need to handle large payloads in every layer we can. Of course, the solution becomes complex as we try to fix more layers. With that in mind, this page captures two approaches. They vary in complexity and also in the degree to which they fix the aforementioned problems.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants