From 7bed750bc2ec095e7da5b7963b231a6e67b9db88 Mon Sep 17 00:00:00 2001 From: Mikhail Petrov Date: Thu, 18 Jan 2024 16:22:22 +0300 Subject: [PATCH] [WIP] Orphaned code removal. --- .../processors/cache/GridCacheAdapter.java | 565 ++++++++---------- .../distributed/dht/GridDhtCacheAdapter.java | 2 - 2 files changed, 253 insertions(+), 314 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 29ca8ddbef43a..e5d788ae4be3a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1784,7 +1784,7 @@ protected IgniteInternalFuture getAsync( * @return Future for the get operation. * @see GridCacheAdapter#getAllAsync(Collection) */ - protected IgniteInternalFuture> getAllAsync( + protected abstract IgniteInternalFuture> getAllAsync( @Nullable Collection keys, boolean forcePrimary, boolean skipTx, @@ -1794,28 +1794,12 @@ protected IgniteInternalFuture> getAllAsync( ReadRepairStrategy readRepairStrategy, boolean skipVals, final boolean needVer - ) { - CacheOperationContext opCtx = ctx.operationContextPerCall(); - - return getAllAsync(keys, - null, - opCtx == null || !opCtx.skipStore(), - !skipTx, - taskName, - deserializeBinary, - opCtx != null && opCtx.recovery(), - readRepairStrategy, - forcePrimary, - skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null), - skipVals, - needVer); - } + ); /** * @param keys Keys. * @param readerArgs Near cache reader will be added if not null. * @param readThrough Read through. - * @param checkTx Check tx. * @param taskName Task name. * @param deserializeBinary Deserialize binary. * @param recovery Recovery flag. @@ -1829,7 +1813,6 @@ protected IgniteInternalFuture> getAllAsync( public final IgniteInternalFuture> getAllAsync(@Nullable final Collection keys, @Nullable final ReaderArguments readerArgs, boolean readThrough, - boolean checkTx, final String taskName, final boolean deserializeBinary, final boolean recovery, @@ -1846,7 +1829,6 @@ public final IgniteInternalFuture> getAllAsync(@Nullable final Collect return getAllAsync0(ctx.cacheKeysView(keys), readerArgs, readThrough, - checkTx, taskName, deserializeBinary, expiry, @@ -1863,7 +1845,6 @@ public final IgniteInternalFuture> getAllAsync(@Nullable final Collect * @param keys Keys. * @param readerArgs Near cache reader will be added if not null. * @param readThrough Read-through flag. - * @param checkTx Check local transaction flag. * @param taskName Task name/ * @param deserializeBinary Deserialize binary flag. * @param expiry Expiry policy. @@ -1878,7 +1859,6 @@ protected final IgniteInternalFuture> getAllAsync0( @Nullable final Collection keys, @Nullable final ReaderArguments readerArgs, final boolean readThrough, - boolean checkTx, final String taskName, final boolean deserializeBinary, @Nullable final IgniteCacheExpiryPolicy expiry, @@ -1893,370 +1873,331 @@ protected final IgniteInternalFuture> getAllAsync0( if (F.isEmpty(keys)) return new GridFinishedFuture<>(Collections.emptyMap()); - GridNearTxLocal tx = null; + assert mvccSnapshot == null; - if (checkTx) { - try { - checkJta(); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(e); - } - - tx = checkCurrentTx(); - } - - if (tx == null || tx.implicit()) { - assert mvccSnapshot == null; - - Map misses = null; + Map misses = null; - Set newLocalEntries = null; + Set newLocalEntries = null; - final AffinityTopologyVersion topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : - tx.topologyVersion(); + final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); - ctx.shared().database().checkpointReadLock(); + ctx.shared().database().checkpointReadLock(); - try { - int keysSize = keys.size(); + try { + int keysSize = keys.size(); - GridDhtTopologyFuture topFut = ctx.shared().exchange().lastFinishedFuture(); + GridDhtTopologyFuture topFut = ctx.shared().exchange().lastFinishedFuture(); - Throwable ex = topFut != null ? topFut.validateCache(ctx, recovery, /*read*/true, null, keys) : null; + Throwable ex = topFut != null ? topFut.validateCache(ctx, recovery, /*read*/true, null, keys) : null; - if (ex != null) - return new GridFinishedFuture<>(ex); + if (ex != null) + return new GridFinishedFuture<>(ex); - final Map map = keysSize == 1 ? - (Map)new IgniteBiTuple<>() : - U.newHashMap(keysSize); + final Map map = keysSize == 1 ? + (Map)new IgniteBiTuple<>() : + U.newHashMap(keysSize); - final boolean storeEnabled = !skipVals && readThrough && ctx.readThrough(); + final boolean storeEnabled = !skipVals && readThrough && ctx.readThrough(); - boolean readNoEntry = ctx.readNoEntry(expiry, readerArgs != null); + boolean readNoEntry = ctx.readNoEntry(expiry, readerArgs != null); - for (KeyCacheObject key : keys) { - while (true) { - try { - EntryGetResult res = null; + for (KeyCacheObject key : keys) { + while (true) { + try { + EntryGetResult res = null; - boolean evt = !skipVals; - boolean updateMetrics = !skipVals; + boolean evt = !skipVals; + boolean updateMetrics = !skipVals; - GridCacheEntryEx entry = null; + GridCacheEntryEx entry = null; - boolean skipEntry = readNoEntry; + boolean skipEntry = readNoEntry; - if (readNoEntry) { - CacheDataRow row = mvccSnapshot != null ? - ctx.offheap().mvccRead(ctx, key, mvccSnapshot) : - ctx.offheap().read(ctx, key); + if (readNoEntry) { + CacheDataRow row = mvccSnapshot != null ? + ctx.offheap().mvccRead(ctx, key, mvccSnapshot) : + ctx.offheap().read(ctx, key); - if (row != null) { - long expireTime = row.expireTime(); + if (row != null) { + long expireTime = row.expireTime(); - if (expireTime != 0) { - if (expireTime > U.currentTimeMillis()) { - res = new EntryGetWithTtlResult(row.value(), - row.version(), - false, - expireTime, - 0); - } - else - skipEntry = false; + if (expireTime != 0) { + if (expireTime > U.currentTimeMillis()) { + res = new EntryGetWithTtlResult(row.value(), + row.version(), + false, + expireTime, + 0); } else - res = new EntryGetResult(row.value(), row.version(), false); + skipEntry = false; } + else + res = new EntryGetResult(row.value(), row.version(), false); + } - if (res != null) { - if (evt) { - ctx.events().readEvent(key, - null, - txLbl, - row.value(), - taskName, - !deserializeBinary); - } - - if (updateMetrics && ctx.statisticsEnabled()) - ctx.cache().metrics0().onRead(true); + if (res != null) { + if (evt) { + ctx.events().readEvent(key, + null, + txLbl, + row.value(), + taskName, + !deserializeBinary); } - else if (storeEnabled) - skipEntry = false; - } - if (!skipEntry) { - boolean isNewLocalEntry = this.map.getEntry(ctx, key) == null; + if (updateMetrics && ctx.statisticsEnabled()) + ctx.cache().metrics0().onRead(true); + } + else if (storeEnabled) + skipEntry = false; + } - entry = entryEx(key); + if (!skipEntry) { + boolean isNewLocalEntry = this.map.getEntry(ctx, key) == null; - if (entry == null) { - if (!skipVals && ctx.statisticsEnabled()) - ctx.cache().metrics0().onRead(false); + entry = entryEx(key); - break; - } + if (entry == null) { + if (!skipVals && ctx.statisticsEnabled()) + ctx.cache().metrics0().onRead(false); - if (isNewLocalEntry) { - if (newLocalEntries == null) - newLocalEntries = new HashSet<>(); + break; + } - newLocalEntries.add(entry); - } + if (isNewLocalEntry) { + if (newLocalEntries == null) + newLocalEntries = new HashSet<>(); - if (storeEnabled) { - res = entry.innerGetAndReserveForLoad(updateMetrics, - evt, - taskName, - expiry, - !deserializeBinary, - readerArgs); + newLocalEntries.add(entry); + } - assert res != null; + if (storeEnabled) { + res = entry.innerGetAndReserveForLoad(updateMetrics, + evt, + taskName, + expiry, + !deserializeBinary, + readerArgs); - if (res.value() == null) { - if (misses == null) - misses = new HashMap<>(); + assert res != null; - misses.put(key, res); + if (res.value() == null) { + if (misses == null) + misses = new HashMap<>(); - res = null; - } - } - else { - res = entry.innerGetVersioned( - null, - null, - updateMetrics, - evt, - null, - taskName, - expiry, - !deserializeBinary, - readerArgs); + misses.put(key, res); - if (res == null) - entry.touch(); + res = null; } } - - if (res != null) { - ctx.addResult(map, - key, - res, - skipVals, - keepCacheObjects, - deserializeBinary, - true, - needVer); - - if (entry != null && (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))) + else { + res = entry.innerGetVersioned( + null, + null, + updateMetrics, + evt, + null, + taskName, + expiry, + !deserializeBinary, + readerArgs); + + if (res == null) entry.touch(); - - if (keysSize == 1) - // Safe to return because no locks are required in READ_COMMITTED mode. - return new GridFinishedFuture<>(map); } - - break; } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in getAllAsync(..) method (will retry): " + key); + + if (res != null) { + ctx.addResult(map, + key, + res, + skipVals, + keepCacheObjects, + deserializeBinary, + true, + needVer); + + if (entry != null) + entry.touch(); + + if (keysSize == 1) + // Safe to return because no locks are required in READ_COMMITTED mode. + return new GridFinishedFuture<>(map); } + + break; + } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Got removed entry in getAllAsync(..) method (will retry): " + key); } } + } - if (storeEnabled && misses != null) { - final Map loadKeys = misses; + if (storeEnabled && misses != null) { + final Map loadKeys = misses; - final IgniteTxLocalAdapter tx0 = tx; + final Collection loaded = new HashSet<>(); - final Collection loaded = new HashSet<>(); + return new GridEmbeddedFuture( + ctx.closures().callLocalSafe(ctx.projectSafe(new GPC>() { + @Override public Map call() throws Exception { + ctx.store().loadAll(null/*tx*/, loadKeys.keySet(), new CI2() { + @Override public void apply(KeyCacheObject key, Object val) { + EntryGetResult res = loadKeys.get(key); - return new GridEmbeddedFuture( - ctx.closures().callLocalSafe(ctx.projectSafe(new GPC>() { - @Override public Map call() throws Exception { - ctx.store().loadAll(null/*tx*/, loadKeys.keySet(), new CI2() { - @Override public void apply(KeyCacheObject key, Object val) { - EntryGetResult res = loadKeys.get(key); + if (res == null || val == null) + return; - if (res == null || val == null) - return; + loaded.add(key); - loaded.add(key); + CacheObject cacheVal = ctx.toCacheObject(val); - CacheObject cacheVal = ctx.toCacheObject(val); + while (true) { + GridCacheEntryEx entry = null; - while (true) { - GridCacheEntryEx entry = null; + try { + ctx.shared().database().ensureFreeSpace(ctx.dataRegion()); + } + catch (IgniteCheckedException e) { + // Wrap errors (will be unwrapped). + throw new GridClosureException(e); + } - try { - ctx.shared().database().ensureFreeSpace(ctx.dataRegion()); - } - catch (IgniteCheckedException e) { - // Wrap errors (will be unwrapped). - throw new GridClosureException(e); - } + ctx.shared().database().checkpointReadLock(); - ctx.shared().database().checkpointReadLock(); - - try { - entry = entryEx(key); - - entry.unswap(); - - GridCacheVersion newVer = nextVersion(); - - EntryGetResult verVal = entry.versionedValue( - cacheVal, - res.version(), - newVer, - expiry, - readerArgs); - - if (log.isDebugEnabled()) - log.debug("Set value loaded from store into entry [" + - "oldVer=" + res.version() + - ", newVer=" + verVal.version() + ", " + - "entry=" + entry + ']'); - - // Don't put key-value pair into result map if value is null. - if (verVal.value() != null) { - ctx.addResult(map, - key, - verVal, - skipVals, - keepCacheObjects, - deserializeBinary, - true, - needVer); - } - else { - ctx.addResult( - map, - key, - new EntryGetResult(cacheVal, res.version()), - skipVals, - keepCacheObjects, - deserializeBinary, - false, - needVer - ); - } - - if (tx0 == null || (!tx0.implicit() && - tx0.isolation() == READ_COMMITTED)) - entry.touch(); - - break; - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry during getAllAsync (will retry): " + - entry); - } - catch (IgniteCheckedException e) { - // Wrap errors (will be unwrapped). - throw new GridClosureException(e); + try { + entry = entryEx(key); + + entry.unswap(); + + GridCacheVersion newVer = nextVersion(); + + EntryGetResult verVal = entry.versionedValue( + cacheVal, + res.version(), + newVer, + expiry, + readerArgs); + + if (log.isDebugEnabled()) + log.debug("Set value loaded from store into entry [" + + "oldVer=" + res.version() + + ", newVer=" + verVal.version() + ", " + + "entry=" + entry + ']'); + + // Don't put key-value pair into result map if value is null. + if (verVal.value() != null) { + ctx.addResult(map, + key, + verVal, + skipVals, + keepCacheObjects, + deserializeBinary, + true, + needVer); } - finally { - ctx.shared().database().checkpointReadUnlock(); + else { + ctx.addResult( + map, + key, + new EntryGetResult(cacheVal, res.version()), + skipVals, + keepCacheObjects, + deserializeBinary, + false, + needVer + ); } + + entry.touch(); + + break; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry during getAllAsync (will retry): " + + entry); + } + catch (IgniteCheckedException e) { + // Wrap errors (will be unwrapped). + throw new GridClosureException(e); + } + finally { + ctx.shared().database().checkpointReadUnlock(); } } - }); - - clearReservationsIfNeeded(topVer, loadKeys, loaded, tx0); + } + }); - return map; - } - }), true), - new C2, Exception, IgniteInternalFuture>>() { - @Override public IgniteInternalFuture> apply(Map map, Exception e) { - if (e != null) { - clearReservationsIfNeeded(topVer, loadKeys, loaded, tx0); + clearReservationsIfNeeded(topVer, loadKeys, loaded, null); - return new GridFinishedFuture<>(e); - } + return map; + } + }), true), + new C2, Exception, IgniteInternalFuture>>() { + @Override public IgniteInternalFuture> apply(Map map, Exception e) { + if (e != null) { + clearReservationsIfNeeded(topVer, loadKeys, loaded, null); - if (tx0 == null || (!tx0.implicit() && tx0.isolation() == READ_COMMITTED)) { - Collection notFound = new HashSet<>(loadKeys.keySet()); + return new GridFinishedFuture<>(e); + } - notFound.removeAll(loaded); + Collection notFound = new HashSet<>(loadKeys.keySet()); - // Touch entries that were not found in store. - for (KeyCacheObject key : notFound) { - GridCacheEntryEx entry = peekEx(key); + notFound.removeAll(loaded); - if (entry != null) - entry.touch(); - } - } + // Touch entries that were not found in store. + for (KeyCacheObject key : notFound) { + GridCacheEntryEx entry = peekEx(key); - // There were no misses. - return new GridFinishedFuture<>(Collections.emptyMap()); + if (entry != null) + entry.touch(); } - }, - new C2, Exception, Map>() { - @Override public Map apply(Map loaded, Exception e) { - if (e == null) - map.putAll(loaded); - return map; - } + // There were no misses. + return new GridFinishedFuture<>(Collections.emptyMap()); } - ); - } - else - // Misses can be non-zero only if store is enabled. - assert misses == null; + }, + new C2, Exception, Map>() { + @Override public Map apply(Map loaded, Exception e) { + if (e == null) + map.putAll(loaded); - return new GridFinishedFuture<>(map); - } - catch (RuntimeException | AssertionError e) { - if (misses != null) { - for (KeyCacheObject key0 : misses.keySet()) { - GridCacheEntryEx entry = peekEx(key0); - if (entry != null) - entry.touch(); + return map; + } } - } + ); + } + else + // Misses can be non-zero only if store is enabled. + assert misses == null; - if (newLocalEntries != null) { - for (GridCacheEntryEx entry : newLocalEntries) - removeEntry(entry); + return new GridFinishedFuture<>(map); + } + catch (RuntimeException | AssertionError e) { + if (misses != null) { + for (KeyCacheObject key0 : misses.keySet()) { + GridCacheEntryEx entry = peekEx(key0); + if (entry != null) + entry.touch(); } - - return new GridFinishedFuture<>(e); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(e); } - finally { - ctx.shared().database().checkpointReadUnlock(); + + if (newLocalEntries != null) { + for (GridCacheEntryEx entry : newLocalEntries) + removeEntry(entry); } + + return new GridFinishedFuture<>(e); } - else { - return asyncOp(tx, new AsyncOp>(keys) { - @Override public IgniteInternalFuture> op(GridNearTxLocal tx, - AffinityTopologyVersion readyTopVer) { - return tx.getAllAsync(ctx, - readyTopVer, - keys, - deserializeBinary, - skipVals, - false, - !readThrough, - recovery, - readRepairStrategy, - needVer); - } - }, ctx.operationContextPerCall(), /*retry*/false); + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); + } + finally { + ctx.shared().database().checkpointReadUnlock(); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index d700ae7122e86..fbe691443f019 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -655,7 +655,6 @@ else if (log.isDebugEnabled()) return getAllAsync(keys, null, opCtx == null || !opCtx.skipStore(), - /*don't check local tx. */false, taskName, deserializeBinary, opCtx != null && opCtx.recovery(), @@ -691,7 +690,6 @@ IgniteInternalFuture> getDhtAllAsync( return getAllAsync0(keys, readerArgs, readThrough, - /*don't check local tx. */false, taskName, false, expiry,