Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
sboikov committed Mar 5, 2015
1 parent 814fae3 commit 7253add
Show file tree
Hide file tree
Showing 38 changed files with 628 additions and 423 deletions.
Expand Up @@ -559,6 +559,16 @@ public GridIoMessageFactory(MessageFactory[] ext) {


break; break;


case 103:
msg = new GridCacheRawVersionedEntry<>();

break;

case 104:
msg = new GridCacheVersionEx();

break;

default: default:
if (ext != null) { if (ext != null) {
for (MessageFactory factory : ext) { for (MessageFactory factory : ext) {
Expand Down
Expand Up @@ -32,18 +32,6 @@ public interface CacheObject extends Message {
*/ */
@Nullable public <T> T value(CacheObjectContext ctx, boolean cpy); @Nullable public <T> T value(CacheObjectContext ctx, boolean cpy);


/**
* @param name Field name.
* @return Field value.
*/
@Nullable public <T> T getField(String name);

/**
* @param ctx Context.
* @throws IgniteCheckedException If failed.
*/
public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException;

/** /**
* @return {@code True} if value is byte array. * @return {@code True} if value is byte array.
*/ */
Expand All @@ -56,6 +44,14 @@ public interface CacheObject extends Message {
*/ */
public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException; public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException;


/**
* Prepares cache object for cache (e.g. copies user-provided object if needed).
*
* @param ctx Cache context.
* @return Instance to store in cache.
*/
public CacheObject prepareForCache(CacheObjectContext ctx);

/** /**
* @param ctx Context. * @param ctx Context.
* @param ldr Class loader. * @param ldr Class loader.
Expand All @@ -64,9 +60,8 @@ public interface CacheObject extends Message {
public void finishUnmarshal(CacheObjectContext ctx, ClassLoader ldr) throws IgniteCheckedException; public void finishUnmarshal(CacheObjectContext ctx, ClassLoader ldr) throws IgniteCheckedException;


/** /**
* @param ctx Cache context. * @param ctx Context.
* * @throws IgniteCheckedException If failed.
* @return Instance to store in cache.
*/ */
public CacheObject prepareForCache(CacheObjectContext ctx); public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException;
} }
Expand Up @@ -36,12 +36,6 @@ public abstract class CacheObjectAdapter implements CacheObject, Externalizable
/** */ /** */
protected byte[] valBytes; protected byte[] valBytes;


/** {@inheritDoc} */
@Nullable @Override public <T> T getField(String name) {
// TODO IGNITE-51.
return null;
}

/** /**
* @param ctx Context. * @param ctx Context.
* @return {@code True} need to copy value returned to user. * @return {@code True} need to copy value returned to user.
Expand Down
Expand Up @@ -2570,7 +2570,7 @@ public <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(@Nullable final C
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void putAllConflict(final Map<? extends K, GridCacheDrInfo<V>> drMap) @Override public void putAllConflict(final Map<KeyCacheObject, GridCacheDrInfo> drMap)
throws IgniteCheckedException { throws IgniteCheckedException {
if (F.isEmpty(drMap)) if (F.isEmpty(drMap))
return; return;
Expand All @@ -2591,7 +2591,7 @@ public <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(@Nullable final C
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public IgniteInternalFuture<?> putAllConflictAsync(final Map<? extends K, GridCacheDrInfo<V>> drMap) @Override public IgniteInternalFuture<?> putAllConflictAsync(final Map<KeyCacheObject, GridCacheDrInfo> drMap)
throws IgniteCheckedException { throws IgniteCheckedException {
if (F.isEmpty(drMap)) if (F.isEmpty(drMap))
return new GridFinishedFuture<Object>(ctx.kernalContext()); return new GridFinishedFuture<Object>(ctx.kernalContext());
Expand Down Expand Up @@ -3483,7 +3483,7 @@ public <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(@Nullable final C
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void removeAllConflict(final Map<? extends K, GridCacheVersion> drMap) @Override public void removeAllConflict(final Map<KeyCacheObject, GridCacheVersion> drMap)
throws IgniteCheckedException { throws IgniteCheckedException {
ctx.denyOnLocalRead(); ctx.denyOnLocalRead();


Expand All @@ -3504,7 +3504,7 @@ public <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0(@Nullable final C
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public IgniteInternalFuture<?> removeAllConflictAsync(final Map<? extends K, GridCacheVersion> drMap) @Override public IgniteInternalFuture<?> removeAllConflictAsync(final Map<KeyCacheObject, GridCacheVersion> drMap)
throws IgniteCheckedException { throws IgniteCheckedException {
ctx.denyOnLocalRead(); ctx.denyOnLocalRead();


Expand Down Expand Up @@ -3977,10 +3977,10 @@ protected void checkJta() throws IgniteCheckedException {
final ExpiryPolicy plc = plc0 != null ? plc0 : ctx.expiry(); final ExpiryPolicy plc = plc0 != null ? plc0 : ctx.expiry();


if (ctx.store().isLocalStore()) { if (ctx.store().isLocalStore()) {
IgniteDataLoaderImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex()); IgniteDataLoaderImpl ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex());


try { try {
ldr.updater(new GridDrDataLoadCacheUpdater<K, V>()); ldr.updater(new GridDrDataLoadCacheUpdater());


LocalStoreLoadClosure c = new LocalStoreLoadClosure(p, ldr, plc); LocalStoreLoadClosure c = new LocalStoreLoadClosure(p, ldr, plc);


Expand Down Expand Up @@ -4193,10 +4193,10 @@ public void localLoad(Collection<? extends K> keys,
}); });


if (ctx.store().isLocalStore()) { if (ctx.store().isLocalStore()) {
IgniteDataLoaderImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex()); IgniteDataLoaderImpl ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex());


try { try {
ldr.updater(new GridDrDataLoadCacheUpdater<K, V>()); ldr.updater(new GridDrDataLoadCacheUpdater());


LocalStoreLoadClosure c = new LocalStoreLoadClosure(null, ldr, plc0); LocalStoreLoadClosure c = new LocalStoreLoadClosure(null, ldr, plc0);


Expand Down Expand Up @@ -5760,7 +5760,7 @@ protected AsyncInOp(K key) {
/** /**
* @param keys Keys involved. * @param keys Keys involved.
*/ */
protected AsyncInOp(Collection<? extends K> keys) { protected AsyncInOp(Collection<?> keys) {
super(keys); super(keys);
} }


Expand Down Expand Up @@ -6229,7 +6229,7 @@ private class LocalStoreLoadClosure extends CIX3<KeyCacheObject, Object, GridCac
final IgniteBiPredicate<K, V> p; final IgniteBiPredicate<K, V> p;


/** */ /** */
final Collection<Map.Entry<K, V>> col; final Collection<GridCacheRawVersionedEntry> col;


/** */ /** */
final IgniteDataLoaderImpl<K, V> ldr; final IgniteDataLoaderImpl<K, V> ldr;
Expand Down Expand Up @@ -6272,20 +6272,18 @@ else if (ttl == CU.TTL_NOT_CHANGED)
ttl = 0; ttl = 0;
} }


GridCacheRawVersionedEntry e = new GridCacheRawVersionedEntry<>(ctx.toCacheKeyObject(key), GridCacheRawVersionedEntry e = new GridCacheRawVersionedEntry(ctx.toCacheKeyObject(key),
null,
ctx.toCacheObject(val), ctx.toCacheObject(val),
null,
ttl, ttl,
0, 0,
ver); ver);


e.marshal(ctx.marshaller()); e.prepareDirectMarshal(ctx.cacheObjectContext());


col.add(e); col.add(e);


if (col.size() == ldr.perNodeBufferSize()) { if (col.size() == ldr.perNodeBufferSize()) {
ldr.addData(col); ldr.addDataInternal(col);


col.clear(); col.clear();
} }
Expand All @@ -6296,7 +6294,7 @@ else if (ttl == CU.TTL_NOT_CHANGED)
*/ */
void onDone() { void onDone() {
if (!col.isEmpty()) if (!col.isEmpty())
ldr.addData(col); ldr.addDataInternal(col);
} }
} }


Expand Down
Expand Up @@ -124,19 +124,19 @@ public class GridCacheContext<K, V> implements Externalizable {
private CacheDataStructuresManager<K, V> dataStructuresMgr; private CacheDataStructuresManager<K, V> dataStructuresMgr;


/** Eager TTL manager. */ /** Eager TTL manager. */
private GridCacheTtlManager<K, V> ttlMgr; private GridCacheTtlManager ttlMgr;


/** Store manager. */ /** Store manager. */
private GridCacheStoreManager storeMgr; private GridCacheStoreManager storeMgr;


/** Replication manager. */ /** Replication manager. */
private GridCacheDrManager<K, V> drMgr; private GridCacheDrManager drMgr;


/** Serialization manager. */ /** Serialization manager. */
private IgniteCacheSerializationManager<K, V> serMgr; private IgniteCacheSerializationManager<K, V> serMgr;


/** JTA manager. */ /** JTA manager. */
private CacheJtaManagerAdapter<K, V> jtaMgr; private CacheJtaManagerAdapter jtaMgr;


/** Managers. */ /** Managers. */
private List<GridCacheManager<K, V>> mgrs = new LinkedList<>(); private List<GridCacheManager<K, V>> mgrs = new LinkedList<>();
Expand Down Expand Up @@ -238,9 +238,9 @@ public GridCacheContext(
CacheContinuousQueryManager<K, V> contQryMgr, CacheContinuousQueryManager<K, V> contQryMgr,
GridCacheAffinityManager<K, V> affMgr, GridCacheAffinityManager<K, V> affMgr,
CacheDataStructuresManager<K, V> dataStructuresMgr, CacheDataStructuresManager<K, V> dataStructuresMgr,
GridCacheTtlManager<K, V> ttlMgr, GridCacheTtlManager ttlMgr,
GridCacheDrManager<K, V> drMgr, GridCacheDrManager drMgr,
CacheJtaManagerAdapter<K, V> jtaMgr) { CacheJtaManagerAdapter jtaMgr) {
assert ctx != null; assert ctx != null;
assert sharedCtx != null; assert sharedCtx != null;
assert cacheCfg != null; assert cacheCfg != null;
Expand Down Expand Up @@ -839,7 +839,7 @@ public IgniteTxManager tm() {
/** /**
* @return Lock order manager. * @return Lock order manager.
*/ */
public GridCacheVersionManager<K, V> versions() { public GridCacheVersionManager versions() {
return sharedCtx.versions(); return sharedCtx.versions();
} }


Expand Down Expand Up @@ -930,21 +930,21 @@ public CacheDataStructuresManager<K, V> dataStructures() {
/** /**
* @return DR manager. * @return DR manager.
*/ */
public GridCacheDrManager<K, V> dr() { public GridCacheDrManager dr() {
return drMgr; return drMgr;
} }


/** /**
* @return TTL manager. * @return TTL manager.
*/ */
public GridCacheTtlManager<K, V> ttl() { public GridCacheTtlManager ttl() {
return ttlMgr; return ttlMgr;
} }


/** /**
* @return JTA manager. * @return JTA manager.
*/ */
public CacheJtaManagerAdapter<K, V> jta() { public CacheJtaManagerAdapter jta() {
return jtaMgr; return jtaMgr;
} }


Expand All @@ -958,7 +958,7 @@ public boolean putIfAbsentFilter(@Nullable CacheEntryPredicate[] p) {


for (CacheEntryPredicate p0 : p) { for (CacheEntryPredicate p0 : p) {
if ((p0 instanceof CacheEntrySerializablePredicate) && if ((p0 instanceof CacheEntrySerializablePredicate) &&
((CacheEntrySerializablePredicate) p0).predicate() instanceof CacheEntryPredicateNoValue) ((CacheEntrySerializablePredicate)p0).predicate() instanceof CacheEntryPredicateNoValue)
return true; return true;
} }


Expand Down Expand Up @@ -1589,15 +1589,15 @@ public boolean conflictNeedResolve() {
* *
* @param oldEntry Old entry. * @param oldEntry Old entry.
* @param newEntry New entry. * @param newEntry New entry.
* @param atomicVerComparator Whether to use atomic version comparator. * @param atomicVerComp Whether to use atomic version comparator.
* @return Conflict resolution result. * @return Conflict resolution result.
* @throws IgniteCheckedException In case of exception. * @throws IgniteCheckedException In case of exception.
*/ */
public GridCacheVersionConflictContext<K, V> conflictResolve(GridCacheVersionedEntryEx<K, V> oldEntry, public GridCacheVersionConflictContext<K, V> conflictResolve(GridCacheVersionedEntryEx<K, V> oldEntry,
GridCacheVersionedEntryEx<K, V> newEntry, boolean atomicVerComparator) throws IgniteCheckedException { GridCacheVersionedEntryEx<K, V> newEntry, boolean atomicVerComp) throws IgniteCheckedException {
assert conflictRslvr != null : "Should not reach this place."; assert conflictRslvr != null : "Should not reach this place.";


GridCacheVersionConflictContext<K, V> ctx = conflictRslvr.resolve(oldEntry, newEntry, atomicVerComparator); GridCacheVersionConflictContext<K, V> ctx = conflictRslvr.resolve(oldEntry, newEntry, atomicVerComp);


if (ctx.isManualResolve()) if (ctx.isManualResolve())
drMgr.onReceiveCacheConflictResolved(ctx.isUseNew(), ctx.isUseOld(), ctx.isMerge()); drMgr.onReceiveCacheConflictResolved(ctx.isUseNew(), ctx.isUseOld(), ctx.isMerge());
Expand Down

0 comments on commit 7253add

Please sign in to comment.