Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -637,12 +637,10 @@ else if (!crd && !fetchFuts.containsKey(grp.groupId())) {
/**
* @param msg Change request.
* @param topVer Current topology version.
* @param crd Coordinator flag.
* @return Closed caches IDs.
*/
private Set<Integer> processCacheCloseRequests(
ClientCacheChangeDummyDiscoveryMessage msg,
boolean crd,
AffinityTopologyVersion topVer
) {
Set<String> cachesToClose = msg.cachesToClose();
Expand Down Expand Up @@ -704,7 +702,7 @@ void processClientCachesRequests(ClientCacheChangeDummyDiscoveryMessage msg) {

// Check and close caches via dummy message.
if (msg.cachesToClose() != null)
closedCaches = processCacheCloseRequests(msg, crd, topVer);
closedCaches = processCacheCloseRequests(msg, topVer);

// Shedule change message.
if (startedCaches != null || closedCaches != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ public void cacheConfigurationEnrichment(CacheConfigurationEnrichment cacheCfgEn
", clientStartOnly=" + clientStartOnly +
", stop=" + stop +
", destroy=" + destroy +
", disabledAfterStart" + disabledAfterStart +
", disabledAfterStart=" + disabledAfterStart +
']';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ public boolean clientOnlyExchange() {
* @return New caches start requests.
*/
public Collection<CacheActionData> cacheStartRequests() {
return cachesToStart != null ? cachesToStart.values() : Collections.<CacheActionData>emptyList();
return cachesToStart != null ? cachesToStart.values() : Collections.emptyList();
}

/**
* @return Stop cache requests.
*/
public Collection<CacheActionData> cacheStopRequests() {
return cachesToStop != null ? cachesToStop.values() : Collections.<CacheActionData>emptyList();
return cachesToStop != null ? cachesToStop.values() : Collections.emptyList();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1599,7 +1599,7 @@ private GridCacheGateway<K, V> checkProxyIsValid(@Nullable GridCacheGateway<K, V
IgniteCacheProxyImpl proxyImpl = (IgniteCacheProxyImpl) delegate;

try {
IgniteCacheProxy<K, V> proxy = context().kernalContext().cache().<K, V>publicJCache(context().name());
IgniteCacheProxy<K, V> proxy = context().kernalContext().cache().publicJCache(context().name());

if (proxy != null) {
proxyImpl.opportunisticRestart(proxy.internalProxy());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1577,13 +1577,17 @@ else if (interceptorVal != val0)

recordNodeId(affNodeId, topVer);

if (metrics && cctx.statisticsEnabled()) {
if (metrics && cctx.statisticsEnabled() && tx != null) {
cctx.cache().metrics0().onWrite();

T2<GridCacheOperation, CacheObject> entryProcRes = tx.entry(txKey()).entryProcessorCalculatedValue();
IgniteTxEntry txEntry = tx.entry(txKey());

if (entryProcRes != null && UPDATE.equals(entryProcRes.get1()))
cctx.cache().metrics0().onInvokeUpdate(old != null);
if (txEntry != null) {
T2<GridCacheOperation, CacheObject> entryProcRes = txEntry.entryProcessorCalculatedValue();

if (entryProcRes != null && UPDATE.equals(entryProcRes.get1()))
cctx.cache().metrics0().onInvokeUpdate(old != null);
}
}

if (evt && newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,7 @@ public GridDhtPartitionTopology clientTopology(int grpId, DiscoCache discoCache)

CacheGroupDescriptor grpDesc = cctx.affinity().cacheGroups().get(grpId);

assert grpDesc != null : grpId;
assert grpDesc != null : "grpId=" + grpId;

CacheConfiguration<?, ?> ccfg = grpDesc.config();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/** Transaction interface implementation. */
private IgniteTransactionsImpl transactions;

/** Pending cache starts. */
/** Pending cache operations. */
private ConcurrentMap<UUID, IgniteInternalFuture> pendingFuts = new ConcurrentHashMap<>();

/** Template configuration add futures. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3202,7 +3202,8 @@ private void collectInfo() {
for (Map.Entry<GridCacheMapEntry, Integer> info : store.entrySet()) {
GridCacheAdapter<Object, Object> cacheCtx = info.getKey().context().cache();

metricPerCacheStore.computeIfAbsent(cacheCtx, k -> new ArrayList<>()).add(info);
if (cacheCtx != null)
metricPerCacheStore.computeIfAbsent(cacheCtx, k -> new ArrayList<>()).add(info);
}

store.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2742,14 +2742,11 @@ void reset(
PendingMessage pm = new PendingMessage(msg);

this.msgs.add(pm);

if (pm.customMsg && pm.id.equals(customDiscardId))
this.customDiscardId = customDiscardId;

if (!pm.customMsg && pm.id.equals(discardId))
this.discardId = discardId;
}
}

this.discardId = discardId;
this.customDiscardId = customDiscardId;
}

/**
Expand Down Expand Up @@ -6218,32 +6215,8 @@ private void processClientPingRequest(final TcpDiscoveryClientPingRequest msg) {
*/
private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean waitForNotification) {
if (isLocalNodeCoordinator()) {
boolean delayMsg;

assert ring.minimumNodeVersion() != null : ring;

boolean joiningEmpty;

synchronized (mux) {
joiningEmpty = joiningNodes.isEmpty();
}

delayMsg = msg.topologyVersion() == 0L && !joiningEmpty;

if (delayMsg) {
if (log.isDebugEnabled()) {
synchronized (mux) {
log.debug("Delay custom message processing, there are joining nodes [msg=" + msg +
", joiningNodes=" + joiningNodes + ']');
}
}

synchronized (mux) {
pendingCustomMsgs.add(msg);
}

if (posponeUndeliveredMessages(msg))
return;
}

if (!msg.verified()) {
msg.verify(getLocalNodeId());
Expand Down Expand Up @@ -6327,6 +6300,36 @@ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean wa
}
}

/**
* If new node is in the progress of being added we must store and resend undelivered messages.
*
* @param msg Processed message.
* @return {@code true} If message was appended to pending queue.
*/
private boolean posponeUndeliveredMessages(final TcpDiscoveryCustomEventMessage msg) {
boolean joiningEmpty;

synchronized (mux) {
joiningEmpty = joiningNodes.isEmpty();

if (log.isDebugEnabled())
log.debug("Delay custom message processing, there are joining nodes [msg=" + msg +
", joiningNodes=" + joiningNodes + ']');
}

boolean delayMsg = msg.topologyVersion() == 0L && !joiningEmpty;

if (delayMsg) {
synchronized (mux) {
pendingCustomMsgs.add(msg);
}

return true;
}

return false;
}

/**
* Checks failed nodes list and sends {@link TcpDiscoveryNodeFailedMessage} if failed node is still in the
* ring and node detected failure left ring.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionOptimisticException;
import org.junit.Ignore;
import org.junit.Test;

import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -186,7 +185,7 @@ private void txStreamerLoad(boolean allowOverwrite) throws Exception {
private void txStreamerLoad(Ignite ignite,
Integer key,
String cacheName,
boolean allowOverwrite) throws Exception {
boolean allowOverwrite) {
IgniteCache<Integer, Integer> cache = ignite.cache(cacheName);

log.info("Test key: " + key);
Expand Down Expand Up @@ -2824,7 +2823,6 @@ public void testReadWriteTxConflict() throws Exception {
/**
* @throws Exception If failed.
*/
@Ignore("https://issues.apache.org/jira/browse/IGNITE-9226")
@Test
public void testReadWriteTransactionsNoDeadlock() throws Exception {
checkReadWriteTransactionsNoDeadlock(false);
Expand All @@ -2833,7 +2831,6 @@ public void testReadWriteTransactionsNoDeadlock() throws Exception {
/**
* @throws Exception If failed.
*/
@Ignore("https://issues.apache.org/jira/browse/IGNITE-9226")
@Test
public void testReadWriteTransactionsNoDeadlockMultinode() throws Exception {
checkReadWriteTransactionsNoDeadlock(true);
Expand All @@ -2844,8 +2841,6 @@ public void testReadWriteTransactionsNoDeadlockMultinode() throws Exception {
* @throws Exception If failed.
*/
private void checkReadWriteTransactionsNoDeadlock(final boolean multiNode) throws Exception {
fail("https://issues.apache.org/jira/browse/IGNITE-9226");

final Ignite ignite0 = ignite(0);

for (final CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
Expand Down Expand Up @@ -4140,7 +4135,7 @@ private void accountTx(final boolean getAll,

if (nonSer) {
nonSerFut = runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
@Override public Void call() {
int nodeIdx = idx.getAndIncrement() % clients.size();

Ignite node = clients.get(nodeIdx);
Expand Down Expand Up @@ -4198,7 +4193,7 @@ private void accountTx(final boolean getAll,
}

final IgniteInternalFuture<?> fut = runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
@Override public Void call() {
int nodeIdx = idx.getAndIncrement() % clients.size();

Ignite node = clients.get(nodeIdx);
Expand All @@ -4210,8 +4205,8 @@ private void accountTx(final boolean getAll,
final IgniteTransactions txs = node.transactions();

final IgniteCache<Integer, Account> cache =
nearCache ? node.createNearCache(cacheName, new NearCacheConfiguration<Integer, Account>()) :
node.<Integer, Account>cache(cacheName);
nearCache ? node.createNearCache(cacheName, new NearCacheConfiguration<>()) :
node.cache(cacheName);

assertNotNull(cache);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1353,10 +1353,8 @@ protected List<Integer> backupKeys(IgniteCache<?, ?> cache, int cnt, int startFr
* @param cnt Keys count.
* @param startFrom Start value for keys search.
* @return Collection of keys for which given cache is neither primary nor backup.
* @throws IgniteCheckedException If failed.
*/
protected List<Integer> nearKeys(IgniteCache<?, ?> cache, int cnt, int startFrom)
throws IgniteCheckedException {
protected List<Integer> nearKeys(IgniteCache<?, ?> cache, int cnt, int startFrom) {
return findKeys(cache, cnt, startFrom, 2);
}

Expand Down Expand Up @@ -1543,10 +1541,8 @@ protected Integer backupKey(IgniteCache<?, ?> cache)
/**
* @param cache Cache.
* @return Key for which given cache is neither primary nor backup.
* @throws IgniteCheckedException If failed.
*/
protected Integer nearKey(IgniteCache<?, ?> cache)
throws IgniteCheckedException {
protected Integer nearKey(IgniteCache<?, ?> cache) {
return nearKeys(cache, 1, 1).get(0);
}

Expand Down