Skip to content

Commit

Permalink
IGNITE-4030 Streamline PlatformTarget operation methods
Browse files Browse the repository at this point in the history
This closes #1167
  • Loading branch information
ptupitsyn committed Oct 18, 2016
1 parent a863eee commit 7290d88
Show file tree
Hide file tree
Showing 37 changed files with 114 additions and 326 deletions.
Expand Up @@ -98,16 +98,6 @@ protected PlatformAbstractTarget(PlatformContext platformCtx) {
}
}

/** {@inheritDoc} */
@Override public long outLong(int type) throws Exception {
try {
return processOutLong(type);
}
catch (Exception e) {
throw convertException(e);
}
}

/** {@inheritDoc} */
@Override public void outStream(int type, long memPtr) throws Exception {
try (PlatformMemory mem = platformCtx.memory().get(memPtr)) {
Expand Down Expand Up @@ -154,26 +144,6 @@ protected PlatformAbstractTarget(PlatformContext platformCtx) {
}
}

/** {@inheritDoc} */
@Override public void inObjectStreamOutStream(int type, Object arg, long inMemPtr, long outMemPtr) throws Exception {
try (PlatformMemory inMem = platformCtx.memory().get(inMemPtr)) {
BinaryRawReaderEx reader = platformCtx.reader(inMem);

try (PlatformMemory outMem = platformCtx.memory().get(outMemPtr)) {
PlatformOutputStream out = outMem.output();

BinaryRawWriterEx writer = platformCtx.writer(out);

processInObjectStreamOutStream(type, arg, reader, writer);

out.synchronize();
}
}
catch (Exception e) {
throw convertException(e);
}
}

/** {@inheritDoc} */
@Override public Object inObjectStreamOutObjectStream(int type, Object arg, long inMemPtr, long outMemPtr)
throws Exception {
Expand Down Expand Up @@ -329,20 +299,6 @@ protected Object processInStreamOutObject(int type, BinaryRawReaderEx reader) th
return throwUnsupported(type);
}

/**
* Process IN-OUT operation.
*
* @param type Type.
* @param arg Argument.
* @param reader Binary reader.
* @param writer Binary writer.
* @throws IgniteCheckedException In case of exception.
*/
protected void processInObjectStreamOutStream(int type, @Nullable Object arg, BinaryRawReaderEx reader,
BinaryRawWriterEx writer) throws IgniteCheckedException {
throwUnsupported(type);
}

/**
* Process IN-OUT operation.
*
Expand All @@ -357,16 +313,6 @@ protected Object processInObjectStreamOutObjectStream(int type, @Nullable Object
return throwUnsupported(type);
}

/**
* Process OUT operation.
*
* @param type Type.
* @throws IgniteCheckedException In case of exception.
*/
protected long processOutLong(int type) throws IgniteCheckedException {
return throwUnsupported(type);
}

/**
* Process OUT operation.
*
Expand Down
Expand Up @@ -66,17 +66,6 @@ public interface PlatformTarget {
*/
public void inStreamOutStream(int type, long inMemPtr, long outMemPtr) throws Exception;

/**
* Operation accepting an object and a memory stream and returning result to another memory stream.
*
* @param type Operation type.
* @param arg Argument (optional).
* @param inMemPtr Input memory pointer.
* @param outMemPtr Output memory pointer.
* @throws Exception In case of failure.
*/
public void inObjectStreamOutStream(int type, @Nullable Object arg, long inMemPtr, long outMemPtr) throws Exception;

/**
* Operation accepting an object and a memory stream and returning result to another memory stream and an object.
*
Expand All @@ -90,15 +79,6 @@ public interface PlatformTarget {
public Object inObjectStreamOutObjectStream(int type, @Nullable Object arg, long inMemPtr, long outMemPtr)
throws Exception;

/**
* Operation returning long result.
*
* @param type Operation type.
* @return Result.
* @throws Exception In case of failure.
*/
public long outLong(int type) throws Exception;

/**
* Operation returning result to memory stream.
*
Expand Down
Expand Up @@ -273,23 +273,6 @@ public PlatformCache(PlatformContext platformCtx, IgniteCache cache, boolean kee
this.keepBinary = keepBinary;
}

/** {@inheritDoc} */
@Override protected long processOutLong(int type) throws IgniteCheckedException {
switch (type) {
case OP_CLEAR_CACHE:
cache.clear();

return TRUE;

case OP_REMOVE_ALL2:
cache.removeAll();

return TRUE;
}

return super.processOutLong(type);
}

/** {@inheritDoc} */
@Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException {
switch (type) {
Expand Down Expand Up @@ -785,6 +768,16 @@ private void loadCache0(BinaryRawReaderEx reader, boolean loc) {

return TRUE;
}

case OP_CLEAR_CACHE:
cache.clear();

return TRUE;

case OP_REMOVE_ALL2:
cache.removeAll();

return TRUE;
}
return super.processInLongOutLong(type, val);
}
Expand Down
Expand Up @@ -292,10 +292,10 @@ public PlatformAffinity(PlatformContext platformCtx, GridKernalContext igniteCtx
}

/** {@inheritDoc} */
@Override public long outLong(int type) throws Exception {
@Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
if (type == OP_PARTITIONS)
return aff.partitions();

return super.outLong(type);
return super.processInLongOutLong(type, val);
}
}
Expand Up @@ -136,7 +136,7 @@ public PlatformAbstractQueryCursor(PlatformContext platformCtx, QueryCursorEx<T>
}

/** {@inheritDoc} */
@Override protected long processOutLong(int type) throws IgniteCheckedException {
@Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_ITERATOR:
iter = cursor.iterator();
Expand All @@ -154,7 +154,7 @@ public PlatformAbstractQueryCursor(PlatformContext platformCtx, QueryCursorEx<T>
return iter.hasNext() ? TRUE : FALSE;
}

return super.processOutLong(type);
return super.processInLongOutLong(type, val);
}

/** {@inheritDoc} */
Expand Down
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.processors.platform.cache.query;

import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;

Expand Down Expand Up @@ -45,7 +46,7 @@ public PlatformContinuousQueryProxy(PlatformContext platformCtx, PlatformContinu
}

/** {@inheritDoc} */
@Override public long outLong(int type) throws Exception {
@Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException {
qry.close();

return 0;
Expand Down
Expand Up @@ -322,7 +322,7 @@ public PlatformClusterGroup(PlatformContext platformCtx, ClusterGroupEx prj) {
}

/** {@inheritDoc} */
@Override protected long processOutLong(int type) throws IgniteCheckedException {
@Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_RESET_METRICS: {
assert prj instanceof IgniteCluster; // Can only be invoked on top-level cluster group.
Expand All @@ -333,7 +333,7 @@ public PlatformClusterGroup(PlatformContext platformCtx, ClusterGroupEx prj) {
}
}

return super.processOutLong(type);
return super.processInLongOutLong(type, val);
}

/**
Expand Down
Expand Up @@ -135,14 +135,7 @@ public PlatformCompute(PlatformContext platformCtx, ClusterGroup grp, String pla

return TRUE;
}
}

return super.processInLongOutLong(type, val);
}

/** {@inheritDoc} */
@Override protected long processOutLong(int type) throws IgniteCheckedException {
switch (type) {
case OP_WITH_NO_FAILOVER: {
compute.withNoFailover();
computeForPlatform.withNoFailover();
Expand All @@ -151,7 +144,7 @@ public PlatformCompute(PlatformContext platformCtx, ClusterGroup grp, String pla
}
}

return super.processOutLong(type);
return super.processInLongOutLong(type, val);
}

/**
Expand Down
Expand Up @@ -217,14 +217,7 @@ else if (plc == PLC_FLUSH)

return TRUE;
}
}

return super.processInLongOutLong(type, val);
}

/** {@inheritDoc} */
@Override public long processOutLong(int type) throws IgniteCheckedException {
switch (type) {
case OP_ALLOW_OVERWRITE:
return ldr.allowOverwrite() ? TRUE : FALSE;

Expand All @@ -238,7 +231,7 @@ else if (plc == PLC_FLUSH)
return ldr.perNodeParallelOperations();
}

return super.processOutLong(type);
return super.processInLongOutLong(type, val);
}

/**
Expand Down
Expand Up @@ -99,8 +99,17 @@ public PlatformAtomicLong(PlatformContext ctx, GridCacheAtomicLongImpl atomicLon
}

/** {@inheritDoc} */
@Override protected long processOutLong(int type) throws IgniteCheckedException {
@Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_ADD_AND_GET:
return atomicLong.addAndGet(val);

case OP_GET_AND_ADD:
return atomicLong.getAndAdd(val);

case OP_GET_AND_SET:
return atomicLong.getAndSet(val);

case OP_CLOSE:
atomicLong.close();

Expand All @@ -125,22 +134,6 @@ public PlatformAtomicLong(PlatformContext ctx, GridCacheAtomicLongImpl atomicLon
return atomicLong.removed() ? TRUE : FALSE;
}

return super.processOutLong(type);
}

/** {@inheritDoc} */
@Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_ADD_AND_GET:
return atomicLong.addAndGet(val);

case OP_GET_AND_ADD:
return atomicLong.getAndAdd(val);

case OP_GET_AND_SET:
return atomicLong.getAndSet(val);
}

return super.processInLongOutLong(type, val);
}
}
Expand Up @@ -24,7 +24,6 @@
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.omg.CORBA.TRANSACTION_REQUIRED;

/**
* Platform atomic reference wrapper.
Expand Down Expand Up @@ -135,7 +134,7 @@ private PlatformAtomicReference(PlatformContext ctx, GridCacheAtomicReferenceImp
}

/** {@inheritDoc} */
@Override protected long processOutLong(int type) throws IgniteCheckedException {
@Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_CLOSE:
atomicRef.close();
Expand All @@ -146,7 +145,6 @@ private PlatformAtomicReference(PlatformContext ctx, GridCacheAtomicReferenceImp
return atomicRef.removed() ? TRUE : FALSE;
}

return super.processOutLong(type);
return super.processInLongOutLong(type, val);
}
}

}
Expand Up @@ -71,8 +71,19 @@ public PlatformAtomicSequence(PlatformContext ctx, IgniteAtomicSequence atomicSe


/** {@inheritDoc} */
@Override protected long processOutLong(int type) throws IgniteCheckedException {
@Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_ADD_AND_GET:
return atomicSeq.addAndGet(val);

case OP_GET_AND_ADD:
return atomicSeq.getAndAdd(val);

case OP_SET_BATCH_SIZE:
atomicSeq.batchSize((int)val);

return TRUE;

case OP_CLOSE:
atomicSeq.close();

Expand All @@ -94,24 +105,6 @@ public PlatformAtomicSequence(PlatformContext ctx, IgniteAtomicSequence atomicSe
return atomicSeq.batchSize();
}

return super.processOutLong(type);
}

/** {@inheritDoc} */
@Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException {
switch (type) {
case OP_ADD_AND_GET:
return atomicSeq.addAndGet(val);

case OP_GET_AND_ADD:
return atomicSeq.getAndAdd(val);

case OP_SET_BATCH_SIZE:
atomicSeq.batchSize((int)val);

return TRUE;
}

return super.processInLongOutLong(type, val);
}
}

0 comments on commit 7290d88

Please sign in to comment.