Skip to content

Commit

Permalink
futures: api cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Yakov Zhdanov committed Mar 5, 2015
1 parent 2d45a47 commit bf01a40
Show file tree
Hide file tree
Showing 15 changed files with 35 additions and 494 deletions.
Expand Up @@ -82,14 +82,10 @@ public GridKernalGatewayImpl(String gridName) {

throw illegalState();
}

enterThreadLocals();
}

/** {@inheritDoc} */
@Override public void readUnlock() {
leaveThreadLocals();

rwLock.readUnlock();
}

Expand All @@ -99,8 +95,6 @@ public GridKernalGatewayImpl(String gridName) {
if (stackTrace == null)
stackTrace = stackTrace();

enterThreadLocals();

boolean interrupted = false;

// Busy wait is intentional.
Expand Down Expand Up @@ -129,8 +123,6 @@ public GridKernalGatewayImpl(String gridName) {
if (stackTrace == null)
stackTrace = stackTrace();

enterThreadLocals();

return true;
}

Expand Down Expand Up @@ -161,27 +153,9 @@ private IllegalStateException illegalState() {
", state=" + state + ']');
}

/**
* Enter thread locals.
*/
private void enterThreadLocals() {
GridThreadLocal.enter();
GridThreadLocalEx.enter();
}

/**
* Leave thread locals.
*/
private void leaveThreadLocals() {
GridThreadLocalEx.leave();
GridThreadLocal.leave();
}

/** {@inheritDoc} */
@Override public void writeUnlock() {
rwLock.writeUnlock();

leaveThreadLocals();
}

/** {@inheritDoc} */
Expand Down
Expand Up @@ -30,7 +30,6 @@
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.internal.util.worker.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.marshaller.*;
import org.apache.ignite.plugin.extensions.communication.*;
Expand Down Expand Up @@ -112,14 +111,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
/** Lock to sync maps access. */
private final ReadWriteLock lock = new ReentrantReadWriteLock();

/** Message cache. */
private ThreadLocal<IgniteBiTuple<Object, byte[]>> cacheMsg =
new GridThreadLocal<IgniteBiTuple<Object, byte[]>>() {
@Nullable @Override protected IgniteBiTuple<Object, byte[]> initialValue() {
return null;
}
};

/** Fully started flag. When set to true, can send and receive messages. */
private volatile boolean started;

Expand Down Expand Up @@ -416,9 +407,6 @@ public void resetMetrics() {
@Override public void stop(boolean cancel) throws IgniteCheckedException {
stopSpi();

// Clear cache.
cacheMsg.set(null);

if (log.isDebugEnabled())
log.debug(stopInfo());
}
Expand Down Expand Up @@ -1332,14 +1320,14 @@ public void addMessageListener(Object topic, final GridMessageListener lsnr) {
pool(msgSet.policy()).execute(
new Runnable() {
@Override public void run() {
try {
unwindMessageSet(msgSet, lsnrs0);
}
finally {
workersCnt.decrement();
try {
unwindMessageSet(msgSet, lsnrs0);
}
finally {
workersCnt.decrement();
}
}
}
});
});

success = true;
}
Expand Down
Expand Up @@ -71,7 +71,7 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap
private volatile boolean locDepOwner;

/** */
private final GridThreadLocal<Boolean> ignoreOwnership = new GridThreadLocal<Boolean>() {
private final ThreadLocal<Boolean> ignoreOwnership = new ThreadLocal<Boolean>() {
@Override protected Boolean initialValue() {
return false;
}
Expand Down
Expand Up @@ -293,10 +293,6 @@ private IgniteInternalFuture<Object> startFuture(GridCacheMessage<K, V> cacheMsg
private void processMessage(UUID nodeId, GridCacheMessage<K, V> msg,
IgniteBiInClosure<UUID, GridCacheMessage<K, V>> c) {
try {
// Start clean.
if (msg.transactional())
CU.resetTxContext(cctx);

// We will not end up with storing a bunch of new UUIDs
// in each cache entry, since node ID is stored in NIO session
// on handshake.
Expand All @@ -309,8 +305,9 @@ private void processMessage(UUID nodeId, GridCacheMessage<K, V> msg,
U.error(log, "Failed processing message [senderId=" + nodeId + ']', e);
}
finally {
// Clear thread-local tx contexts.
CU.resetTxContext(cctx);
// Reset thread local context.
cctx.tm().txContextReset();
cctx.mvcc().contextReset();

// Unwind eviction notifications.
CU.unwindEvicts(cctx);
Expand Down
Expand Up @@ -86,13 +86,6 @@ public boolean allowForStartup() {
return false;
}

/**
* @return If this is a transactional message.
*/
public boolean transactional() {
return false;
}

/**
* @return {@code True} if class loading errors should be ignored, false otherwise.
*/
Expand Down
Expand Up @@ -51,8 +51,8 @@ public class GridCacheMvccManager<K, V> extends GridCacheSharedManagerAdapter<K,
private static final int MAX_REMOVED_LOCKS = 10240;

/** Pending locks per thread. */
private final GridThreadLocal<Queue<GridCacheMvccCandidate<K>>> pending =
new GridThreadLocal<Queue<GridCacheMvccCandidate<K>>>() {
private final ThreadLocal<Queue<GridCacheMvccCandidate<K>>> pending =
new ThreadLocal<Queue<GridCacheMvccCandidate<K>>>() {
@Override protected Queue<GridCacheMvccCandidate<K>> initialValue() {
return new LinkedList<>();
}
Expand Down Expand Up @@ -724,6 +724,13 @@ public boolean addNext(GridCacheContext<K, V> cacheCtx, GridCacheMvccCandidate<K
return add;
}

/**
* Reset MVCC context.
*/
public void contextReset() {
pending.set(new LinkedList<GridCacheMvccCandidate<K>>());
}

/**
* Adds candidate to the list of near local candidates.
*
Expand Down
Expand Up @@ -1237,15 +1237,6 @@ public static String txString(@Nullable IgniteInternalTx tx) {
", duration=" + (U.currentTimeMillis() - tx.startTime()) + ']';
}

/**
* @param ctx Cache context.
*/
public static void resetTxContext(GridCacheSharedContext ctx) {
assert ctx != null;

ctx.tm().txContextReset();
}

/**
* @param ctx Cache context.
*/
Expand Down
Expand Up @@ -1096,7 +1096,7 @@ private Collection<GridCacheVersion> localDhtPendingVersions(Iterable<IgniteTxEn

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtTxPrepareFuture.class, this, "super", super.toString());
return S.toString(GridDhtTxPrepareFuture.class, this, "xid", tx.xidVersion(), "super", super.toString());
}

/**
Expand Down
Expand Up @@ -65,7 +65,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
private static final int TX_SALVAGE_TIMEOUT = Integer.getInteger(IGNITE_TX_SALVAGE_TIMEOUT, 100);

/** Committing transactions. */
private final ThreadLocal<IgniteInternalTx> threadCtx = new GridThreadLocalEx<>();
private final ThreadLocal<IgniteInternalTx> threadCtx = new ThreadLocal<>();

/** Per-thread transaction map. */
private final ConcurrentMap<Long, IgniteInternalTx<K, V>> threadMap = newMap();
Expand Down
Expand Up @@ -147,14 +147,14 @@ public class GridJobProcessor extends GridProcessorAdapter {
};

/** Internal task flag. */
private final GridThreadLocal<Boolean> internal = new GridThreadLocal<Boolean>() {
private final ThreadLocal<Boolean> internal = new ThreadLocal<Boolean>() {
@Override protected Boolean initialValue() {
return false;
}
};

/** Current session. */
private final GridThreadLocal<ComputeTaskSession> currentSess = new GridThreadLocal<>();
private final ThreadLocal<ComputeTaskSession> currentSess = new ThreadLocal<>();

/**
* @param ctx Kernal context.
Expand Down

0 comments on commit bf01a40

Please sign in to comment.