Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-6416 #2774

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void testCoordinatorChange() throws Exception {
// Start servers.
Ignite srv1 = ignitionStart(serverConfiguration(1), null);
Ignite srv2 = ignitionStart(serverConfiguration(2), null);
ignitionStart(serverConfiguration(3, true), finishLatch);
Ignite srv3 = ignitionStart(serverConfiguration(3, true), finishLatch);

UUID srv1Id = srv1.cluster().localNode().id();
UUID srv2Id = srv2.cluster().localNode().id();
Expand All @@ -175,7 +175,8 @@ public void testCoordinatorChange() throws Exception {

colFut1.get();

checkNodesState(TBL_NAME, c("age", Integer.class.getName()));
// Port number is for srv2.
checkTableState(QueryUtils.DFLT_SCHEMA, TBL_NAME, 10801, c("age", Integer.class.getName()));

// Test migration from normal server to non-affinity server.
idxLatch = blockIndexing(srv2Id);
Expand All @@ -191,7 +192,11 @@ public void testCoordinatorChange() throws Exception {

colFut2.get();

checkNodesState(TBL_NAME, c("city", String.class.getName()));
// Let's actually create cache on non affinity server.
srv3.cache(QueryUtils.createTableCacheName(QueryUtils.DFLT_SCHEMA, "PERSON"));

// Port number is for srv3.
checkTableState(QueryUtils.DFLT_SCHEMA, TBL_NAME, 10802, c("city", String.class.getName()));
}

/**
Expand Down Expand Up @@ -239,7 +244,7 @@ public void testOperationChaining() throws Exception {

U.await(finishLatch);

checkNodesState(TBL_NAME, c1, c2);
checkTableState(QueryUtils.DFLT_SCHEMA, TBL_NAME, c1, c2);
}

/**
Expand Down Expand Up @@ -276,7 +281,7 @@ public void testNodeJoinOnPendingOperation() throws Exception {

U.await(finishLatch);

checkNodesState(TBL_NAME, c);
checkTableState(QueryUtils.DFLT_SCHEMA, TBL_NAME, c);
}

/**
Expand Down Expand Up @@ -335,7 +340,7 @@ public void testConcurrentPutRemove() throws Exception {
finishLatch.await();

// Make sure new column is there.
checkNodesState(TBL_NAME, c("v", Integer.class.getName()));
checkTableState(QueryUtils.DFLT_SCHEMA, TBL_NAME, c("v", Integer.class.getName()));

run(srv1, "update person set \"v\" = case when mod(id, 2) <> 0 then substring(name, 7, length(name) - 6) " +
"else null end");
Expand Down Expand Up @@ -429,7 +434,7 @@ public void testConcurrentRebalance() throws Exception {
CountDownLatch idxLatch1 = blockIndexing(srv1);
CountDownLatch idxLatch2 = blockIndexing(srv2);

QueryField c = c("salary", Float.class.getName());
QueryField c = c("salary", Double.class.getName());

final IgniteInternalFuture<?> idxFut = addCols(srv1, QueryUtils.DFLT_SCHEMA, c);

Expand All @@ -449,7 +454,7 @@ public void testConcurrentRebalance() throws Exception {
// Validate index state.
idxFut.get();

checkNodesState(TBL_NAME, c);
checkTableState(QueryUtils.DFLT_SCHEMA, TBL_NAME, c);
}

/**
Expand Down Expand Up @@ -655,7 +660,7 @@ private void checkClientReconnect(final boolean restartCache, boolean dynamicCac
}
});

checkNodeState((IgniteEx)cli, schemaName, TBL_NAME, cols);
checkTableState(schemaName, TBL_NAME, cols);
}

/**
Expand Down Expand Up @@ -844,7 +849,7 @@ public void testConcurrentOperationsAndNodeStartStopMultithreaded() throws Excep

idxQry += ')';

checkNodesState(TBL_NAME, expCols);
checkTableState(QueryUtils.DFLT_SCHEMA, TBL_NAME, expCols);

put(cli, 0, 500);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,34 @@

package org.apache.ignite.internal.processors.cache.index;

import java.util.Arrays;
import java.util.Collection;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryField;
import org.apache.ignite.internal.processors.query.QuerySchema;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.h2.table.Column;
import org.h2.value.DataType;

/**
Expand All @@ -68,157 +61,60 @@ public abstract class DynamicColumnsAbstractTest extends GridCommonAbstractTest
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);

/**
* Check that given columns have been added to all related structures on target node exactly where needed
* (namely, schema in cache descriptor, type descriptor on started cache, and H2 state on started cache).
* @param node Target node.
* Check that given columns are seen by client.
* @param schemaName Schema name to look for the table in.
* @param tblName Table name to check.
* @param cols Columns whose presence must be checked.
*/
static void checkNodeState(IgniteEx node, String schemaName, String tblName, QueryField... cols) {
String cacheName = F.eq(schemaName, QueryUtils.DFLT_SCHEMA) ?
QueryUtils.createTableCacheName(schemaName, tblName) : schemaName;

// Schema state check - should pass regardless of cache state.
{
DynamicCacheDescriptor desc = node.context().cache().cacheDescriptor(cacheName);

assertNotNull("Cache descriptor not found", desc);

assertTrue(desc.sql() == F.eq(schemaName, QueryUtils.DFLT_SCHEMA));

QuerySchema schema = desc.schema();

assertNotNull(schema);

QueryEntity entity = null;

for (QueryEntity e : schema.entities()) {
if (F.eq(tblName, e.getTableName())) {
entity = e;

break;
}
}

assertNotNull("Query entity not found", entity);

Iterator<Map.Entry<String, String>> it = entity.getFields().entrySet().iterator();

for (int i = entity.getFields().size() - cols.length; i > 0 && it.hasNext(); i--)
it.next();

for (QueryField col : cols) {
assertTrue("New column not found in query entity: " + col.name(), it.hasNext());

Map.Entry<String, String> e = it.next();

assertEquals(col.name(), e.getKey());

assertEquals(col.typeName(), e.getValue());

if (!col.isNullable()) {
assertNotNull(entity.getNotNullFields());
static void checkTableState(String schemaName, String tblName, QueryField... cols) throws SQLException {
checkTableState(schemaName, tblName, ClientConnectorConfiguration.DFLT_PORT, cols);
}

assertTrue(entity.getNotNullFields().contains(col.name()));
}
else if (entity.getNotNullFields() != null)
assertFalse(entity.getNotNullFields().contains(col.name()));
}
}
/**
* Check that given columns are seen by client.
* @param schemaName Schema name to look for the table in.
* @param tblName Table name to check.
* @param port Port number.
* @param cols Columns whose presence must be checked.
*/
static void checkTableState(String schemaName, String tblName, int port, QueryField... cols) throws SQLException {
List<QueryField> flds = new ArrayList<>();

// Start cache on this node if we haven't yet.
node.cache(cacheName);
try (Connection c = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1:" + port)) {
try (ResultSet rs = c.getMetaData().getColumns(null, schemaName, tblName, "%")) {
while (rs.next()) {
String name = rs.getString("COLUMN_NAME");

// Type descriptor state check.
{
Collection<GridQueryTypeDescriptor> descs = node.context().query().types(cacheName);
short type = rs.getShort("DATA_TYPE");

GridQueryTypeDescriptor desc = null;
String typeClsName = DataType.getTypeClassName(DataType.convertSQLTypeToValueType(type));

for (GridQueryTypeDescriptor d : descs) {
if (F.eq(tblName, d.tableName())) {
desc = d;
short nullable = rs.getShort("NULLABLE");

break;
flds.add(new QueryField(name, typeClsName, nullable == 1));
}
}

assertNotNull("Type descriptor not found", desc);

Iterator<Map.Entry<String, Class<?>>> it = desc.fields().entrySet().iterator();

for (int i = desc.fields().size() - cols.length; i > 0 && it.hasNext(); i--)
it.next();

for (QueryField col : cols) {
assertTrue("New column not found in type descriptor: " + col.name(), it.hasNext());

Map.Entry<String, Class<?>> e = it.next();

assertEquals(col.name(), e.getKey());

assertEquals(col.typeName(), e.getValue().getName());

assertTrue(col.isNullable() || desc.property(col.name()).notNull());
}
}

// H2 table state check.
{
GridH2Table tbl = ((IgniteH2Indexing)node.context().query().getIndexing()).dataTable(schemaName,
tblName);

assertNotNull("Table not found", tbl);

Iterator<Column> colIt = Arrays.asList(tbl.getColumns()).iterator();

GridH2RowDescriptor rowDesc = tbl.rowDescriptor();

int i = 0;
Iterator<QueryField> it = flds.iterator();

for (; i < tbl.getColumns().length - cols.length && colIt.hasNext(); i++)
colIt.next();
for (int i = flds.size() - cols.length; i > 0 && it.hasNext(); i--)
it.next();

for (QueryField col : cols) {
assertTrue("New column not found in H2 table: " + col.name(), colIt.hasNext());
for (QueryField exp : cols) {
assertTrue("New column not found in metadata: " + exp.name(), it.hasNext());

assertTrue(colIt.hasNext());
QueryField act = it.next();

Column c = colIt.next();
assertEquals(exp.name(), act.name());

assertEquals(col.name(), c.getName());
assertEquals(exp.typeName(), act.typeName());

assertEquals(col.typeName(), DataType.getTypeClassName(c.getType()));

assertFalse(rowDesc.isKeyValueOrVersionColumn(i));

assertEquals(col.isNullable(), c.isNullable());

try {
assertEquals(DataType.getTypeFromClass(Class.forName(col.typeName())),
rowDesc.fieldType(i - GridH2KeyValueRowOnheap.DEFAULT_COLUMNS_COUNT));
}
catch (ClassNotFoundException e) {
throw new AssertionError(e);
}

i++;
}
// TODO uncomment after IGNITE-6529 is implemented.
//assertEquals(exp.isNullable(), act.isNullable());
}
}

/**
* Check that given columns have been added to all related structures on all started nodes (namely, schema
* in cache descriptor, type descriptor on started cache, and H2 state on started cache).
* @param tblName Table name to check.
* @param cols Columns whose presence must be checked.
*/
static void checkNodesState(String tblName, QueryField... cols) {
for (Ignite node : Ignition.allGrids())
checkNodeState((IgniteEx)node, QueryUtils.DFLT_SCHEMA, tblName, cols);
}

/**
* @param name New column name.
* @param typeName Class name for this new column's data type.
Expand Down