Skip to content
Permalink
Browse files
Reuse zkclient in BestPossibleExternalViewVerifier and fix resource l…
…eak (#2180)

Reuse zkclient in BestPossibleExternalViewVerifier and fix resource leak

Reuse zkclient in verifier and improve resource closure logic to avoid resource leak.
  • Loading branch information
qqu0127 committed Aug 1, 2022
1 parent 49aef7c commit 132715785e90803ad8991da491f4621db1668fb8
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 51 deletions.
@@ -88,10 +88,5 @@ public boolean persistBestPossibleAssignment(
_bestPossibleAssignment = bestPossibleAssignment;
return true;
}

@Override
// BucketDataAccessor will be reused, won't be closed here.
public void close() {
}
}
}
@@ -41,12 +41,13 @@
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.serializer.ByteArraySerializer;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordJacksonSerializer;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.util.GZipCompressionUtil;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.slf4j.Logger;
@@ -75,10 +76,11 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {

private final int _bucketSize;
private final long _versionTTLms;
private ZkSerializer _zkSerializer;
private RealmAwareZkClient _zkClient;
private ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor;
private Map<String, ScheduledFuture> _gcTaskFutureMap = new HashMap<>();
private final ZkSerializer _zkSerializer;
private final RealmAwareZkClient _zkClient;
private final ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor;
private final Map<String, ScheduledFuture> _gcTaskFutureMap = new HashMap<>();
private boolean _usesExternalZkClient = false;

/**
* Constructor that allows a custom bucket size.
@@ -87,25 +89,21 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
* @param versionTTLms in ms
*/
public ZkBucketDataAccessor(String zkAddr, int bucketSize, long versionTTLms) {
_zkClient = createRealmAwareZkClient(zkAddr);
_zkClient.setZkSerializer(new ZkSerializer() {
@Override
public byte[] serialize(Object data) throws ZkMarshallingError {
if (data instanceof byte[]) {
return (byte[]) data;
}
throw new HelixException("ZkBucketDataAccesor only supports a byte array as an argument!");
}
this(createRealmAwareZkClient(zkAddr), bucketSize, versionTTLms, false);
}

@Override
public Object deserialize(byte[] data) throws ZkMarshallingError {
return data;
}
});
public ZkBucketDataAccessor(RealmAwareZkClient zkClient) {
this(zkClient, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL, true);
}

private ZkBucketDataAccessor(RealmAwareZkClient zkClient, int bucketSize, long versionTTLms,
boolean usesExternalZkClient) {
_zkClient = zkClient;
_zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
_zkSerializer = new ZNRecordJacksonSerializer();
_bucketSize = bucketSize;
_versionTTLms = versionTTLms;
_usesExternalZkClient = usesExternalZkClient;
}

/**
@@ -135,6 +133,7 @@ private static RealmAwareZkClient createRealmAwareZkClient(String zkAddr) {
zkClient = DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
}
zkClient.setZkSerializer(new ByteArraySerializer());
return zkClient;
}

@@ -258,7 +257,7 @@ public void compressedBucketDelete(String path) {

@Override
public void disconnect() {
if (!_zkClient.isClosed()) {
if (!_usesExternalZkClient && _zkClient != null && !_zkClient.isClosed()) {
_zkClient.close();
}
}
@@ -34,6 +34,7 @@
import org.apache.helix.PropertyKey;
import org.apache.helix.controller.common.PartitionStateMap;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer;
import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
@@ -91,6 +92,7 @@ public BestPossibleExternalViewVerifier(String zkAddr, String clusterName, Set<S
_resources = resources;
_expectLiveInstances = expectLiveInstances;
_dataProvider = new ResourceControllerDataProvider();
// _zkClient should be closed with BestPossibleExternalViewVerifier
}

/**
@@ -105,7 +107,7 @@ public BestPossibleExternalViewVerifier(String zkAddr, String clusterName, Set<S
public BestPossibleExternalViewVerifier(RealmAwareZkClient zkClient, String clusterName,
Set<String> resources, Map<String, Map<String, String>> errStates,
Set<String> expectLiveInstances) {
this(zkClient, clusterName, resources, errStates, expectLiveInstances, 0);
this(zkClient, clusterName, errStates, resources, expectLiveInstances, 0, true);
}

@Deprecated
@@ -114,11 +116,7 @@ public BestPossibleExternalViewVerifier(RealmAwareZkClient zkClient, String clus
Set<String> expectLiveInstances, int waitTillVerify) {
// usesExternalZkClient = true because ZkClient is given by the caller
// at close(), we will not close this ZkClient because it might be being used elsewhere
super(zkClient, clusterName, true, waitTillVerify);
_errStates = errStates;
_resources = resources;
_expectLiveInstances = expectLiveInstances;
_dataProvider = new ResourceControllerDataProvider();
this(zkClient, clusterName, errStates, resources, expectLiveInstances, waitTillVerify, true);
}

private BestPossibleExternalViewVerifier(RealmAwareZkClient zkClient, String clusterName,
@@ -144,7 +142,6 @@ public static class Builder extends ZkHelixClusterVerifier.Builder<Builder> {
private Set<String> _resources;
private Set<String> _expectLiveInstances;
private RealmAwareZkClient _zkClient;
private boolean _usesExternalZkClient = false; // false by default

public Builder(String clusterName) {
_clusterName = clusterName;
@@ -155,11 +152,12 @@ public BestPossibleExternalViewVerifier build() {
throw new IllegalArgumentException("Cluster name is missing!");
}

// _usesExternalZkClient == true
if (_zkClient != null) {
return new BestPossibleExternalViewVerifier(_zkClient, _clusterName, _resources, _errStates,
_expectLiveInstances, _waitPeriodTillVerify);
return new BestPossibleExternalViewVerifier(_zkClient, _clusterName, _errStates, _resources,
_expectLiveInstances, _waitPeriodTillVerify, true);
}

// _usesExternalZkClient == false
if (_realmAwareZkConnectionConfig == null || _realmAwareZkClientConfig == null) {
// For backward-compatibility
return new BestPossibleExternalViewVerifier(_zkAddress, _clusterName, _resources,
@@ -170,7 +168,7 @@ public BestPossibleExternalViewVerifier build() {
return new BestPossibleExternalViewVerifier(
createZkClient(RealmAwareZkClient.RealmMode.SINGLE_REALM, _realmAwareZkConnectionConfig,
_realmAwareZkClientConfig, _zkAddress), _clusterName, _errStates, _resources,
_expectLiveInstances, _waitPeriodTillVerify, _usesExternalZkClient);
_expectLiveInstances, _waitPeriodTillVerify, false);
}

public String getClusterName() {
@@ -210,7 +208,6 @@ public String getZkAddr() {

public Builder setZkClient(RealmAwareZkClient zkClient) {
_zkClient = zkClient;
_usesExternalZkClient = true; // Set the flag since external ZkClient is used
return this;
}
}
@@ -435,18 +432,15 @@ private BestPossibleStateOutput calcBestPossState(ResourceControllerDataProvider

RebalanceUtil.runStage(event, new CurrentStateComputationStage());
// Note the readOnlyWagedRebalancer is just for one time usage
DryrunWagedRebalancer dryrunWagedRebalancer =
new DryrunWagedRebalancer(_zkClient.getServers(), cache.getClusterName(),
cache.getClusterConfig().getGlobalRebalancePreference());
event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(), dryrunWagedRebalancer);
try {

try (ZkBucketDataAccessor zkBucketDataAccessor = new ZkBucketDataAccessor(_zkClient);
DryrunWagedRebalancer dryrunWagedRebalancer = new DryrunWagedRebalancer(zkBucketDataAccessor,
cache.getClusterName(), cache.getClusterConfig().getGlobalRebalancePreference())) {
event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(), dryrunWagedRebalancer);
RebalanceUtil.runStage(event, new BestPossibleStateCalcStage());
} finally {
dryrunWagedRebalancer.close();
}

BestPossibleStateOutput output = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
return output;
return event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
}

@Override
@@ -456,15 +450,17 @@ public String toString() {
+ (_resources != null ? Arrays.toString(_resources.toArray()) : "") + "])";
}

// TODO: to clean up, finalize is deprecated in Java 9
@Override
public void finalize() {
close();
super.finalize();
}

private class DryrunWagedRebalancer extends org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer {
public DryrunWagedRebalancer(String metadataStoreAddress, String clusterName,
private static class DryrunWagedRebalancer extends ReadOnlyWagedRebalancer implements AutoCloseable {
public DryrunWagedRebalancer(ZkBucketDataAccessor zkBucketDataAccessor, String clusterName,
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
super(new ZkBucketDataAccessor(metadataStoreAddress), clusterName, preferences);
super(zkBucketDataAccessor, clusterName, preferences);
}

@Override
@@ -44,7 +44,7 @@
import org.slf4j.LoggerFactory;

public abstract class ZkHelixClusterVerifier
implements IZkChildListener, IZkDataListener, HelixClusterVerifier {
implements IZkChildListener, IZkDataListener, HelixClusterVerifier, AutoCloseable {
private static Logger LOG = LoggerFactory.getLogger(ZkHelixClusterVerifier.class);
protected static int DEFAULT_TIMEOUT = 300 * 1000;
protected static int DEFAULT_PERIOD = 500;
@@ -229,6 +229,11 @@ public boolean verifyByPolling() {
return verifyByPolling(DEFAULT_TIMEOUT, DEFAULT_PERIOD);
}

/**
* Implement close() for {@link AutoCloseable} and {@link HelixClusterVerifier}.
* Non-external resources should be closed in this method to prevent resource leak.
*/
@Override
public void close() {
if (_zkClient != null && !_usesExternalZkClient) {
_zkClient.close();
@@ -285,7 +285,7 @@ public ZkBucketDataAccessor getZkBucketDataAccessor() {
if (_zkBucketDataAccessor == null) {
synchronized (this) {
if (_zkBucketDataAccessor == null) {
_zkBucketDataAccessor = new ZkBucketDataAccessor(_zkAddr);
_zkBucketDataAccessor = new ZkBucketDataAccessor(getByteArrayRealmAwareZkClient());
}
}
}

0 comments on commit 1327157

Please sign in to comment.