Skip to content

Commit

Permalink
Simplify gossip protocol interface.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jun 22, 2017
1 parent ead0f44 commit 5287f53
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 167 deletions.
Expand Up @@ -79,7 +79,7 @@ public class AntiEntropyService<K, V> extends AbstractListenerManager<GossipEven
private final SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE); private final SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);


public AntiEntropyService( public AntiEntropyService(
AntiEntropyProtocol protocol, AntiEntropyProtocol<Identifier> protocol,
Supplier<Collection<Identifier>> peerProvider, Supplier<Collection<Identifier>> peerProvider,
Executor eventExecutor, Executor eventExecutor,
ScheduledExecutorService communicationExecutor, ScheduledExecutorService communicationExecutor,
Expand All @@ -91,7 +91,7 @@ public AntiEntropyService(
this.eventExecutor = checkNotNull(eventExecutor, "eventExecutor cannot be null"); this.eventExecutor = checkNotNull(eventExecutor, "eventExecutor cannot be null");
this.communicationExecutor = checkNotNull(communicationExecutor, "communicationExecutor cannot be null"); this.communicationExecutor = checkNotNull(communicationExecutor, "communicationExecutor cannot be null");
this.tombstonesDisabled = tombstonesDisabled; this.tombstonesDisabled = tombstonesDisabled;
protocol.listener().registerGossipListener(this::update); protocol.registerGossipListener(this::update);
updateFuture = communicationExecutor.scheduleAtFixedRate(this::performAntiEntropy, 0, antiEntropyInterval.toMillis(), TimeUnit.MILLISECONDS); updateFuture = communicationExecutor.scheduleAtFixedRate(this::performAntiEntropy, 0, antiEntropyInterval.toMillis(), TimeUnit.MILLISECONDS);
purgeFuture = !tombstonesDisabled ? communicationExecutor.scheduleAtFixedRate(this::purgeTombstones, 0, purgeInterval.toMillis(), TimeUnit.MILLISECONDS) : null; purgeFuture = !tombstonesDisabled ? communicationExecutor.scheduleAtFixedRate(this::purgeTombstones, 0, purgeInterval.toMillis(), TimeUnit.MILLISECONDS) : null;
} }
Expand Down Expand Up @@ -206,7 +206,7 @@ private void sendAdvertisementToPeer(Identifier peer) {
long updateTime = System.currentTimeMillis(); long updateTime = System.currentTimeMillis();
AntiEntropyAdvertisement<K> advertisement = new AntiEntropyAdvertisement<>( AntiEntropyAdvertisement<K> advertisement = new AntiEntropyAdvertisement<>(
ImmutableMap.copyOf(Maps.transformValues(updates, GossipUpdate::digest))); ImmutableMap.copyOf(Maps.transformValues(updates, GossipUpdate::digest)));
protocol.dispatcher().advertise(peer, advertisement).whenComplete((response, error) -> { protocol.advertise(peer, advertisement).whenComplete((response, error) -> {
if (error != null) { if (error != null) {
log.debug("Failed to send anti-entropy advertisement to {}: {}", peer, error.getMessage()); log.debug("Failed to send anti-entropy advertisement to {}: {}", peer, error.getMessage());
} else if (response.status() == AntiEntropyResponse.Status.PROCESSED) { } else if (response.status() == AntiEntropyResponse.Status.PROCESSED) {
Expand Down Expand Up @@ -277,7 +277,7 @@ private synchronized void purgeTombstones() {
@Override @Override
public void close() { public void close() {
open = false; open = false;
protocol.listener().unregisterGossipListener(); protocol.unregisterGossipListener();
updateFuture.cancel(false); updateFuture.cancel(false);
if (purgeFuture != null) { if (purgeFuture != null) {
purgeFuture.cancel(false); purgeFuture.cancel(false);
Expand Down Expand Up @@ -314,7 +314,7 @@ public void processItems(List<GossipUpdate<K, V>> items) {
item.timestamp().isNewerThan(existing.timestamp()) ? item : existing)); item.timestamp().isNewerThan(existing.timestamp()) ? item : existing));
communicationExecutor.execute(() -> { communicationExecutor.execute(() -> {
try { try {
protocol.dispatcher().gossip(peer, new GossipMessage<>(logicalClock.increment(), map.values())); protocol.gossip(peer, new GossipMessage<>(logicalClock.increment(), map.values()));
} catch (Exception e) { } catch (Exception e) {
log.warn("Failed to send to {}", peer, e); log.warn("Failed to send to {}", peer, e);
} }
Expand Down
Expand Up @@ -71,7 +71,7 @@ public static <K, V> Builder<K, V> builder() {
private final Map<Identifier, LogicalTimestamp> peerTimestamps = Maps.newHashMap(); private final Map<Identifier, LogicalTimestamp> peerTimestamps = Maps.newHashMap();


public DisseminationService( public DisseminationService(
GossipProtocol protocol, GossipProtocol<?> protocol,
Supplier<Collection<Identifier>> peerProvider, Supplier<Collection<Identifier>> peerProvider,
Executor eventExecutor, Executor eventExecutor,
ScheduledExecutorService communicationExecutor, ScheduledExecutorService communicationExecutor,
Expand All @@ -84,7 +84,7 @@ public DisseminationService(
this.eventExecutor = checkNotNull(eventExecutor, "eventExecutor cannot be null"); this.eventExecutor = checkNotNull(eventExecutor, "eventExecutor cannot be null");
this.fastConvergence = fastConvergence; this.fastConvergence = fastConvergence;
this.tombstonesDisabled = tombstonesDisabled; this.tombstonesDisabled = tombstonesDisabled;
protocol.listener().registerGossipListener(this::update); protocol.registerGossipListener(this::update);
updateFuture = communicationExecutor.scheduleAtFixedRate(this::gossip, 0, updateInterval.toMillis(), TimeUnit.MILLISECONDS); updateFuture = communicationExecutor.scheduleAtFixedRate(this::gossip, 0, updateInterval.toMillis(), TimeUnit.MILLISECONDS);
purgeFuture = !tombstonesDisabled ? communicationExecutor.scheduleAtFixedRate(this::purgeTombstones, 0, purgeInterval.toMillis(), TimeUnit.MILLISECONDS) : null; purgeFuture = !tombstonesDisabled ? communicationExecutor.scheduleAtFixedRate(this::purgeTombstones, 0, purgeInterval.toMillis(), TimeUnit.MILLISECONDS) : null;
} }
Expand Down Expand Up @@ -190,7 +190,7 @@ private synchronized void updatePeer(Identifier peer) {
.collect(Collectors.toList()); .collect(Collectors.toList());


// Send the gossip message. // Send the gossip message.
protocol.dispatcher().gossip(peer, new GossipMessage<>(updateTimestamp, filteredUpdates)); protocol.gossip(peer, new GossipMessage<>(updateTimestamp, filteredUpdates));


// Set the peer's update time. // Set the peer's update time.
peerTimestamps.put(peer, updateTimestamp); peerTimestamps.put(peer, updateTimestamp);
Expand All @@ -216,7 +216,7 @@ private synchronized void purgeTombstones() {


@Override @Override
public void close() { public void close() {
protocol.listener().unregisterGossipListener(); protocol.unregisterGossipListener();
updateFuture.cancel(false); updateFuture.cancel(false);
if (purgeFuture != null) { if (purgeFuture != null) {
purgeFuture.cancel(false); purgeFuture.cancel(false);
Expand Down
Expand Up @@ -17,25 +17,33 @@


import io.atomix.utils.Identifier; import io.atomix.utils.Identifier;


import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

/** /**
* Anti-entropy protocol. * Anti-entropy protocol.
*/ */
public interface AntiEntropyProtocol<T extends Identifier> extends GossipProtocol { public interface AntiEntropyProtocol<T extends Identifier> extends GossipProtocol<T> {


/** /**
* Returns the anti-entropy protocol listener. * Sends an anti-entropy advertisement.
* *
* @return the anti-entropy protocol listener * @param identifier the location to which to send the advertisement
* @param advertisement the anti-entropy advertisement to send
* @return a future to be completed with the advertisement response
*/ */
@Override <K> CompletableFuture<AntiEntropyResponse<K>> advertise(T identifier, AntiEntropyAdvertisement<K> advertisement);
AntiEntropyProtocolListener listener();


/** /**
* Returns the anti-entropy protocol dispatcher. * Registers an anti-entropy advertisement handler.
* *
* @return the anti-entropy protocol dispatcher * @param handler the anti-entropy advertisement handler to register
*/
<K> void registerAdvertisementHandler(Function<AntiEntropyAdvertisement<K>, AntiEntropyResponse<K>> handler);

/**
* Unregisters the anti-entropy advertisement handler.
*/ */
@Override void unregisterAdvertisementHandler();
AntiEntropyProtocolDispatcher<T> dispatcher();


} }

This file was deleted.

This file was deleted.

Expand Up @@ -17,23 +17,31 @@


import io.atomix.utils.Identifier; import io.atomix.utils.Identifier;


import java.util.function.Consumer;

/** /**
* Gossip protocol. * Gossip protocol.
*/ */
public interface GossipProtocol<T extends Identifier> { public interface GossipProtocol<T extends Identifier> {


/** /**
* Returns the gossip protocol listener. * Sends a gossip message to the given node.
* *
* @return the gossip protocol listener * @param identifier the location to which to send the gossip message
* @param message the gossip message to send
*/ */
GossipProtocolListener listener(); <K, V> void gossip(T identifier, GossipMessage<K, V> message);


/** /**
* Returns the gossip protocol dispatcher. * Registers a gossip message listener.
* *
* @return the gossip protocol dispatcher * @param listener the gossip message listener to register
*/
<K, V> void registerGossipListener(Consumer<GossipMessage<K, V>> listener);

/**
* Unregisters the gossip message listener.
*/ */
GossipProtocolDispatcher<T> dispatcher(); void unregisterGossipListener();


} }

This file was deleted.

This file was deleted.

0 comments on commit 5287f53

Please sign in to comment.