Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1983] Remove Optionals to make DagManager, EventSubmitter, and TopologyCatalog required for GaaS operation #3855

Merged
merged 2 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,13 @@ public class ServiceConfigKeys {
public static final String GOBBLIN_ORCHESTRATOR_LISTENER_CLASS = "org.apache.gobblin.service.modules.orchestration.Orchestrator";

// Gobblin Service Manager Keys
public static final String GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologyCatalog.enabled";
public static final String GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "flowCatalog.enabled";
public static final String GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "scheduler.enabled";
public static final String GOBBLIN_SERVICE_INSTANCE_NAME = GOBBLIN_SERVICE_PREFIX + "instance.name";

public static final String GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "restliServer.enabled";
public static final String GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologySpecFactory.enabled";
public static final String GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "gitConfigMonitor.enabled";
public static final String GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "dagManager.enabled";
public static final boolean DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED = false;
public static final String GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "jobStatusMonitor.enabled";
public static final String GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "warmStandby.enabled";
public static final String GOBBLIN_SERVICE_MULTI_ACTIVE_SCHEDULER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "multiActiveScheduler.enabled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import javax.annotation.Nonnull;

import org.apache.commons.lang3.StringUtils;

import com.codahale.metrics.Meter;
Expand All @@ -41,6 +39,8 @@
import com.google.common.collect.Lists;
import com.google.common.io.Closer;

import javax.annotation.Nonnull;
Copy link
Contributor

@phet phet Jan 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when I see:

Import order: java, org, com, gobblin.

here - https://gobblin.apache.org/docs/developer-guide/CodingStyle/

I presumed javax would follow java, not pop up between com and gobblin. are you sure this is supposed to move here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not do any import changes manually, intelliJ does it, hopefully based on the provided codestyle


import org.apache.gobblin.Constructs;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
Expand Down Expand Up @@ -247,6 +247,10 @@ public Timer apply(@Nonnull Timer input) {
});
}

public static void updateTimer(Timer timer, final long duration, final TimeUnit unit) {
updateTimer(Optional.of(timer), duration, unit);
}

/**
* Marks a meter only if it is defined.
* @param meter an Optional<{@link com.codahale.metrics.Meter}>
Expand All @@ -255,6 +259,10 @@ public static void markMeter(Optional<Meter> meter) {
markMeter(meter, 1);
}

public static void markMeter(Meter meter) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this being used?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my related Q is whether we still require the Optional version or could replace them w/ this one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are many non-gaas callers to this method with Optional parameter, which may be providing parameter that is truly optional.

markMeter(Optional.of(meter), 1);
}

/**
* Marks a meter only if it is defined.
* @param meter an Optional&lt;{@link com.codahale.metrics.Meter}&gt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public void submit(String name, Map<String, String> additionalMetadata) {
}

// Timestamp is set by metric context.
this.metricContext.get().submitEvent(new GobblinTrackingEvent(0l, this.namespace, name, finalMetadata));
this.metricContext.get().submitEvent(new GobblinTrackingEvent(0L, this.namespace, name, finalMetadata));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class FlowStatusTest {
class TestJobStatusRetriever extends JobStatusRetriever {

protected TestJobStatusRetriever(MultiContextIssueRepository issueRepository) {
super(ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED, issueRepository);
super(issueRepository);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public FlowStatus getFlowStatus(String flowName, String flowGroup, long flowExec
List<JobStatus> jobStatuses = ImmutableList.copyOf(retainStatusOfAnyFlowOrJobMatchingTag(
jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId), tag));
ExecutionStatus flowExecutionStatus =
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getDagManagerEnabled(), jobStatuses.iterator());
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatuses.iterator());
return jobStatuses.iterator().hasNext()
? new FlowStatus(flowName, flowGroup, flowExecutionId, jobStatuses.iterator(), flowExecutionStatus) : null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@

import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import com.google.common.base.Supplier;
Expand Down Expand Up @@ -60,15 +58,12 @@ public abstract class JobStatusRetriever implements LatestFlowExecutionIdTracker

@Getter
protected final MetricContext metricContext;
@Getter
protected final Boolean dagManagerEnabled;

private final MultiContextIssueRepository issueRepository;

protected JobStatusRetriever(boolean dagManagerEnabled, MultiContextIssueRepository issueRepository) {
protected JobStatusRetriever(MultiContextIssueRepository issueRepository) {
this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
this.issueRepository = Objects.requireNonNull(issueRepository);
this.dagManagerEnabled = dagManagerEnabled;
}

public abstract Iterator<JobStatus> getJobStatusesForFlowExecution(String flowName, String flowGroup,
Expand Down Expand Up @@ -186,7 +181,7 @@ protected List<FlowStatus> asFlowStatuses(List<FlowExecutionJobStateGrouping> fl
Comparator.comparing(this::getJobGroup).thenComparing(this::getJobName).thenComparing(this::getJobExecutionId)
).collect(Collectors.toList())));
return new FlowStatus(exec.getFlowName(), exec.getFlowGroup(), exec.getFlowExecutionId(), jobStatuses.iterator(),
getFlowStatusFromJobStatuses(dagManagerEnabled, jobStatuses.iterator()));
getFlowStatusFromJobStatuses(jobStatuses.iterator()));
}).collect(Collectors.toList());
}

Expand Down Expand Up @@ -227,31 +222,15 @@ public static boolean isFlowStatus(org.apache.gobblin.service.monitoring.JobStat
&& jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY) && jobStatus.getJobGroup().equals(JobStatusRetriever.NA_KEY);
}

public static ExecutionStatus getFlowStatusFromJobStatuses(boolean dagManagerEnabled, Iterator<JobStatus> jobStatusIterator) {
public static ExecutionStatus getFlowStatusFromJobStatuses(Iterator<JobStatus> jobStatusIterator) {
ExecutionStatus flowExecutionStatus = ExecutionStatus.$UNKNOWN;

if (dagManagerEnabled) {
while (jobStatusIterator.hasNext()) {
JobStatus jobStatus = jobStatusIterator.next();
// Check if this is the flow status instead of a single job status
if (JobStatusRetriever.isFlowStatus(jobStatus)) {
flowExecutionStatus = ExecutionStatus.valueOf(jobStatus.getEventName());
}
}
} else {
Set<ExecutionStatus> jobStatuses = new HashSet<>();
while (jobStatusIterator.hasNext()) {
JobStatus jobStatus = jobStatusIterator.next();
// because in absence of DagManager we do not get all flow level events, we will ignore the flow level events
// we actually get and purely calculate flow status based on flow statuses.
if (!JobStatusRetriever.isFlowStatus(jobStatus)) {
jobStatuses.add(ExecutionStatus.valueOf(jobStatus.getEventName()));
}
while (jobStatusIterator.hasNext()) {
JobStatus jobStatus = jobStatusIterator.next();
// Check if this is the flow status instead of a single job status
if (JobStatusRetriever.isFlowStatus(jobStatus)) {
flowExecutionStatus = ExecutionStatus.valueOf(jobStatus.getEventName());
}

List<ExecutionStatus> statusesInDescendingSalience = ImmutableList.of(ExecutionStatus.FAILED, ExecutionStatus.CANCELLED,
ExecutionStatus.RUNNING, ExecutionStatus.ORCHESTRATED, ExecutionStatus.COMPLETE);
flowExecutionStatus = statusesInDescendingSalience.stream().filter(jobStatuses::contains).findFirst().orElse(ExecutionStatus.$UNKNOWN);
}

return flowExecutionStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public void testIsFlowRunning() {
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("RUNNING").build();
jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3, flowStatus).iterator();
when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
when(jobStatusRetriever.getDagManagerEnabled()).thenReturn(true);
Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
}

Expand Down Expand Up @@ -109,9 +108,9 @@ public void testGetFlowStatusesAcrossGroup() {

// IMPORTANT: result invariants to honor - ordered by ascending flowName, all of same flowName adjacent, therein descending flowExecutionId
// NOTE: Three copies of FlowStatus are needed for repeated use, due to mutable, non-rewinding `Iterator FlowStatus.getJobStatusIterator`
FlowStatus flowStatus = createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2), jobStatusRetriever);
FlowStatus flowStatus2 = createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2), jobStatusRetriever);
FlowStatus flowStatus3 = createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2), jobStatusRetriever);
FlowStatus flowStatus = createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2));
FlowStatus flowStatus2 = createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2));
FlowStatus flowStatus3 = createFlowStatus(flowGroup, flowName1, flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2));
Mockito.when(jobStatusRetriever.getFlowStatusesForFlowGroupExecutions("myFlowGroup", 2))
.thenReturn(Collections.singletonList(flowStatus), Collections.singletonList(flowStatus2), Collections.singletonList(flowStatus3)); // (for three invocations)

Expand All @@ -138,9 +137,9 @@ public void testGetFlowStatusesAcrossGroup() {
Arrays.asList(f0jsmDep2)));
}

private FlowStatus createFlowStatus(String flowGroup, String flowName, long flowExecutionId, List<JobStatus> jobStatuses, JobStatusRetriever jobStatusRetriever) {
private FlowStatus createFlowStatus(String flowGroup, String flowName, long flowExecutionId, List<JobStatus> jobStatuses) {
return new FlowStatus(flowName, flowGroup, flowExecutionId, jobStatuses.iterator(),
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getDagManagerEnabled(), jobStatuses.iterator()));
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatuses.iterator()));
}

private JobStatus createFlowJobStatus(String flowGroup, String flowName, long flowExecutionId, ExecutionStatus status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ public class GobblinServiceConfiguration {
@Getter
private final boolean isMultiActiveSchedulerEnabled;

@Getter
private final boolean isTopologyCatalogEnabled;

@Getter
private final boolean isFlowCatalogEnabled;

Expand All @@ -64,9 +61,6 @@ public class GobblinServiceConfiguration {
@Getter
private final boolean isGitConfigMonitorEnabled;

@Getter
private final boolean isDagManagerEnabled;

@Getter
private final boolean isJobStatusMonitorEnabled;

Expand All @@ -93,8 +87,6 @@ public GobblinServiceConfiguration(String serviceName, String serviceId, Config
this.innerConfig = Objects.requireNonNull(config, "Config cannot be null");
this.serviceWorkDir = serviceWorkDir;

isTopologyCatalogEnabled =
ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY, true);
isFlowCatalogEnabled =
ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY, true);

Expand All @@ -113,8 +105,6 @@ public GobblinServiceConfiguration(String serviceName, String serviceId, Config
this.isMultiActiveSchedulerEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_MULTI_ACTIVE_SCHEDULER_ENABLED_KEY, false);

this.isHelixManagerEnabled = config.hasPath(ServiceConfigKeys.ZK_CONNECTION_STRING_KEY);
this.isDagManagerEnabled =
ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED);
this.isJobStatusMonitorEnabled =
ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY, true);
this.isSchedulerEnabled =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,6 @@

import java.util.Objects;

import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandlerWithWarmStandby;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitorFactory;
import org.apache.gobblin.service.monitoring.GitConfigMonitor;
import org.apache.helix.HelixManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -49,13 +37,18 @@
import javax.inject.Singleton;

import org.apache.gobblin.restli.EmbeddedRestliServer;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
import org.apache.gobblin.runtime.instance.StandardGobblinInstanceLauncher;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueRepository;
import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
import org.apache.gobblin.runtime.util.InjectionNames;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
import org.apache.gobblin.service.FlowConfigV2ResourceLocalHandler;
Expand All @@ -75,17 +68,24 @@
import org.apache.gobblin.service.modules.db.ServiceDatabaseProvider;
import org.apache.gobblin.service.modules.db.ServiceDatabaseProviderImpl;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandler;
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandler;
import org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandlerWithWarmStandby;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
import org.apache.gobblin.service.modules.troubleshooter.MySqlMultiContextIssueRepository;
import org.apache.gobblin.service.modules.utils.HelixUtils;
import org.apache.gobblin.runtime.util.InjectionNames;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitorFactory;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
import org.apache.gobblin.service.monitoring.GitConfigMonitor;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
Expand Down Expand Up @@ -190,20 +190,15 @@ public void configure(Binder binder) {
binder.bind(SharedFlowMetricsSingleton.class);

OptionalBinder.newOptionalBinder(binder, TopologyCatalog.class);
if (serviceConfig.isTopologyCatalogEnabled()) {
binder.bind(TopologyCatalog.class);
}
binder.bind(TopologyCatalog.class);

if (serviceConfig.isTopologySpecFactoryEnabled()) {
binder.bind(TopologySpecFactory.class)
.to(getClassByNameOrAlias(TopologySpecFactory.class, serviceConfig.getInnerConfig(),
ServiceConfigKeys.TOPOLOGYSPEC_FACTORY_KEY, ServiceConfigKeys.DEFAULT_TOPOLOGY_SPEC_FACTORY));
}

OptionalBinder.newOptionalBinder(binder, DagManager.class);
if (serviceConfig.isDagManagerEnabled()) {
binder.bind(DagManager.class);
}
binder.bind(DagManager.class);

OptionalBinder.newOptionalBinder(binder, HelixManager.class);
if (serviceConfig.isHelixManagerEnabled()) {
Expand Down
Loading
Loading