[GOBBLIN-2013] update guice initialization for 'DagProcEngine enabled' and related classes #3892
Conversation
c51c82d to
5d51351
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #3892 +/- ##
============================================
+ Coverage 46.53% 46.55% +0.02%
- Complexity 11149 11158 +9
============================================
Files 2231 2231
Lines 87881 87911 +30
Branches 9632 9633 +1
============================================
+ Hits 40893 40929 +36
+ Misses 43288 43287 -1
+ Partials 3700 3695 -5 ☔ View full report in Codecov by Sentry. |
8ce2784 to
2505b62
Compare
| metricContext.register(this.totalAddSpecTimeNanos); | ||
| metricContext.register(this.numJobsScheduledDuringStartup); | ||
| } | ||
| this.dagProcessingEngineEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.DAG_PROCESSING_ENGINE_ENABLED, false); |
There was a problem hiding this comment.
u can also add this as an injected param. up to u (look at line 183)
| if (this.dagProcessingEngineEnabled) { | ||
| Config flowConfig = flowSpec.getConfig(); | ||
| String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY); | ||
| String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY); | ||
| String flowExecutionId = String.valueOf(FlowUtils.getOrCreateFlowExecutionId(flowSpec)); | ||
| DagActionStore.DagAction dagAction = | ||
| new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, DagActionStore.FlowActionType.LAUNCH); | ||
| this.dagManagement.addDagAction(dagAction); | ||
| } else { | ||
| this.orchestrator.orchestrate(flowSpec, jobProps, Long.parseLong(triggerTimestampMillis), isReminderEvent); | ||
| } |
There was a problem hiding this comment.
This is not the correct place to add dag action to the stream. Orchestrate is called in response to scheduler and is where scheduler lease arbitration will happen. As a result of it, the launch dagAction will be committed to dagActionStore -> read by changeMonitor -> then forwarded to the dagManager. It's at the last step that we should add the launch task to the stream (via dagManagement) instead of pass to old dagManager. This change should be in DagProcEnabledDagActionStoreChangeMonitor
There was a problem hiding this comment.
Also wherever your enable this let's add a test
| Orchestrator orchestrator, SchedulerService schedulerService, Optional<UserQuotaManager> quotaManager, Optional<Logger> log, | ||
| @Named(InjectionNames.WARM_STANDBY_ENABLED) boolean isWarmStandbyEnabled, | ||
| Optional<FlowTriggerHandler> flowTriggerHandler) throws Exception { | ||
| Optional<FlowTriggerHandler> flowTriggerHandler, DagManagement dagManagement, |
There was a problem hiding this comment.
it's also clearer into to have an optional DagManagement object that is present only when the engine is enabled but it's open to debate
There was a problem hiding this comment.
Yea, I actually wanted to create those objects regardless of the enabled flag so at least the object creation can be tested in prod.
There was a problem hiding this comment.
I'd whole-heartedly urge us to ONLY create required instances. otherwise a not-yet-ready-for-prod instance that fails creation can bring down the entire application. best practice is to guard significant new/reworked functionality behind config, and this includes initialization
There was a problem hiding this comment.
I'm confused: how does this PR relate to #3893 (review) ?
there seems to be some, but not all overlap. if two PRs relate to one another, and have any overlap at all, please have at least one mention the other in the PR description to clarify their relationship.
separately, but also critical: this PR title really brings very little clarity and seems insufficient (eventually) for a commit message
| Orchestrator orchestrator, SchedulerService schedulerService, Optional<UserQuotaManager> quotaManager, Optional<Logger> log, | ||
| @Named(InjectionNames.WARM_STANDBY_ENABLED) boolean isWarmStandbyEnabled, | ||
| Optional<FlowTriggerHandler> flowTriggerHandler) throws Exception { | ||
| Optional<FlowTriggerHandler> flowTriggerHandler, DagManagement dagManagement, |
There was a problem hiding this comment.
I'd whole-heartedly urge us to ONLY create required instances. otherwise a not-yet-ready-for-prod instance that fails creation can bring down the entire application. best practice is to guard significant new/reworked functionality behind config, and this includes initialization
| protected final Optional<FlowTriggerHandler> flowTriggerHandler; | ||
| @Getter | ||
| protected final Map<String, Spec> scheduledFlowSpecs; | ||
| protected final Map<String, FlowSpec> scheduledFlowSpecs; |
There was a problem hiding this comment.
NBD, this may be for the best... but I can't seem to figure out: where is the initial motivation coming in? e.g. I was looking for a changed method signature, but didn't notice one. is it entirely a preference to change the type of these various members?
(again, I'm not calling into question the decision to change, more wanting to understand motivation for the decision.)
There was a problem hiding this comment.
I believe the initial commit was too big and some improvements were missed out
There was a problem hiding this comment.
aah, ok, I see - so essentially just following more widely the specific-derived-class (FlowSpec) typing instituted therein.
(I like the increased clarity)
| @Getter | ||
| private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton; | ||
|
|
||
| private final ClassAliasResolver<SpecCompiler> aliasResolver; |
There was a problem hiding this comment.
good call to rescope as method level!
| ContextAwareGauge<Long> orchestrationDelayMetric = metricContext.newContextAwareGauge | ||
| (ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, orchestrationDelayCounter::get); | ||
| this.metricContext.register(orchestrationDelayMetric); | ||
| metricContext.register(orchestrationDelayMetric); |
There was a problem hiding this comment.
why change how it's referenced... isn't metricContext still an instance member?
There was a problem hiding this comment.
actually it is not, it is static
| String specCompilerClassName = ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS; | ||
| if (config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) { | ||
| specCompilerClassName = config.getString(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY); |
There was a problem hiding this comment.
isn't there a method in ConfigUtils (or similar utility) that we could leverage to provide such fallback to a default?
There was a problem hiding this comment.
I do not think so.
There was a problem hiding this comment.
what about this one:
String ConfigUtils::getString(Config config, String path, String def)
?
There was a problem hiding this comment.
oh i misread your ask.
| protected final Optional<FlowTriggerHandler> flowTriggerHandler; | ||
| @Getter | ||
| protected final Map<String, Spec> scheduledFlowSpecs; | ||
| protected final Map<String, FlowSpec> scheduledFlowSpecs; |
There was a problem hiding this comment.
aah, ok, I see - so essentially just following more widely the specific-derived-class (FlowSpec) typing instituted therein.
(I like the increased clarity)
phet
left a comment
There was a problem hiding this comment.
nice work turning this one around in such short order!
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
creating instances of new new class created in PR [GOBBLIN-1910] Add
DagProcEngine,DagManagement,DagTask,DagProc, and other abstractions for refactored DAG management #3858Tests
updated tests
Commits