Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -69,18 +69,19 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
Executors.newSingleThreadScheduledExecutor();

private final int _bucketSize;
private final long _versionTTL;
private final long _versionTTLms;
private ZkSerializer _zkSerializer;
private RealmAwareZkClient _zkClient;
private ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor;
private ScheduledFuture _gcTaskFuture = null;

/**
* Constructor that allows a custom bucket size.
* @param zkAddr
* @param bucketSize
* @param versionTTL in ms
* @param versionTTLms in ms
*/
public ZkBucketDataAccessor(String zkAddr, int bucketSize, long versionTTL) {
public ZkBucketDataAccessor(String zkAddr, int bucketSize, long versionTTLms) {
if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
try {
// Create realm-aware ZkClient.
Expand Down Expand Up @@ -114,7 +115,7 @@ public Object deserialize(byte[] data) throws ZkMarshallingError {
_zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
_zkSerializer = new ZNRecordJacksonSerializer();
_bucketSize = bucketSize;
_versionTTL = versionTTL;
_versionTTLms = versionTTLms;
}

/**
Expand Down Expand Up @@ -223,7 +224,7 @@ public <T extends HelixProperty> boolean compressedBucketWrite(String rootPath,
}

// 5. Update the timer for GC
updateGCTimer(rootPath, versionStr);
updateGCTimer(rootPath, version);
return true;
}

Expand Down Expand Up @@ -329,32 +330,34 @@ public void close() {
disconnect();
}

private void updateGCTimer(String rootPath, String currentVersion) {
TimerTask gcTask = new TimerTask() {
@Override
public void run() {
private synchronized void updateGCTimer(String rootPath, long currentVersion) {
if (_gcTaskFuture != null) {
_gcTaskFuture.cancel(false);
}
Comment on lines +334 to +336
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to cancel? What if that causes incomplete deletion of stale versions? I think we should let it queue up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cancel will only cancel the pending tasks. For the pending tasks that are queued up, they will only cause extra children list read. The later operation will be the same. And if there is risk of incomplete deleting, then that concern lives no matter we let it queued up or not. That issue (if exists) should be addressed separately.

// Schedule the gc task with TTL
_gcTaskFuture = GC_THREAD.schedule(() -> {
try {
deleteStaleVersions(rootPath, currentVersion);
} catch (Exception ex) {
LOG.error("Failed to delete the stale versions.", ex);
}
};

// Schedule the gc task with TTL
GC_THREAD.schedule(gcTask, _versionTTL, TimeUnit.MILLISECONDS);
}, _versionTTLms, TimeUnit.MILLISECONDS);
}

/**
* Deletes all stale versions.
* @param rootPath
* @param currentVersion
*/
private void deleteStaleVersions(String rootPath, String currentVersion) {
private void deleteStaleVersions(String rootPath, long currentVersion) {
// Get all children names under path
List<String> children = _zkBaseDataAccessor.getChildNames(rootPath, AccessOption.PERSISTENT);
if (children == null || children.isEmpty()) {
// The whole path has been deleted so return immediately
return;
}
filterChildrenNames(children, currentVersion);
List<String> pathsToDelete = getPathsToDelete(rootPath, children);
List<String> pathsToDelete =
getPathsToDelete(rootPath, filterChildrenNames(children, currentVersion));
for (String pathToDelete : pathsToDelete) {
// TODO: Should be batch delete but it doesn't work. It's okay since this runs async
_zkBaseDataAccessor.remove(pathToDelete, AccessOption.PERSISTENT);
Expand All @@ -363,29 +366,32 @@ private void deleteStaleVersions(String rootPath, String currentVersion) {

/**
* Filter out non-version children names and non-stale versions.
* @param childrenToRemove
* @param childrenNodes
* @param currentVersion
* @return The filtered child node names to be removed
*/
private void filterChildrenNames(List<String> childrenToRemove, String currentVersion) {
// Leave out metadata
childrenToRemove.remove(LAST_SUCCESSFUL_WRITE_KEY);
childrenToRemove.remove(LAST_WRITE_KEY);

// Leave out currentVersion and above
// This is because we want to honor the TTL for newer versions
childrenToRemove.remove(currentVersion);
long currentVer = Long.parseLong(currentVersion);
for (String child : childrenToRemove) {
private List<String> filterChildrenNames(List<String> childrenNodes, long currentVersion) {
List<String> childrenToRemove = new ArrayList<>();
for (String child : childrenNodes) {
// Leave out metadata
if (child.equals(LAST_SUCCESSFUL_WRITE_KEY) || child.equals(LAST_WRITE_KEY)) {
continue;
}
long childVer;
try {
long version = Long.parseLong(child);
if (version >= currentVer) {
childrenToRemove.remove(child);
}
} catch (Exception e) {
childVer = Long.parseLong(child);
} catch (NumberFormatException ex) {
LOG.warn("Found an invalid ZNode: {}", child);
// Ignore ZNode names that aren't parseable
childrenToRemove.remove(child);
LOG.debug("Found an invalid ZNode: {}", child);
continue;
}
if (childVer < currentVersion) {
childrenToRemove.add(child);
}
// Leave out currentVersion and above
// This is because we want to honor the TTL for newer versions
}
return childrenToRemove;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,25 @@
* under the License.
*/

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.BucketDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.TestHelper;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.testng.Assert;
Expand All @@ -49,6 +50,7 @@ public class TestZkBucketDataAccessor extends ZkTestBase {
private static final String NAME_KEY = TestHelper.getTestClassName();
private static final String LAST_SUCCESSFUL_WRITE_KEY = "LAST_SUCCESSFUL_WRITE";
private static final String LAST_WRITE_KEY = "LAST_WRITE";
private static final long VERSION_TTL_MS = 1000L;

// Populate list and map fields for content comparison
private static final List<String> LIST_FIELD = ImmutableList.of("1", "2");
Expand All @@ -62,7 +64,7 @@ public class TestZkBucketDataAccessor extends ZkTestBase {
@BeforeClass
public void beforeClass() {
// Initialize ZK accessors for testing
_bucketDataAccessor = new ZkBucketDataAccessor(ZK_ADDR, 50 * 1024, 0L);
_bucketDataAccessor = new ZkBucketDataAccessor(ZK_ADDR, 50 * 1024, VERSION_TTL_MS);
HelixZkClient zkClient = DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
zkClient.setZkSerializer(new ZkSerializer() {
Expand Down Expand Up @@ -103,8 +105,13 @@ public void testCompressedBucketWrite() throws IOException {

@Test(dependsOnMethods = "testCompressedBucketWrite")
public void testMultipleWrites() throws Exception {
int count = 50;
// Note to use a count number < 10 for testing.
// Otherwise the nodes named with version number will be ordered in a different alphabet order.
// This might hide some bugs in the GC code。
int count = 5;

Assert.assertTrue(VERSION_TTL_MS > 100,
"This test should be executed with the TTL more than 100ms.");
// Write "count" times
for (int i = 0; i < count; i++) {
_bucketDataAccessor.compressedBucketWrite(PATH, new HelixProperty(record));
Expand All @@ -126,8 +133,17 @@ public void testMultipleWrites() throws Exception {
// Use Verifier because GC can take ZK delay
Assert.assertTrue(TestHelper.verify(() -> {
List<String> children = _zkBaseDataAccessor.getChildNames(PATH, AccessOption.PERSISTENT);
return children.size() == 3;
}, 60 * 1000L));
return children.size() == 3 && children.containsAll(ImmutableList
.of(LAST_SUCCESSFUL_WRITE_KEY, LAST_WRITE_KEY,
new Long(lastSuccessfulWriteVer).toString()));
}, VERSION_TTL_MS * 2));

// Wait one more TTL to ensure that the GC has been done.
Thread.sleep(VERSION_TTL_MS);
List<String> children = _zkBaseDataAccessor.getChildNames(PATH, AccessOption.PERSISTENT);
Assert.assertTrue(children.size() == 3 && children.containsAll(ImmutableList
.of(LAST_SUCCESSFUL_WRITE_KEY, LAST_WRITE_KEY,
new Long(lastSuccessfulWriteVer).toString())));
}

/**
Expand Down