-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve concurrency between DruidSchema and BrokerServerView #11457
Conversation
This pull request fixes 1 alert when merging 1efbe13 into 8729b40 - view on LGTM.com fixed alerts:
|
This pull request fixes 1 alert when merging 700f31a into 8729b40 - view on LGTM.com fixed alerts:
|
if (segmentsMap.remove(segment.getId()) == null) { | ||
log.warn("Unknown segment[%s] was removed from the cluster. Ignoring this event.", segment.getId()); | ||
} | ||
totalSegments--; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it better to change the count only when it's a known segment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I will fix it.
This pull request fixes 1 alert when merging 5facb33 into 280c080 - view on LGTM.com fixed alerts:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
final Optional<DruidServerMetadata> historicalServer = servers | ||
.stream() | ||
.filter(metadata -> metadata.getType().equals(ServerType.HISTORICAL)) | ||
.filter(metadata -> metadata.getType().equals(ServerType.HISTORICAL) | ||
|| metadata.getType().equals(ServerType.BROKER)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is correct, but currently broker segments are not tracked in segment metadata, on the assumption that any segment a broker has is also going to be somewhere on a historical, and if it isn't on any historicals then it either will be soon or, dropped from the broker soon, and it doesn't look like that behavior has changed in this PR.
This isn't great to ignore them, but it does save on potentially complicated logic of not querying segment metadata to ourself and only other brokers (we would probably want to get that information in a different way locally?).
Anyway, it might be worth adding a comment about this, even though it is mentioned in a few other places, like addSegment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I knew the current behavior but thought this could be better. But, on the second thought, probably better to keep the code logic consistent, so I reverted this change and added some comment.
protected DruidTable buildDruidTable(final String dataSource) | ||
/** | ||
* This is a helper method for unit tests to emulate heavy work done with {@link #lock}. | ||
* It must be used only in unit tests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe nice to move all of these 'only for testing' methods to the end of this file or something to get them out of the way.
@GuardedBy("lock") | ||
private boolean isServerViewInitialized = false; | ||
|
||
private int totalSegments = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At first I was wondering if this be volatile
, but I don't think it matters since it is only called whenever sys segments scan is run, and not in a loop or anything where it would probably matter. Maybe we should add a comment that it is ok to be neither guarded by, volatile, or a concurrent type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, added.
This pull request fixes 1 alert when merging 5c3d8a3 into 257bc5c - view on LGTM.com fixed alerts:
|
@clintropolis thank you for the review! |
Description
A concurrency issue is recently found between
DruidSchema
andBrokerServerView
, which is, refreshing aDruidTable
inDruidSchema
can block query processing. This can happen in the scenario described below.DruidSchema-Cache
thread locksDruidSchema#lock
and callsDruidSchema.buildDruidTable()
to rebuild the rowSignature of druidTable.BrokerServerView
. TheBrokerServerView
thread will lockBrokerServerView#lock
, process the new segment added, and call timelineCallbacks ofDruidSchema
.DruidSchema
is executed by the same thread used in the step 2. This thread will wait forDruidSchema.buildDruidTable()
in the step 1 to be done andDruidSchema#lock
is released.BrokerServerView.getTimeline()
. This call will wait for the timelineCallbacks to be done in the step 2 andBrokerServerView#lock
is released.The following flame graphs show what those threads were doing when this happened in our cluster. The metrics for these flame graphs were collected for 30 seconds.
DruidSchema
was callingrefreshSegmentsForDataSource()
andbuildDruidTable()
.refreshSegmentsForDataSource
issues aSegmentMetadataQuery
per segment and locksDruidSchema#lock
per row in the result to update segment metadata in memory.buildDruidTable
locksDruidSchema#lock
while it iterates all columns in all segments in the datasource. When the datasource has 481200 segments and 774 columns in each segment,buildDruidTable
took about 25 seconds (!!) on my desktop.BrokerServerView
was blocked in theDruidSchema.addSegment
callback.Finally, there were 2 timeseries queries that were blocked in
BrokerServerView.getTimeline()
for 30 seconds.Currently, this can happen whenever
DruidSchema
needs to refreshDruidTable
, which is whenever a new segment is added to the cluster or a segment is completely removed from the cluster. Moving segments should not cause this issue because a new segment is always loaded first in the new server before it is removed from the previous server. As a result, moving segments does not require to update theRowSignature
ofDruidTable
.To fix this issue, this PR improves the concurrency of
DruidSchema
by not holdingDruidSchema#lock
to process expensive operations such as refreshingDruidTables
.DruidSchema-Callback
, is added inDruidSchema
to asynchronously process the timeline callbacks.segmentMetadataInfo
map is changed toConcurrentHashMap<String, ConcurrentSkipListMap<SegmentId, AvailableSegmentMetadata>>
, so that updating the map doesn't have to lockDruidSchema#lock
. Instead, the concurrency control is delegated toConcurrentMap
s. This could potentially make querying the segments table faster becausegetSegmentMetadataSnapshot()
no longer requires to lockDruidSchema#lock
.Finally, this PR does not fix the expensive logic in
buildDruidTable()
. Incremental updates onDruidTable
could be better but it requires that, for each column, we should be able to fall back to the column type in the second most recent segment when the most recent segment disappears. We can research more how we can track those column types of segments efficiently as a follow-up.Key changed/added classes in this PR
DruidSchema
This PR has: