Skip to content

Commit

Permalink
Platforms: WIP on better interfaces.
Browse files Browse the repository at this point in the history
  • Loading branch information
vozerov-gridgain committed Sep 1, 2015
1 parent 39da853 commit 66d46ec
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 24 deletions.
Expand Up @@ -17,8 +17,6 @@


package org.apache.ignite.internal.processors.platform; package org.apache.ignite.internal.processors.platform;


import java.util.Collection;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterMetrics;
Expand All @@ -33,15 +31,18 @@
import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter; import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQuery; import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQuery;
import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway; import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway;
import org.apache.ignite.internal.processors.platform.cluster.PlatformClusterNodeFilter;
import org.apache.ignite.internal.processors.platform.compute.PlatformJob; import org.apache.ignite.internal.processors.platform.compute.PlatformJob;
import org.apache.ignite.internal.processors.platform.datastreamer.PlatformStreamReceiver;
import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream; import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManager; import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManager;
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.stream.StreamReceiver;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;


import java.util.Collection;
import java.util.UUID;

/** /**
* Platform context. Acts as an entry point for platform operations. * Platform context. Acts as an entry point for platform operations.
*/ */
Expand Down Expand Up @@ -266,13 +267,13 @@ public interface PlatformContext {
* @param keepPortable Keep portable flag. * @param keepPortable Keep portable flag.
* @return Stream receiver. * @return Stream receiver.
*/ */
public StreamReceiver createStreamReceiver(Object rcv, long ptr, boolean keepPortable); public PlatformStreamReceiver createStreamReceiver(Object rcv, long ptr, boolean keepPortable);


/** /**
* Create cluster node filter. * Create cluster node filter.
* *
* @param filter Native filter. * @param filter Native filter.
* @return Cluster node filter. * @return Cluster node filter.
*/ */
public IgnitePredicate<ClusterNode> createClusterNodeFilter(Object filter); public PlatformClusterNodeFilter createClusterNodeFilter(Object filter);
} }
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.platform.cluster;

import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.lang.IgnitePredicate;

/**
* Platform cluster node filter marker interface.
*/
public interface PlatformClusterNodeFilter extends IgnitePredicate<ClusterNode> {
// No-op.
}
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.platform.datastreamer;

import org.apache.ignite.stream.StreamReceiver;

/**
* Platform data streamer filter marker interface.
*/
public interface PlatformStreamReceiver extends StreamReceiver<Object, Object> {
// No-op.
}
Expand Up @@ -25,20 +25,19 @@
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.IgniteInstanceResource;


/** /**
* Interop cluster node filter. * Interop cluster node filter.
*/ */
public class PlatformClusterNodeFilter extends PlatformAbstractPredicate implements IgnitePredicate<ClusterNode> { public class PlatformClusterNodeFilterImpl extends PlatformAbstractPredicate implements PlatformClusterNodeFilter {
/** */ /** */
private static final long serialVersionUID = 0L; private static final long serialVersionUID = 0L;


/** /**
* {@link java.io.Externalizable} support. * {@link java.io.Externalizable} support.
*/ */
public PlatformClusterNodeFilter() { public PlatformClusterNodeFilterImpl() {
// No-op. // No-op.
} }


Expand All @@ -48,7 +47,7 @@ public PlatformClusterNodeFilter() {
* @param pred .Net portable predicate. * @param pred .Net portable predicate.
* @param ctx Kernal context. * @param ctx Kernal context.
*/ */
public PlatformClusterNodeFilter(Object pred, PlatformContext ctx) { public PlatformClusterNodeFilterImpl(Object pred, PlatformContext ctx) {
super(pred, 0, ctx); super(pred, 0, ctx);
} }


Expand Down
Expand Up @@ -17,11 +17,6 @@


package org.apache.ignite.internal.processors.platform.datastreamer; package org.apache.ignite.internal.processors.platform.datastreamer;


import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Collection;
import java.util.Map;
import org.apache.ignite.Ignite; import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteException;
Expand All @@ -33,12 +28,17 @@
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.stream.StreamReceiver;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Collection;
import java.util.Map;


/** /**
* Interop receiver. * Interop receiver.
*/ */
public class PlatformStreamReceiver<K, V> extends PlatformAbstractPredicate implements StreamReceiver<K, V> { public class PlatformStreamReceiverImpl extends PlatformAbstractPredicate implements PlatformStreamReceiver {
/** */ /** */
private static final long serialVersionUID = 0L; private static final long serialVersionUID = 0L;


Expand All @@ -48,7 +48,7 @@ public class PlatformStreamReceiver<K, V> extends PlatformAbstractPredicate impl
/** /**
* Constructor. * Constructor.
*/ */
public PlatformStreamReceiver() public PlatformStreamReceiverImpl()
{ {
super(); super();
} }
Expand All @@ -60,7 +60,7 @@ public PlatformStreamReceiver()
* @param ptr Pointer to receiver in the native platform. * @param ptr Pointer to receiver in the native platform.
* @param ctx Kernal context. * @param ctx Kernal context.
*/ */
public PlatformStreamReceiver(Object pred, long ptr, boolean keepPortable, PlatformContext ctx) { public PlatformStreamReceiverImpl(Object pred, long ptr, boolean keepPortable, PlatformContext ctx) {
super(pred, ptr, ctx); super(pred, ptr, ctx);


assert pred != null; assert pred != null;
Expand All @@ -69,7 +69,7 @@ public PlatformStreamReceiver(Object pred, long ptr, boolean keepPortable, Platf
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void receive(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> collection) @Override public void receive(IgniteCache<Object, Object> cache, Collection<Map.Entry<Object, Object>> collection)
throws IgniteException { throws IgniteException {
assert ctx != null; assert ctx != null;


Expand All @@ -82,15 +82,15 @@ public PlatformStreamReceiver(Object pred, long ptr, boolean keepPortable, Platf


writer.writeInt(collection.size()); writer.writeInt(collection.size());


for (Map.Entry<K, V> e : collection) { for (Map.Entry<Object, Object> e : collection) {
writer.writeObject(e.getKey()); writer.writeObject(e.getKey());
writer.writeObject(e.getValue()); writer.writeObject(e.getValue());
} }


out.synchronize(); out.synchronize();


ctx.gateway().dataStreamerStreamReceiverInvoke(ptr, ctx.gateway().dataStreamerStreamReceiverInvoke(ptr, new PlatformCache(ctx, cache, keepPortable),
new PlatformCache(ctx, cache, keepPortable), mem.pointer(), keepPortable); mem.pointer(), keepPortable);
} }
} }


Expand All @@ -116,5 +116,4 @@ public void setIgniteInstance(Ignite ignite) {


keepPortable = in.readBoolean(); keepPortable = in.readBoolean();
} }

} }

0 comments on commit 66d46ec

Please sign in to comment.