Skip to content

Commit

Permalink
IGNITE-6416: SQL: dynamic columns tests refactoring. This closes apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
devozerov authored and zstan committed Dec 20, 2017
1 parent 550f6b0 commit 979d57f
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void testCoordinatorChange() throws Exception {
// Start servers.
Ignite srv1 = ignitionStart(serverConfiguration(1), null);
Ignite srv2 = ignitionStart(serverConfiguration(2), null);
ignitionStart(serverConfiguration(3, true), finishLatch);
Ignite srv3 = ignitionStart(serverConfiguration(3, true), finishLatch);

UUID srv1Id = srv1.cluster().localNode().id();
UUID srv2Id = srv2.cluster().localNode().id();
Expand All @@ -175,7 +175,8 @@ public void testCoordinatorChange() throws Exception {

colFut1.get();

checkNodesState(TBL_NAME, c("age", Integer.class.getName()));
// Port number is for srv2.
checkTableState(QueryUtils.DFLT_SCHEMA, TBL_NAME, 10801, c("age", Integer.class.getName()));

// Test migration from normal server to non-affinity server.
idxLatch = blockIndexing(srv2Id);
Expand All @@ -191,7 +192,11 @@ public void testCoordinatorChange() throws Exception {

colFut2.get();

checkNodesState(TBL_NAME, c("city", String.class.getName()));
// Let's actually create cache on non affinity server.
srv3.cache(QueryUtils.createTableCacheName(QueryUtils.DFLT_SCHEMA, "PERSON"));

// Port number is for srv3.
checkTableState(QueryUtils.DFLT_SCHEMA, TBL_NAME, 10802, c("city", String.class.getName()));
}

/**
Expand Down Expand Up @@ -239,7 +244,7 @@ public void testOperationChaining() throws Exception {

U.await(finishLatch);

checkNodesState(TBL_NAME, c1, c2);
checkTableState(QueryUtils.DFLT_SCHEMA, TBL_NAME, c1, c2);
}

/**
Expand Down Expand Up @@ -276,7 +281,7 @@ public void testNodeJoinOnPendingOperation() throws Exception {

U.await(finishLatch);

checkNodesState(TBL_NAME, c);
checkTableState(QueryUtils.DFLT_SCHEMA, TBL_NAME, c);
}

/**
Expand Down Expand Up @@ -335,7 +340,7 @@ public void testConcurrentPutRemove() throws Exception {
finishLatch.await();

// Make sure new column is there.
checkNodesState(TBL_NAME, c("v", Integer.class.getName()));
checkTableState(QueryUtils.DFLT_SCHEMA, TBL_NAME, c("v", Integer.class.getName()));

run(srv1, "update person set \"v\" = case when mod(id, 2) <> 0 then substring(name, 7, length(name) - 6) " +
"else null end");
Expand Down Expand Up @@ -429,7 +434,7 @@ public void testConcurrentRebalance() throws Exception {
CountDownLatch idxLatch1 = blockIndexing(srv1);
CountDownLatch idxLatch2 = blockIndexing(srv2);

QueryField c = c("salary", Float.class.getName());
QueryField c = c("salary", Double.class.getName());

final IgniteInternalFuture<?> idxFut = addCols(srv1, QueryUtils.DFLT_SCHEMA, c);

Expand All @@ -449,7 +454,7 @@ public void testConcurrentRebalance() throws Exception {
// Validate index state.
idxFut.get();

checkNodesState(TBL_NAME, c);
checkTableState(QueryUtils.DFLT_SCHEMA, TBL_NAME, c);
}

/**
Expand Down Expand Up @@ -655,7 +660,7 @@ private void checkClientReconnect(final boolean restartCache, boolean dynamicCac
}
});

checkNodeState((IgniteEx)cli, schemaName, TBL_NAME, cols);
checkTableState(schemaName, TBL_NAME, cols);
}

/**
Expand Down Expand Up @@ -844,7 +849,7 @@ public void testConcurrentOperationsAndNodeStartStopMultithreaded() throws Excep

idxQry += ')';

checkNodesState(TBL_NAME, expCols);
checkTableState(QueryUtils.DFLT_SCHEMA, TBL_NAME, expCols);

put(cli, 0, 500);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,34 @@

package org.apache.ignite.internal.processors.cache.index;

import java.util.Arrays;
import java.util.Collection;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryField;
import org.apache.ignite.internal.processors.query.QuerySchema;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.h2.table.Column;
import org.h2.value.DataType;

/**
Expand All @@ -68,157 +61,60 @@ public abstract class DynamicColumnsAbstractTest extends GridCommonAbstractTest
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);

/**
* Check that given columns have been added to all related structures on target node exactly where needed
* (namely, schema in cache descriptor, type descriptor on started cache, and H2 state on started cache).
* @param node Target node.
* Check that given columns are seen by client.
* @param schemaName Schema name to look for the table in.
* @param tblName Table name to check.
* @param cols Columns whose presence must be checked.
*/
static void checkNodeState(IgniteEx node, String schemaName, String tblName, QueryField... cols) {
String cacheName = F.eq(schemaName, QueryUtils.DFLT_SCHEMA) ?
QueryUtils.createTableCacheName(schemaName, tblName) : schemaName;

// Schema state check - should pass regardless of cache state.
{
DynamicCacheDescriptor desc = node.context().cache().cacheDescriptor(cacheName);

assertNotNull("Cache descriptor not found", desc);

assertTrue(desc.sql() == F.eq(schemaName, QueryUtils.DFLT_SCHEMA));

QuerySchema schema = desc.schema();

assertNotNull(schema);

QueryEntity entity = null;

for (QueryEntity e : schema.entities()) {
if (F.eq(tblName, e.getTableName())) {
entity = e;

break;
}
}

assertNotNull("Query entity not found", entity);

Iterator<Map.Entry<String, String>> it = entity.getFields().entrySet().iterator();

for (int i = entity.getFields().size() - cols.length; i > 0 && it.hasNext(); i--)
it.next();

for (QueryField col : cols) {
assertTrue("New column not found in query entity: " + col.name(), it.hasNext());

Map.Entry<String, String> e = it.next();

assertEquals(col.name(), e.getKey());

assertEquals(col.typeName(), e.getValue());

if (!col.isNullable()) {
assertNotNull(entity.getNotNullFields());
static void checkTableState(String schemaName, String tblName, QueryField... cols) throws SQLException {
checkTableState(schemaName, tblName, ClientConnectorConfiguration.DFLT_PORT, cols);
}

assertTrue(entity.getNotNullFields().contains(col.name()));
}
else if (entity.getNotNullFields() != null)
assertFalse(entity.getNotNullFields().contains(col.name()));
}
}
/**
* Check that given columns are seen by client.
* @param schemaName Schema name to look for the table in.
* @param tblName Table name to check.
* @param port Port number.
* @param cols Columns whose presence must be checked.
*/
static void checkTableState(String schemaName, String tblName, int port, QueryField... cols) throws SQLException {
List<QueryField> flds = new ArrayList<>();

// Start cache on this node if we haven't yet.
node.cache(cacheName);
try (Connection c = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1:" + port)) {
try (ResultSet rs = c.getMetaData().getColumns(null, schemaName, tblName, "%")) {
while (rs.next()) {
String name = rs.getString("COLUMN_NAME");

// Type descriptor state check.
{
Collection<GridQueryTypeDescriptor> descs = node.context().query().types(cacheName);
short type = rs.getShort("DATA_TYPE");

GridQueryTypeDescriptor desc = null;
String typeClsName = DataType.getTypeClassName(DataType.convertSQLTypeToValueType(type));

for (GridQueryTypeDescriptor d : descs) {
if (F.eq(tblName, d.tableName())) {
desc = d;
short nullable = rs.getShort("NULLABLE");

break;
flds.add(new QueryField(name, typeClsName, nullable == 1));
}
}

assertNotNull("Type descriptor not found", desc);

Iterator<Map.Entry<String, Class<?>>> it = desc.fields().entrySet().iterator();

for (int i = desc.fields().size() - cols.length; i > 0 && it.hasNext(); i--)
it.next();

for (QueryField col : cols) {
assertTrue("New column not found in type descriptor: " + col.name(), it.hasNext());

Map.Entry<String, Class<?>> e = it.next();

assertEquals(col.name(), e.getKey());

assertEquals(col.typeName(), e.getValue().getName());

assertTrue(col.isNullable() || desc.property(col.name()).notNull());
}
}

// H2 table state check.
{
GridH2Table tbl = ((IgniteH2Indexing)node.context().query().getIndexing()).dataTable(schemaName,
tblName);

assertNotNull("Table not found", tbl);

Iterator<Column> colIt = Arrays.asList(tbl.getColumns()).iterator();

GridH2RowDescriptor rowDesc = tbl.rowDescriptor();

int i = 0;
Iterator<QueryField> it = flds.iterator();

for (; i < tbl.getColumns().length - cols.length && colIt.hasNext(); i++)
colIt.next();
for (int i = flds.size() - cols.length; i > 0 && it.hasNext(); i--)
it.next();

for (QueryField col : cols) {
assertTrue("New column not found in H2 table: " + col.name(), colIt.hasNext());
for (QueryField exp : cols) {
assertTrue("New column not found in metadata: " + exp.name(), it.hasNext());

assertTrue(colIt.hasNext());
QueryField act = it.next();

Column c = colIt.next();
assertEquals(exp.name(), act.name());

assertEquals(col.name(), c.getName());
assertEquals(exp.typeName(), act.typeName());

assertEquals(col.typeName(), DataType.getTypeClassName(c.getType()));

assertFalse(rowDesc.isKeyValueOrVersionColumn(i));

assertEquals(col.isNullable(), c.isNullable());

try {
assertEquals(DataType.getTypeFromClass(Class.forName(col.typeName())),
rowDesc.fieldType(i - GridH2KeyValueRowOnheap.DEFAULT_COLUMNS_COUNT));
}
catch (ClassNotFoundException e) {
throw new AssertionError(e);
}

i++;
}
// TODO uncomment after IGNITE-6529 is implemented.
//assertEquals(exp.isNullable(), act.isNullable());
}
}

/**
* Check that given columns have been added to all related structures on all started nodes (namely, schema
* in cache descriptor, type descriptor on started cache, and H2 state on started cache).
* @param tblName Table name to check.
* @param cols Columns whose presence must be checked.
*/
static void checkNodesState(String tblName, QueryField... cols) {
for (Ignite node : Ignition.allGrids())
checkNodeState((IgniteEx)node, QueryUtils.DFLT_SCHEMA, tblName, cols);
}

/**
* @param name New column name.
* @param typeName Class name for this new column's data type.
Expand Down

0 comments on commit 979d57f

Please sign in to comment.