Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poll API, health management for recorders, backend daemon for reporting load, aggregation window management, work assignment scheduling #47

Merged
merged 94 commits into from Mar 15, 2017
Merged
Show file tree
Hide file tree
Changes from 84 commits
Commits
Show all changes
94 commits
Select commit Hold shift + click to select a range
e17d4c4
adding AsyncStorage and IO stream for ser/de.
gauravAshok Dec 22, 2016
ce400f4
Added data model for cpu sampling stacktraces
Dec 22, 2016
10d904e
parsing recording header in profile api
Dec 26, 2016
bc48ebb
removing proto generated recorder classes
Dec 26, 2016
746f76e
updated gitignore to include proto generated recorder class
Dec 26, 2016
d2777f2
changed tab spacing to 2 characters
Dec 26, 2016
c730e9b
wse parser and refactoring code
Dec 26, 2016
e9c6539
in middle of some changes
Dec 27, 2016
abd2539
in middle of some changes
Dec 27, 2016
13832dd
using codedinputstream for parsing protobuf
Dec 28, 2016
b871ba6
see description
Dec 29, 2016
e4970ad
Added fields to profileworkinfo and unit tests for happy path
Jan 2, 2017
b29bbe4
formatting changes
Jan 2, 2017
5904319
added more unit-tests and other fixes
Jan 3, 2017
4bffaf1
moved location of profileapitest
Jan 3, 2017
a849797
formatting and removed storage related code
Jan 4, 2017
106f949
removed file created by mistake
Jan 4, 2017
af64c73
Added unit tests, refactored into aggregation module, added some fiel…
Jan 5, 2017
58870db
Merge branch 'master' into aggregator
Jan 5, 2017
91a3d74
refactored packages of aggregation module
Jan 5, 2017
b93a114
added aggregationprofilemodel to gitignore
Jan 5, 2017
4c573ad
cleanup
Jan 5, 2017
08ba7dd
setup logging properties, metrics integration, config loading
Jan 9, 2017
0303532
setup rolling logs for file
Jan 9, 2017
7295ef5
Merge branch 'master' into aggregator
Jan 12, 2017
3a878f7
Refactoring code throughout, added support for non-concurrent workid…
Jan 19, 2017
9f33a4e
Merge branch 'master' into aggregator
Jan 19, 2017
b2c1bb7
reworked buffer handling in profile api, non-concurrent processing of…
Jan 22, 2017
47dc7b7
Minor updates
Jan 23, 2017
3bd73f3
Fixed jackson version issues
Jan 23, 2017
fca8684
discarding read bytes in compositebytebuf input stream, other minor f…
Jan 25, 2017
2db9090
Leader election and watch worker verticles with hooks for listening a…
Jan 25, 2017
36e25dc
Added ongoing and terminal indicators in aggregation state
Feb 6, 2017
a8fb922
Merge branch 'master' into aggregator
Feb 6, 2017
7bdea7c
Updated packages and moved files around
Feb 6, 2017
f9fbd28
Merge branch 'master' into aggregator
Feb 6, 2017
8a504d8
Removed extra dependencies in aggregation pom.xml
Feb 6, 2017
7e36028
Refactored spacings
Feb 6, 2017
b7507ba
pom updates
Feb 6, 2017
435385d
Merge branch 'aggregator' into leaderelection
Feb 6, 2017
1c8bacf
Merge branch 'master' into leaderelection
Feb 6, 2017
74e650e
removed httpverticle class
Feb 6, 2017
70e3c0b
Added method in leader discovery store to determine if self is leader
Feb 7, 2017
2aa1352
formatting
Feb 7, 2017
a981f6c
moved files around
Feb 7, 2017
212ceca
moved file locations and comments
Feb 7, 2017
0ef1eff
rename variables
Feb 7, 2017
04ecbb3
zk based backend association store implementation
Feb 9, 2017
2894ab1
added tests for zk based backend association store
Feb 9, 2017
d382a31
skeleton for leader http verticle and refactoring across the board
Feb 10, 2017
fef0ec9
Replace process group set with separate proto message
Feb 10, 2017
b78aa5d
moved processgroups message to recorder proto and removed processgrou…
Feb 11, 2017
feaaff5
leader apis for association and load, including tests
Feb 11, 2017
e7060e5
leader proxy verticle and refactoring across the board
Feb 12, 2017
2179443
removing sharing of http server across verticles and separating ports…
Feb 13, 2017
a3fafd2
moved classes around
Feb 13, 2017
58556b6
removed some files
Feb 13, 2017
58d1cab
Merge branch 'master' into leaderelection
Feb 13, 2017
19b1989
temp changes
Feb 17, 2017
586f04e
optimising how proto message is converted to vertex buffer
Feb 17, 2017
ad9dc6b
clear child watcher when leaderelectionwatcher verticle is underplayed
Feb 17, 2017
5b72c59
snowwhite sleeps forever
Feb 17, 2017
590a9e7
Storing process group in individual nodes in ZK for association and o…
Feb 21, 2017
8b88f2f
defunct behaviour control using system nanotime and fixing race issue…
Feb 21, 2017
a23385f
PR #13 related fixes
Feb 21, 2017
60d8e54
Refactoring deployment process of verticles
Feb 22, 2017
88d96e7
sending prev and curr tick from backend as part of load report request
Feb 23, 2017
e10c931
removed prevtick from load report request and simplified invariants w…
Feb 23, 2017
1a6b7d3
Merge branch 'leaderelection' into work_assignment
Feb 23, 2017
9d85a50
resolved compilation issues post merge
Feb 23, 2017
3848013
Fixed race-conditions and other minor fixes suggested in PR #13
Feb 28, 2017
464f0b1
Comparator for backends compares ip lexicographically if same process…
Feb 28, 2017
76ee26b
Merge branch 'leaderelection' into work_assignment
Feb 28, 2017
31b274c
Merge branch 'master' into work_assignment
anvinjain Feb 28, 2017
a28a452
log4j2 support added in backend and unfinished changes for work assig…
anvinjain Mar 1, 2017
4fac1f8
building schedule for work assignment and starting aggregation window
anvinjain Mar 1, 2017
46e8d2a
Merge branch 'master' into work_assignment
anvinjain Mar 2, 2017
b0bbee3
blind code complete, at least compiles now
anvinjain Mar 3, 2017
62d1d81
added backend daemon deployer
anvinjain Mar 4, 2017
c06b54d
minor refactoring, logs, comments across the board
anvinjain Mar 5, 2017
417b771
self-review fixes and existing unit tests passing
anvinjain Mar 6, 2017
e3199ea
minor bug fixes and added some unit tests
anvinjain Mar 6, 2017
7fda302
Added unit tests for poll, load api, other fixes
anvinjain Mar 6, 2017
d77e4e2
update recorder info when sending assignment on poll
anvinjain Mar 6, 2017
14c6b77
Refactoring of workassignmentschedule and work slot pool, reworked lo…
anvinjain Mar 14, 2017
668398a
Hard-coding verticle count to 1 for backend daemon in its deployer
anvinjain Mar 14, 2017
a525979
daemon deployment hardcoded to single verticle, modifications to aggr…
anvinjain Mar 14, 2017
a5eb038
Utility to attach multiple handlers to vertx route
anvinjain Mar 14, 2017
650134a
cleaned up exceptions
anvinjain Mar 14, 2017
da0053a
Stickiness of work assignment to a recorder in same aggregation window
anvinjain Mar 14, 2017
91da467
Rename of WorkProfile proto message to RecordingPolicy
anvinjain Mar 14, 2017
8ae803b
backend id picked from config
anvinjain Mar 14, 2017
eaa57eb
Multiple fixes as reported in PR #47
anvinjain Mar 15, 2017
4082bc7
Added check of backend association in /leader/work API
anvinjain Mar 15, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -12,22 +12,22 @@ public class FinalizedAggregationWindow {
private final String clusterId;
private final String procId;
private final LocalDateTime start;
private final LocalDateTime endWithTolerance;
private final LocalDateTime endedAt;
protected final Map<Long, FinalizedProfileWorkInfo> workInfoLookup;
protected final FinalizedCpuSamplingAggregationBucket cpuSamplingAggregationBucket;

public FinalizedAggregationWindow(String appId,
String clusterId,
String procId,
LocalDateTime start,
LocalDateTime endWithTolerance,
LocalDateTime endedAt,
Map<Long, FinalizedProfileWorkInfo> workInfoLookup,
FinalizedCpuSamplingAggregationBucket cpuSamplingAggregationBucket) {
this.appId = appId;
this.clusterId = clusterId;
this.procId = procId;
this.start = start;
this.endWithTolerance = endWithTolerance;
this.endedAt = endedAt;
this.workInfoLookup = workInfoLookup;
this.cpuSamplingAggregationBucket = cpuSamplingAggregationBucket;
}
Expand All @@ -36,6 +36,11 @@ public FinalizedProfileWorkInfo getDetailsForWorkId(long workId) {
return this.workInfoLookup.get(workId);
}

//NOTE: This is computed on expiry of aggregation window, null otherwise. Having a getter here to make this testable
public LocalDateTime getEndedAt() {
return this.endedAt;
}

@Override
public boolean equals(Object o) {
if (o == this) {
Expand All @@ -50,7 +55,7 @@ public boolean equals(Object o) {
&& this.clusterId.equals(other.clusterId)
&& this.procId.equals(other.procId)
&& this.start.equals(other.start)
&& this.endWithTolerance.equals(other.endWithTolerance)
&& this.endedAt == null ? other.endedAt == null : this.endedAt.equals(other.endedAt)
&& this.workInfoLookup.equals(other.workInfoLookup)
&& this.cpuSamplingAggregationBucket.equals(other.cpuSamplingAggregationBucket);
}
Expand All @@ -59,7 +64,7 @@ protected Header buildHeaderProto(int version, WorkType workType) {
return Header.newBuilder()
.setFormatVersion(version)
.setWorkType(workType)
.setAggregationEndTime(endWithTolerance.atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_ZONED_DATE_TIME))
.setAggregationEndTime(endedAt == null ? null : endedAt.atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_ZONED_DATE_TIME))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all DTOs, we should consider re-using the object for serialization.

They expose a "clear" mechanism to do this. One can clear the DTO and start afresh without any additional GC pressure.

Something worth thinking about.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean reusing the builder object right? Also, clearing the same builder object can work only where concurrent access cannot happen. Above is a candidate, agreed

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I meant both builder and DTO, but looks like neither is possible. The API is extremely stupid, it seems, read: https://groups.google.com/forum/#!topic/protobuf/b0gS4wpjuIo and https://groups.google.com/forum/#!topic/protobuf/No9bBRh3Wp0

It seems they wanted to support some "optimizations" by being GC unfriendly, which they didn't get right either. Dumbness rules!

.setAggregationStartTime(start.atOffset(ZoneOffset.UTC).format(DateTimeFormatter.ISO_ZONED_DATE_TIME))
.setAppId(appId)
.setClusterId(clusterId)
Expand Down
Expand Up @@ -7,6 +7,9 @@
import java.time.temporal.ChronoUnit;
import java.util.Map;

/**
* TODO: Add recorder info(aggregated proto) to this class, accept it in constructor which is called from ProfileWorkInfo#buildFinalizedEntity method
*/
public class FinalizedProfileWorkInfo {
private final int recorderVersion;
private final AggregationState state;
Expand Down Expand Up @@ -40,6 +43,10 @@ public LocalDateTime getEndedAt() {
return endedAt;
}

public AggregationState getState() {
return state;
}

@Override
public boolean equals(Object o) {
if (o == this) {
Expand Down
47 changes: 45 additions & 2 deletions backend/src/idl/backend.proto
Expand Up @@ -4,6 +4,49 @@ option java_outer_classname = "BackendDTO";

message LoadReportRequest {
required string ip = 1;
required float load = 2;
required int64 curr_tick = 3;
required uint32 port = 2;
required float load = 3;
required int64 curr_tick = 4;
}

message WorkProfile {
required uint32 duration = 1;
required uint32 coverage_pct = 2;
repeated Work work = 3;
required string description = 4;
}

enum WorkType {
cpu_sample_work = 0;
thread_sample_work = 1;
monitor_contention_work = 2;
monitor_wait_work = 3;
}

message Work {
required WorkType w_type = 1;
optional CpuSampleWork cpu_sample = 2;
optional ThreadSampleWork thd_sample = 3;
optional MonitorContentionWork monitor_block = 4;
optional MonitorWaitWork monitor_wait = 5;
}

message CpuSampleWork {
required uint32 frequency = 1;
required uint32 max_frames = 2;
}

message ThreadSampleWork {
required uint32 frequency = 1;
required uint32 max_frames = 2;
}

message MonitorContentionWork {
required uint32 max_monitors = 1;
required uint32 max_frames = 2;
}

message MonitorWaitWork {
required uint32 max_monitors = 1;
required uint32 max_frames = 2;
}
19 changes: 17 additions & 2 deletions backend/src/main/conf/local.json
@@ -1,7 +1,11 @@
{
"ip.address": "127.0.0.1",
"backend.version": 1,
"backend.http.port": 2491,
"leader.http.port": 2496,
"load.report.interval.secs": 60,
"recorder.defunct.threshold.secs": 120,
"max.simultaneous.profiles": 100,
"backend.http.server": {
"idle.timeout.secs": 120
},
Expand All @@ -16,7 +20,7 @@
"compression": true
},
"vertxOptions": {
"blockedThreadCheckInterval": 1000
"blockedThreadCheckInterval": 1000000
},
"backendHttpOptions": {
"config": {
Expand All @@ -41,11 +45,22 @@
"aggregation.enabled": false,
"leader.watching.path": "/backends",
"leader.mutex.path": "/leader",
"leader.sleep.ms": 60000
"leader.sleep.ms": 60000,
"kill.behavior": "KILL"
}
},
"leaderHttpOptions": {
"backend.association.path": "/association",
"load.miss.tolerance": 1
},
"daemonOptions": {
"worker": true,
"config": {
"aggregation.window.duration.mins": 30,
"aggregation.window.end.tolerance.secs": 120,
"workprofile.refresh.offset.secs": 300,
"scheduling.buffer.secs": 30,
"work.assignment.max.delay.secs": 120
}
}
}
13 changes: 12 additions & 1 deletion backend/src/main/java/fk/prof/backend/BackendApplication.java
@@ -1,8 +1,13 @@
package fk.prof.backend;

import fk.prof.backend.worker.BackendDaemon;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import org.apache.commons.cli.*;

public class BackendApplication {
private static Logger logger = LoggerFactory.getLogger(BackendDaemon.class);

public static void main(String[] args) throws Exception {
ConfigManager.setDefaultSystemProperties();
CommandLineParser parser = new DefaultParser();
Expand All @@ -20,6 +25,12 @@ public static void main(String[] args) throws Exception {
String confPath = cmd.getOptionValue("c");

BackendManager backendManager = new BackendManager(confPath);
backendManager.launch();
backendManager.launch().setHandler(ar -> {
if(ar.succeeded()) {
logger.info("Backend launched");
} else {
logger.error("Error launching backend: ", ar.cause());
}
});
}
}
35 changes: 24 additions & 11 deletions backend/src/main/java/fk/prof/backend/BackendManager.java
Expand Up @@ -2,16 +2,19 @@

import com.google.common.base.Preconditions;
import fk.prof.backend.deployer.VerticleDeployer;
import fk.prof.backend.deployer.impl.BackendHttpVerticleDeployer;
import fk.prof.backend.deployer.impl.LeaderElectionParticipatorVerticleDeployer;
import fk.prof.backend.deployer.impl.LeaderElectionWatcherVerticleDeployer;
import fk.prof.backend.deployer.impl.LeaderHttpVerticleDeployer;
import fk.prof.backend.deployer.impl.*;
import fk.prof.backend.leader.election.LeaderElectedTask;
import fk.prof.backend.model.aggregation.AggregationWindowLookupStore;
import fk.prof.backend.model.assignment.ProcessGroupAssociationStore;
import fk.prof.backend.model.assignment.SimultaneousWorkAssignmentCounter;
import fk.prof.backend.model.assignment.impl.ProcessGroupAssociationStoreImpl;
import fk.prof.backend.model.assignment.impl.SimultaneousWorkAssignmentCounterImpl;
import fk.prof.backend.model.association.BackendAssociationStore;
import fk.prof.backend.model.association.ProcessGroupCountBasedBackendComparator;
import fk.prof.backend.model.association.impl.ZookeeperBasedBackendAssociationStore;
import fk.prof.backend.model.election.impl.InMemoryLeaderStore;
import fk.prof.backend.service.ProfileWorkService;
import fk.prof.backend.model.aggregation.impl.AggregationWindowLookupStoreImpl;
import fk.prof.backend.model.policy.PolicyStore;
import io.vertx.core.*;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
Expand All @@ -23,8 +26,10 @@
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* TODO: Deployment process is liable to changes later
Expand Down Expand Up @@ -76,16 +81,24 @@ public Future<Void> close() {
public Future<Void> launch() {
Future result = Future.future();
InMemoryLeaderStore leaderStore = new InMemoryLeaderStore(configManager.getIPAddress());
ProfileWorkService profileWorkService = new ProfileWorkService();

VerticleDeployer backendHttpVerticleDeployer = new BackendHttpVerticleDeployer(vertx, configManager, leaderStore, profileWorkService);
backendHttpVerticleDeployer.deploy().setHandler(backendDeployResult -> {
AggregationWindowLookupStore aggregationWindowLookupStore = new AggregationWindowLookupStoreImpl();
ProcessGroupAssociationStore processGroupAssociationStore = new ProcessGroupAssociationStoreImpl(configManager.getRecorderDefunctThresholdInSeconds());
SimultaneousWorkAssignmentCounter simultaneousWorkAssignmentCounter = new SimultaneousWorkAssignmentCounterImpl(configManager.getMaxSimultaneousProfiles());

VerticleDeployer backendHttpVerticleDeployer = new BackendHttpVerticleDeployer(vertx, configManager, leaderStore, aggregationWindowLookupStore, processGroupAssociationStore);
VerticleDeployer backendDaemonVerticleDeployer = new BackendDaemonVerticleDeployer(vertx, configManager, leaderStore, processGroupAssociationStore, aggregationWindowLookupStore, simultaneousWorkAssignmentCounter);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a validation that ensures that only one thread will ever run backend-daemon. Basically 1 vertical.

Copy link
Author

@anvinjain anvinjain Mar 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validation is done inside the deployer for backend daemon. Fixed in commit 668398a

CompositeFuture backendDeploymentFuture = CompositeFuture.all(backendHttpVerticleDeployer.deploy(), backendDaemonVerticleDeployer.deploy());
backendDeploymentFuture.setHandler(backendDeployResult -> {
if (backendDeployResult.succeeded()) {
try {
List<String> backendDeployments = backendDeployResult.result().list();
List<String> backendDeployments = backendDeployResult.result().list().stream()
.flatMap(fut -> ((CompositeFuture)fut).list().stream())
.map(deployment -> (String)deployment)
.collect(Collectors.toList());

BackendAssociationStore backendAssociationStore = createBackendAssociationStore(vertx, curatorClient);
VerticleDeployer leaderHttpVerticleDeployer = new LeaderHttpVerticleDeployer(vertx, configManager, backendAssociationStore);
PolicyStore policyStore = new PolicyStore();
VerticleDeployer leaderHttpVerticleDeployer = new LeaderHttpVerticleDeployer(vertx, configManager, backendAssociationStore, policyStore);
Runnable leaderElectedTask = createLeaderElectedTask(vertx, leaderHttpVerticleDeployer, backendDeployments);

VerticleDeployer leaderElectionParticipatorVerticleDeployer = new LeaderElectionParticipatorVerticleDeployer(
Expand Down
20 changes: 20 additions & 0 deletions backend/src/main/java/fk/prof/backend/ConfigManager.java
Expand Up @@ -15,17 +15,21 @@
*/
public class ConfigManager {
private static final String IP_ADDRESS_KEY = "ip.address";
private static final String BACKEND_VERSION_KEY = "backend.version";
private static final String BACKEND_HTTP_PORT_KEY = "backend.http.port";
private static final String LEADER_HTTP_PORT_KEY = "leader.http.port";
private static final String BACKEND_HTTP_SERVER_OPTIONS_KEY = "backend.http.server";
private static final String LEADER_HTTP_SERVER_OPTIONS_KEY = "leader.http.server";
private static final String HTTP_CLIENT_OPTIONS_KEY = "http.client";
private static final String LOAD_REPORT_INTERVAL_KEY = "load.report.interval.secs";
private static final String RECORDER_DEFUNCT_THRESHOLD_KEY = "recorder.defunct.threshold.secs";
private static final String MAX_SIMULTANEOUS_PROFILES_KEY = "max.simultaneous.profiles";
private static final String VERTX_OPTIONS_KEY = "vertxOptions";
private static final String BACKEND_HTTP_DEPLOYMENT_OPTIONS_KEY = "backendHttpOptions";
private static final String CURATOR_OPTIONS_KEY = "curatorOptions";
private static final String LEADER_ELECTION_DEPLOYMENT_OPTIONS_KEY = "leaderElectionOptions";
private static final String LEADER_HTTP_DEPLOYMENT_OPTIONS_KEY = "leaderHttpOptions";
private static final String BACKEND_DAEMON_OPTIONS_KEY = "daemonOptions";
private static final String LOGFACTORY_SYSTEM_PROPERTY_KEY = "vertx.logger-delegate-factory-class-name";
private static final String LOGFACTORY_SYSTEM_PROPERTY_DEFAULT_VALUE = "io.vertx.core.logging.SLF4JLogDelegateFactory";

Expand All @@ -48,6 +52,10 @@ public String getIPAddress() {
return config.getString(IP_ADDRESS_KEY, "127.0.0.1");
}

public int getBackendVersion() {
return config.getInteger(BACKEND_VERSION_KEY);
}

public int getBackendHttpPort() {
return config.getInteger(BACKEND_HTTP_PORT_KEY, 2491);
}
Expand All @@ -60,6 +68,14 @@ public int getLoadReportIntervalInSeconds() {
return config.getInteger(LOAD_REPORT_INTERVAL_KEY, 60);
}

public int getRecorderDefunctThresholdInSeconds() {
return config.getInteger(RECORDER_DEFUNCT_THRESHOLD_KEY, 120);
}

public int getMaxSimultaneousProfiles() {
return config.getInteger(MAX_SIMULTANEOUS_PROFILES_KEY, 10);
}

public JsonObject getBackendHttpServerConfig() {
return config.getJsonObject(BACKEND_HTTP_SERVER_OPTIONS_KEY, new JsonObject());
}
Expand Down Expand Up @@ -92,6 +108,10 @@ public JsonObject getLeaderHttpDeploymentConfig() {
return enrichDeploymentConfig(config.getJsonObject(LEADER_HTTP_DEPLOYMENT_OPTIONS_KEY, new JsonObject()));
}

public JsonObject getBackendDaemonDeploymentConfig() {
return enrichDeploymentConfig(config.getJsonObject(BACKEND_DAEMON_OPTIONS_KEY, new JsonObject()));
}

private JsonObject enrichDeploymentConfig(JsonObject deploymentConfig) {
if(deploymentConfig.getJsonObject("config") == null) {
deploymentConfig.put("config", new JsonObject());
Expand Down