Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
sboikov committed Jan 26, 2015
1 parent c10a6c7 commit c1c60c7
Show file tree
Hide file tree
Showing 43 changed files with 681 additions and 566 deletions.
Expand Up @@ -54,7 +54,7 @@ public static void main(String[] args) throws IgniteCheckedException {
final String seqName = UUID.randomUUID().toString(); final String seqName = UUID.randomUUID().toString();


// Initialize atomic sequence in grid. // Initialize atomic sequence in grid.
IgniteAtomicSequence seq = g.cache(CACHE_NAME).dataStructures().atomicSequence(seqName, 0, true); IgniteAtomicSequence seq = g.atomicSequence(seqName, 0, true);


// First value of atomic sequence on this node. // First value of atomic sequence on this node.
long firstVal = seq.get(); long firstVal = seq.get();
Expand Down Expand Up @@ -95,8 +95,7 @@ private static class SequenceClosure implements IgniteRunnable {
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override public void run() { @Override public void run() {
try { try {
IgniteAtomicSequence seq = Ignition.ignite().cache(cacheName).dataStructures(). IgniteAtomicSequence seq = Ignition.ignite().atomicSequence(seqName, 0, true);
atomicSequence(seqName, 0, true);


for (int i = 0; i < RETRIES; i++) for (int i = 0; i < RETRIES; i++)
System.out.println("Sequence [currentValue=" + seq.get() + ", afterIncrement=" + System.out.println("Sequence [currentValue=" + seq.get() + ", afterIncrement=" +
Expand Down
Expand Up @@ -19,6 +19,8 @@


import org.apache.ignite.*; import org.apache.ignite.*;


import java.io.*;

/** /**
* This interface provides a rich API for working with distributed atomic sequence. * This interface provides a rich API for working with distributed atomic sequence.
* <p> * <p>
Expand Down Expand Up @@ -52,12 +54,12 @@
* <h1 class="header">Creating Distributed Atomic Sequence</h1> * <h1 class="header">Creating Distributed Atomic Sequence</h1>
* Instance of distributed atomic sequence can be created by calling the following method: * Instance of distributed atomic sequence can be created by calling the following method:
* <ul> * <ul>
* <li>{@link org.apache.ignite.cache.datastructures.CacheDataStructures#atomicSequence(String, long, boolean)}</li> * <li>{@link Ignite#atomicSequence(String, long, boolean)}</li>
* </ul> * </ul>
* @see org.apache.ignite.cache.datastructures.CacheDataStructures#atomicSequence(String, long, boolean) * @see org.apache.ignite.cache.datastructures.CacheDataStructures#atomicSequence(String, long, boolean)
* @see org.apache.ignite.cache.datastructures.CacheDataStructures#removeAtomicSequence(String) * @see org.apache.ignite.cache.datastructures.CacheDataStructures#removeAtomicSequence(String)
*/ */
public interface IgniteAtomicSequence { public interface IgniteAtomicSequence extends Closeable {
/** /**
* Name of atomic sequence. * Name of atomic sequence.
* *
Expand Down Expand Up @@ -127,4 +129,9 @@ public interface IgniteAtomicSequence {
* @return {@code true} if atomic sequence was removed from cache, {@code false} otherwise. * @return {@code true} if atomic sequence was removed from cache, {@code false} otherwise.
*/ */
public boolean removed(); public boolean removed();

/**
* Removes atomic sequence.
*/
@Override void close();
} }
Expand Up @@ -46,9 +46,6 @@
*/ */
@SuppressWarnings("RedundantFieldInitialization") @SuppressWarnings("RedundantFieldInitialization")
public class CacheConfiguration extends MutableConfiguration { public class CacheConfiguration extends MutableConfiguration {
/** Default atomic sequence reservation size. */
public static final int DFLT_ATOMIC_SEQUENCE_RESERVE_SIZE = 1000;

/** Default size of preload thread pool. */ /** Default size of preload thread pool. */
public static final int DFLT_PRELOAD_THREAD_POOL_SIZE = 2; public static final int DFLT_PRELOAD_THREAD_POOL_SIZE = 2;


Expand Down Expand Up @@ -179,9 +176,6 @@ public class CacheConfiguration extends MutableConfiguration {
/** Cache name. */ /** Cache name. */
private String name; private String name;


/** Default batch size for all cache's sequences. */
private int seqReserveSize = DFLT_ATOMIC_SEQUENCE_RESERVE_SIZE;

/** Preload thread pool size. */ /** Preload thread pool size. */
private int preloadPoolSize = DFLT_PRELOAD_THREAD_POOL_SIZE; private int preloadPoolSize = DFLT_PRELOAD_THREAD_POOL_SIZE;


Expand Down Expand Up @@ -414,7 +408,6 @@ public CacheConfiguration(CompleteConfiguration cfg) {
qryCfg = cc.getQueryConfiguration(); qryCfg = cc.getQueryConfiguration();
qryIdxEnabled = cc.isQueryIndexEnabled(); qryIdxEnabled = cc.isQueryIndexEnabled();
readFromBackup = cc.isReadFromBackup(); readFromBackup = cc.isReadFromBackup();
seqReserveSize = cc.getAtomicSequenceReserveSize();
startSize = cc.getStartSize(); startSize = cc.getStartSize();
storeFactory = cc.getCacheStoreFactory(); storeFactory = cc.getCacheStoreFactory();
storeValBytes = cc.isStoreValueBytes(); storeValBytes = cc.isStoreValueBytes();
Expand Down Expand Up @@ -1348,31 +1341,6 @@ public void setCloner(CacheCloner cloner) {
this.cloner = cloner; this.cloner = cloner;
} }


/**
* Gets default number of sequence values reserved for {@link org.apache.ignite.IgniteAtomicSequence} instances. After
* a certain number has been reserved, consequent increments of sequence will happen locally,
* without communication with other nodes, until the next reservation has to be made.
* <p>
* Default value is {@link #DFLT_ATOMIC_SEQUENCE_RESERVE_SIZE}.
*
* @return Atomic sequence reservation size.
*/
public int getAtomicSequenceReserveSize() {
return seqReserveSize;
}

/**
* Sets default number of sequence values reserved for {@link org.apache.ignite.IgniteAtomicSequence} instances. After a certain
* number has been reserved, consequent increments of sequence will happen locally, without communication with other
* nodes, until the next reservation has to be made.
*
* @param seqReserveSize Atomic sequence reservation size.
* @see #getAtomicSequenceReserveSize()
*/
public void setAtomicSequenceReserveSize(int seqReserveSize) {
this.seqReserveSize = seqReserveSize;
}

/** /**
* Gets size of preloading thread pool. Note that size serves as a hint and implementation * Gets size of preloading thread pool. Note that size serves as a hint and implementation
* may create more threads for preloading than specified here (but never less threads). * may create more threads for preloading than specified here (but never less threads).
Expand Down
Expand Up @@ -17,21 +17,32 @@


package org.apache.ignite.configuration; package org.apache.ignite.configuration;


import org.apache.ignite.IgniteAtomicSequence;
import org.apache.ignite.cache.*;

import static org.apache.ignite.cache.CacheMode.*;

/** /**
* * Configuration for atomic data structures.
*/ */
public class IgniteAtomicConfiguration { public class IgniteAtomicConfiguration {
/** */ /** */
public static final int DFLT_BACKUPS = 0; public static final int DFLT_BACKUPS = 0;


/** */ /** */
public static final boolean DFLT_REPLICATED = false; public static final CacheMode DFLT_CACHE_MODE = PARTITIONED;


/** */ /** Default atomic sequence reservation size. */
private int backups = DFLT_BACKUPS; public static final int DFLT_ATOMIC_SEQUENCE_RESERVE_SIZE = 1000;

/** Default batch size for all cache's sequences. */
private int seqReserveSize = DFLT_ATOMIC_SEQUENCE_RESERVE_SIZE;

/** Cache mode. */
private CacheMode cacheMode = DFLT_CACHE_MODE;


/** */ /** */
private boolean replicated = DFLT_REPLICATED; private int backups = DFLT_BACKUPS;


public int getBackups() { public int getBackups() {
return backups; return backups;
Expand All @@ -41,11 +52,36 @@ public void setBackups(int backups) {
this.backups = backups; this.backups = backups;
} }


public boolean isReplicated() { public CacheMode getCacheMode() {
return replicated; return cacheMode;
}

public void setCacheMode(CacheMode cacheMode) {
this.cacheMode = cacheMode;
}

/**
* Gets default number of sequence values reserved for {@link IgniteAtomicSequence} instances. After
* a certain number has been reserved, consequent increments of sequence will happen locally,
* without communication with other nodes, until the next reservation has to be made.
* <p>
* Default value is {@link #DFLT_ATOMIC_SEQUENCE_RESERVE_SIZE}.
*
* @return Atomic sequence reservation size.
*/
public int getAtomicSequenceReserveSize() {
return seqReserveSize;
} }


public void setReplicated(boolean replicated) { /**
this.replicated = replicated; * Sets default number of sequence values reserved for {@link IgniteAtomicSequence} instances. After a certain
* number has been reserved, consequent increments of sequence will happen locally, without communication with other
* nodes, until the next reservation has to be made.
*
* @param seqReserveSize Atomic sequence reservation size.
* @see #getAtomicSequenceReserveSize()
*/
public void setAtomicSequenceReserveSize(int seqReserveSize) {
this.seqReserveSize = seqReserveSize;
} }
} }
Expand Up @@ -1425,6 +1425,7 @@ private void start0(GridStartContext startCtx) throws IgniteCheckedException {
myCfg.setPluginConfigurations(cfg.getPluginConfigurations()); myCfg.setPluginConfigurations(cfg.getPluginConfigurations());
myCfg.setTransactionsConfiguration(new TransactionsConfiguration(cfg.getTransactionsConfiguration())); myCfg.setTransactionsConfiguration(new TransactionsConfiguration(cfg.getTransactionsConfiguration()));
myCfg.setQueryConfiguration(cfg.getQueryConfiguration()); myCfg.setQueryConfiguration(cfg.getQueryConfiguration());
myCfg.setAtomicConfiguration(cfg.getAtomicConfiguration());


ClientConnectionConfiguration clientCfg = cfg.getClientConnectionConfiguration(); ClientConnectionConfiguration clientCfg = cfg.getClientConnectionConfiguration();


Expand Down Expand Up @@ -2125,13 +2126,21 @@ private static CacheConfiguration atomicsSystemCache(IgniteAtomicConfiguration c
CacheConfiguration ccfg = new CacheConfiguration(); CacheConfiguration ccfg = new CacheConfiguration();


ccfg.setName(CU.ATOMICS_CACHE_NAME); ccfg.setName(CU.ATOMICS_CACHE_NAME);
ccfg.setAtomicityMode(TRANSACTIONAL);
ccfg.setSwapEnabled(false);
ccfg.setQueryIndexEnabled(false);
ccfg.setPreloadMode(SYNC);
ccfg.setWriteSynchronizationMode(FULL_SYNC);


ccfg.setCacheMode(cfg.isReplicated() ? REPLICATED : PARTITIONED); ccfg.setCacheMode(cfg.getCacheMode());


if (!cfg.isReplicated()) if (cfg.getCacheMode() == PARTITIONED) {
ccfg.setBackups(cfg.getBackups()); ccfg.setBackups(cfg.getBackups());


ccfg.setDistributionMode(client ? NEAR_ONLY : NEAR_PARTITIONED); ccfg.setDistributionMode(client ? NEAR_ONLY : NEAR_PARTITIONED);
}
else
ccfg.setDistributionMode(client ? NEAR_ONLY : PARTITIONED_ONLY);


return ccfg; return ccfg;
} }
Expand Down
Expand Up @@ -752,6 +752,7 @@ public void start(final IgniteConfiguration cfg, ExecutorService utilityCachePoo
IgniteComponentType.HADOOP.create(ctx, true): // No-op when peer class loading is enabled. IgniteComponentType.HADOOP.create(ctx, true): // No-op when peer class loading is enabled.
IgniteComponentType.HADOOP.createIfInClassPath(ctx, cfg.getHadoopConfiguration() != null)), attrs); IgniteComponentType.HADOOP.createIfInClassPath(ctx, cfg.getHadoopConfiguration() != null)), attrs);
startProcessor(ctx, new GridServiceProcessor(ctx), attrs); startProcessor(ctx, new GridServiceProcessor(ctx), attrs);
startProcessor(ctx, new CacheDataStructuresProcessor(ctx), attrs);


// Start plugins. // Start plugins.
for (PluginProvider provider : ctx.plugins().allProviders()) { for (PluginProvider provider : ctx.plugins().allProviders()) {
Expand Down Expand Up @@ -2235,18 +2236,18 @@ private boolean isDaemon() {


/** /**
* Whether or not SMTP is configured. Note that SMTP is considered configured if * Whether or not SMTP is configured. Note that SMTP is considered configured if
* SMTP host is provided in configuration (see {@link org.apache.ignite.configuration.IgniteConfiguration#getSmtpHost()}. * SMTP host is provided in configuration (see {@link IgniteConfiguration#getSmtpHost()}.
* <p> * <p>
* If SMTP is not configured all emails notifications will be disabled. * If SMTP is not configured all emails notifications will be disabled.
* *
* @return {@code True} if SMTP is configured - {@code false} otherwise. * @return {@code True} if SMTP is configured - {@code false} otherwise.
* @see org.apache.ignite.configuration.IgniteConfiguration#getSmtpFromEmail() * @see IgniteConfiguration#getSmtpFromEmail()
* @see org.apache.ignite.configuration.IgniteConfiguration#getSmtpHost() * @see IgniteConfiguration#getSmtpHost()
* @see org.apache.ignite.configuration.IgniteConfiguration#getSmtpPassword() * @see IgniteConfiguration#getSmtpPassword()
* @see org.apache.ignite.configuration.IgniteConfiguration#getSmtpPort() * @see IgniteConfiguration#getSmtpPort()
* @see org.apache.ignite.configuration.IgniteConfiguration#getSmtpUsername() * @see IgniteConfiguration#getSmtpUsername()
* @see org.apache.ignite.configuration.IgniteConfiguration#isSmtpSsl() * @see IgniteConfiguration#isSmtpSsl()
* @see org.apache.ignite.configuration.IgniteConfiguration#isSmtpStartTls() * @see IgniteConfiguration#isSmtpStartTls()
* @see #sendAdminEmailAsync(String, String, boolean) * @see #sendAdminEmailAsync(String, String, boolean)
*/ */
@Override public boolean isSmtpEnabled() { @Override public boolean isSmtpEnabled() {
Expand Down Expand Up @@ -2529,7 +2530,7 @@ private Iterable<Object> lifecycleAwares(IgniteConfiguration cfg) {
* completes ok and its result value is {@code true} email was successfully sent. In all * completes ok and its result value is {@code true} email was successfully sent. In all
* other cases - sending process has failed. * other cases - sending process has failed.
* @see #isSmtpEnabled() * @see #isSmtpEnabled()
* @see org.apache.ignite.configuration.IgniteConfiguration#getAdminEmails() * @see IgniteConfiguration#getAdminEmails()
*/ */
@Override public IgniteFuture<Boolean> sendAdminEmailAsync(String subj, String body, boolean html) { @Override public IgniteFuture<Boolean> sendAdminEmailAsync(String subj, String body, boolean html) {
A.notNull(subj, "subj"); A.notNull(subj, "subj");
Expand Down Expand Up @@ -3229,37 +3230,72 @@ private boolean runNextNodeCallable(final ConcurrentLinkedQueue<GridNodeCallable
/** {@inheritDoc} */ /** {@inheritDoc} */
@Nullable @Override public IgniteAtomicSequence atomicSequence(String name, long initVal, boolean create) @Nullable @Override public IgniteAtomicSequence atomicSequence(String name, long initVal, boolean create)
throws IgniteCheckedException { throws IgniteCheckedException {
return null; guard();

try {
return ctx.dataStructures().sequence(name, initVal, create);
}
finally {
unguard();
}
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Nullable @Override public IgniteAtomicLong atomicLong(String name, long initVal, boolean create) @Nullable @Override public IgniteAtomicLong atomicLong(String name, long initVal, boolean create)
throws IgniteCheckedException { throws IgniteCheckedException {
return null; guard();

try {
return ctx.dataStructures().atomicLong(name, initVal, create);
}
finally {
unguard();
}
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Nullable @Override public <T> IgniteAtomicReference<T> atomicReference(String name, @Nullable @Override public <T> IgniteAtomicReference<T> atomicReference(String name,
@Nullable T initVal, @Nullable T initVal,
boolean create) boolean create)
throws IgniteCheckedException { throws IgniteCheckedException {
return null; guard();

try {
return ctx.dataStructures().atomicReference(name, initVal, create);
}
finally {
unguard();
}
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Nullable @Override public <T, S> IgniteAtomicStamped<T, S> atomicStamped(String name, @Nullable @Override public <T, S> IgniteAtomicStamped<T, S> atomicStamped(String name,
@Nullable T initVal, @Nullable T initVal,
@Nullable S initStamp, @Nullable S initStamp,
boolean create) throws IgniteCheckedException { boolean create) throws IgniteCheckedException {
return null; guard();

try {
return ctx.dataStructures().atomicStamped(name, initVal, initStamp, create);
}
finally {
unguard();
}
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Nullable @Override public IgniteCountDownLatch countDownLatch(String name, @Nullable @Override public IgniteCountDownLatch countDownLatch(String name,
int cnt, int cnt,
boolean autoDel, boolean autoDel,
boolean create) throws IgniteCheckedException { boolean create) throws IgniteCheckedException {
return null; guard();

try {
return ctx.dataStructures().countDownLatch(name, cnt, autoDel, create);
}
finally {
unguard();
}
} }


/** /**
Expand All @@ -3270,7 +3306,8 @@ private boolean runNextNodeCallable(final ConcurrentLinkedQueue<GridNodeCallable
* @return Created component. * @return Created component.
* @throws IgniteCheckedException If failed to create component. * @throws IgniteCheckedException If failed to create component.
*/ */
private static <T extends GridComponent> T createComponent(Class<T> cls, GridKernalContext ctx) throws IgniteCheckedException { private static <T extends GridComponent> T createComponent(Class<T> cls, GridKernalContext ctx)
throws IgniteCheckedException {
assert cls.isInterface() : cls; assert cls.isInterface() : cls;


T comp = ctx.plugins().createComponent(cls); T comp = ctx.plugins().createComponent(cls);
Expand Down
Expand Up @@ -20,6 +20,7 @@
import org.apache.ignite.*; import org.apache.ignite.*;
import org.apache.ignite.configuration.*; import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.datastructures.*;
import org.apache.ignite.internal.processors.fs.*; import org.apache.ignite.internal.processors.fs.*;
import org.apache.ignite.internal.processors.portable.*; import org.apache.ignite.internal.processors.portable.*;
import org.apache.ignite.plugin.*; import org.apache.ignite.plugin.*;
Expand Down Expand Up @@ -439,6 +440,13 @@ public interface GridKernalContext extends Iterable<GridComponent> {
*/ */
public GridClockSource timeSource(); public GridClockSource timeSource();


/**
* Gets data structures processor.
*
* @return Data structures processor.
*/
public CacheDataStructuresProcessor dataStructures();

/** /**
* Sets segmented flag to {@code true} when node is stopped due to segmentation issues. * Sets segmented flag to {@code true} when node is stopped due to segmentation issues.
*/ */
Expand Down

0 comments on commit c1c60c7

Please sign in to comment.