Skip to content

Commit

Permalink
[GOBBLIN-1575] use reference count in helix manager, so that connect/…
Browse files Browse the repository at this point in the history
…disconnect are called once and at the right time (#3427)

* make a copy of helix manager, so that each job can keep using it without worrying about disconnect

* address review comments

* address review comments

* use reference count to connect/disconnect only once

* address review comments
  • Loading branch information
arjun4084346 committed Nov 29, 2021
1 parent 374fd6f commit 89497c6
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,9 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher
@Getter
private DistributeJobMonitor jobMonitor;

public GobblinHelixDistributeJobExecutionLauncher(Builder builder) throws Exception {
if (builder.taskDriverHelixManager.isPresent()) {
this.planningJobHelixManager = builder.taskDriverHelixManager.get();
} else {
this.planningJobHelixManager = builder.jobHelixManager;
}
public GobblinHelixDistributeJobExecutionLauncher(Builder builder) {
this.planningJobHelixManager = builder.planningJobHelixManager;

this.helixTaskDriver = new TaskDriver(this.planningJobHelixManager);
this.sysProps = builder.sysProps;
this.jobPlanningProps = builder.jobPlanningProps;
Expand Down Expand Up @@ -151,7 +148,7 @@ private void executeCancellation() {
// work flow should never be deleted explicitly because it has a expiry time
// If cancellation is requested, we should set the job state to CANCELLED/ABORT
this.helixTaskDriver.waitToStop(planningJobId, this.helixJobStopTimeoutSeconds * 1000);
log.info("Stopped the workflow ", planningJobId);
log.info("Stopped the workflow {}", planningJobId);
}
} catch (HelixException e) {
// Cancellation may throw an exception, but Helix set the job state to STOP and it should eventually stop
Expand All @@ -168,8 +165,7 @@ private void executeCancellation() {
public static class Builder {
Properties sysProps;
Properties jobPlanningProps;
HelixManager jobHelixManager;
Optional<HelixManager> taskDriverHelixManager;
HelixManager planningJobHelixManager;
Path appWorkDir;
GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics;
GobblinHelixMetrics helixMetrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

package org.apache.gobblin.cluster;

import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
Expand All @@ -44,6 +40,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.common.io.Closer;
import com.typesafe.config.Config;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.ConfigurationKeys;
Expand Down Expand Up @@ -143,10 +144,12 @@ private GobblinHelixJobLauncher createJobLauncher()
/**
* Launch the actual {@link GobblinHelixJobLauncher}.
*/
@SneakyThrows
@Override
public TaskResult run() {
log.info("Running planning job {} [{} {}]", this.planningJobId, this.applicationName, this.instanceName);
this.jobTaskMetrics.updateTimeBetweenJobSubmissionAndExecution(this.jobPlusSysConfig);
this.jobHelixManager.connect();

try (Closer closer = Closer.create()) {
Optional<String> planningIdFromStateStore = this.jobsMapping.getPlanningJobId(jobUri);
Expand Down Expand Up @@ -203,6 +206,7 @@ public TaskResult run() {
return new TaskResult(TaskResult.Status.FAILED, "Exception occurred for job " + planningJobId + ":" + ExceptionUtils
.getFullStackTrace(e));
} finally {
this.jobHelixManager.disconnect();
// always cleanup the job mapping for current job name.
try {
this.jobsMapping.deleteMapping(jobUri);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.cluster;

import org.apache.helix.InstanceType;


public class GobblinHelixManagerFactory {

public static GobblinReferenceCountingZkHelixManager getZKHelixManager(String clusterName, String instanceName,
InstanceType type, String zkAddr) {
return new GobblinReferenceCountingZkHelixManager(clusterName, instanceName, type, zkAddr);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
Expand Down Expand Up @@ -160,30 +159,28 @@ protected void addLeadershipChangeAwareComponent (LeadershipChangeAwareComponent
/**
* Build the {@link HelixManager} for the Application Master.
*/
protected static HelixManager buildHelixManager(Config config, String zkConnectionString, String clusterName, InstanceType type) {
protected static HelixManager buildHelixManager(Config config, String clusterName, InstanceType type) {
Preconditions.checkArgument(config.hasPath(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY));
String zkConnectionString = config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
log.info("Using ZooKeeper connection string: " + zkConnectionString);

String helixInstanceName = ConfigUtils.getString(config, GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY,
GobblinClusterManager.class.getSimpleName());
return HelixManagerFactory.getZKHelixManager(
return GobblinHelixManagerFactory.getZKHelixManager(
config.getString(clusterName), helixInstanceName, type, zkConnectionString);
}

public void initialize() {
Preconditions.checkArgument(this.config.hasPath(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY));
String zkConnectionString = this.config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
log.info("Using ZooKeeper connection string: " + zkConnectionString);

if (this.dedicatedManagerCluster) {
Preconditions.checkArgument(this.config.hasPath(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY));
log.info("We will use separate clusters to manage GobblinClusterManager and job distribution.");
// This will create and register a Helix controller in ZooKeeper
this.managerClusterHelixManager = buildHelixManager(this.config,
zkConnectionString,
GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY,
InstanceType.CONTROLLER);

// This will create a Helix administrator to dispatch jobs to ZooKeeper
this.jobClusterHelixManager = buildHelixManager(this.config,
zkConnectionString,
GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY,
InstanceType.ADMINISTRATOR);

Expand All @@ -195,14 +192,13 @@ public void initialize() {

if (this.dedicatedJobClusterController) {
this.jobClusterController = Optional.of(GobblinHelixMultiManager
.buildHelixManager(this.config, zkConnectionString, GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY,
.buildHelixManager(this.config, GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY,
InstanceType.CONTROLLER));
}

if (this.dedicatedTaskDriverCluster) {
// This will create a Helix administrator to dispatch jobs to ZooKeeper
this.taskDriverHelixManager = Optional.of(buildHelixManager(this.config,
zkConnectionString,
GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY,
InstanceType.ADMINISTRATOR));

Expand All @@ -216,7 +212,7 @@ public void initialize() {
// This will create a dedicated controller for planning job distribution
if (dedicatedTaskDriverClusterController) {
this.taskDriverClusterController = Optional.of(GobblinHelixMultiManager
.buildHelixManager(this.config, zkConnectionString, GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY,
.buildHelixManager(this.config, GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY,
InstanceType.CONTROLLER));
}
}
Expand All @@ -226,7 +222,6 @@ public void initialize() {
boolean isHelixClusterManaged = ConfigUtils.getBoolean(this.config, GobblinClusterConfigurationKeys.IS_HELIX_CLUSTER_MANAGED,
GobblinClusterConfigurationKeys.DEFAULT_IS_HELIX_CLUSTER_MANAGED);
this.managerClusterHelixManager = buildHelixManager(this.config,
zkConnectionString,
GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY,
isHelixClusterManaged ? InstanceType.PARTICIPANT : InstanceType.CONTROLLER);
this.jobClusterHelixManager = this.managerClusterHelixManager;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.cluster;

import java.util.concurrent.atomic.AtomicInteger;

import org.apache.helix.InstanceType;
import org.apache.helix.manager.zk.ZKHelixManager;

import lombok.extern.slf4j.Slf4j;


/**
* A {@link ZKHelixManager} which keeps a reference count of users.
* Every user should call connect and disconnect to increase and decrease the count.
* Calls to connect and disconnect to the underlying ZKHelixManager are made only for the first and last usage respectively.
*/
@Slf4j
public class GobblinReferenceCountingZkHelixManager extends ZKHelixManager {
private final AtomicInteger usageCount = new AtomicInteger(0);

public GobblinReferenceCountingZkHelixManager(String clusterName, String instanceName, InstanceType instanceType, String zkAddress) {
super(clusterName, instanceName, instanceType, zkAddress);
}

@Override
public void connect() throws Exception {
if (usageCount.incrementAndGet() == 1) {
super.connect();
}
}

@Override
public void disconnect() {
if (usageCount.decrementAndGet() <= 0) {
super.disconnect();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
Expand Down Expand Up @@ -285,18 +284,18 @@ private void initHelixManager() {

if (this.isTaskDriver && this.dedicatedTaskDriverCluster) {
// This will create a Helix manager to receive the planning job
this.taskDriverHelixManager = Optional.of(HelixManagerFactory.getZKHelixManager(
this.taskDriverHelixManager = Optional.of(GobblinHelixManagerFactory.getZKHelixManager(
ConfigUtils.getString(this.clusterConfig, GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY, ""),
this.helixInstanceName,
InstanceType.PARTICIPANT,
zkConnectionString));
this.jobHelixManager = HelixManagerFactory.getZKHelixManager(
this.jobHelixManager = GobblinHelixManagerFactory.getZKHelixManager(
this.clusterName,
this.helixInstanceName,
InstanceType.ADMINISTRATOR,
zkConnectionString);
} else {
this.jobHelixManager = HelixManagerFactory.getZKHelixManager(
this.jobHelixManager = GobblinHelixManagerFactory.getZKHelixManager(
this.clusterName,
this.helixInstanceName,
InstanceType.PARTICIPANT,
Expand All @@ -305,8 +304,7 @@ private void initHelixManager() {
}

private HelixManager getReceiverManager() {
return taskDriverHelixManager.isPresent()?taskDriverHelixManager.get()
: this.jobHelixManager;
return taskDriverHelixManager.isPresent() ? taskDriverHelixManager.get() : this.jobHelixManager;
}

private TaskStateModelFactory createTaskStateModelFactory(Map<String, TaskFactory> taskFactoryMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ static GobblinHelixJobLauncher buildJobLauncherForCentralizedMode(GobblinHelixJo
*/
private void runJobLauncherLoop() throws JobException {
try {
this.jobHelixManager.connect();
while (true) {
currentJobLauncher = buildJobLauncherForCentralizedMode(jobScheduler, jobProps);
// in "run once" case, job scheduler will remove current job from the scheduler
Expand All @@ -233,6 +234,7 @@ private void runJobLauncherLoop() throws JobException {
log.error("Failed to run job {}", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
throw new JobException("Failed to run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
} finally {
this.jobHelixManager.disconnect();
currentJobLauncher = null;
}
}
Expand All @@ -249,8 +251,8 @@ private void runJobExecutionLauncher() throws JobException {
String newPlanningId;
Closer closer = Closer.create();
try {
HelixManager planningJobManager = this.taskDriverHelixManager.isPresent()?
this.taskDriverHelixManager.get() : this.jobHelixManager;
HelixManager planningJobHelixManager = this.taskDriverHelixManager.orElse(this.jobHelixManager);
planningJobHelixManager.connect();

String builderStr = jobProps.getProperty(GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER,
GobblinHelixDistributeJobExecutionLauncher.Builder.class.getName());
Expand All @@ -263,7 +265,7 @@ private void runJobExecutionLauncher() throws JobException {
jobLock.lock();

try {
if (planningJobIdFromStore.isPresent() && !canRun(planningJobIdFromStore.get(), planningJobManager)) {
if (planningJobIdFromStore.isPresent() && !canRun(planningJobIdFromStore.get(), planningJobHelixManager)) {
planningJobLauncherMetrics.skippedPlanningJobs.mark();
return;
}
Expand All @@ -284,8 +286,7 @@ private void runJobExecutionLauncher() throws JobException {

builder.setSysProps(this.sysProps);
builder.setJobPlanningProps(jobPlanningProps);
builder.setJobHelixManager(this.jobHelixManager);
builder.setTaskDriverHelixManager(this.taskDriverHelixManager);
builder.setPlanningJobHelixManager(planningJobHelixManager);
builder.setAppWorkDir(this.appWorkDir);
builder.setJobsMapping(this.jobsMapping);
builder.setPlanningJobLauncherMetrics(this.planningJobLauncherMetrics);
Expand All @@ -310,8 +311,9 @@ private void runJobExecutionLauncher() throws JobException {
// make sure the planning job is initialized (or visible) to other parallel running threads,
// so that the same critical section check (querying Helix for job completeness)
// can be applied.
HelixUtils.waitJobInitialization(planningJobManager, newPlanningId, newPlanningId);
HelixUtils.waitJobInitialization(planningJobHelixManager, newPlanningId, newPlanningId);
} finally {
planningJobHelixManager.disconnect();
// end of the critical section to check if a job with same job name is running
jobLock.unlock();
}
Expand Down

0 comments on commit 89497c6

Please sign in to comment.