Skip to content

Commit

Permalink
IGNITE-11660 Add more tests into DmlStatementsProcessorTest
Browse files Browse the repository at this point in the history
  • Loading branch information
gromtech committed Apr 1, 2019
1 parent 9a9c817 commit 0551cca
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@

package org.apache.ignite.internal.processors.query.h2;

import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteProductVersion;

import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
Expand All @@ -30,6 +34,10 @@
*/
@SuppressWarnings({"Anonymous2MethodRef", "PublicInnerClass", "unused"})
public class DmlStatementsProcessor {
/** The version which changed the anonymous class position of REMOVE closure. */
private static final IgniteProductVersion RMV_ANON_CLS_POS_CHANGED_SINCE =
IgniteProductVersion.fromString("2.7.0");

/** */
public static final class InsertEntryProcessor implements EntryProcessor<Object, Object, Boolean> {
/** Value to set. */
Expand Down Expand Up @@ -112,21 +120,40 @@ public ModifyingEntryProcessor(Object val, IgniteInClosure<MutableEntry<Object,
};

/** Remove updater for compatibility with < 2.7.0. Must not be moved around to keep at anonymous position 4. */
public static final IgniteInClosure<MutableEntry<Object, Object>> RMV_OLD =
private static final IgniteInClosure<MutableEntry<Object, Object>> RMV_OLD =
new IgniteInClosure<MutableEntry<Object, Object>>() {
@Override public void apply(MutableEntry<Object, Object> e) {
e.remove();
}
};

/** Remove updater. Must not be moved around to keep at anonymous position 5. */
public static final IgniteInClosure<MutableEntry<Object, Object>> RMV =
private static final IgniteInClosure<MutableEntry<Object, Object>> RMV =
new IgniteInClosure<MutableEntry<Object, Object>>() {
@Override public void apply(MutableEntry<Object, Object> e) {
e.remove();
}
};

/**
* Returns the remove closure based on the version of the primary node.
*
* @param node Primary node.
* @param key Key.
* @return Remove closure.
*/
public static IgniteInClosure<MutableEntry<Object, Object>> getRemoveClosure(ClusterNode node, Object key) {
assert node != null;
assert key != null;

IgniteInClosure<MutableEntry<Object, Object>> rmvC = RMV;

if (node.version().compareTo(RMV_ANON_CLS_POS_CHANGED_SINCE) < 0)
rmvC = RMV_OLD;

return rmvC;
}

/**
* Entry value updater.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,10 @@ public void add(Object key, EntryProcessor<Object, Object, Boolean> proc, int ro
throws IgniteCheckedException {
assert key != null;
assert proc != null;

ClusterNode node = cctx.affinity().primaryByKey(key, AffinityTopologyVersion.NONE);

if (node == null)
throw new IgniteCheckedException("Failed to map key to node.");

assert rowNum < cntPerRow.length;

ClusterNode node = primaryNodeByKey(key);

UUID nodeId = node.id();

Batch batch = batches.get(nodeId);
Expand All @@ -126,6 +122,20 @@ public void add(Object key, EntryProcessor<Object, Object, Boolean> proc, int ro
sendBatch(batch);
}

/**
* @param key Key.
* @return Primary node for given key.
* @throws IgniteCheckedException If primary node is not found.
*/
public ClusterNode primaryNodeByKey(Object key) throws IgniteCheckedException {
ClusterNode node = cctx.affinity().primaryByKey(key, AffinityTopologyVersion.NONE);

if (node == null)
throw new IgniteCheckedException("Failed to map key to node.");

return node;
}

/**
* Flush any remaining entries.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import java.util.List;
import java.util.Map;

import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.cache.CacheOperationContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
Expand All @@ -46,6 +48,7 @@
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.transactions.TransactionDuplicateKeyException;
import org.h2.util.DateTimeUtils;
import org.h2.util.LocalDateTimeUtils;
Expand Down Expand Up @@ -340,9 +343,16 @@ private static UpdateResult doDelete(GridCacheContext cctx, Iterable<List<?>> cu
if (row.size() != 2)
continue;

Object key = row.get(0);

ClusterNode node = sender.primaryNodeByKey(key);

IgniteInClosure<MutableEntry<Object, Object>> rmvC =
DmlStatementsProcessor.getRemoveClosure(node, key);

sender.add(
row.get(0),
new DmlStatementsProcessor.ModifyingEntryProcessor(row.get(1), DmlStatementsProcessor.RMV),
key,
new DmlStatementsProcessor.ModifyingEntryProcessor(row.get(1), rmvC),
0
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
package org.apache.ignite.internal.processors.query.h2;

import javax.cache.processor.MutableEntry;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.testframework.GridTestNode;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -48,6 +51,44 @@ public void testRemoveEntryModifierCompatibilityNew() throws Exception {
checkRemoveClosureByAnonymousPosition(5);
}

/**
* Checks that the old remove-closure is used if the remote node version is less than 2.7.0.
*/
@Test
public void testRemoveEntryModifierClassName() {
String oldClsName = DmlStatementsProcessor.class.getName() + "$" + 4;
String newClsName = DmlStatementsProcessor.class.getName() + "$" + 5;

checkRemoveEntryClassName("2.4.0", oldClsName);
checkRemoveEntryClassName("2.5.0", oldClsName);
checkRemoveEntryClassName("2.6.0", oldClsName);

checkRemoveEntryClassName("2.7.0", newClsName);
checkRemoveEntryClassName("2.8.0", newClsName);
}

/**
* Checks remove-closure class name.
*
* @param ver The version of the remote node.
* @param expClsName Expected class name.
*/
private void checkRemoveEntryClassName(final String ver, String expClsName) {
ClusterNode node = new GridTestNode() {
@Override public IgniteProductVersion version() {
return IgniteProductVersion.fromString(ver);
}
};

IgniteInClosure<MutableEntry<Object, Object>> rmvC =
DmlStatementsProcessor.getRemoveClosure(node, 0);

Assert.assertNotNull("Check remove-closure", rmvC);

Assert.assertEquals("Check remove-closure class name for version " + ver,
expClsName, rmvC.getClass().getName());
}

/**
* Checks that remove-closure is available by anonymous class position.
*/
Expand Down

0 comments on commit 0551cca

Please sign in to comment.