Skip to content

Commit

Permalink
ignite-971 Fixed offheap to swap eviction, added failover tests with …
Browse files Browse the repository at this point in the history
…swap/offheap, added retries for tx 'check backup' rollback.
  • Loading branch information
sboikov committed Sep 15, 2015
1 parent eb7d2b0 commit a7490a6
Show file tree
Hide file tree
Showing 62 changed files with 1,584 additions and 421 deletions.
Expand Up @@ -843,7 +843,7 @@ public void onOffHeapEvict() {
offHeapEvicts.incrementAndGet(); offHeapEvicts.incrementAndGet();


if (delegate != null) if (delegate != null)
delegate.onOffHeapRemove(); delegate.onOffHeapEvict();
} }


/** /**
Expand Down
Expand Up @@ -4096,21 +4096,22 @@ public void awaitLastFut() {


return t; return t;
} }
catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException | catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException e) {
IgniteTxRollbackCheckedException e) {
throw e; throw e;
} }
catch (IgniteCheckedException e) { catch (IgniteCheckedException e) {
try { if (!(e instanceof IgniteTxRollbackCheckedException)) {
tx.rollback(); try {
tx.rollback();


e = new IgniteTxRollbackCheckedException("Transaction has been rolled back: " + e = new IgniteTxRollbackCheckedException("Transaction has been rolled back: " +
tx.xid(), e); tx.xid(), e);
} }
catch (IgniteCheckedException | AssertionError | RuntimeException e1) { catch (IgniteCheckedException | AssertionError | RuntimeException e1) {
U.error(log, "Failed to rollback transaction (cache may contain stale locks): " + tx, e1); U.error(log, "Failed to rollback transaction (cache may contain stale locks): " + tx, e1);


U.addLastCause(e, e1, log); U.addLastCause(e, e1, log);
}
} }


if (X.hasCause(e, ClusterTopologyCheckedException.class) && i != retries - 1) { if (X.hasCause(e, ClusterTopologyCheckedException.class) && i != retries - 1) {
Expand Down
Expand Up @@ -866,6 +866,19 @@ public Collection<GridCacheMvccCandidate> localCandidates(@Nullable GridCacheVer
*/ */
public void updateTtl(@Nullable GridCacheVersion ver, long ttl); public void updateTtl(@Nullable GridCacheVersion ver, long ttl);


/**
* Tries to do offheap -> swap eviction.
*
* @param entry Serialized swap entry.
* @param evictVer Version when entry was selected for eviction.
* @param obsoleteVer Obsolete version.
* @throws IgniteCheckedException If failed.
* @throws GridCacheEntryRemovedException If entry was removed.
* @return {@code True} if entry was obsoleted and written to swap.
*/
public boolean offheapSwapEvict(byte[] entry, GridCacheVersion evictVer, GridCacheVersion obsoleteVer)
throws IgniteCheckedException, GridCacheEntryRemovedException;

/** /**
* @return Value. * @return Value.
* @throws IgniteCheckedException If failed to read from swap storage. * @throws IgniteCheckedException If failed to read from swap storage.
Expand Down
Expand Up @@ -958,7 +958,7 @@ public void batchEvict(Collection<?> keys, @Nullable GridCacheVersion obsoleteVe


List<GridCacheEntryEx> locked = new ArrayList<>(keys.size()); List<GridCacheEntryEx> locked = new ArrayList<>(keys.size());


Set<GridCacheEntryEx> notRemove = null; Set<GridCacheEntryEx> notRmv = null;


Collection<GridCacheBatchSwapEntry> swapped = new ArrayList<>(keys.size()); Collection<GridCacheBatchSwapEntry> swapped = new ArrayList<>(keys.size());


Expand Down Expand Up @@ -990,10 +990,10 @@ public void batchEvict(Collection<?> keys, @Nullable GridCacheVersion obsoleteVe
locked.add(entry); locked.add(entry);


if (entry.obsolete()) { if (entry.obsolete()) {
if (notRemove == null) if (notRmv == null)
notRemove = new HashSet<>(); notRmv = new HashSet<>();


notRemove.add(entry); notRmv.add(entry);


continue; continue;
} }
Expand All @@ -1004,11 +1004,19 @@ public void batchEvict(Collection<?> keys, @Nullable GridCacheVersion obsoleteVe
GridCacheBatchSwapEntry swapEntry = entry.evictInBatchInternal(obsoleteVer); GridCacheBatchSwapEntry swapEntry = entry.evictInBatchInternal(obsoleteVer);


if (swapEntry != null) { if (swapEntry != null) {
assert entry.obsolete() : entry;

swapped.add(swapEntry); swapped.add(swapEntry);


if (log.isDebugEnabled()) if (log.isDebugEnabled())
log.debug("Entry was evicted [entry=" + entry + ", localNode=" + cctx.nodeId() + ']'); log.debug("Entry was evicted [entry=" + entry + ", localNode=" + cctx.nodeId() + ']');
} }
else if (!entry.obsolete()) {
if (notRmv == null)
notRmv = new HashSet<>();

notRmv.add(entry);
}
} }


// Batch write to swap. // Batch write to swap.
Expand All @@ -1025,7 +1033,7 @@ public void batchEvict(Collection<?> keys, @Nullable GridCacheVersion obsoleteVe


// Remove entries and fire events outside the locks. // Remove entries and fire events outside the locks.
for (GridCacheEntryEx entry : locked) { for (GridCacheEntryEx entry : locked) {
if (entry.obsolete() && (notRemove == null || !notRemove.contains(entry))) { if (entry.obsolete() && (notRmv == null || !notRmv.contains(entry))) {
entry.onMarkedObsolete(); entry.onMarkedObsolete();


cache.removeEntry(entry); cache.removeEntry(entry);
Expand Down
Expand Up @@ -432,6 +432,41 @@ public boolean isStartVersion() {
return info; return info;
} }


/** {@inheritDoc} */
@Override public boolean offheapSwapEvict(byte[] entry, GridCacheVersion evictVer, GridCacheVersion obsoleteVer)
throws IgniteCheckedException, GridCacheEntryRemovedException {
assert cctx.swap().swapEnabled() && cctx.swap().offHeapEnabled() : this;

boolean obsolete;

synchronized (this) {
checkObsolete();

if (hasReaders() || !isStartVersion())
return false;

GridCacheMvcc mvcc = mvccExtras();

if (mvcc != null && !mvcc.isEmpty(obsoleteVer))
return false;

if (cctx.swap().offheapSwapEvict(key, entry, partition(), evictVer)) {
assert !hasValueUnlocked() : this;

obsolete = markObsolete0(obsoleteVer, false);

assert obsolete : this;
}
else
obsolete = false;
}

if (obsolete)
onMarkedObsolete();

return obsolete;
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public CacheObject unswap() throws IgniteCheckedException, GridCacheEntryRemovedException { @Override public CacheObject unswap() throws IgniteCheckedException, GridCacheEntryRemovedException {
return unswap(true); return unswap(true);
Expand Down Expand Up @@ -536,7 +571,7 @@ private void swap() throws IgniteCheckedException {
log.debug("Value did not change, skip write swap entry: " + this); log.debug("Value did not change, skip write swap entry: " + this);


if (cctx.swap().offheapEvictionEnabled()) if (cctx.swap().offheapEvictionEnabled())
cctx.swap().enableOffheapEviction(key()); cctx.swap().enableOffheapEviction(key(), partition());


return; return;
} }
Expand Down Expand Up @@ -2988,7 +3023,7 @@ protected boolean hasValueUnlocked() {
synchronized (this) { synchronized (this) {
checkObsolete(); checkObsolete();


if (isNew() || (!preload && deletedUnlocked())) { if ((isNew() && !cctx.swap().containsKey(key, partition())) || (!preload && deletedUnlocked())) {
long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime; long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime;


val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
Expand Down Expand Up @@ -3643,6 +3678,9 @@ private synchronized <K, V> CacheEntryImplEx<K, V> wrapVersionedWithValue() {
try { try {
if (F.isEmptyOrNulls(filter)) { if (F.isEmptyOrNulls(filter)) {
synchronized (this) { synchronized (this) {
if (obsoleteVersionExtras() != null)
return true;

CacheObject prev = saveValueForIndexUnlocked(); CacheObject prev = saveValueForIndexUnlocked();


if (!hasReaders() && markObsolete0(obsoleteVer, false)) { if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
Expand Down Expand Up @@ -3684,6 +3722,9 @@ private synchronized <K, V> CacheEntryImplEx<K, V> wrapVersionedWithValue() {
return false; return false;


synchronized (this) { synchronized (this) {
if (obsoleteVersionExtras() != null)
return true;

if (!v.equals(ver)) if (!v.equals(ver))
// Version has changed since entry passed the filter. Do it again. // Version has changed since entry passed the filter. Do it again.
continue; continue;
Expand Down Expand Up @@ -3768,6 +3809,13 @@ private void evictFailed(@Nullable CacheObject prevVal) throws IgniteCheckedExce
try { try {
if (!hasReaders() && markObsolete0(obsoleteVer, false)) { if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
if (!isStartVersion() && hasValueUnlocked()) { if (!isStartVersion() && hasValueUnlocked()) {
if (cctx.offheapTiered() && hasOffHeapPointer()) {
if (cctx.swap().offheapEvictionEnabled())
cctx.swap().enableOffheapEviction(key(), partition());

return null;
}

IgniteUuid valClsLdrId = null; IgniteUuid valClsLdrId = null;
IgniteUuid keyClsLdrId = null; IgniteUuid keyClsLdrId = null;


Expand Down
Expand Up @@ -126,9 +126,9 @@ public static long expireTime(byte[] bytes) {
* @return Version. * @return Version.
*/ */
public static GridCacheVersion version(byte[] bytes) { public static GridCacheVersion version(byte[] bytes) {
int off = VERSION_OFFSET; // Skip ttl, expire time. long off = BYTE_ARR_OFF + VERSION_OFFSET; // Skip ttl, expire time.


boolean verEx = bytes[off++] != 0; boolean verEx = UNSAFE.getByte(bytes, off++) != 0;


return U.readVersion(bytes, off, verEx); return U.readVersion(bytes, off, verEx);
} }
Expand Down Expand Up @@ -157,26 +157,6 @@ public static GridCacheVersion version(byte[] bytes) {
return new IgniteBiTuple<>(valBytes, type); return new IgniteBiTuple<>(valBytes, type);
} }


/**
* @param bytes Entry bytes.
* @return Value bytes offset.
*/
public static int valueOffset(byte[] bytes) {
assert bytes.length > 40 : bytes.length;

int off = VERSION_OFFSET; // Skip ttl, expire time.

boolean verEx = bytes[off++] != 0;

off += verEx ? VERSION_EX_SIZE : VERSION_SIZE;

off += 5; // Byte array flag + array size.

assert bytes.length >= off;

return off;
}

/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public byte[] valueBytes() { @Override public byte[] valueBytes() {
if (valBytes != null) { if (valBytes != null) {
Expand Down

0 comments on commit a7490a6

Please sign in to comment.