Skip to content

Commit

Permalink
NIFI-3214: Added fetch and replace to DistributedMapCache
Browse files Browse the repository at this point in the history
- Using fetch and replace together can provide optimistic locking for
  concurrency control.
- Added fetch to get cache entry with its meta data such as revision
  number.
- Added replace to update cache only if it has not been updated.
- Added Map Cache protocol version 2 for those new operations.
- Existing operations such as get or put can work with protocol version
  1.
  • Loading branch information
ijokarumawak committed Jan 13, 2017
1 parent a794166 commit 42cf830
Show file tree
Hide file tree
Showing 18 changed files with 682 additions and 86 deletions.
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.client;

import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;

import java.io.IOException;

/**
* <p>This interface defines an API that can be used for interacting with a
* Distributed Cache that functions similarly to a {@link java.util.Map Map}.
*
* <p>In addition to the API defined in {@link DistributedMapCacheClient} super class,
* this class provides methods for concurrent atomic updates those are added since Map Cache protocol version 2.
*
* <p>If a remote cache server doesn't support Map Cache protocol version 2, these methods throw UnsupportedOperationException.
*/
@Tags({"distributed", "client", "cluster", "map", "cache"})
@CapabilityDescription("Provides the ability to communicate with a DistributedMapCacheServer. This allows "
+ "multiple nodes to coordinate state with a single remote entity.")
public interface AtomicDistributedMapCacheClient extends DistributedMapCacheClient {

interface CacheEntry<K, V> {

long getRevision();

K getKey();

V getValue();

}

/**
* Fetch a CacheEntry with a key.
* @param <K> the key type
* @param <V> the value type
* @param key the key to lookup in the map
* @param keySerializer key serializer
* @param valueDeserializer value deserializer
* @return A CacheEntry instance if one exists, otherwise <cod>null</cod>.
* @throws IOException if unable to communicate with the remote instance
*/
<K, V> CacheEntry<K, V> fetch(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException;

/**
* Replace an existing key with new value.
* @param <K> the key type
* @param <V> the value type
* @param key the key to replace
* @param value the new value for the key
* @param keySerializer key serializer
* @param valueSerializer value serializer
* @param revision a revision that was retrieved by a preceding fetch operation, if the key is already updated by other client,
* this doesn't match with the one on server, therefore the replace operation will not be performed.
* If there's no existing entry for the key, any revision can replace the key.
* @return true only if the key is replaced.
* @throws IOException if unable to communicate with the remote instance
*/
<K, V> boolean replace(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, long revision) throws IOException;

}
Expand Up @@ -43,4 +43,8 @@ public interface CommsSession extends Closeable {
long getTimeout(TimeUnit timeUnit);

SSLContext getSSLContext();

int getProtocolVersion();

void setProtocolVersion(final int protocolVersion);
}
Expand Up @@ -16,7 +16,9 @@
*/
package org.apache.nifi.distributed.cache.client;

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -39,16 +41,14 @@
import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.SSLContextService.ClientAuth;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.DataOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tags({"distributed", "cache", "state", "map", "cluster"})
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"})
@CapabilityDescription("Provides the ability to communicate with a DistributedMapCacheServer. This can be used in order to share a Map "
+ "between nodes in a NiFi cluster")
public class DistributedMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient {
public class DistributedMapCacheClientService extends AbstractControllerService implements AtomicDistributedMapCacheClient {

private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class);

Expand Down Expand Up @@ -216,6 +216,58 @@ public Boolean execute(final CommsSession session) throws IOException {
});
}

@Override
public <K, V> CacheEntry<K, V> fetch(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
return withCommsSession(session -> {
validateProtocolVersion(session, 2);

final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
dos.writeUTF("fetch");

serialize(key, keySerializer, dos);
dos.flush();

// read response
final DataInputStream dis = new DataInputStream(session.getInputStream());
final long revision = dis.readLong();
final byte[] responseBuffer = readLengthDelimitedResponse(dis);

if (revision < 0) {
// This indicates that key was not found.
return null;
}

final StandardCacheEntry<K, V> standardCacheEntry = new StandardCacheEntry<>(key, valueDeserializer.deserialize(responseBuffer), revision);
return standardCacheEntry;
});
}

private void validateProtocolVersion(final CommsSession session, final int requiredProtocolVersion) {
if (session.getProtocolVersion() < requiredProtocolVersion) {
throw new UnsupportedOperationException("Remote cache server doesn't support protocol version " + requiredProtocolVersion);
}
}

@Override
public <K, V> boolean replace(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final long revision) throws IOException {
return withCommsSession(session -> {
validateProtocolVersion(session, 2);

final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
dos.writeUTF("replace");

serialize(key, keySerializer, dos);
dos.writeLong(revision);
serialize(value, valueSerializer, dos);

dos.flush();

// read response
final DataInputStream dis = new DataInputStream(session.getInputStream());
return dis.readBoolean();
});
}

private byte[] readLengthDelimitedResponse(final DataInputStream dis) throws IOException {
final int responseLength = dis.readInt();
final byte[] responseBuffer = new byte[responseLength];
Expand Down Expand Up @@ -247,9 +299,10 @@ private CommsSession leaseCommsSession() throws IOException {
}

session = createCommsSession(configContext);
final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(2, 1);
try {
ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator);
session.setProtocolVersion(versionNegotiator.getVersion());
} catch (final HandshakeException e) {
try {
session.close();
Expand Down
Expand Up @@ -127,6 +127,7 @@ private CommsSession leaseCommsSession() throws IOException {
final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
try {
ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator);
session.setProtocolVersion(versionNegotiator.getVersion());
} catch (final HandshakeException e) {
IOUtils.closeQuietly(session);

Expand Down
Expand Up @@ -42,6 +42,8 @@ public class SSLCommsSession implements CommsSession {
private final SSLSocketChannelOutputStream out;
private final BufferedOutputStream bufferedOut;

private int protocolVersion;

public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port) throws IOException {
sslSocketChannel = new SSLSocketChannel(sslContext, hostname, port, true);

Expand Down Expand Up @@ -106,4 +108,13 @@ public long getTimeout(final TimeUnit timeUnit) {
return timeUnit.convert(sslSocketChannel.getTimeout(), TimeUnit.MILLISECONDS);
}

@Override
public int getProtocolVersion() {
return protocolVersion;
}

@Override
public void setProtocolVersion(final int protocolVersion) {
this.protocolVersion = protocolVersion;
}
}
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.client;

public class StandardCacheEntry<K,V> implements AtomicDistributedMapCacheClient.CacheEntry<K,V> {

private final K key;
private final V value;
private final long revision;


public StandardCacheEntry(final K key, final V value, final long revision) {
this.key = key;
this.value = value;
this.revision = revision;
}

@Override
public long getRevision() {
return revision;
}

@Override
public K getKey() {
return key;
}

@Override
public V getValue() {
return value;
}
}
Expand Up @@ -45,6 +45,8 @@ public class StandardCommsSession implements CommsSession {
private final SocketChannelOutputStream out;
private final InterruptableOutputStream bufferedOut;

private int protocolVersion;

public StandardCommsSession(final String hostname, final int port) throws IOException {
socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port));
socketChannel.configureBlocking(false);
Expand Down Expand Up @@ -122,4 +124,15 @@ public SSLContext getSSLContext() {
public long getTimeout(final TimeUnit timeUnit) {
return timeUnit.convert(timeoutMillis, TimeUnit.MILLISECONDS);
}

@Override
public int getProtocolVersion() {
return protocolVersion;
}

@Override
public void setProtocolVersion(final int protocolVersion) {
this.protocolVersion = protocolVersion;
}

}
Expand Up @@ -33,6 +33,19 @@ public class ProtocolHandshake {
public static final int DIFFERENT_RESOURCE_VERSION = 21;
public static final int ABORT = 255;

/**
* <p>Initiate handshake to ensure client and server can communicate with the same protocol.
* If the server doesn't support requested protocol version, HandshakeException will be thrown.</p>
*
* <p>DistributedMapCache version histories:<ul>
* <li>2: Added atomic update operations (fetch and replace) using optimistic lock with revision number.</li>
* <li>1: Initial version.</li>
* </ul></p>
*
* <p>DistributedSetCache version histories:<ul>
* <li>1: Initial version.</li>
* </ul></p>
*/
public static void initiateHandshake(final InputStream in, final OutputStream out, final VersionNegotiator versionNegotiator) throws IOException, HandshakeException {
final DataInputStream dis = new DataInputStream(in);
final DataOutputStream dos = new DataOutputStream(out);
Expand Down Expand Up @@ -85,6 +98,7 @@ private static void initiateVersionNegotiation(final VersionNegotiator negotiato

// Attempt negotiation of resource based on our new preferred version.
initiateVersionNegotiation(negotiator, dis, dos);
return;
case ABORT:
throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF());
default:
Expand Down
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.nifi.distributed.cache.server;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -30,8 +32,6 @@

import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.remote.io.socket.SocketChannelInputStream;
Expand Down Expand Up @@ -121,9 +121,9 @@ public void run() {
return;
}
try (final InputStream in = new BufferedInputStream(rawInputStream);
final OutputStream out = new BufferedOutputStream(rawOutputStream)) {
final OutputStream out = new BufferedOutputStream(rawOutputStream)) {

final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
final VersionNegotiator versionNegotiator = getVersionNegotiator();

ProtocolHandshake.receiveHandshake(in, out, versionNegotiator);

Expand Down Expand Up @@ -163,6 +163,14 @@ public void run() {
thread.start();
}

/**
* Refer {@link org.apache.nifi.distributed.cache.protocol.ProtocolHandshake#initiateHandshake(InputStream, OutputStream, VersionNegotiator)}
* for details of each version enhancements.
*/
protected StandardVersionNegotiator getVersionNegotiator() {
return new StandardVersionNegotiator(1);
}

@Override
public void stop() throws IOException {
stopped = true;
Expand Down
Expand Up @@ -17,6 +17,7 @@
package org.apache.nifi.distributed.cache.server.map;

import java.io.File;
import java.io.IOException;

import javax.net.ssl.SSLContext;

Expand Down Expand Up @@ -69,10 +70,14 @@ protected CacheServer createCacheServer(final ConfigurationContext context) {
try {
final File persistenceDir = persistencePath == null ? null : new File(persistencePath);

return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir);
return createMapCacheServer(port, maxSize, sslContext, evictionPolicy, persistenceDir);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

protected MapCacheServer createMapCacheServer(int port, int maxSize, SSLContext sslContext, EvictionPolicy evictionPolicy, File persistenceDir) throws IOException {
return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir);
}

}
Expand Up @@ -31,5 +31,9 @@ public interface MapCache {

ByteBuffer remove(ByteBuffer key) throws IOException;

MapCacheRecord fetch(ByteBuffer key) throws IOException;

MapPutResult replace(MapCacheRecord record) throws IOException;

void shutdown() throws IOException;
}

0 comments on commit 42cf830

Please sign in to comment.