New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Poll API, health management for recorders, backend daemon for reporting load, aggregation window management, work assignment scheduling #47
Conversation
anvinjain
commented
Mar 6, 2017
- Poll API, update recorder info and return work assignment in response
- Health management for recorders on poll api request
- Backend daemon for reporting load
- Managing process group associations on backend
- Setting up and expiry of aggregation window
- Building schedule of work assignments on basis of work reported by leader
workid -> profileworkinfo map in aggregated profile app,cluster,proc fields in window fleshed out default root nodes in stack tree running checksum using underlying byte arr fixed bugs in usage of codedinputstream parsing profile indexes with update when wse is processed
…ds to aggregation classes
… profiles for same work id, finalizable entities, aggregation entity pojos added in aggregation module, other pull request related fixes
…nment and scheduling all tests were passing at this point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't looked at tests at all (assuming they are good and cover the feature well).
Didn't see any integration tests, assuming you want them done in e2e work, if not, please cover basic integration flows (eg. backend defunkt flow with real backend and real leader has nothing to do with running broker, so can be tested at integration level in isolation, perhaps).
Other comments are all inline.
Other than core logic issues (which need to be fixed and are often direct in terms of analysis and fix, GC overhead issues are more work, of-course, I mean the other ones), general soft-issues theme coming across is about:
- naming
- more than necessary indirection
I have called them out in some places, but haven't been very thorough in calling out every single instance. But have called out atleast one example of each pattern, so after going thru the whole thing, may be take a cursory look at the code-changes again (only with the readability and maintainability in mind and see if you find something worth fixing.
@@ -59,7 +64,7 @@ protected Header buildHeaderProto(int version, WorkType workType) { | |||
return Header.newBuilder() | |||
.setFormatVersion(version) | |||
.setWorkType(workType) | |||
.setAggregationEndTime(endWithTolerance.atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_ZONED_DATE_TIME)) | |||
.setAggregationEndTime(endedAt == null ? null : endedAt.atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_ZONED_DATE_TIME)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For all DTOs, we should consider re-using the object for serialization.
They expose a "clear" mechanism to do this. One can clear the DTO and start afresh without any additional GC pressure.
Something worth thinking about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean reusing the builder object right? Also, clearing the same builder object can work only where concurrent access cannot happen. Above is a candidate, agreed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I meant both builder and DTO, but looks like neither is possible. The API is extremely stupid, it seems, read: https://groups.google.com/forum/#!topic/protobuf/b0gS4wpjuIo and https://groups.google.com/forum/#!topic/protobuf/No9bBRh3Wp0
It seems they wanted to support some "optimizations" by being GC unfriendly, which they didn't get right either. Dumbness rules!
SimultaneousWorkAssignmentCounter simultaneousWorkAssignmentCounter = new SimultaneousWorkAssignmentCounterImpl(configManager.getMaxSimultaneousProfiles()); | ||
|
||
VerticleDeployer backendHttpVerticleDeployer = new BackendHttpVerticleDeployer(vertx, configManager, leaderStore, aggregationWindowLookupStore, processGroupAssociationStore); | ||
VerticleDeployer backendDaemonVerticleDeployer = new BackendDaemonVerticleDeployer(vertx, configManager, leaderStore, processGroupAssociationStore, aggregationWindowLookupStore, simultaneousWorkAssignmentCounter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have a validation that ensures that only one thread will ever run backend-daemon. Basically 1 vertical.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validation is done inside the deployer for backend daemon. Fixed in commit 668398a
|
||
//Add process group associations which are returned by leader | ||
for(Recorder.ProcessGroup processGroup: processGroups.getProcessGroupList()) { | ||
ProcessGroupDetail existingValue = this.processGroupLookup.putIfAbsent(processGroup, new ProcessGroupDetail(processGroup, thresholdForDefunctRecorderInSecs)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a little confusing, because there is just one thread calling it.
Calling put as opposed to putIfAbsent makes the intent clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put if absent is required here. Leader always returns all the process groups associated with a backend. Process groups which are already present in the lookup should not be associated with a new processgroupdetail instance. Only the ones which are new. Hence the putIfAbsent semantic
private void abortOngoingProfiles() { | ||
ensureEntityIsWriteable(); | ||
try { | ||
for (Map.Entry<Long, ProfileWorkInfo> entry : workInfoLookup.entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This appears racy at first, until one realizes that put really only happens in the constructor.
We should make this a immutable-map, I guess? to make invariants absolutely obvious?
It needn't even be a concurrent data-structure, because volatile bool orders things anyway. Just a HashMap will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in commit a525979
abortOngoingProfiles(); | ||
long[] workIds = this.workInfoLookup.keySet().stream().mapToLong(Long::longValue).toArray(); | ||
aggregationWindowLookupStore.deAssociateAggregationWindow(workIds); | ||
this.endedAt = LocalDateTime.now(Clock.systemUTC()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This means we can have profiles that end first and start later (in the worst cases).
May be we should completely move to stable clocks here? or may be its worth recording both, we record a start and end for convenience, but also have a stable-clock based duration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
start is assigned in the constructor itself (maybe I can not accept that as a param and generate value in constructor itself). Unless clock goes backward, end first and start later cannot happen, right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, clock rewind scenarios. On Varadhi machines we have seen upto 4 minutes of delta across ~70 odd brokers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But clock rewind happening on the same machine itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
vertx delegates timer stuff to schedule methods exposed by netty's event loop. Netty converts this into a ScheduledFutureTask and scheduling queue is handled by AbstractScheduledEventExecutor
Netty uses system nanotime internally (stable clock), so wall clock skew on backend should not affect us. Of course, end time can still be recorded incorrectly, so additionally will store duration in the object as a separate field
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
public class RecorderProtoUtil { | ||
|
||
public static Recorder.ProcessGroup mapRecorderInfoToProcessGroup(Recorder.RecorderInfo recorderInfo) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All functions in this class are GC provoking. May be return closable-wrapped objects to eventually allow pooling without too much trouble?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will discuss offline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you mentioned earlier, proto builders are single-time use only and cannot be used to construct other proto objects once build()
is called on them
|
||
private ProfHttpClient buildHttpClient() { | ||
JsonObject httpClientConfig = configManager.getHttpClientConfig(); | ||
ProfHttpClient httpClient = ProfHttpClient.newBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In cases like this builder is not useful. It takes 5 params and we are providing all 5, it defeats the point.
I'd rather have a convenient constructor that takes httpClientConfig and funnels to 5-arg constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bloated param list in constructor is such a readability pain according to me. builder gives that convenience of self-documenting code where I don't have to keep matching param index with the param name in constructor signature to understand what gets mapped where and also reduces risk of accidentally mixing up argument order (for multiple variable of same type, like int, there will be no visual cue)
Regardless, we have had this discussion earlier, I will fix such instances.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us not remove it unless we really agree. Let us discuss this offline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: Default behavior for builder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in commit eaa57eb
if(ar.result().getStatusCode() == 200) { | ||
try { | ||
Recorder.ProcessGroups assignedProcessGroups = ProtoUtil.buildProtoFromBuffer(Recorder.ProcessGroups.parser(), ar.result().getResponse()); | ||
processGroupAssociationStore.updateProcessGroupAssociations(assignedProcessGroups, (processGroupDetail, processGroupAssociationResult) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is keeping this mapping fresh. But we ignore failures.
In case of inability to talk to leader, leader may assign the same PG to another backend.
We should perhaps configure a threshold after which, if we remain partitioned, we'll drop ownership. Of-course, this threshold should be fairly high, so that we survive intermittent network-partitions, usually DC level issues are usually resolved in first few hours, so may be we should go on for an hour or 2 and then drop the ball.
Of-course, we should namespace serialized files then, because its easy to end up in a situation where 2 backend (where one is not able to talk to the leader) are aggr-win synchronized and are stepping on each other's tows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assignment of PG on leader is lazy and only in response to /association request made by some recorder. For multiple agg win to be synchronized, leader has to assign same pg to both backends (B1 and B2). These events are ordered from leader's perspective. Once a PG is assigned to B1, leader will not assign it to B2 unless B1 goes defunct (some delay, d seconds in nanotime clock has to be observed by leader). Since aggregation window determines start time (wall-clock) when it is initialized (which is when association is received by backend), for two windows to have same start time means that clock at B2 is exactly d seconds slower than B1 and two or more competing requests for /association have been made exactly so.
Let's also consider scenario when assignment has already been made and aggr win is in progress: when n/w partition occurs and some recorders(let's call that set S1) can talk to previous backend(B1), others cannot(S2). Recorders in S2, then call association and get a new backend assigned to PG(B2) if leader determines B1 is defunct. If B1 had recovered before that or healthy regardless according to leader, reassignment does not happen, and leader still returns B1.
In case of leader returning B2 because according to leader b1 is defunct, recorders in S1 are still talking to B1. That is a problem yes, but only for the current aggregation window. I can add a check in /leader/work API to return work assignment only if B1 is still assigned with the PG. In this case two files are written to disk but as I said with different start times, and we can live with that, they show up as different profiles, coverage is also distributed in proportion to machines observed by respective backends. In the case where n/w recovers before aggregation window end, backend is able to report load to leader, finds out it is not associated with PG anymore and expires self aggregation window.
So we can tweak /leader/work API such that backend sends self details as well in the request and leader checks for association before handing out work. Makes sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that /leader/work proposal does seem to work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in commit 4082bc7
} else { | ||
logger.error("Error when reporting load to leader", ar.cause()); | ||
} | ||
setupTimerForReportingLoad(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd do this in finally for safety. Anything goes wrong here and we lose our heartbeat, pretty scary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
applies to the one below too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one should be moved to finally but the ones below are in catch clause and else clause for a reason, otherwise competing load report timers will be set
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in commit eaa57eb
String leaderIPAddress; | ||
if((leaderIPAddress = leaderReadContext.getLeaderIPAddress()) != null) { | ||
try { | ||
String requestPath = new StringBuilder(ApiPathConstants.LEADER_GET_WORK) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Method for this path generation and a class for leader-bound http-client will help reduce this repetitive code. Eg. for http-client, host, port etc can be constructor params. May be separate wrapper objects over httpClient that encap path and verb too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
path generation abstracted out in url utils. Fixed in commit 4082bc7
…gic for load reporting and backend defunctness Work assignment schedule does not take max concurrency as ceiling anymore and so does not prevent building schedule Work slot pool deals with slot pojo instead of integers Retired work assignment factory and instead work assignment schedule accepts bootstrap config in constructor Backend health updated if newer tick is received by leader, backend does not send newer ticks if load report fails
…egation window Making work info lookup non-concurrent and immutable in aggregation window Adding duration as member in aggregation window Rename of process group association store
Adding method to build httpclient from json config and associated refactor Stale check for aggregation window when fetching recording policy Variable renaming across window assignment schedule Better error checks in load report from daemon Rename of aggregation window store