Skip to content

Commit

Permalink
Improving platform interfaces.
Browse files Browse the repository at this point in the history
  • Loading branch information
vozerov-gridgain committed Sep 1, 2015
1 parent 0e25f55 commit 32579d4
Show file tree
Hide file tree
Showing 12 changed files with 96 additions and 122 deletions.
Expand Up @@ -38,8 +38,7 @@
import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager; import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.platform.PlatformAwareEventFilter; import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
import org.apache.ignite.internal.processors.platform.PlatformLocalEventListener;
import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P2; import org.apache.ignite.internal.util.typedef.P2;
import org.apache.ignite.internal.util.typedef.T3; import org.apache.ignite.internal.util.typedef.T3;
Expand Down Expand Up @@ -139,8 +138,8 @@ public GridEventConsumeHandler() {
if (filter != null) if (filter != null)
ctx.resource().injectGeneric(filter); ctx.resource().injectGeneric(filter);


if (filter instanceof PlatformAwareEventFilter) if (filter instanceof PlatformEventFilterListener)
((PlatformAwareEventFilter)filter).initialize(ctx); ((PlatformEventFilterListener)filter).initialize(ctx);


final boolean loc = nodeId.equals(ctx.localNodeId()); final boolean loc = nodeId.equals(ctx.localNodeId());


Expand Down Expand Up @@ -260,16 +259,16 @@ public GridEventConsumeHandler() {
RuntimeException err = null; RuntimeException err = null;


try { try {
if (filter instanceof PlatformAwareEventFilter) if (filter instanceof PlatformEventFilterListener)
((PlatformAwareEventFilter)filter).close(); ((PlatformEventFilterListener)filter).onClose();
} }
catch(RuntimeException ex) { catch(RuntimeException ex) {
err = ex; err = ex;
} }


try { try {
if (cb instanceof PlatformLocalEventListener) if (cb instanceof PlatformEventFilterListener)
((PlatformLocalEventListener)cb).close(); ((PlatformEventFilterListener)cb).onClose();
} }
catch (RuntimeException ex) { catch (RuntimeException ex) {
if (err == null) if (err == null)
Expand Down
Expand Up @@ -17,26 +17,6 @@


package org.apache.ignite.internal.managers.communication; package org.apache.ignite.internal.managers.communication;


import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterNode;
Expand All @@ -54,6 +34,7 @@
import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.GridSpinReadWriteLock; import org.apache.ignite.internal.util.GridSpinReadWriteLock;
Expand Down Expand Up @@ -83,6 +64,27 @@
import org.jsr166.ConcurrentHashMap8; import org.jsr166.ConcurrentHashMap8;
import org.jsr166.ConcurrentLinkedDeque8; import org.jsr166.ConcurrentLinkedDeque8;


import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
Expand Down Expand Up @@ -1457,8 +1459,8 @@ else if (loc)
public void addUserMessageListener(@Nullable final Object topic, @Nullable final IgniteBiPredicate<UUID, ?> p) { public void addUserMessageListener(@Nullable final Object topic, @Nullable final IgniteBiPredicate<UUID, ?> p) {
if (p != null) { if (p != null) {
try { try {
if (p instanceof GridLifecycleAwareMessageFilter) if (p instanceof PlatformMessageFilter)
((GridLifecycleAwareMessageFilter)p).initialize(ctx); ((PlatformMessageFilter)p).initialize(ctx);
else else
ctx.resource().injectGeneric(p); ctx.resource().injectGeneric(p);


Expand Down Expand Up @@ -1795,8 +1797,8 @@ private void closeListener(GridMessageListener lsnr) {
if (lsnr instanceof GridUserMessageListener) { if (lsnr instanceof GridUserMessageListener) {
GridUserMessageListener userLsnr = (GridUserMessageListener)lsnr; GridUserMessageListener userLsnr = (GridUserMessageListener)lsnr;


if (userLsnr.predLsnr instanceof GridLifecycleAwareMessageFilter) if (userLsnr.predLsnr instanceof PlatformMessageFilter)
((GridLifecycleAwareMessageFilter)userLsnr.predLsnr).close(); ((PlatformMessageFilter)userLsnr.predLsnr).onClose();
} }
} }


Expand Down
Expand Up @@ -47,8 +47,7 @@
import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.processors.platform.PlatformAwareEventFilter; import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
import org.apache.ignite.internal.processors.platform.PlatformLocalEventListener;
import org.apache.ignite.internal.util.GridConcurrentLinkedHashSet; import org.apache.ignite.internal.util.GridConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.F;
Expand Down Expand Up @@ -681,8 +680,8 @@ public boolean removeLocalEventListener(GridLocalEventListener lsnr, @Nullable i
{ {
IgnitePredicate p = ((UserListenerWrapper)lsnr).listener(); IgnitePredicate p = ((UserListenerWrapper)lsnr).listener();


if (p instanceof PlatformLocalEventListener) if (p instanceof PlatformEventFilterListener)
((PlatformLocalEventListener)p).close(); ((PlatformEventFilterListener)p).onClose();
} }


return found; return found;
Expand Down Expand Up @@ -784,19 +783,20 @@ private void notifyListeners(@Nullable Collection<GridLocalEventListener> set, E
* @param p Grid event predicate. * @param p Grid event predicate.
* @return Collection of grid events. * @return Collection of grid events.
*/ */
@SuppressWarnings("unchecked")
public <T extends Event> Collection<T> localEvents(IgnitePredicate<T> p) { public <T extends Event> Collection<T> localEvents(IgnitePredicate<T> p) {
assert p != null; assert p != null;


if (p instanceof PlatformAwareEventFilter) { if (p instanceof PlatformEventFilterListener) {
PlatformAwareEventFilter p0 = (PlatformAwareEventFilter)p; PlatformEventFilterListener p0 = (PlatformEventFilterListener)p;


p0.initialize(ctx); p0.initialize(ctx);


try { try {
return getSpi().localEvents(p0); return (Collection<T>)getSpi().localEvents(p0);
} }
finally { finally {
p0.close(); p0.onClose();
} }
} }
else else
Expand Down
Expand Up @@ -18,13 +18,10 @@
package org.apache.ignite.internal.processors.platform; package org.apache.ignite.internal.processors.platform;


import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.Event; import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventAdapter;
import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.communication.GridLifecycleAwareMessageFilter;
import org.apache.ignite.internal.portable.PortableRawReaderEx; import org.apache.ignite.internal.portable.PortableRawReaderEx;
import org.apache.ignite.internal.portable.PortableRawWriterEx; import org.apache.ignite.internal.portable.PortableRawWriterEx;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFilterEx; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFilterEx;
Expand All @@ -39,10 +36,10 @@
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManager; import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManager;
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;


import java.util.Collection; import java.util.Collection;
import java.util.UUID;


/** /**
* Platform context. Acts as an entry point for platform operations. * Platform context. Acts as an entry point for platform operations.
Expand Down Expand Up @@ -178,7 +175,7 @@ public interface PlatformContext {
* @param ptr Pointer of deployed native filter. * @param ptr Pointer of deployed native filter.
* @return Filter. * @return Filter.
*/ */
public GridLifecycleAwareMessageFilter<UUID, Object> createRemoteMessageFilter(Object filter, long ptr); public PlatformMessageFilter createRemoteMessageFilter(Object filter, long ptr);


/** /**
* Check whether the given event type is supported. * Check whether the given event type is supported.
Expand All @@ -192,17 +189,17 @@ public interface PlatformContext {
* Write event. * Write event.
* *
* @param writer Writer. * @param writer Writer.
* @param event Event. * @param evt Event.
*/ */
public void writeEvent(PortableRawWriterEx writer, EventAdapter event); public void writeEvent(PortableRawWriterEx writer, Event evt);


/** /**
* Create local event filter. * Create local event filter.
* *
* @param hnd Native handle. * @param hnd Native handle.
* @return Filter. * @return Filter.
*/ */
public <E extends Event> PlatformAwareEventFilter<E> createLocalEventFilter(long hnd); public PlatformEventFilterListener createLocalEventFilter(long hnd);


/** /**
* Create remote event filter. * Create remote event filter.
Expand All @@ -211,7 +208,7 @@ public interface PlatformContext {
* @param types Event types. * @param types Event types.
* @return Filter. * @return Filter.
*/ */
public <E extends Event> PlatformAwareEventFilter<E> createRemoteEventFilter(Object pred, final int... types); public PlatformEventFilterListener createRemoteEventFilter(Object pred, final int... types);


/** /**
* Create native exception. * Create native exception.
Expand Down
Expand Up @@ -24,16 +24,16 @@
import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgnitePredicate;


/** /**
* Special version of predicate for events with initialize/close callbacks. * Platform event filter and listener.
*/ */
public interface PlatformAwareEventFilter<E extends Event> extends IgnitePredicate<E>, IgniteBiPredicate<UUID, E> { public interface PlatformEventFilterListener extends IgnitePredicate<Event>, IgniteBiPredicate<UUID, Event> {
/** /**
* Initializes the filter. * Initializes the filter.
*/ */
public void initialize(GridKernalContext ctx); public void initialize(GridKernalContext ctx);


/** /**
* Closes the filter. * Callback invoked when filter is closed.
*/ */
public void close(); public void onClose();
} }

This file was deleted.

Expand Up @@ -15,15 +15,17 @@
* limitations under the License. * limitations under the License.
*/ */


package org.apache.ignite.internal.managers.communication; package org.apache.ignite.internal.processors.platform.message;


import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiPredicate;


import java.util.UUID;

/** /**
* Special version of bi-predicate for messaging with initialize/close callbacks. * Platform message filter.
*/ */
public interface GridLifecycleAwareMessageFilter<K, V> extends IgniteBiPredicate<K, V> { public interface PlatformMessageFilter extends IgniteBiPredicate<UUID, Object> {
/** /**
* Initializes the filter. * Initializes the filter.
* *
Expand All @@ -34,5 +36,5 @@ public interface GridLifecycleAwareMessageFilter<K, V> extends IgniteBiPredicate
/** /**
* Closes the filter. * Closes the filter.
*/ */
public void close(); public void onClose();
} }

0 comments on commit 32579d4

Please sign in to comment.