diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java index a11da29f0e6..bc13471a58c 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java @@ -19,56 +19,60 @@ * under the License. */ +import com.google.common.collect.ImmutableMap; import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.I0Itec.zkclient.DataUpdater; import org.I0Itec.zkclient.exception.ZkMarshallingError; import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.I0Itec.zkclient.serialize.ZkSerializer; import org.apache.helix.AccessOption; -import org.apache.helix.BaseDataAccessor; import org.apache.helix.BucketDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.HelixProperty; import org.apache.helix.ZNRecord; import org.apache.helix.manager.zk.client.DedicatedZkClientFactory; import org.apache.helix.manager.zk.client.HelixZkClient; -import org.apache.helix.manager.zk.client.SharedZkClientFactory; import org.apache.helix.util.GZipCompressionUtil; +import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable { - private static Logger LOG = LoggerFactory.getLogger(ZkBucketDataAccessor.class); + private static final Logger LOG = LoggerFactory.getLogger(ZkBucketDataAccessor.class); - private static final int DEFAULT_NUM_VERSIONS = 2; + private static final int DEFAULT_BUCKET_SIZE = 50 * 1024; // 50KB + private static final long DEFAULT_VERSION_TTL = TimeUnit.MINUTES.toMillis(1L); // 1 min private static final String BUCKET_SIZE_KEY = "BUCKET_SIZE"; private static final String DATA_SIZE_KEY = "DATA_SIZE"; - private static final String WRITE_LOCK_KEY = "WRITE_LOCK"; - private static final String LAST_SUCCESS_KEY = "LAST_SUCCESS"; + private static final String METADATA_KEY = "METADATA"; + private static final String LAST_SUCCESSFUL_WRITE_KEY = "LAST_SUCCESSFUL_WRITE"; + private static final String LAST_WRITE_KEY = "LAST_WRITE"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + // Thread pool for deleting stale versions + private static final ScheduledExecutorService GC_THREAD = Executors.newScheduledThreadPool(1); - // 100 KB for default bucket size - private static final int DEFAULT_BUCKET_SIZE = 50 * 1024; private final int _bucketSize; - private final int _numVersions; + private final long _versionTTL; private ZkSerializer _zkSerializer; private HelixZkClient _zkClient; - private HelixZkClient _znRecordClient; - private BaseDataAccessor _zkBaseDataAccessor; - private BaseDataAccessor _znRecordBaseDataAccessor; + private ZkBaseDataAccessor _zkBaseDataAccessor; /** * Constructor that allows a custom bucket size. * @param zkAddr * @param bucketSize - * @param numVersions number of versions to store in ZK + * @param versionTTL in ms */ - public ZkBucketDataAccessor(String zkAddr, int bucketSize, int numVersions) { - // There are two HelixZkClients: - // 1. _zkBaseDataAccessor for writes of binary data - // 2. _znRecordBaseDataAccessor for writes of ZNRecord (metadata) + public ZkBucketDataAccessor(String zkAddr, int bucketSize, long versionTTL) { _zkClient = DedicatedZkClientFactory.getInstance() .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr)); _zkClient.setZkSerializer(new ZkSerializer() { @@ -85,20 +89,10 @@ public Object deserialize(byte[] data) throws ZkMarshallingError { return data; } }); - _zkBaseDataAccessor = new ZkBaseDataAccessor(_zkClient); - - // TODO: Optimize serialization with Jackson - // TODO: Or use a better binary serialization protocol - // TODO: Consider making this also binary - // TODO: Consider an async write for the metadata as well - _znRecordClient = SharedZkClientFactory.getInstance() - .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr)); - _znRecordBaseDataAccessor = new ZkBaseDataAccessor<>(_znRecordClient); - _znRecordClient.setZkSerializer(new ZNRecordSerializer()); - + _zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient); _zkSerializer = new ZNRecordJacksonSerializer(); _bucketSize = bucketSize; - _numVersions = numVersions; + _versionTTL = versionTTL; } /** @@ -106,82 +100,109 @@ public Object deserialize(byte[] data) throws ZkMarshallingError { * @param zkAddr */ public ZkBucketDataAccessor(String zkAddr) { - this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_NUM_VERSIONS); + this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL); } @Override - public boolean compressedBucketWrite(String path, T value) + public boolean compressedBucketWrite(String rootPath, T value) throws IOException { - // Take the ZNrecord and serialize it (get byte[]) + DataUpdater lastWriteVersionUpdater = dataInZk -> { + if (dataInZk == null || dataInZk.length == 0) { + // No last write version exists, so start with 0 + return "0".getBytes(); + } + // Last write exists, so increment and write it back + // **String conversion is necessary to make it display in ZK (zooinspector)** + String lastWriteVersionStr = new String(dataInZk); + long lastWriteVersion = Long.parseLong(lastWriteVersionStr); + lastWriteVersion++; + return String.valueOf(lastWriteVersion).getBytes(); + }; + + // 1. Increment lastWriteVersion using DataUpdater + ZkBaseDataAccessor.AccessResult result = _zkBaseDataAccessor.doUpdate( + rootPath + "/" + LAST_WRITE_KEY, lastWriteVersionUpdater, AccessOption.PERSISTENT); + if (result._retCode != ZkBaseDataAccessor.RetCode.OK) { + throw new HelixException( + String.format("Failed to write the write version at path: %s!", rootPath)); + } + + // Successfully reserved a version number + byte[] binaryVersion = (byte[]) result._updatedValue; + String versionStr = new String(binaryVersion); + final long version = Long.parseLong(versionStr); + + // 2. Write to the incremented last write version + String versionedDataPath = rootPath + "/" + versionStr; + + // Take the ZNRecord and serialize it (get byte[]) byte[] serializedRecord = _zkSerializer.serialize(value.getRecord()); // Compress the byte[] byte[] compressedRecord = GZipCompressionUtil.compress(serializedRecord); // Compute N - number of buckets int numBuckets = (compressedRecord.length + _bucketSize - 1) / _bucketSize; - if (tryLock(path)) { - try { - // Read or initialize metadata and compute the last success version index - ZNRecord metadataRecord = - _znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT); - if (metadataRecord == null) { - metadataRecord = new ZNRecord(extractIdFromPath(path)); - } - int lastSuccessIndex = - (metadataRecord.getIntField(LAST_SUCCESS_KEY, -1) + 1) % _numVersions; - String dataPath = path + "/" + lastSuccessIndex; - - List paths = new ArrayList<>(); - List buckets = new ArrayList<>(); - - int ptr = 0; - int counter = 0; - while (counter < numBuckets) { - paths.add(dataPath + "/" + counter); - if (counter == numBuckets - 1) { - // Special treatment for the last bucket - buckets.add(Arrays.copyOfRange(compressedRecord, ptr, - ptr + compressedRecord.length % _bucketSize)); - } else { - buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + _bucketSize)); - } - ptr += _bucketSize; - counter++; - } - - // Do a cleanup of previous data - if (!_zkBaseDataAccessor.remove(dataPath, AccessOption.PERSISTENT)) { - // Clean-up is not critical so upon failure, we log instead of throwing an exception - LOG.warn("Failed to clean up previous bucketed data in data path: {}", dataPath); - } + List paths = new ArrayList<>(); + List buckets = new ArrayList<>(); + + int ptr = 0; + int counter = 0; + while (counter < numBuckets) { + paths.add(versionedDataPath + "/" + counter); + if (counter == numBuckets - 1) { + // Special treatment for the last bucket + buckets.add( + Arrays.copyOfRange(compressedRecord, ptr, ptr + compressedRecord.length % _bucketSize)); + } else { + buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + _bucketSize)); + } + ptr += _bucketSize; + counter++; + } - // Do an async set to ZK - boolean[] success = - _zkBaseDataAccessor.setChildren(paths, buckets, AccessOption.PERSISTENT); - // Exception and fail the write if any failed - for (boolean s : success) { - if (!s) { - throw new HelixException( - String.format("Failed to write the data buckets for path: %s", path)); - } - } + // 3. Include the metadata in the batch write + Map metadata = ImmutableMap.of(BUCKET_SIZE_KEY, Integer.toString(_bucketSize), + DATA_SIZE_KEY, Integer.toString(compressedRecord.length)); + byte[] binaryMetadata = OBJECT_MAPPER.writeValueAsBytes(metadata); + paths.add(versionedDataPath + "/" + METADATA_KEY); + buckets.add(binaryMetadata); + + // Do an async set to ZK + boolean[] success = _zkBaseDataAccessor.setChildren(paths, buckets, AccessOption.PERSISTENT); + // Exception and fail the write if any failed + for (boolean s : success) { + if (!s) { + throw new HelixException( + String.format("Failed to write the data buckets for path: %s", rootPath)); + } + } - // Data write completed, so update the metadata with last success index - // Note that the metadata ZNodes is written using sync write - metadataRecord.setIntField(BUCKET_SIZE_KEY, _bucketSize); - metadataRecord.setLongField(DATA_SIZE_KEY, compressedRecord.length); - metadataRecord.setIntField(LAST_SUCCESS_KEY, lastSuccessIndex); - if (!_znRecordBaseDataAccessor.set(path, metadataRecord, AccessOption.PERSISTENT)) { - throw new HelixException( - String.format("Failed to write the metadata at path: %s!", path)); - } - } finally { - // Critical section for write ends here - unlock(path); + // 4. Update lastSuccessfulWriteVersion using Updater + DataUpdater lastSuccessfulWriteVersionUpdater = dataInZk -> { + if (dataInZk == null || dataInZk.length == 0) { + // No last write version exists, so write version from this write + return versionStr.getBytes(); } - return true; + // Last successful write exists so check if it's smaller than my number + String lastWriteVersionStr = new String(dataInZk); + long lastWriteVersion = Long.parseLong(lastWriteVersionStr); + if (lastWriteVersion < version) { + // Smaller, so I can overwrite + return versionStr.getBytes(); + } else { + // Greater, I have lagged behind. Return null and do not write + return null; + } + }; + if (!_zkBaseDataAccessor.update(rootPath + "/" + LAST_SUCCESSFUL_WRITE_KEY, + lastSuccessfulWriteVersionUpdater, AccessOption.PERSISTENT)) { + throw new HelixException(String + .format("Failed to write the last successful write metadata at path: %s!", rootPath)); } - throw new HelixException(String.format("Could not acquire lock for write. Path: %s", path)); + + // 5. Update the timer for GC + updateGCTimer(rootPath, versionStr); + return true; } @Override @@ -202,126 +223,158 @@ public void disconnect() { if (!_zkClient.isClosed()) { _zkClient.close(); } - if (!_znRecordClient.isClosed()) { - _znRecordClient.close(); - } } private HelixProperty compressedBucketRead(String path) { - // TODO: Incorporate parallelism into reads instead of locking the whole thing against other - // reads and writes - if (tryLock(path)) { - try { - // Retrieve the metadata - ZNRecord metadataRecord = - _znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT); - if (metadataRecord == null) { - throw new ZkNoNodeException( - String.format("Metadata ZNRecord does not exist for path: %s", path)); - } + // 1. Get the version to read + byte[] binaryVersionToRead = _zkBaseDataAccessor.get(path + "/" + LAST_SUCCESSFUL_WRITE_KEY, + null, AccessOption.PERSISTENT); + if (binaryVersionToRead == null) { + throw new ZkNoNodeException( + String.format("Last successful write ZNode does not exist for path: %s", path)); + } + String versionToRead = new String(binaryVersionToRead); + + // 2. Get the metadata map + byte[] binaryMetadata = _zkBaseDataAccessor.get(path + "/" + versionToRead + "/" + METADATA_KEY, + null, AccessOption.PERSISTENT); + if (binaryMetadata == null) { + throw new ZkNoNodeException( + String.format("Metadata ZNode does not exist for path: %s", path)); + } + Map metadata; + try { + metadata = OBJECT_MAPPER.readValue(binaryMetadata, Map.class); + } catch (IOException e) { + throw new HelixException(String.format("Failed to deserialize path metadata: %s!", path), e); + } - int bucketSize = metadataRecord.getIntField(BUCKET_SIZE_KEY, -1); - int dataSize = metadataRecord.getIntField(DATA_SIZE_KEY, -1); - int lastSuccessIndex = metadataRecord.getIntField(LAST_SUCCESS_KEY, -1); - if (lastSuccessIndex == -1) { - throw new HelixException(String.format("Metadata ZNRecord does not have %s! Path: %s", - LAST_SUCCESS_KEY, path)); - } - if (bucketSize == -1) { - throw new HelixException( - String.format("Metadata ZNRecord does not have %s! Path: %s", BUCKET_SIZE_KEY, path)); - } - if (dataSize == -1) { - throw new HelixException( - String.format("Metadata ZNRecord does not have %s! Path: %s", DATA_SIZE_KEY, path)); - } + // 3. Read the data + Object bucketSizeObj = metadata.get(BUCKET_SIZE_KEY); + Object dataSizeObj = metadata.get(DATA_SIZE_KEY); + if (bucketSizeObj == null) { + throw new HelixException( + String.format("Metadata ZNRecord does not have %s! Path: %s", BUCKET_SIZE_KEY, path)); + } + if (dataSizeObj == null) { + throw new HelixException( + String.format("Metadata ZNRecord does not have %s! Path: %s", DATA_SIZE_KEY, path)); + } + int bucketSize = Integer.parseInt((String) bucketSizeObj); + int dataSize = Integer.parseInt((String) dataSizeObj); - // Compute N - number of buckets - int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize; - byte[] compressedRecord = new byte[dataSize]; - String dataPath = path + "/" + lastSuccessIndex; + // Compute N - number of buckets + int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize; + byte[] compressedRecord = new byte[dataSize]; + String dataPath = path + "/" + versionToRead; - List paths = new ArrayList<>(); - for (int i = 0; i < numBuckets; i++) { - paths.add(dataPath + "/" + i); - } + List paths = new ArrayList<>(); + for (int i = 0; i < numBuckets; i++) { + paths.add(dataPath + "/" + i); + } - // Async get - List buckets = _zkBaseDataAccessor.get(paths, null, AccessOption.PERSISTENT, true); - - // Combine buckets into one byte array - int copyPtr = 0; - for (int i = 0; i < numBuckets; i++) { - if (i == numBuckets - 1) { - // Special treatment for the last bucket - System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, dataSize % bucketSize); - } else { - System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, bucketSize); - copyPtr += bucketSize; - } - } + // Async get + List buckets = _zkBaseDataAccessor.get(paths, null, AccessOption.PERSISTENT, true); + + // Combine buckets into one byte array + int copyPtr = 0; + for (int i = 0; i < numBuckets; i++) { + if (i == numBuckets - 1) { + // Special treatment for the last bucket + System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, dataSize % bucketSize); + } else { + System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, bucketSize); + copyPtr += bucketSize; + } + } - // Decompress the byte array - ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressedRecord); - byte[] serializedRecord; - try { - serializedRecord = GZipCompressionUtil.uncompress(byteArrayInputStream); - } catch (IOException e) { - throw new HelixException(String.format("Failed to decompress path: %s!", path), e); - } + // Decompress the byte array + ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressedRecord); + byte[] serializedRecord; + try { + serializedRecord = GZipCompressionUtil.uncompress(byteArrayInputStream); + } catch (IOException e) { + throw new HelixException(String.format("Failed to decompress path: %s!", path), e); + } + + // Deserialize the record to retrieve the original + ZNRecord originalRecord = (ZNRecord) _zkSerializer.deserialize(serializedRecord); + return new HelixProperty(originalRecord); + } + + @Override + public void close() { + disconnect(); + } - // Deserialize the record to retrieve the original - ZNRecord originalRecord = (ZNRecord) _zkSerializer.deserialize(serializedRecord); - return new HelixProperty(originalRecord); - } finally { - // Critical section for read ends here - unlock(path); + private void updateGCTimer(String rootPath, String currentVersion) { + TimerTask gcTask = new TimerTask() { + @Override + public void run() { + deleteStaleVersions(rootPath, currentVersion); } - } - throw new HelixException(String.format("Could not acquire lock for read. Path: %s", path)); + }; + + // Schedule the gc task with TTL + GC_THREAD.schedule(gcTask, _versionTTL, TimeUnit.MILLISECONDS); } /** - * Returns the last string element in a split String array by /. - * @param path - * @return + * Deletes all stale versions. + * @param rootPath + * @param currentVersion */ - private String extractIdFromPath(String path) { - String[] splitPath = path.split("/"); - return splitPath[splitPath.length - 1]; + private void deleteStaleVersions(String rootPath, String currentVersion) { + // Get all children names under path + List children = _zkBaseDataAccessor.getChildNames(rootPath, AccessOption.PERSISTENT); + if (children == null || children.isEmpty()) { + // The whole path has been deleted so return immediately + return; + } + filterChildrenNames(children, currentVersion); + List pathsToDelete = getPathsToDelete(rootPath, children); + for (String pathToDelete : pathsToDelete) { + // TODO: Should be batch delete but it doesn't work. It's okay since this runs async + _zkBaseDataAccessor.remove(pathToDelete, AccessOption.PERSISTENT); + } } /** - * Acquires the lock (create an ephemeral node) only if it is free (no ephemeral node already - * exists) at the time of invocation. - * @param path - * @return + * Filter out non-version children names and non-stale versions. + * @param children */ - private boolean tryLock(String path) { - // Check if another write is taking place and if not, create an ephemeral node to simulate - // acquiring of a lock - return !_zkBaseDataAccessor.exists(path + "/" + WRITE_LOCK_KEY, AccessOption.EPHEMERAL) - && _zkBaseDataAccessor.set(path + "/" + WRITE_LOCK_KEY, new byte[0], - AccessOption.EPHEMERAL); + private void filterChildrenNames(List children, String currentVersion) { + // Leave out metadata + children.remove(LAST_SUCCESSFUL_WRITE_KEY); + children.remove(LAST_WRITE_KEY); + + // Leave out currentVersion and above + // This is because we want to honor the TTL for newer versions + children.remove(currentVersion); + long currentVer = Long.parseLong(currentVersion); + for (String child : children) { + try { + long version = Long.parseLong(child); + if (version >= currentVer) { + children.remove(child); + } + } catch (Exception e) { + // Ignore ZNode names that aren't parseable + children.remove(child); + LOG.debug("Found an invalid ZNode: {}", child); + } + } } /** - * Releases the lock (removes the ephemeral node). + * Generates all stale paths to delete. * @param path + * @param staleVersions + * @return */ - private void unlock(String path) { - // Write succeeded, so release the lock - if (!_zkBaseDataAccessor.remove(path + "/" + WRITE_LOCK_KEY, AccessOption.EPHEMERAL)) { - throw new HelixException(String.format("Could not remove ephemeral node for path: %s", path)); - } - // TODO: In case of remove failure, we risk a lock never getting released. - // TODO: Consider two possible improvements - // TODO: 1. Use ephemeral owner id for the same connection to reclaim the lock - // TODO: 2. Use "lease" - lock with a timeout - } - - @Override - public void close() throws Exception { - disconnect(); + private List getPathsToDelete(String path, List staleVersions) { + List pathsToDelete = new ArrayList<>(); + staleVersions.forEach(ver -> pathsToDelete.add(path + "/" + ver)); + return pathsToDelete; } } diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java index 4c28835dd9f..c7b5cbfd0d0 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java @@ -27,35 +27,64 @@ import java.util.List; import java.util.Map; import java.util.Random; +import org.I0Itec.zkclient.exception.ZkMarshallingError; +import org.I0Itec.zkclient.serialize.ZkSerializer; import org.apache.helix.AccessOption; +import org.apache.helix.BaseDataAccessor; import org.apache.helix.BucketDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.HelixProperty; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; import org.apache.helix.common.ZkTestBase; +import org.apache.helix.manager.zk.client.DedicatedZkClientFactory; +import org.apache.helix.manager.zk.client.HelixZkClient; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; public class TestZkBucketDataAccessor extends ZkTestBase { - private static final String PATH = "/" + TestHelper.getTestClassName(); private static final String NAME_KEY = TestHelper.getTestClassName(); - private static final String LAST_SUCCESS_KEY = "LAST_SUCCESS"; - private static final String BUCKET_SIZE_KEY = "BUCKET_SIZE"; - private static final String WRITE_LOCK_KEY = "WRITE_LOCK"; + private static final String LAST_SUCCESSFUL_WRITE_KEY = "LAST_SUCCESSFUL_WRITE"; + private static final String LAST_WRITE_KEY = "LAST_WRITE"; // Populate list and map fields for content comparison private static final List LIST_FIELD = ImmutableList.of("1", "2"); private static final Map MAP_FIELD = ImmutableMap.of("1", "2"); private BucketDataAccessor _bucketDataAccessor; + private BaseDataAccessor _zkBaseDataAccessor; + + private ZNRecord record = new ZNRecord(NAME_KEY); @BeforeClass public void beforeClass() { - _bucketDataAccessor = new ZkBucketDataAccessor(ZK_ADDR); + // Initialize ZK accessors for testing + _bucketDataAccessor = new ZkBucketDataAccessor(ZK_ADDR, 50 * 1024, 0L); + HelixZkClient zkClient = DedicatedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR)); + zkClient.setZkSerializer(new ZkSerializer() { + @Override + public byte[] serialize(Object data) throws ZkMarshallingError { + if (data instanceof byte[]) { + return (byte[]) data; + } + throw new HelixException("ZkBucketDataAccesor only supports a byte array as an argument!"); + } + + @Override + public Object deserialize(byte[] data) throws ZkMarshallingError { + return data; + } + }); + _zkBaseDataAccessor = new ZkBaseDataAccessor<>(zkClient); + + // Fill in some data for the record + record.setSimpleField(NAME_KEY, NAME_KEY); + record.setListField(NAME_KEY, LIST_FIELD); + record.setMapField(NAME_KEY, MAP_FIELD); } @AfterClass @@ -69,17 +98,42 @@ public void afterClass() { */ @Test public void testCompressedBucketWrite() throws IOException { - ZNRecord record = new ZNRecord(NAME_KEY); - record.setSimpleField(NAME_KEY, NAME_KEY); - record.setListField(NAME_KEY, LIST_FIELD); - record.setMapField(NAME_KEY, MAP_FIELD); Assert.assertTrue(_bucketDataAccessor.compressedBucketWrite(PATH, new HelixProperty(record))); } + @Test(dependsOnMethods = "testCompressedBucketWrite") + public void testMultipleWrites() throws Exception { + int count = 50; + + // Write "count" times + for (int i = 0; i < count; i++) { + _bucketDataAccessor.compressedBucketWrite(PATH, new HelixProperty(record)); + } + + // Last known good version number should be "count" + byte[] binarySuccessfulWriteVer = _zkBaseDataAccessor + .get(PATH + "/" + LAST_SUCCESSFUL_WRITE_KEY, null, AccessOption.PERSISTENT); + long lastSuccessfulWriteVer = Long.parseLong(new String(binarySuccessfulWriteVer)); + Assert.assertEquals(lastSuccessfulWriteVer, count); + + // Last write version should be "count" + byte[] binaryWriteVer = + _zkBaseDataAccessor.get(PATH + "/" + LAST_WRITE_KEY, null, AccessOption.PERSISTENT); + long writeVer = Long.parseLong(new String(binaryWriteVer)); + Assert.assertEquals(writeVer, count); + + // Test that all previous versions have been deleted + // Use Verifier because GC can take ZK delay + Assert.assertTrue(TestHelper.verify(() -> { + List children = _zkBaseDataAccessor.getChildNames(PATH, AccessOption.PERSISTENT); + return children.size() == 3; + }, 60 * 1000L)); + } + /** * The record written in {@link #testCompressedBucketWrite()} is the same record that was written. */ - @Test(dependsOnMethods = "testCompressedBucketWrite") + @Test(dependsOnMethods = "testMultipleWrites") public void testCompressedBucketRead() { HelixProperty readRecord = _bucketDataAccessor.compressedBucketRead(PATH, HelixProperty.class); Assert.assertEquals(readRecord.getRecord().getSimpleField(NAME_KEY), NAME_KEY); @@ -88,43 +142,10 @@ public void testCompressedBucketRead() { _bucketDataAccessor.compressedBucketDelete(PATH); } - /** - * Do 10 writes and check that there are 5 versions of the data. - */ - @Test(dependsOnMethods = "testCompressedBucketRead") - public void testManyWritesWithVersionCounts() throws IOException { - int bucketSize = 50 * 1024; - int numVersions = 5; - int expectedLastSuccessfulIndex = 4; - String path = PATH + "2"; - ZNRecord record = new ZNRecord(NAME_KEY); - record.setSimpleField(NAME_KEY, NAME_KEY); - record.setListField(NAME_KEY, LIST_FIELD); - record.setMapField(NAME_KEY, MAP_FIELD); - - BucketDataAccessor bucketDataAccessor = - new ZkBucketDataAccessor(ZK_ADDR, bucketSize, numVersions); - for (int i = 0; i < 10; i++) { - bucketDataAccessor.compressedBucketWrite(path, new HelixProperty(record)); - } - - // Check that there are numVersions number of children under path - List children = _baseAccessor.getChildNames(path, AccessOption.PERSISTENT); - Assert.assertEquals(children.size(), numVersions); - - // Check that last successful index is 4 (since we did 10 writes) - ZNRecord metadata = _baseAccessor.get(path, null, AccessOption.PERSISTENT); - Assert.assertEquals(metadata.getIntField(LAST_SUCCESS_KEY, -1), expectedLastSuccessfulIndex); - - // Clean up - bucketDataAccessor.compressedBucketDelete(path); - bucketDataAccessor.disconnect(); - } - /** * Write a HelixProperty with large number of entries using BucketDataAccessor and read it back. */ - @Test(dependsOnMethods = "testManyWritesWithVersionCounts") + @Test(dependsOnMethods = "testCompressedBucketRead") public void testLargeWriteAndRead() throws IOException { String name = "largeResourceAssignment"; HelixProperty property = createLargeHelixProperty(name, 100000); @@ -146,71 +167,6 @@ public void testLargeWriteAndRead() throws IOException { Assert.assertEquals(readRecord, property); } - /** - * Tests that each write cleans up previous bucketed data. This method writes some small amount of - * data and checks that the data buckets from the large write performed in the previous test - * method have been cleaned up. - * @throws IOException - */ - @Test(dependsOnMethods = "testLargeWriteAndRead") - public void testCleanupBeforeWrite() throws IOException { - // Create a HelixProperty of a very small size with the same name as the large HelixProperty - // created from the previous method - String name = "largeResourceAssignment"; - HelixProperty property = new HelixProperty(name); - property.getRecord().setIntField("Hi", 10); - - // Get the bucket count from the write performed in the previous method - ZNRecord metadata = _baseAccessor.get("/" + name, null, AccessOption.PERSISTENT); - int origBucketSize = metadata.getIntField(BUCKET_SIZE_KEY, -1); - - // Perform a write twice to overwrite both versions - _bucketDataAccessor.compressedBucketWrite("/" + name, property); - _bucketDataAccessor.compressedBucketWrite("/" + name, property); - - // Check that the children count for version 0 (version for the large write) is 1 - Assert.assertEquals( - _baseAccessor.getChildNames("/" + name + "/0", AccessOption.PERSISTENT).size(), 1); - - // Clean up - _bucketDataAccessor.compressedBucketDelete("/" + name); - } - - /** - * Test that no concurrent reads and writes are allowed by triggering multiple operations after - * creating an artificial lock. - * @throws IOException - */ - @Test(dependsOnMethods = "testCleanupBeforeWrite") - public void testFailureToAcquireLock() throws Exception { - String name = "acquireLock"; - // Use a large HelixProperty to simulate a write that keeps the lock for some time - HelixProperty property = createLargeHelixProperty(name, 100); - - // Artificially create the ephemeral ZNode - _baseAccessor.create("/" + name + "/" + WRITE_LOCK_KEY, new ZNRecord(name), - AccessOption.EPHEMERAL); - - // Test write - try { - _bucketDataAccessor.compressedBucketWrite("/" + name, property); - Assert.fail("Should fail due to an already-existing lock ZNode!"); - } catch (HelixException e) { - // Expect an exception - } - - // Test read - try { - _bucketDataAccessor.compressedBucketRead("/" + name, HelixProperty.class); - Assert.fail("Should fail due to an already-existing lock ZNode!"); - } catch (HelixException e) { - // Expect an exception - } - - // Clean up - _bucketDataAccessor.compressedBucketDelete("/" + name); - } - private HelixProperty createLargeHelixProperty(String name, int numEntries) { HelixProperty property = new HelixProperty(name); for (int i = 0; i < numEntries; i++) {