Skip to content

Commit

Permalink
# ignite-299 : Use ConcurrentMap instead of ClusterNodeLocalMap (chan…
Browse files Browse the repository at this point in the history
…ge usages and delete interface)
  • Loading branch information
ashutakGG committed Feb 20, 2015
1 parent be52911 commit 4ba5d0e
Show file tree
Hide file tree
Showing 20 changed files with 50 additions and 74 deletions.
Expand Up @@ -28,6 +28,7 @@

import java.math.*;
import java.util.*;
import java.util.concurrent.*;

/**
* This example demonstrates how to use continuation feature of Ignite by
Expand Down Expand Up @@ -123,7 +124,7 @@ private static class FibonacciClosure implements IgniteClosure<Long, BigInteger>
return n == 0 ? BigInteger.ZERO : BigInteger.ONE;

// Node-local storage.
ClusterNodeLocalMap<Long, IgniteFuture<BigInteger>> locMap = ignite.cluster().nodeLocalMap();
ConcurrentMap<Long, IgniteFuture<BigInteger>> locMap = ignite.cluster().nodeLocalMap();

// Check if value is cached in node-local-map first.
fut1 = locMap.get(n - 1);
Expand Down
Expand Up @@ -25,6 +25,7 @@

import java.io.*;
import java.util.*;
import java.util.concurrent.*;

/**
* {@code GridCluster} also provides a handle on {@link #nodeLocalMap()} which provides map-like functionality
Expand Down Expand Up @@ -60,7 +61,7 @@ public interface IgniteCluster extends ClusterGroup, IgniteAsyncSupport {
*
* @return Node local storage instance for the local node.
*/
public <K, V> ClusterNodeLocalMap<K, V> nodeLocalMap();
public <K, V> ConcurrentMap<K, V> nodeLocalMap();

/**
* Pings a remote node.
Expand Down

This file was deleted.

Expand Up @@ -17,17 +17,27 @@

package org.apache.ignite.internal.cluster;

import org.apache.ignite.cluster.*;
import org.apache.ignite.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.jdk8.backport.*;

import java.io.*;
import java.util.concurrent.*;

/**
*
* Implementation for node-local storage.
* <p>
* {@code ClusterNodeLocalMapImpl} is similar to {@link ThreadLocal} in a way that its values are not
* distributed and kept only on local node (similar like {@link ThreadLocal} values are attached to the
* current thread only). Node-local values are used primarily by jobs executed from the remote
* nodes to keep intermediate state on the local node between executions.
* <p>
* {@code ClusterNodeLocalMapImpl} is a {@link ConcurrentMap} so it is trivial to use.
* <p>
* You can get an instance of {@code ClusterNodeLocalMapImpl} by calling {@link IgniteCluster#nodeLocalMap()} method.
*/
public class ClusterNodeLocalMapImpl<K, V> extends ConcurrentHashMap8<K, V> implements ClusterNodeLocalMap<K, V>,
public class ClusterNodeLocalMapImpl<K, V> extends ConcurrentHashMap8<K, V> implements ConcurrentMap<K, V>,
Externalizable {
/** */
private static final long serialVersionUID = 0L;
Expand Down
Expand Up @@ -27,6 +27,7 @@

import java.io.*;
import java.util.*;
import java.util.concurrent.*;

/**
*
Expand Down Expand Up @@ -66,7 +67,7 @@ public IgniteClusterAsyncImpl(IgniteClusterImpl cluster) {
}

/** {@inheritDoc} */
@Override public <K, V> ClusterNodeLocalMap<K, V> nodeLocalMap() {
@Override public <K, V> ConcurrentMap<K, V> nodeLocalMap() {
return cluster.nodeLocalMap();
}

Expand Down
Expand Up @@ -51,7 +51,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus

/** Node local store. */
@GridToStringExclude
private ClusterNodeLocalMap nodeLoc;
private ConcurrentMap nodeLoc;

/**
* Required by {@link Externalizable}.
Expand Down Expand Up @@ -101,7 +101,7 @@ public IgniteClusterImpl(GridKernalContext ctx) {

/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <K, V> ClusterNodeLocalMap<K, V> nodeLocalMap() {
@Override public <K, V> ConcurrentMap<K, V> nodeLocalMap() {
guard();

try {
Expand Down
Expand Up @@ -19,7 +19,6 @@

import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.query.*;
Expand Down Expand Up @@ -331,8 +330,7 @@ else if (req.remoteTransformerClassName() != null)
else
fut = (GridCacheQueryFutureAdapter<?, ?, ?>)qry.execute(req.queryArguments());

ClusterNodeLocalMap<QueryExecutionKey, QueryFutureWrapper> locMap =
g.cluster().nodeLocalMap();
ConcurrentMap<QueryExecutionKey, QueryFutureWrapper> locMap = g.cluster().nodeLocalMap();

QueryFutureWrapper wrapper = new QueryFutureWrapper(fut);

Expand Down Expand Up @@ -367,8 +365,7 @@ private FetchQueryResults(GridRestCacheQueryRequest req) {

/** {@inheritDoc} */
@Override public GridRestResponse call() throws Exception {
ClusterNodeLocalMap<QueryExecutionKey, QueryFutureWrapper> locMap =
g.cluster().nodeLocalMap();
ConcurrentMap<QueryExecutionKey, QueryFutureWrapper> locMap = g.cluster().nodeLocalMap();

return fetchQueryResults(req.queryId(), locMap.get(new QueryExecutionKey(req.queryId())),
locMap, g.cluster().localNode().id());
Expand Down
Expand Up @@ -17,7 +17,6 @@

package org.apache.ignite.internal.visor.compute;

import org.apache.ignite.cluster.*;
import org.apache.ignite.compute.*;
import org.apache.ignite.internal.processors.task.*;
import org.apache.ignite.internal.util.typedef.internal.*;
Expand All @@ -26,6 +25,7 @@
import org.jetbrains.annotations.*;

import java.util.*;
import java.util.concurrent.*;

import static org.apache.ignite.internal.visor.compute.VisorComputeMonitoringHolder.*;
import static org.apache.ignite.internal.visor.util.VisorTaskUtils.*;
Expand Down Expand Up @@ -75,7 +75,7 @@ private VisorComputeToggleMonitoringJob(IgniteBiTuple<String, Boolean> arg, bool
if (checkExplicitTaskMonitoring(ignite))
return true;
else {
ClusterNodeLocalMap<String, VisorComputeMonitoringHolder> storage = ignite.cluster().nodeLocalMap();
ConcurrentMap<String, VisorComputeMonitoringHolder> storage = ignite.cluster().nodeLocalMap();

VisorComputeMonitoringHolder holder = storage.get(COMPUTE_MONITORING_HOLDER_KEY);

Expand Down
Expand Up @@ -19,7 +19,6 @@

import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.igfs.*;
Expand All @@ -33,6 +32,7 @@
import org.apache.ignite.streamer.*;

import java.util.*;
import java.util.concurrent.*;

import static org.apache.ignite.internal.processors.cache.GridCacheUtils.*;
import static org.apache.ignite.internal.visor.compute.VisorComputeMonitoringHolder.*;
Expand Down Expand Up @@ -84,7 +84,7 @@ protected void events(VisorNodeDataCollectorJobResult res, VisorNodeDataCollecto
res.taskMonitoringEnabled(arg.taskMonitoringEnabled());

if (arg.taskMonitoringEnabled()) {
ClusterNodeLocalMap<String, VisorComputeMonitoringHolder> storage = ignite.cluster().nodeLocalMap();
ConcurrentMap<String, VisorComputeMonitoringHolder> storage = ignite.cluster().nodeLocalMap();

VisorComputeMonitoringHolder holder = storage.get(COMPUTE_MONITORING_HOLDER_KEY);

Expand Down
Expand Up @@ -30,6 +30,7 @@

import java.io.*;
import java.util.*;
import java.util.concurrent.*;

import static org.apache.ignite.events.EventType.*;
import static org.apache.ignite.internal.visor.util.VisorTaskUtils.*;
Expand Down Expand Up @@ -263,7 +264,7 @@ private boolean filterByTaskSessionId(Event e, IgniteUuid taskSesId) {
@Override protected Collection<? extends VisorGridEvent> run(final VisorNodeEventsCollectorTaskArg arg) {
final long startEvtTime = arg.timeArgument() == null ? 0L : System.currentTimeMillis() - arg.timeArgument();

final ClusterNodeLocalMap<String, Long> nl = ignite.cluster().nodeLocalMap();
final ConcurrentMap<String, Long> nl = ignite.cluster().nodeLocalMap();

final Long startEvtOrder = arg.keyOrder() != null && nl.containsKey(arg.keyOrder()) ?
nl.get(arg.keyOrder()) : -1L;
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.jetbrains.annotations.*;

import java.util.*;
import java.util.concurrent.*;

import static org.apache.ignite.internal.visor.util.VisorTaskUtils.*;

Expand Down Expand Up @@ -85,8 +86,7 @@ protected VisorQueryCleanupJob(Collection<String> arg, boolean debug) {

/** {@inheritDoc} */
@Override protected Void run(Collection<String> qryIds) {
ClusterNodeLocalMap<String, VisorQueryTask.VisorFutureResultSetHolder> locMap =
ignite.cluster().nodeLocalMap();
ConcurrentMap<String, VisorQueryTask.VisorFutureResultSetHolder> locMap = ignite.cluster().nodeLocalMap();

for (String qryId : qryIds)
locMap.remove(qryId);
Expand Down
Expand Up @@ -18,14 +18,14 @@
package org.apache.ignite.internal.visor.query;

import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.task.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.internal.visor.*;
import org.apache.ignite.lang.*;

import java.util.*;
import java.util.concurrent.*;

/**
* Task for collecting next page previously executed SQL or SCAN query.
Expand Down Expand Up @@ -71,7 +71,7 @@ private VisorQueryNextPageJob(IgniteBiTuple<String, Integer> arg, boolean debug)
private VisorQueryResult nextSqlPage(IgniteBiTuple<String, Integer> arg) throws IgniteCheckedException {
long start = U.currentTimeMillis();

ClusterNodeLocalMap<String, VisorQueryTask.VisorFutureResultSetHolder<List<?>>> storage =
ConcurrentMap<String, VisorQueryTask.VisorFutureResultSetHolder<List<?>>> storage =
ignite.cluster().nodeLocalMap();

VisorQueryTask.VisorFutureResultSetHolder<List<?>> t = storage.get(arg.get1());
Expand All @@ -95,7 +95,7 @@ private VisorQueryResult nextSqlPage(IgniteBiTuple<String, Integer> arg) throws
private VisorQueryResult nextScanPage(IgniteBiTuple<String, Integer> arg) throws IgniteCheckedException {
long start = U.currentTimeMillis();

ClusterNodeLocalMap<String, VisorQueryTask.VisorFutureResultSetHolder<Map.Entry<Object, Object>>> storage =
ConcurrentMap<String, VisorQueryTask.VisorFutureResultSetHolder<Map.Entry<Object, Object>>> storage =
ignite.cluster().nodeLocalMap();

VisorQueryTask.VisorFutureResultSetHolder<Map.Entry<Object, Object>> t = storage.get(arg.get1());
Expand Down
Expand Up @@ -19,7 +19,6 @@

import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.query.*;
Expand All @@ -33,6 +32,7 @@
import java.io.*;
import java.sql.*;
import java.util.*;
import java.util.concurrent.*;

import static org.apache.ignite.internal.visor.query.VisorQueryUtils.*;
import static org.apache.ignite.internal.visor.util.VisorTaskUtils.*;
Expand Down Expand Up @@ -282,7 +282,7 @@ protected VisorQueryJob(VisorQueryArg arg, boolean debug) {
private void scheduleResultSetHolderRemoval(final String id) {
((IgniteKernal)ignite).context().timeout().addTimeoutObject(new GridTimeoutObjectAdapter(RMV_DELAY) {
@Override public void onTimeout() {
ClusterNodeLocalMap<String, VisorFutureResultSetHolder> storage = ignite.cluster().nodeLocalMap();
ConcurrentMap<String, VisorFutureResultSetHolder> storage = ignite.cluster().nodeLocalMap();

VisorFutureResultSetHolder<?> t = storage.get(id);

Expand Down
Expand Up @@ -41,6 +41,7 @@
import java.nio.file.*;
import java.text.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

import static java.lang.System.*;
Expand Down Expand Up @@ -361,7 +362,7 @@ public static Collection<VisorGridEvent> collectEvents(Ignite ignite, String evt
assert ignite != null;
assert evtTypes != null && evtTypes.length > 0;

ClusterNodeLocalMap<String, Long> nl = ignite.cluster().nodeLocalMap();
ConcurrentMap<String, Long> nl = ignite.cluster().nodeLocalMap();

final long lastOrder = getOrElse(nl, evtOrderKey, -1L);
final long throttle = getOrElse(nl, evtThrottleCntrKey, 0L);
Expand Down
Expand Up @@ -18,12 +18,12 @@
package org.apache.ignite.internal;

import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.testframework.junits.common.*;

import java.util.*;
import java.util.concurrent.*;

/**
* This test will test node local storage.
Expand All @@ -49,7 +49,7 @@ public void testNodeLocal() throws Exception {

GridTuple3 key = F.t(keyNum, keyStr, keyDate);

ClusterNodeLocalMap<Object, Object> nl = g.cluster().nodeLocalMap();
ConcurrentMap<Object, Object> nl = g.cluster().nodeLocalMap();

nl.put(keyStr, "Hello world!");
nl.put(key, 12);
Expand Down
Expand Up @@ -432,7 +432,7 @@ public String terminalId() {

/** {@inheritDoc} */
@Override public Object execute() {
ClusterNodeLocalMap<String, T2<AtomicLong, AtomicLong>> nodeLoc = ignite.cluster().nodeLocalMap();
ConcurrentMap<String, T2<AtomicLong, AtomicLong>> nodeLoc = ignite.cluster().nodeLocalMap();

T2<AtomicLong, AtomicLong> cntrs = nodeLoc.get("cntrs");

Expand Down
Expand Up @@ -19,7 +19,6 @@

import org.apache.ignite.*;
import org.apache.ignite.cache.affinity.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.compute.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.*;
Expand Down Expand Up @@ -90,7 +89,7 @@ public GridDsiPerfJob(@Nullable GridDsiMessage msg) {
*/
@SuppressWarnings("ConstantConditions")
@Override public Object execute() {
ClusterNodeLocalMap<String, T2<AtomicLong, AtomicLong>> nodeLoc = ignite.cluster().nodeLocalMap();
ConcurrentMap<String, T2<AtomicLong, AtomicLong>> nodeLoc = ignite.cluster().nodeLocalMap();

T2<AtomicLong, AtomicLong> cntrs = nodeLoc.get("cntrs");

Expand Down
Expand Up @@ -520,13 +520,13 @@ public void testLoggerMarshalling() throws Exception {
}

/**
* Tests marshal {@link org.apache.ignite.cluster.ClusterNodeLocalMap} instance.
* Tests marshal {@link ClusterNodeLocalMapImpl} instance.
*
* @throws Exception If test failed.
*/
@SuppressWarnings("unchecked")
public void testNodeLocalMarshalling() throws Exception {
ClusterNodeLocalMap<String, String> loc = grid().cluster().nodeLocalMap();
ConcurrentMap<String, String> loc = grid().cluster().nodeLocalMap();

String key = "test-key";
String val = "test-val";
Expand All @@ -550,7 +550,7 @@ public void testNodeLocalMarshalling() throws Exception {

outBean.checkNullResources();

loc = (ClusterNodeLocalMap<String, String>)outBean.getObjectField();
loc = (ConcurrentMap<String, String>)outBean.getObjectField();

assert loc.size() == 1;
assert val.equals(loc.get(key));
Expand Down

0 comments on commit 4ba5d0e

Please sign in to comment.