Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignite 5052 #1958

Closed
wants to merge 35 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
aa48bab
IGNITE-5052 CREATE TABLE first steps
alexpaschenko Apr 21, 2017
7f04ca5
IGNITE-5052 CREATE TABLE parsing contd
alexpaschenko Apr 24, 2017
9ba38c9
Merge remote-tracking branch 'apache/ignite-2.0' into ignite-5052
alexpaschenko Apr 25, 2017
d6cdb6a
IGNITE-5052 CREATE TABLE parsing contd
alexpaschenko Apr 25, 2017
0be9b08
IGNITE-5052 Dynamic local CREATE TABLE
alexpaschenko Apr 26, 2017
6569d53
IGNITE-5052 CREATE/DROP TABLE + few tests
alexpaschenko Apr 26, 2017
a7e409b
IGNITE-5052 GridSqlCreateTable.toSql
alexpaschenko Apr 27, 2017
7d3325e
Merge remote-tracking branch 'apache/ignite-2.0' into ignite-5052
alexpaschenko Apr 28, 2017
b740701
Post merge fix
alexpaschenko Apr 28, 2017
e650908
Merge branch 'master' into ignite-5052
May 5, 2017
afda222
Merge branch 'master' into ignite-5052
alexpaschenko May 10, 2017
dc5f68b
IGNITE-5052 Review fixes
alexpaschenko May 10, 2017
888eb16
IGNITE-5052 Review fixes - contd
alexpaschenko May 11, 2017
9a26a67
Merge remote-tracking branch 'apache/master' into ignite-5052
alexpaschenko May 11, 2017
7f377c4
Merge branch 'master' into ignite-5052
May 16, 2017
41e2f64
Minors.
May 16, 2017
588093f
IGNITE-5052 Review fixes.
alexpaschenko May 16, 2017
e2bb7bd
Merge branch 'master' into ignite-5052
alexpaschenko May 16, 2017
e88a287
Merge branch 'master' into ignite-5052
alexpaschenko May 17, 2017
eef106f
IGNITE-5052 Review fixes.
alexpaschenko May 17, 2017
f9fa7e7
Merge branch 'master' into ignite-5052
alexpaschenko May 18, 2017
813dd68
IGNITE-5052 Post merge and review fixes.
alexpaschenko May 18, 2017
e3f05f2
IGNITE-5052 CREATE TABLE tests.
alexpaschenko May 18, 2017
a905de9
IGNITE-5052 CREATE TABLE tests.
alexpaschenko May 18, 2017
bf34a80
Merge branch 'master' into ignite-5052
alexpaschenko May 19, 2017
53c2f28
IGNITE-5052 Removed odd stuff
alexpaschenko May 19, 2017
307bee4
IGNITE-5052 Additional checks for unsupported stuff in CREATE TABLE
alexpaschenko May 19, 2017
68fa5b3
Merge branch 'master' into ignite-5052
May 22, 2017
a99e674
Merge branch 'master' into ignite-5052
alexpaschenko May 22, 2017
84d0895
Minors.
May 22, 2017
eec88c6
Merge remote-tracking branch 'upstream/ignite-5052' into ignite-5052
May 22, 2017
32abecb
IGNITE-5052 Review fixes.
alexpaschenko May 22, 2017
49a14d1
IGNITE-5052 Javadocs fixed.
alexpaschenko May 23, 2017
06af283
IGNITE-5052 Preserve existing protocol for getorcreate and destcache …
alexpaschenko May 24, 2017
299fd0a
Merge branch 'master' into ignite-5052
alexpaschenko May 24, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1794,9 +1794,12 @@ public CacheConfiguration<K, V> setPartitionLossPolicy(PartitionLossPolicy partL
* @return {@code this} for chaining.
*/
public CacheConfiguration<K, V> setQueryEntities(Collection<QueryEntity> qryEntities) {
if (this.qryEntities == null)
if (this.qryEntities == null) {
this.qryEntities = new ArrayList<>(qryEntities);

return this;
}

for (QueryEntity entity : qryEntities) {
boolean found = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@
package org.apache.ignite.internal;

import java.util.Collection;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteFileSystem;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.cluster.IgniteClusterEx;
import org.apache.ignite.internal.processors.cache.GridCacheUtilityKey;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.hadoop.Hadoop;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -59,6 +63,29 @@ public interface IgniteEx extends Ignite {
*/
public Collection<IgniteInternalCache<?, ?>> cachesx(@Nullable IgnitePredicate<? super IgniteInternalCache<?, ?>>... p);

/**
* Gets existing cache with the given name or creates new one with the given configuration.
* <p>
* If a cache with the same name already exists, this method will not check that the given
* configuration matches the configuration of existing cache and will return an instance
* of the existing cache.
*
* @param cacheCfg Cache configuration to use.
* @return Tuple [Existing or newly created cache; {@code true} if cache was newly crated, {@code false} otherwise]
* @throws CacheException If error occurs.
*/
public <K, V> IgniteBiTuple<IgniteCache<K, V>, Boolean> getOrCreateCache0(CacheConfiguration<K, V> cacheCfg)
throws CacheException;

/**
* Stops dynamically started cache.
*
* @param cacheName Cache name to stop.
* @return {@code true} if cache has been stopped as the result of this call, {@code false} otherwise.
* @throws CacheException If error occurs.
*/
public boolean destroyCache0(String cacheName) throws CacheException;

/**
* Checks if the event type is user-recordable.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.CacheException;
import javax.management.JMException;
import javax.management.ObjectName;
import org.apache.ignite.IgniteAtomicLong;
Expand Down Expand Up @@ -163,6 +164,7 @@
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
Expand Down Expand Up @@ -2789,6 +2791,13 @@ public <K, V> IgniteInternalCache<K, V> getCache(String name) {

/** {@inheritDoc} */
@Override public <K, V> IgniteCache<K, V> getOrCreateCache(CacheConfiguration<K, V> cacheCfg) {
return getOrCreateCache0(cacheCfg).get1();
}

/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <K, V> IgniteBiTuple<IgniteCache<K, V>, Boolean> getOrCreateCache0(
CacheConfiguration<K, V> cacheCfg) {
A.notNull(cacheCfg, "cacheCfg");
CU.validateCacheName(cacheCfg.getName());

Expand All @@ -2797,16 +2806,18 @@ public <K, V> IgniteInternalCache<K, V> getCache(String name) {
try {
checkClusterState();

Boolean res = false;

if (ctx.cache().cache(cacheCfg.getName()) == null) {
ctx.cache().dynamicStartCache(cacheCfg,
res = ctx.cache().dynamicStartCache(cacheCfg,
cacheCfg.getName(),
null,
false,
true,
true).get();
}

return ctx.cache().publicJCache(cacheCfg.getName());
return new IgniteBiTuple<>((IgniteCache<K, V>)ctx.cache().publicJCache(cacheCfg.getName()), res);
}
catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
Expand Down Expand Up @@ -3006,12 +3017,17 @@ private void checkNearCacheStarted(IgniteCacheProxy<?, ?> cache) throws IgniteCh

/** {@inheritDoc} */
@Override public void destroyCache(String cacheName) {
destroyCache0(cacheName);
}

/** {@inheritDoc} */
@Override public boolean destroyCache0(String cacheName) throws CacheException {
CU.validateCacheName(cacheName);

IgniteInternalFuture stopFut = destroyCacheAsync(cacheName, true);
IgniteInternalFuture<Boolean> stopFut = destroyCacheAsync(cacheName, true);

try {
stopFut.get();
return stopFut.get();
}
catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
Expand All @@ -3037,7 +3053,7 @@ private void checkNearCacheStarted(IgniteCacheProxy<?, ?> cache) throws IgniteCh
* @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @return Ignite future.
*/
public IgniteInternalFuture<?> destroyCacheAsync(String cacheName, boolean checkThreadTx) {
public IgniteInternalFuture<Boolean> destroyCacheAsync(String cacheName, boolean checkThreadTx) {
CU.validateCacheName(cacheName);

guard();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ boolean onCacheChangeRequested(DynamicCacheChangeBatch batch, AffinityTopologyVe
if (req.start()) {
if (desc == null) {
if (req.clientStartOnly()) {
ctx.cache().completeCacheStartFuture(req, new IgniteCheckedException("Failed to start " +
ctx.cache().completeCacheStartFuture(req, false, new IgniteCheckedException("Failed to start " +
"client cache (a cache with the given name is not started): " + req.cacheName()));
}
else {
Expand Down Expand Up @@ -327,7 +327,7 @@ boolean onCacheChangeRequested(DynamicCacheChangeBatch batch, AffinityTopologyVe
}
else {
if (req.failIfExists()) {
ctx.cache().completeCacheStartFuture(req,
ctx.cache().completeCacheStartFuture(req, false,
new CacheExistsException("Failed to start cache " +
"(a cache with the same name is already started): " + req.cacheName()));
}
Expand Down Expand Up @@ -420,19 +420,19 @@ else if (req.close()) {
if (!F.isEmpty(reqsToComplete)) {
ctx.closure().callLocalSafe(new Callable<Void>() {
@Override public Void call() throws Exception {
for (T2<DynamicCacheChangeRequest, AffinityTopologyVersion> t :reqsToComplete) {
for (T2<DynamicCacheChangeRequest, AffinityTopologyVersion> t : reqsToComplete) {
final DynamicCacheChangeRequest req = t.get1();
AffinityTopologyVersion waitTopVer = t.get2();

IgniteInternalFuture<?> fut = waitTopVer != null ?
ctx.cache().context().exchange().affinityReadyFuture(waitTopVer) : null;

if (fut == null || fut.isDone())
ctx.cache().completeCacheStartFuture(req, null);
ctx.cache().completeCacheStartFuture(req, false, null);
else {
fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
ctx.cache().completeCacheStartFuture(req, null);
ctx.cache().completeCacheStartFuture(req, false, null);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void completeRequestFutures(GridCacheSharedContext ctx) {
private void completeRequestFutures(Map<String, ActionData> map, GridCacheSharedContext ctx) {
if (map != null) {
for (ActionData req : map.values())
ctx.cache().completeCacheStartFuture(req.req, null);
ctx.cache().completeCacheStartFuture(req.req, true, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1965,12 +1965,12 @@ void completeTemplateAddFuture(String cacheName, IgniteUuid deploymentId) {
* @param req Request to complete future for.
* @param err Error if any.
*/
void completeCacheStartFuture(DynamicCacheChangeRequest req, @Nullable Exception err) {
void completeCacheStartFuture(DynamicCacheChangeRequest req, boolean success, @Nullable Exception err) {
if (req.initiatingNodeId().equals(ctx.localNodeId())) {
DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(req.requestId());

if (fut != null)
fut.onDone(null, err);
fut.onDone(success, err);
}
}

Expand Down Expand Up @@ -2063,7 +2063,7 @@ private boolean startAllCachesOnClientStart() {
*/
public IgniteInternalFuture<?> createFromTemplate(String cacheName) {
try {
CacheConfiguration cfg = createConfigFromTemplate(cacheName);
CacheConfiguration cfg = getOrCreateConfigFromTemplate(cacheName);

return dynamicStartCache(cfg, cacheName, null, true, true, true);
}
Expand All @@ -2086,7 +2086,7 @@ public IgniteInternalFuture<?> getOrCreateFromTemplate(String cacheName, boolean
if (publicJCache(cacheName, false, checkThreadTx) != null) // Cache with given name already started.
return new GridFinishedFuture<>();

CacheConfiguration cfg = createConfigFromTemplate(cacheName);
CacheConfiguration cfg = getOrCreateConfigFromTemplate(cacheName);

return dynamicStartCache(cfg, cacheName, null, false, true, checkThreadTx);
}
Expand All @@ -2097,10 +2097,10 @@ public IgniteInternalFuture<?> getOrCreateFromTemplate(String cacheName, boolean

/**
* @param cacheName Cache name.
* @return Cache configuration.
* @return Cache configuration, or {@code null} if no template with matching name found.
* @throws IgniteCheckedException If failed.
*/
private CacheConfiguration createConfigFromTemplate(String cacheName) throws IgniteCheckedException {
public CacheConfiguration getConfigFromTemplate(String cacheName) throws IgniteCheckedException {
CacheConfiguration cfgTemplate = null;

CacheConfiguration dfltCacheCfg = null;
Expand Down Expand Up @@ -2158,7 +2158,10 @@ else if (dfltCacheCfg == null)
if (cfgTemplate == null)
cfgTemplate = dfltCacheCfg;

cfgTemplate = cfgTemplate == null ? new CacheConfiguration() : cloneCheckSerializable(cfgTemplate);
if (cfgTemplate == null)
return null;

cfgTemplate = cloneCheckSerializable(cfgTemplate);

CacheConfiguration cfg = new CacheConfiguration(cfgTemplate);

Expand All @@ -2167,6 +2170,20 @@ else if (dfltCacheCfg == null)
return cfg;
}

/**
* @param cacheName Cache name.
* @return Cache configuration.
* @throws IgniteCheckedException If failed.
*/
private CacheConfiguration getOrCreateConfigFromTemplate(String cacheName) throws IgniteCheckedException {
CacheConfiguration cfg = getConfigFromTemplate(cacheName);

if (cfg != null)
return cfg;
else
return new CacheConfiguration(cacheName);
}

/**
* Dynamically starts cache.
*
Expand All @@ -2179,7 +2196,7 @@ else if (dfltCacheCfg == null)
* @return Future that will be completed when cache is deployed.
*/
@SuppressWarnings("IfMayBeConditional")
public IgniteInternalFuture<?> dynamicStartCache(
public IgniteInternalFuture<Boolean> dynamicStartCache(
@Nullable CacheConfiguration ccfg,
String cacheName,
@Nullable NearCacheConfiguration nearCfg,
Expand Down Expand Up @@ -2209,7 +2226,7 @@ public IgniteInternalFuture<?> dynamicStartCache(
* @return Future that will be completed when cache is deployed.
*/
@SuppressWarnings("IfMayBeConditional")
public IgniteInternalFuture<?> dynamicStartCache(
public IgniteInternalFuture<Boolean> dynamicStartCache(
@Nullable CacheConfiguration ccfg,
String cacheName,
@Nullable NearCacheConfiguration nearCfg,
Expand Down Expand Up @@ -2313,7 +2330,7 @@ private IgniteInternalFuture<?> dynamicStartCaches(
* @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @return Future that will be completed when cache is destroyed.
*/
public IgniteInternalFuture<?> dynamicDestroyCache(String cacheName, boolean checkThreadTx) {
public IgniteInternalFuture<Boolean> dynamicDestroyCache(String cacheName, boolean checkThreadTx) {
assert cacheName != null;

if (checkThreadTx)
Expand Down Expand Up @@ -2515,7 +2532,7 @@ private Collection<DynamicCacheStartFuture> initiateCacheChanges(

if (desc == null)
// No-op.
fut.onDone();
fut.onDone(false);
else {
assert desc.cacheConfiguration() != null : desc;

Expand Down Expand Up @@ -3450,7 +3467,7 @@ public <T> T clone(final T obj) throws IgniteCheckedException {
*
*/
@SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
private class DynamicCacheStartFuture extends GridFutureAdapter<Object> {
private class DynamicCacheStartFuture extends GridFutureAdapter<Boolean> {
/** Cache name. */
private String cacheName;

Expand All @@ -3475,7 +3492,7 @@ public DynamicCacheChangeRequest request() {
}

/** {@inheritDoc} */
@Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
@Override public boolean onDone(@Nullable Boolean res, @Nullable Throwable err) {
// Make sure to remove future before completion.
pendingFuts.remove(req.requestId(), this);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1265,6 +1265,58 @@ else if (op instanceof SchemaIndexDropOperation) {
}
}

/**
* Create cache and table from given query entity.
*
* @param schemaName Schema name to create table in.
* @param entity Entity to create table from.
* @param templateCacheName Cache name to take settings from.
* @param ifNotExists Quietly ignore this command if table already exists.
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("unchecked")
public void dynamicTableCreate(String schemaName, QueryEntity entity, String templateCacheName, boolean ifNotExists)
throws IgniteCheckedException {
CacheConfiguration<?, ?> templateCfg = ctx.cache().getConfigFromTemplate(templateCacheName);

if (templateCfg == null)
throw new SchemaOperationException(SchemaOperationException.CODE_CACHE_NOT_FOUND, templateCacheName);

if (!F.isEmpty(templateCfg.getQueryEntities()))
throw new SchemaOperationException("Template cache already contains query entities which it should not " +
"[cacheName=" + templateCacheName + ']');

CacheConfiguration<?, ?> newCfg = new CacheConfiguration<>(templateCfg);

newCfg.setName(entity.getTableName());

newCfg.setQueryEntities(Collections.singleton(entity));

// We want to preserve user specified names as they are
newCfg.setSqlEscapeAll(true);

boolean res = ctx.grid().getOrCreateCache0(newCfg).get2();

if (!res && !ifNotExists)
throw new SchemaOperationException(SchemaOperationException.CODE_TABLE_EXISTS, entity.getTableName());
}

/**
* Drop table by destroying its cache if it's an 1:1 per cache table.
*
* @param schemaName Schema name.
* @param tblName Table name.
* @param ifExists Quietly ignore this command if table does not exist.
* @throws SchemaOperationException if {@code ifExists} is {@code false} and cache was not found.
*/
@SuppressWarnings("unchecked")
public void dynamicTableDrop(String schemaName, String tblName, boolean ifExists) throws SchemaOperationException {
boolean res = ctx.grid().destroyCache0(tblName);

if (!res && !ifExists)
throw new SchemaOperationException(SchemaOperationException.CODE_TABLE_NOT_FOUND, tblName);
}

/**
* Register cache in indexing SPI.
*
Expand Down