Skip to content

Commit

Permalink
IGNITE-141 - Marshallers refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Valentin Kulichenko committed Feb 28, 2015
1 parent 706938d commit 61908d6
Show file tree
Hide file tree
Showing 20 changed files with 403 additions and 120 deletions.
Expand Up @@ -24,8 +24,7 @@
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.marshaller.*;
import org.apache.ignite.marshaller.optimized.*;
import org.apache.ignite.resources.*;
import org.jetbrains.annotations.*;

import java.io.*;
Expand Down Expand Up @@ -91,8 +90,9 @@ public class CacheRendezvousAffinityFunction implements CacheAffinityFunction, E
/** Hash ID resolver. */
private CacheAffinityNodeHashResolver hashIdRslvr = new CacheAffinityNodeAddressHashResolver();

/** Marshaller. */
private Marshaller marshaller = new OptimizedMarshaller(false);
/** Ignite instance. */
@IgniteInstanceResource
private Ignite ignite;

/**
* Empty constructor with all defaults.
Expand Down Expand Up @@ -291,7 +291,7 @@ public List<ClusterNode> assignPartition(int part, List<ClusterNode> nodes, int
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();

byte[] nodeHashBytes = marshaller.marshal(nodeHash);
byte[] nodeHashBytes = ignite.configuration().getMarshaller().marshal(nodeHash);

out.write(nodeHashBytes, 0, nodeHashBytes.length); // Avoid IOException.
out.write(U.intToBytes(part), 0, 4); // Avoid IOException.
Expand Down
Expand Up @@ -678,6 +678,8 @@ public void start(final IgniteConfiguration cfg,
igfsExecSvc,
restExecSvc);

cfg.getMarshaller().setContext(new MarshallerContextImpl(ctx));

cluster = new IgniteClusterImpl(ctx);

U.onGridStart();
Expand Down Expand Up @@ -2717,4 +2719,89 @@ protected Object readResolve() throws ObjectStreamException {
@Override public String toString() {
return S.toString(IgniteKernal.class, this);
}

/**
*/
private static class MarshallerContextImpl implements MarshallerContext {
private final GridKernalContext ctx;

/** */
private GridCacheAdapter<Integer, String> cache;

/**
* @param ctx Kernal context.
*/
private MarshallerContextImpl(GridKernalContext ctx) {
this.ctx = ctx;
}

/** {@inheritDoc} */
@Override public void registerClass(int id, String clsName) {
if (cache == null)
cache = ctx.cache().marshallerCache();

// TODO: IGNITE-141 - Do not create thread.
Thread t = new Thread(new MarshallerCacheUpdater(cache, id, clsName));

t.start();

try {
t.join();
}
catch (InterruptedException e) {
throw new IgniteException(e);
}
}

/** {@inheritDoc} */
@Override public String className(int id) {
if (cache == null)
cache = ctx.cache().marshallerCache();

try {
return cache.get(id);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
}

/**
*/
private static class MarshallerCacheUpdater implements Runnable {
/** */
private final GridCacheAdapter<Integer, String> cache;

/** */
private final int typeId;

/** */
private final String clsName;

/**
* @param cache Cache.
* @param typeId Type ID.
* @param clsName Class name.
*/
private MarshallerCacheUpdater(GridCacheAdapter<Integer, String> cache, int typeId, String clsName) {
this.cache = cache;
this.typeId = typeId;
this.clsName = clsName;
}

/** {@inheritDoc} */
@Override public void run() {
try {
String old = cache.putIfAbsent(typeId, clsName);

// TODO: IGNITE-141 - proper message
if (old != null && !old.equals(clsName))
throw new IgniteException("Collision.");
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
}
}
Expand Up @@ -1693,11 +1693,15 @@ else if (marsh instanceof OptimizedMarshaller && !U.isHotSpot()) {
"\" because it is reserved for internal purposes.");

if (CU.isUtilityCache(ccfg.getName()))
throw new IgniteCheckedException("Cache name cannot start with \"" + CU.UTILITY_CACHE_NAME +
"\" because this prefix is reserved for internal purposes.");
throw new IgniteCheckedException("Cache name cannot be \"" + CU.UTILITY_CACHE_NAME +
"\" because it is reserved for internal purposes.");

if (CU.isMarshallerCache(ccfg.getName()))
throw new IgniteCheckedException("Cache name cannot be \"" + CU.MARSH_CACHE_NAME +
"\" because it is reserved for internal purposes.");
}

int addCacheCnt = 1; // Always add utility cache.
int addCacheCnt = 2; // Always add marshaller and utility caches.

if (hasHadoop)
addCacheCnt++;
Expand All @@ -1707,7 +1711,7 @@ else if (marsh instanceof OptimizedMarshaller && !U.isHotSpot()) {

copies = new CacheConfiguration[cacheCfgs.length + addCacheCnt];

int cloneIdx = 1;
int cloneIdx = 2;

if (hasHadoop)
copies[cloneIdx++] = CU.hadoopSystemCache();
Expand All @@ -1719,7 +1723,7 @@ else if (marsh instanceof OptimizedMarshaller && !U.isHotSpot()) {
copies[cloneIdx++] = new CacheConfiguration(ccfg);
}
else {
int cacheCnt = 1; // Always add utility cache.
int cacheCnt = 2; // Always add marshaller and utility caches.

if (hasHadoop)
cacheCnt++;
Expand All @@ -1729,7 +1733,7 @@ else if (marsh instanceof OptimizedMarshaller && !U.isHotSpot()) {

copies = new CacheConfiguration[cacheCnt];

int cacheIdx = 1;
int cacheIdx = 2;

if (hasHadoop)
copies[cacheIdx++] = CU.hadoopSystemCache();
Expand All @@ -1738,24 +1742,14 @@ else if (marsh instanceof OptimizedMarshaller && !U.isHotSpot()) {
copies[cacheIdx] = atomicsSystemCache(cfg.getAtomicConfiguration(), clientDisco);
}

// Always add utility cache.
copies[0] = utilitySystemCache(clientDisco);
// Always add marshaller and utility caches.
copies[0] = marshallerSystemCache(clientDisco);
copies[1] = utilitySystemCache(clientDisco);

myCfg.setCacheConfiguration(copies);

myCfg.setCacheSanityCheckEnabled(cfg.isCacheSanityCheckEnabled());

try {
// Use reflection to avoid loading undesired classes.
Class helperCls = Class.forName("org.apache.ignite.util.GridConfigurationHelper");

helperCls.getMethod("overrideConfiguration", IgniteConfiguration.class, Properties.class,
String.class, IgniteLogger.class).invoke(helperCls, myCfg, System.getProperties(), name, log);
}
catch (Exception ignored) {
// No-op.
}

// Ensure that SPIs support multiple grid instances, if required.
if (!startCtx.single()) {
ensureMultiInstanceSupport(deploySpi);
Expand Down Expand Up @@ -1919,6 +1913,30 @@ private IgniteLogger initLogger(@Nullable IgniteLogger cfgLog, UUID nodeId) thro
}
}

/**
* Creates marshaller system cache configuration.
*
* @param client If {@code true} creates client-only cache configuration.
* @return Marshaller system cache configuration.
*/
private static CacheConfiguration marshallerSystemCache(boolean client) {
CacheConfiguration cache = new CacheConfiguration();

cache.setName(CU.MARSH_CACHE_NAME);
cache.setCacheMode(REPLICATED);
cache.setAtomicityMode(TRANSACTIONAL);
cache.setSwapEnabled(false);
cache.setQueryIndexEnabled(false);
cache.setPreloadMode(SYNC);
cache.setWriteSynchronizationMode(FULL_SYNC);
cache.setAffinity(new CacheRendezvousAffinityFunction(false, 100));

if (client)
cache.setDistributionMode(CLIENT_ONLY);

return cache;
}

/**
* Creates utility system cache configuration.
*
Expand Down
Expand Up @@ -308,7 +308,7 @@ public GridCacheContext(
else
cacheId = 1;

sys = CU.UTILITY_CACHE_NAME.equals(cacheName);
sys = CU.MARSH_CACHE_NAME.equals(cacheName) || CU.UTILITY_CACHE_NAME.equals(cacheName);

plc = sys ? UTILITY_CACHE_POOL : SYSTEM_POOL;

Expand Down
Expand Up @@ -57,14 +57,14 @@

import static org.apache.ignite.IgniteSystemProperties.*;
import static org.apache.ignite.cache.CacheAtomicityMode.*;
import static org.apache.ignite.configuration.CacheConfiguration.*;
import static org.apache.ignite.cache.CacheDistributionMode.*;
import static org.apache.ignite.cache.CacheMode.*;
import static org.apache.ignite.cache.CachePreloadMode.*;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
import static org.apache.ignite.configuration.CacheConfiguration.*;
import static org.apache.ignite.configuration.DeploymentMode.*;
import static org.apache.ignite.internal.IgniteNodeAttributes.*;
import static org.apache.ignite.internal.IgniteComponentType.*;
import static org.apache.ignite.internal.IgniteNodeAttributes.*;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.*;
import static org.apache.ignite.transactions.TransactionIsolation.*;

Expand Down Expand Up @@ -571,8 +571,8 @@ private void cleanup(CacheConfiguration cfg, @Nullable Object rsrc, boolean near
if (IgniteComponentType.HADOOP.inClassPath())
sysCaches.add(CU.SYS_CACHE_HADOOP_MR);

sysCaches.add(CU.MARSH_CACHE_NAME);
sysCaches.add(CU.UTILITY_CACHE_NAME);

sysCaches.add(CU.ATOMICS_CACHE_NAME);

CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration();
Expand Down Expand Up @@ -1575,6 +1575,13 @@ public <K, V> GridCache<K, V> publicCache() {
return publicCache(null);
}

/**
* @return Marshaller system cache.
*/
public GridCacheAdapter<Integer, String> marshallerCache() {
return internalCache(CU.MARSH_CACHE_NAME);
}

/**
* Gets utility cache.
*
Expand Down
Expand Up @@ -69,6 +69,9 @@ public class GridCacheUtils {
/** Atomics system cache name. */
public static final String ATOMICS_CACHE_NAME = "ignite-atomics-sys-cache";

/** Marshaller system cache name. */
public static final String MARSH_CACHE_NAME = "ignite-marshaller-sys-cache";

/** Default mask name. */
private static final String DEFAULT_MASK_NAME = "<default>";

Expand Down Expand Up @@ -1519,15 +1522,23 @@ public static CacheConfiguration hadoopSystemCache() {

/**
* @param cacheName Cache name.
* @return {@code True} if this is security system cache.
* @return {@code True} if this is marshaller system cache.
*/
public static boolean isMarshallerCache(String cacheName) {
return MARSH_CACHE_NAME.equals(cacheName);
}

/**
* @param cacheName Cache name.
* @return {@code True} if this is utility system cache.
*/
public static boolean isUtilityCache(String cacheName) {
return UTILITY_CACHE_NAME.equals(cacheName);
}

/**
* @param cacheName Cache name.
* @return {@code True} if this is security system cache.
* @return {@code True} if this is atomics system cache.
*/
public static boolean isAtomicsCache(String cacheName) {
return ATOMICS_CACHE_NAME.equals(cacheName);
Expand All @@ -1538,7 +1549,8 @@ public static boolean isAtomicsCache(String cacheName) {
* @return {@code True} if system cache.
*/
public static boolean isSystemCache(String cacheName) {
return isUtilityCache(cacheName) || isHadoopSystemCache(cacheName) || isAtomicsCache(cacheName);
return isMarshallerCache(cacheName) || isUtilityCache(cacheName) || isHadoopSystemCache(cacheName) ||
isAtomicsCache(cacheName);
}

/**
Expand Down
Expand Up @@ -809,6 +809,25 @@ private void drainQueue() throws InterruptedException {
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
try {
if (!CU.isMarshallerCache(cctx.name())) {
if (log.isDebugEnabled())
log.debug("Waiting for marshaller cache preload [cacheName=" + cctx.name() + ']');

try {
cctx.kernalContext().cache().marshallerCache().preloader().syncFuture().get();
}
catch (IgniteInterruptedCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to wait for marshaller cache preload future (grid is stopping): " +
"[cacheName=" + cctx.name() + ']');

return;
}
catch (IgniteCheckedException e) {
throw new Error("Ordered preload future should never fail: " + e.getMessage(), e);
}
}

int preloadOrder = cctx.config().getPreloadOrder();

if (preloadOrder > 0) {
Expand Down
Expand Up @@ -17,7 +17,6 @@

package org.apache.ignite.internal.processors.rest.client.message;

import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.typedef.internal.*;

import java.io.*;
Expand Down
Expand Up @@ -33,6 +33,14 @@ public abstract class AbstractMarshaller implements Marshaller {
/** Default initial buffer size for the {@link GridByteArrayOutputStream}. */
public static final int DFLT_BUFFER_SIZE = 512;

/** Context. */
protected MarshallerContext ctx;

/** {@inheritDoc} */
@Override public void setContext(MarshallerContext ctx) {
this.ctx = ctx;
}

/** {@inheritDoc} */
@Override public byte[] marshal(@Nullable Object obj) throws IgniteCheckedException {
GridByteArrayOutputStream out = null;
Expand Down
Expand Up @@ -65,6 +65,13 @@
* For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a>
*/
public interface Marshaller {
/**
* Sets marshaller context.
*
* @param ctx Marshaller context.
*/
public void setContext(MarshallerContext ctx);

/**
* Marshals object to the output stream. This method should not close
* given output stream.
Expand Down

0 comments on commit 61908d6

Please sign in to comment.