From f402f7a3f3403dbd856602aaf90fd8bbad396fc5 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Fri, 9 Jun 2017 16:28:45 +0900 Subject: [PATCH 1/2] NIFI-4049: Refactor AtomicDistributedMapCacheClient To be used with cache engines that does not have revision number. --- .../standard/WaitNotifyProtocol.java | 16 +++-- .../nifi/processors/standard/TestNotify.java | 23 ++++++-- .../standard/TestWaitNotifyProtocol.java | 59 +++++++++---------- .../cache/client/AtomicCacheEntry.java} | 39 ++++++++---- .../AtomicDistributedMapCacheClient.java | 34 ++++++++++- .../DistributedMapCacheClientService.java | 21 ++++--- .../cache/server/TestServerAndClient.java | 4 +- 7 files changed, 133 insertions(+), 63 deletions(-) rename nifi-nar-bundles/nifi-standard-services/{nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCacheEntry.java => nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicCacheEntry.java} (58%) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java index 1c891089609e..cd55d27a6124 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java @@ -18,8 +18,8 @@ import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; -import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient.CacheEntry; import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.distributed.cache.client.exception.DeserializationException; @@ -59,7 +59,7 @@ public static class Signal { */ transient private String identifier; - transient private long revision = -1; + transient private AtomicCacheEntry cachedEntry; private Map counts = new HashMap<>(); private Map attributes = new HashMap<>(); private int releasableCount = 0; @@ -225,9 +225,10 @@ public Signal notify(final String signalId, final String counterName, final int * @throws IOException thrown when it failed interacting with the cache engine * @throws DeserializationException thrown if the cache found is not in expected serialized format */ + @SuppressWarnings("unchecked") public Signal getSignal(final String signalId) throws IOException, DeserializationException { - final CacheEntry entry = cache.fetch(signalId, stringSerializer, stringDeserializer); + final AtomicCacheEntry entry = (AtomicCacheEntry) cache.fetch(signalId, stringSerializer, stringDeserializer); if (entry == null) { // No signal found. @@ -239,7 +240,7 @@ public Signal getSignal(final String signalId) throws IOException, Deserializati try { final Signal signal = objectMapper.readValue(value, Signal.class); signal.identifier = signalId; - signal.revision = entry.getRevision(); + signal.cachedEntry = entry; return signal; } catch (final JsonParseException jsonE) { // Try to read it as FlowFileAttributes for backward compatibility. @@ -270,7 +271,12 @@ public void complete(final String signalId) throws IOException { public boolean replace(final Signal signal) throws IOException { final String signalJson = objectMapper.writeValueAsString(signal); - return cache.replace(signal.identifier, signalJson, stringSerializer, stringSerializer, signal.revision); + if (signal.cachedEntry == null) { + signal.cachedEntry = new AtomicCacheEntry<>(signal.identifier, signalJson, null); + } else { + signal.cachedEntry.setValue(signalJson); + } + return cache.replace(signal.cachedEntry, stringSerializer, stringSerializer); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java index 2c5dbc12a084..fe62d2dce389 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java @@ -17,10 +17,10 @@ package org.apache.nifi.processors.standard; import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Serializer; -import org.apache.nifi.distributed.cache.client.StandardCacheEntry; import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.TestRunner; @@ -310,8 +310,8 @@ public void testFailingCacheService() throws InitializationException, IOExceptio } - static class MockCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient { - private final ConcurrentMap values = new ConcurrentHashMap<>(); + static class MockCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient { + private final ConcurrentMap> values = new ConcurrentHashMap<>(); private boolean failOnCalls = false; void setFailOnCalls(boolean failOnCalls){ @@ -412,7 +412,22 @@ public boolean replace(K key, V value, Serializer keySerializer, Seria return false; } - values.put(key, new StandardCacheEntry<>(key, value, revision + 1)); + values.put(key, new AtomicCacheEntry<>(key, value, revision + 1)); + + return true; + } + + @Override + public boolean replace(AtomicCacheEntry entry, Serializer keySerializer, Serializer valueSerializer) throws IOException { + verifyNotFail(); + + final K key = entry.getKey(); + final AtomicCacheEntry existing = values.get(key); + if (existing != null && !existing.getCachedRevision().equals(entry.getCachedRevision())) { + return false; + } + + values.put(key, new AtomicCacheEntry<>(key, entry.getValue(), entry.getCachedRevision().orElse(0L) + 1)); return true; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java index 13b4346859fc..ebdcd051196b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java @@ -17,9 +17,8 @@ package org.apache.nifi.processors.standard; import org.apache.activemq.util.ByteArrayOutputStream; +import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; -import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient.CacheEntry; -import org.apache.nifi.distributed.cache.client.StandardCacheEntry; import org.apache.nifi.distributed.cache.client.exception.DeserializationException; import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal; import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer; @@ -46,33 +45,29 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; public class TestWaitNotifyProtocol { - private final Map> cacheEntries = new HashMap<>(); + private final Map> cacheEntries = new HashMap<>(); - private AtomicDistributedMapCacheClient cache; + private AtomicDistributedMapCacheClient cache; + @SuppressWarnings("unchecked") private final Answer successfulReplace = invocation -> { - final String key = invocation.getArgumentAt(0, String.class); - final String value = invocation.getArgumentAt(1, String.class); - final Long revision = invocation.getArgumentAt(4, Long.class); - cacheEntries.put(key, new StandardCacheEntry<>(key, value, revision + 1)); + final AtomicCacheEntry entry = invocation.getArgumentAt(0, AtomicCacheEntry.class); + cacheEntries.put(entry.getKey(), new AtomicCacheEntry<>(entry.getKey(), entry.getValue(), entry.getCachedRevision().orElse(0L) + 1)); return true; }; @Before + @SuppressWarnings("unchecked") public void before() throws Exception { cacheEntries.clear(); // Default mock implementations. cache = mock(AtomicDistributedMapCacheClient.class); - doAnswer(invocation -> { - final CacheEntry entry = cacheEntries.get(invocation.getArguments()[0]); - return entry; - }).when(cache).fetch(any(), any(), any()); + doAnswer(invocation -> cacheEntries.get(invocation.getArguments()[0])).when(cache).fetch(any(), any(), any()); } @Test @@ -80,7 +75,7 @@ public void testNotifyRetryFailure() throws Exception { // replace always return false. doAnswer(invocation -> false) - .when(cache).replace(any(), any(), any(), any(), anyLong()); + .when(cache).replace(any(), any(), any()); final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); @@ -95,7 +90,7 @@ public void testNotifyRetryFailure() throws Exception { @Test public void testNotifyFirst() throws Exception { - doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong()); + doAnswer(successfulReplace).when(cache).replace(any(), any(), any()); final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); @@ -106,16 +101,16 @@ public void testNotifyFirst() throws Exception { assertEquals(Long.valueOf(1), signal.getCounts().get("a")); assertTrue(cacheEntries.containsKey("signal-id")); - final CacheEntry cacheEntry = cacheEntries.get("signal-id"); + final AtomicCacheEntry cacheEntry = cacheEntries.get("signal-id"); - assertEquals(0, cacheEntry.getRevision()); + assertEquals(1, cacheEntry.getCachedRevision().orElse(-1L).longValue()); assertEquals("{\"counts\":{\"a\":1},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); } @Test public void testNotifyCounters() throws Exception { - doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong()); + doAnswer(successfulReplace).when(cache).replace(any(), any(), any()); final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); @@ -124,21 +119,21 @@ public void testNotifyCounters() throws Exception { protocol.notify(signalId, "a", 1, null); protocol.notify(signalId, "a", 1, null); - CacheEntry cacheEntry = cacheEntries.get("signal-id"); - assertEquals(1, cacheEntry.getRevision()); + AtomicCacheEntry cacheEntry = cacheEntries.get("signal-id"); + assertEquals(2, cacheEntry.getCachedRevision().orElse(-1L).longValue()); assertEquals("{\"counts\":{\"a\":2},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); protocol.notify(signalId, "a", 10, null); cacheEntry = cacheEntries.get("signal-id"); - assertEquals(2, cacheEntry.getRevision()); + assertEquals(3, cacheEntry.getCachedRevision().orElse(-1L).longValue()); assertEquals("{\"counts\":{\"a\":12},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); protocol.notify(signalId, "b", 2, null); protocol.notify(signalId, "c", 3, null); cacheEntry = cacheEntries.get("signal-id"); - assertEquals(4, cacheEntry.getRevision()); + assertEquals(5, cacheEntry.getCachedRevision().orElse(-1L).longValue()); assertEquals("{\"counts\":{\"a\":12,\"b\":2,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); final Map deltas = new HashMap<>(); @@ -147,20 +142,20 @@ public void testNotifyCounters() throws Exception { protocol.notify("signal-id", deltas, null); cacheEntry = cacheEntries.get("signal-id"); - assertEquals(5, cacheEntry.getRevision()); + assertEquals(6, cacheEntry.getCachedRevision().orElse(-1L).longValue()); assertEquals("{\"counts\":{\"a\":22,\"b\":27,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); // Zero clear 'b'. protocol.notify("signal-id", "b", 0, null); cacheEntry = cacheEntries.get("signal-id"); - assertEquals(6, cacheEntry.getRevision()); + assertEquals(7, cacheEntry.getCachedRevision().orElse(-1L).longValue()); assertEquals("{\"counts\":{\"a\":22,\"b\":0,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); } @Test public void testNotifyAttributes() throws Exception { - doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong()); + doAnswer(successfulReplace).when(cache).replace(any(), any(), any()); final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); @@ -172,8 +167,8 @@ public void testNotifyAttributes() throws Exception { protocol.notify(signalId, "a", 1, attributeA1); - CacheEntry cacheEntry = cacheEntries.get("signal-id"); - assertEquals(0, cacheEntry.getRevision()); + AtomicCacheEntry cacheEntry = cacheEntries.get("signal-id"); + assertEquals(1L, cacheEntry.getCachedRevision().orElse(-1L).longValue()); assertEquals("{\"counts\":{\"a\":1},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a1\"},\"releasableCount\":0}", cacheEntry.getValue()); final Map attributeA2 = new HashMap<>(); @@ -184,7 +179,7 @@ public void testNotifyAttributes() throws Exception { protocol.notify(signalId, "a", 1, attributeA2); cacheEntry = cacheEntries.get("signal-id"); - assertEquals(1, cacheEntry.getRevision()); + assertEquals(2L, cacheEntry.getCachedRevision().orElse(-1L).longValue()); assertEquals("Updated attributes should be merged correctly", "{\"counts\":{\"a\":2},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a2\",\"p3\":\"a2\"},\"releasableCount\":0}", cacheEntry.getValue()); @@ -192,7 +187,7 @@ public void testNotifyAttributes() throws Exception { @Test public void testSignalCount() throws Exception { - doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong()); + doAnswer(successfulReplace).when(cache).replace(any(), any(), any()); final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); @@ -233,7 +228,7 @@ public void testSignalCount() throws Exception { */ @Test public void testNiFiVersionUpgrade() throws Exception { - doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong()); + doAnswer(successfulReplace).when(cache).replace(any(), any(), any()); // Simulate old cache entry. final FlowFileAttributesSerializer attributesSerializer = new FlowFileAttributesSerializer(); @@ -245,7 +240,7 @@ public void testNiFiVersionUpgrade() throws Exception { attributesSerializer.serialize(cachedAttributes, bos); final String signalId = "old-entry"; - cacheEntries.put(signalId, new StandardCacheEntry<>(signalId, new String(bos.toByteArray(), StandardCharsets.UTF_8), 0)); + cacheEntries.put(signalId, new AtomicCacheEntry<>(signalId, new String(bos.toByteArray(), StandardCharsets.UTF_8), 0L)); final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); final Signal signal = protocol.getSignal(signalId); @@ -255,7 +250,7 @@ public void testNiFiVersionUpgrade() throws Exception { assertEquals("value2", signal.getAttributes().get("key2")); assertEquals("value3", signal.getAttributes().get("key3")); - cacheEntries.put(signalId, new StandardCacheEntry<>(signalId, "UNSUPPORTED_FORMAT", 0)); + cacheEntries.put(signalId, new AtomicCacheEntry<>(signalId, "UNSUPPORTED_FORMAT", 0L)); try { protocol.getSignal(signalId); fail("Should fail since cached value was not in expected format."); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCacheEntry.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicCacheEntry.java similarity index 58% rename from nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCacheEntry.java rename to nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicCacheEntry.java index b4949d50f399..f70b01a1c70f 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCacheEntry.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicCacheEntry.java @@ -16,31 +16,50 @@ */ package org.apache.nifi.distributed.cache.client; -public class StandardCacheEntry implements AtomicDistributedMapCacheClient.CacheEntry { +import java.util.Optional; - private final K key; - private final V value; - private final long revision; +public class AtomicCacheEntry implements AtomicDistributedMapCacheClient.CacheEntry { + private final K key; + private V value; + private final R revision; - public StandardCacheEntry(final K key, final V value, final long revision) { + /** + * Create new cache entry. + * @param key cache key + * @param value cache value + * @param revision cache revision, can be null with a brand new entry + */ + public AtomicCacheEntry(final K key, final V value, final R revision) { this.key = key; this.value = value; this.revision = revision; } - @Override - public long getRevision() { - return revision; + /** + * @return the latest revision stored in a cache server + */ + public Optional getCachedRevision() { + return Optional.ofNullable(revision); } - @Override public K getKey() { return key; } - @Override public V getValue() { return value; } + + public void setValue(V value) { + this.value = value; + } + + /** + * @deprecated use {@link #getCachedRevision()} instead. + */ + @Override + public long getRevision() { + return revision instanceof Long ? (Long) revision : -1; + } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicDistributedMapCacheClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicDistributedMapCacheClient.java index d0b77e116532..1372e83eb18d 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicDistributedMapCacheClient.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicDistributedMapCacheClient.java @@ -29,14 +29,24 @@ * this class provides methods for concurrent atomic updates those are added since Map Cache protocol version 2. * *

If a remote cache server doesn't support Map Cache protocol version 2, these methods throw UnsupportedOperationException. + * @param The revision type. + * If the underlying cache storage supports the concept of revision to implement optimistic locking, then a client implementation should use that. + * Otherwise set the cached value and check if the key is not updated at {@link #replace(AtomicCacheEntry, Serializer, Serializer)} */ @Tags({"distributed", "client", "cluster", "map", "cache"}) @CapabilityDescription("Provides the ability to communicate with a DistributedMapCacheServer. This allows " + "multiple nodes to coordinate state with a single remote entity.") -public interface AtomicDistributedMapCacheClient extends DistributedMapCacheClient { +public interface AtomicDistributedMapCacheClient extends DistributedMapCacheClient { + /** + * @deprecated use {@link AtomicCacheEntry} instead. + */ interface CacheEntry { + /** + * @deprecated use {@link AtomicCacheEntry#getCachedRevision()} instead. + * @return the latest revision stored in a cache server + */ long getRevision(); K getKey(); @@ -53,6 +63,9 @@ interface CacheEntry { * @param keySerializer key serializer * @param valueDeserializer value deserializer * @return A CacheEntry instance if one exists, otherwise null. + * Although the return type is {@link CacheEntry}, it should be able to cast to {@link AtomicCacheEntry}. + * This is only for keeping the old method signature. In the future, return value will be changed to AtomicCacheEntry. + * Implementation of this method should return AtomicCacheEntry. * @throws IOException if unable to communicate with the remote instance */ CacheEntry fetch(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException; @@ -70,7 +83,24 @@ interface CacheEntry { * If there's no existing entry for the key, any revision can replace the key. * @return true only if the key is replaced. * @throws IOException if unable to communicate with the remote instance + * @deprecated use {@link #replace(AtomicCacheEntry, Serializer, Serializer)} instead + */ + default boolean replace(K key, V value, Serializer keySerializer, Serializer valueSerializer, long revision) throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * Replace an existing key with new value. + * @param the key type + * @param the value type + * @param entry should provide the new value for {@link AtomicCacheEntry#getValue()}, + * and the same revision in the cache storage for {@link AtomicCacheEntry#getCachedRevision()}, + * if the revision does not match with the one in the cache storage, value will not be replaced. + * @param keySerializer key serializer + * @param valueSerializer value serializer + * @return true only if the key is replaced. + * @throws IOException if unable to communicate with the remote instance */ - boolean replace(K key, V value, Serializer keySerializer, Serializer valueSerializer, long revision) throws IOException; + boolean replace(AtomicCacheEntry entry, Serializer keySerializer, Serializer valueSerializer) throws IOException; } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java index f197bace70eb..423e2248ffaa 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java @@ -49,7 +49,7 @@ @SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"}) @CapabilityDescription("Provides the ability to communicate with a DistributedMapCacheServer. This can be used in order to share a Map " + "between nodes in a NiFi cluster") -public class DistributedMapCacheClientService extends AbstractControllerService implements AtomicDistributedMapCacheClient { +public class DistributedMapCacheClientService extends AbstractControllerService implements AtomicDistributedMapCacheClient { private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class); @@ -237,7 +237,8 @@ public long removeByPattern(String regex) throws IOException { } @Override - public CacheEntry fetch(final K key, final Serializer keySerializer, final Deserializer valueDeserializer) throws IOException { + @SuppressWarnings("unchecked") + public CacheEntry fetch(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException { return withCommsSession(session -> { validateProtocolVersion(session, 2); @@ -257,8 +258,7 @@ public CacheEntry fetch(final K key, final Serializer keySeriali return null; } - final StandardCacheEntry standardCacheEntry = new StandardCacheEntry<>(key, valueDeserializer.deserialize(responseBuffer), revision); - return standardCacheEntry; + return new AtomicCacheEntry(key, valueDeserializer.deserialize(responseBuffer), revision); }); } @@ -269,16 +269,21 @@ private void validateProtocolVersion(final CommsSession session, final int requi } @Override - public boolean replace(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer, final long revision) throws IOException { + public boolean replace(K key, V value, Serializer keySerializer, Serializer valueSerializer, long revision) throws IOException { + return replace(new AtomicCacheEntry<>(key, value, revision), keySerializer, valueSerializer); + } + + @Override + public boolean replace(AtomicCacheEntry entry, Serializer keySerializer, Serializer valueSerializer) throws IOException { return withCommsSession(session -> { validateProtocolVersion(session, 2); final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); dos.writeUTF("replace"); - serialize(key, keySerializer, dos); - dos.writeLong(revision); - serialize(value, valueSerializer, dos); + serialize(entry.getKey(), keySerializer, dos); + dos.writeLong(entry.getCachedRevision().orElse(0L)); + serialize(entry.getValue(), valueSerializer, dos); dos.flush(); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java index b08e69d2e9c1..31ec4401839b 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java @@ -403,7 +403,7 @@ public void testNonPersistentMapServerAndClient() throws InitializationException runner.addControllerService("server", server); runner.enableControllerService(server); - DistributedMapCacheClientService client = new DistributedMapCacheClientService(); + DistributedMapCacheClientService client = new DistributedMapCacheClientService<>(); MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client"); client.initialize(clientInitContext); @@ -500,7 +500,7 @@ public void testClientTermination() throws InitializationException, IOException, final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties, serverInitContext.getControllerServiceLookup()); server.startServer(serverContext); - DistributedMapCacheClientService client = new DistributedMapCacheClientService(); + DistributedMapCacheClientService client = new DistributedMapCacheClientService<>(); MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client"); client.initialize(clientInitContext); From d42aafa0e1365eb5e6b0436a0b86183a609ec5ec Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Fri, 9 Jun 2017 16:45:37 +0900 Subject: [PATCH 2/2] NIFI-4049: Refactor AtomicDistributedMapCacheClient Removed old methods completely. --- .../nifi/processors/standard/TestNotify.java | 24 +++-------- .../standard/TestWaitNotifyProtocol.java | 18 ++++---- .../cache/client/AtomicCacheEntry.java | 11 +---- .../AtomicDistributedMapCacheClient.java | 43 +------------------ .../DistributedMapCacheClientService.java | 9 +--- 5 files changed, 20 insertions(+), 85 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java index fe62d2dce389..3e0cd689d690 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java @@ -359,7 +359,7 @@ public void put(final K key, final V value, final Serializer keySerial public V get(final K key, final Serializer keySerializer, final Deserializer valueDeserializer) throws IOException { verifyNotFail(); - final CacheEntry entry = values.get(key); + final AtomicCacheEntry entry = values.get(key); if (entry == null) { return null; } @@ -397,24 +397,10 @@ public long removeByPattern(String regex) throws IOException { @Override @SuppressWarnings("unchecked") - public CacheEntry fetch(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException { + public AtomicCacheEntry fetch(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException { verifyNotFail(); - return values.get(key); - } - - @Override - public boolean replace(K key, V value, Serializer keySerializer, Serializer valueSerializer, long revision) throws IOException { - verifyNotFail(); - - final CacheEntry existing = values.get(key); - if (existing != null && existing.getRevision() != revision) { - return false; - } - - values.put(key, new AtomicCacheEntry<>(key, value, revision + 1)); - - return true; + return (AtomicCacheEntry) values.get(key); } @Override @@ -423,11 +409,11 @@ public boolean replace(AtomicCacheEntry entry, Serializer final K key = entry.getKey(); final AtomicCacheEntry existing = values.get(key); - if (existing != null && !existing.getCachedRevision().equals(entry.getCachedRevision())) { + if (existing != null && !existing.getRevision().equals(entry.getRevision())) { return false; } - values.put(key, new AtomicCacheEntry<>(key, entry.getValue(), entry.getCachedRevision().orElse(0L) + 1)); + values.put(key, new AtomicCacheEntry<>(key, entry.getValue(), entry.getRevision().orElse(0L) + 1)); return true; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java index ebdcd051196b..e3f982c8e128 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java @@ -56,7 +56,7 @@ public class TestWaitNotifyProtocol { @SuppressWarnings("unchecked") private final Answer successfulReplace = invocation -> { final AtomicCacheEntry entry = invocation.getArgumentAt(0, AtomicCacheEntry.class); - cacheEntries.put(entry.getKey(), new AtomicCacheEntry<>(entry.getKey(), entry.getValue(), entry.getCachedRevision().orElse(0L) + 1)); + cacheEntries.put(entry.getKey(), new AtomicCacheEntry<>(entry.getKey(), entry.getValue(), entry.getRevision().orElse(0L) + 1)); return true; }; @@ -103,7 +103,7 @@ public void testNotifyFirst() throws Exception { final AtomicCacheEntry cacheEntry = cacheEntries.get("signal-id"); - assertEquals(1, cacheEntry.getCachedRevision().orElse(-1L).longValue()); + assertEquals(1, cacheEntry.getRevision().orElse(-1L).longValue()); assertEquals("{\"counts\":{\"a\":1},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); } @@ -120,20 +120,20 @@ public void testNotifyCounters() throws Exception { protocol.notify(signalId, "a", 1, null); AtomicCacheEntry cacheEntry = cacheEntries.get("signal-id"); - assertEquals(2, cacheEntry.getCachedRevision().orElse(-1L).longValue()); + assertEquals(2, cacheEntry.getRevision().orElse(-1L).longValue()); assertEquals("{\"counts\":{\"a\":2},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); protocol.notify(signalId, "a", 10, null); cacheEntry = cacheEntries.get("signal-id"); - assertEquals(3, cacheEntry.getCachedRevision().orElse(-1L).longValue()); + assertEquals(3, cacheEntry.getRevision().orElse(-1L).longValue()); assertEquals("{\"counts\":{\"a\":12},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); protocol.notify(signalId, "b", 2, null); protocol.notify(signalId, "c", 3, null); cacheEntry = cacheEntries.get("signal-id"); - assertEquals(5, cacheEntry.getCachedRevision().orElse(-1L).longValue()); + assertEquals(5, cacheEntry.getRevision().orElse(-1L).longValue()); assertEquals("{\"counts\":{\"a\":12,\"b\":2,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); final Map deltas = new HashMap<>(); @@ -142,13 +142,13 @@ public void testNotifyCounters() throws Exception { protocol.notify("signal-id", deltas, null); cacheEntry = cacheEntries.get("signal-id"); - assertEquals(6, cacheEntry.getCachedRevision().orElse(-1L).longValue()); + assertEquals(6, cacheEntry.getRevision().orElse(-1L).longValue()); assertEquals("{\"counts\":{\"a\":22,\"b\":27,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); // Zero clear 'b'. protocol.notify("signal-id", "b", 0, null); cacheEntry = cacheEntries.get("signal-id"); - assertEquals(7, cacheEntry.getCachedRevision().orElse(-1L).longValue()); + assertEquals(7, cacheEntry.getRevision().orElse(-1L).longValue()); assertEquals("{\"counts\":{\"a\":22,\"b\":0,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); } @@ -168,7 +168,7 @@ public void testNotifyAttributes() throws Exception { protocol.notify(signalId, "a", 1, attributeA1); AtomicCacheEntry cacheEntry = cacheEntries.get("signal-id"); - assertEquals(1L, cacheEntry.getCachedRevision().orElse(-1L).longValue()); + assertEquals(1L, cacheEntry.getRevision().orElse(-1L).longValue()); assertEquals("{\"counts\":{\"a\":1},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a1\"},\"releasableCount\":0}", cacheEntry.getValue()); final Map attributeA2 = new HashMap<>(); @@ -179,7 +179,7 @@ public void testNotifyAttributes() throws Exception { protocol.notify(signalId, "a", 1, attributeA2); cacheEntry = cacheEntries.get("signal-id"); - assertEquals(2L, cacheEntry.getCachedRevision().orElse(-1L).longValue()); + assertEquals(2L, cacheEntry.getRevision().orElse(-1L).longValue()); assertEquals("Updated attributes should be merged correctly", "{\"counts\":{\"a\":2},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a2\",\"p3\":\"a2\"},\"releasableCount\":0}", cacheEntry.getValue()); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicCacheEntry.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicCacheEntry.java index f70b01a1c70f..42d03dd0b19b 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicCacheEntry.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicCacheEntry.java @@ -18,7 +18,7 @@ import java.util.Optional; -public class AtomicCacheEntry implements AtomicDistributedMapCacheClient.CacheEntry { +public class AtomicCacheEntry { private final K key; private V value; @@ -39,7 +39,7 @@ public AtomicCacheEntry(final K key, final V value, final R revision) { /** * @return the latest revision stored in a cache server */ - public Optional getCachedRevision() { + public Optional getRevision() { return Optional.ofNullable(revision); } @@ -55,11 +55,4 @@ public void setValue(V value) { this.value = value; } - /** - * @deprecated use {@link #getCachedRevision()} instead. - */ - @Override - public long getRevision() { - return revision instanceof Long ? (Long) revision : -1; - } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicDistributedMapCacheClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicDistributedMapCacheClient.java index 1372e83eb18d..080d666fa1da 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicDistributedMapCacheClient.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicDistributedMapCacheClient.java @@ -38,23 +38,6 @@ + "multiple nodes to coordinate state with a single remote entity.") public interface AtomicDistributedMapCacheClient extends DistributedMapCacheClient { - /** - * @deprecated use {@link AtomicCacheEntry} instead. - */ - interface CacheEntry { - - /** - * @deprecated use {@link AtomicCacheEntry#getCachedRevision()} instead. - * @return the latest revision stored in a cache server - */ - long getRevision(); - - K getKey(); - - V getValue(); - - } - /** * Fetch a CacheEntry with a key. * @param the key type @@ -63,38 +46,16 @@ interface CacheEntry { * @param keySerializer key serializer * @param valueDeserializer value deserializer * @return A CacheEntry instance if one exists, otherwise null. - * Although the return type is {@link CacheEntry}, it should be able to cast to {@link AtomicCacheEntry}. - * This is only for keeping the old method signature. In the future, return value will be changed to AtomicCacheEntry. - * Implementation of this method should return AtomicCacheEntry. - * @throws IOException if unable to communicate with the remote instance - */ - CacheEntry fetch(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException; - - /** - * Replace an existing key with new value. - * @param the key type - * @param the value type - * @param key the key to replace - * @param value the new value for the key - * @param keySerializer key serializer - * @param valueSerializer value serializer - * @param revision a revision that was retrieved by a preceding fetch operation, if the key is already updated by other client, - * this doesn't match with the one on server, therefore the replace operation will not be performed. - * If there's no existing entry for the key, any revision can replace the key. - * @return true only if the key is replaced. * @throws IOException if unable to communicate with the remote instance - * @deprecated use {@link #replace(AtomicCacheEntry, Serializer, Serializer)} instead */ - default boolean replace(K key, V value, Serializer keySerializer, Serializer valueSerializer, long revision) throws IOException { - throw new UnsupportedOperationException(); - } + AtomicCacheEntry fetch(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException; /** * Replace an existing key with new value. * @param the key type * @param the value type * @param entry should provide the new value for {@link AtomicCacheEntry#getValue()}, - * and the same revision in the cache storage for {@link AtomicCacheEntry#getCachedRevision()}, + * and the same revision in the cache storage for {@link AtomicCacheEntry#getRevision()}, * if the revision does not match with the one in the cache storage, value will not be replaced. * @param keySerializer key serializer * @param valueSerializer value serializer diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java index 423e2248ffaa..9651c26cf438 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java @@ -238,7 +238,7 @@ public long removeByPattern(String regex) throws IOException { @Override @SuppressWarnings("unchecked") - public CacheEntry fetch(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException { + public AtomicCacheEntry fetch(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException { return withCommsSession(session -> { validateProtocolVersion(session, 2); @@ -268,11 +268,6 @@ private void validateProtocolVersion(final CommsSession session, final int requi } } - @Override - public boolean replace(K key, V value, Serializer keySerializer, Serializer valueSerializer, long revision) throws IOException { - return replace(new AtomicCacheEntry<>(key, value, revision), keySerializer, valueSerializer); - } - @Override public boolean replace(AtomicCacheEntry entry, Serializer keySerializer, Serializer valueSerializer) throws IOException { return withCommsSession(session -> { @@ -282,7 +277,7 @@ public boolean replace(AtomicCacheEntry entry, Serializer dos.writeUTF("replace"); serialize(entry.getKey(), keySerializer, dos); - dos.writeLong(entry.getCachedRevision().orElse(0L)); + dos.writeLong(entry.getRevision().orElse(0L)); serialize(entry.getValue(), valueSerializer, dos); dos.flush();