From 955fde90e3d2173679ec7486958fe51b29edb72d Mon Sep 17 00:00:00 2001 From: Gokcen Iskender Date: Thu, 29 Aug 2019 14:55:28 -0700 Subject: [PATCH] PHOENIX-5456 IndexScrutinyTool slow for indexes on multitenant tables and IndexScrutinyIT doesn't run --- .../end2end/IndexScrutinyToolBaseIT.java | 137 ++ .../end2end/IndexScrutinyToolForTenantIT.java | 258 ++++ .../phoenix/end2end/IndexScrutinyToolIT.java | 1322 +++++++---------- .../mapreduce/util/IndexColumnNames.java | 3 +- 4 files changed, 911 insertions(+), 809 deletions(-) create mode 100644 phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java create mode 100644 phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolForTenantIT.java diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java new file mode 100644 index 00000000000..9536a12f9a7 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolBaseIT.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.phoenix.end2end; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool; +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat; +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable; +import org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the {@link IndexScrutinyTool} + */ +public class IndexScrutinyToolBaseIT extends BaseTest { + protected String outputDir; + + @BeforeClass public static void doSetup() throws Exception { + Map serverProps = Maps.newHashMap(); + //disable major compactions + serverProps.put(HConstants.MAJOR_COMPACTION_PERIOD, "0"); + Map clientProps = Maps.newHashMap(); + + clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB, Boolean.FALSE.toString()); + + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), + new ReadOnlyProps(clientProps.entrySet().iterator())); + } + + protected List runScrutiny(String[] cmdArgs) throws Exception { + IndexScrutinyTool scrutiny = new IndexScrutinyTool(); + Configuration conf = new Configuration(getUtility().getConfiguration()); + scrutiny.setConf(conf); + int status = scrutiny.run(cmdArgs); + assertEquals(0, status); + for (Job job : scrutiny.getJobs()) { + assertTrue(job.waitForCompletion(true)); + } + return scrutiny.getJobs(); + } + + protected String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize, + SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat, Long maxOutputRows, String tenantId, Long scrutinyTs) { + final List args = Lists.newArrayList(); + if (schemaName != null) { + args.add("-s"); + args.add(schemaName); + } + args.add("-dt"); + args.add(dataTable); + args.add("-it"); + args.add(indxTable); + + // TODO test snapshot reads + // if(useSnapshot) { + // args.add("-snap"); + // } + + if (OutputFormat.FILE.equals(outputFormat)) { + args.add("-op"); + outputDir = "/tmp/" + UUID.randomUUID().toString(); + args.add(outputDir); + } + + args.add("-t"); + args.add(String.valueOf(scrutinyTs)); + args.add("-run-foreground"); + if (batchSize != null) { + args.add("-b"); + args.add(String.valueOf(batchSize)); + } + + // default to using data table as the source table + args.add("-src"); + if (sourceTable == null) { + args.add(SourceTable.DATA_TABLE_SOURCE.name()); + } else { + args.add(sourceTable.name()); + } + if (outputInvalidRows) { + args.add("-o"); + } + if (outputFormat != null) { + args.add("-of"); + args.add(outputFormat.name()); + } + if (maxOutputRows != null) { + args.add("-om"); + args.add(maxOutputRows.toString()); + } + if (tenantId != null) { + args.add("-tenant"); + args.add(tenantId); + } + return args.toArray(new String[0]); + } + + protected long getCounterValue(Counters counters, Enum counter) { + return counters.findCounter(counter).getValue(); + } + + protected int countRows(Connection conn, String tableFullName) throws SQLException { + ResultSet count = conn.createStatement().executeQuery("select count(*) from " + tableFullName); + count.next(); + int numRows = count.getInt(1); + return numRows; + } + +} + + diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolForTenantIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolForTenantIT.java new file mode 100644 index 00000000000..206e793b133 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolForTenantIT.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.phoenix.end2end; + +import com.google.common.base.Joiner; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool; +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat; +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable; +import org.apache.phoenix.mapreduce.index.SourceTargetColumnNames; +import org.apache.phoenix.parse.HintNode; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.List; +import java.util.Properties; + +import static org.apache.phoenix.mapreduce.index.IndexScrutinyTableOutput.OUTPUT_TABLE_NAME; +import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.INVALID_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.VALID_ROW_COUNT; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the {@link IndexScrutinyTool} + */ +@Category(NeedsOwnMiniClusterTest.class) +public class IndexScrutinyToolForTenantIT extends IndexScrutinyToolBaseIT { + private Connection connGlobal = null; + private Connection connTenant = null; + + private String tenantId; + private String tenantViewName; + private String indexNameTenant; + private String multiTenantTable; + private String viewIndexTableName; + + private final String createViewStr = "CREATE VIEW %s AS SELECT * FROM %s"; + private final String upsertQueryStr = "UPSERT INTO %s (COL1, ID, NAME) VALUES('%s' , %d, '%s')"; + private final String createIndexStr = "CREATE INDEX %s ON %s (NAME) "; + + /** + * Create the test data + */ + @Before public void setup() throws SQLException { + tenantId = generateUniqueName(); + tenantViewName = generateUniqueName(); + indexNameTenant = generateUniqueName(); + multiTenantTable = generateUniqueName(); + viewIndexTableName = "_IDX_" + multiTenantTable; + + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + connGlobal = DriverManager.getConnection(getUrl(), props); + + props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + connTenant = DriverManager.getConnection(getUrl(), props); + String createTblStr = "CREATE TABLE %s (COL1 VARCHAR(15) NOT NULL,ID INTEGER NOT NULL" + ", NAME VARCHAR, CONSTRAINT PK_1 PRIMARY KEY (COL1, ID)) MULTI_TENANT=true"; + + createTestTable(getUrl(), String.format(createTblStr, multiTenantTable)); + + connTenant.createStatement() + .execute(String.format(createViewStr, tenantViewName, multiTenantTable)); + + String idxStmtTenant = String.format(createIndexStr, indexNameTenant, tenantViewName); + connTenant.createStatement().execute(idxStmtTenant); + } + + @After public void teardown() throws SQLException { + if (connGlobal != null) { + connGlobal.close(); + } + if (connTenant != null) { + connTenant.close(); + } + } + + /** + * Tests that the config for max number of output rows is observed + */ + @Test public void testTenantViewAndIndexEqual() throws Exception { + connTenant.createStatement() + .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 1, "x")); + connTenant.commit(); + + String[] + argValues = + getArgValues("", tenantViewName, indexNameTenant, 10L, SourceTable.INDEX_TABLE_SOURCE, false, null, null, tenantId, + EnvironmentEdgeManager.currentTimeMillis()); + + List completedJobs = runScrutiny(argValues); + // Sunny case, both index and view are equal. 1 row + assertEquals(1, completedJobs.size()); + for (Job job : completedJobs) { + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT)); + } + } + + /** + * Tests global view on multi-tenant table should work too + **/ + @Test public void testGlobalViewOnMultiTenantTable() throws Exception { + String globalViewName = generateUniqueName(); + String indexNameGlobal = generateUniqueName(); + + connGlobal.createStatement() + .execute(String.format(createViewStr, globalViewName, multiTenantTable)); + + String idxStmtGlobal = String.format(createIndexStr, indexNameGlobal, globalViewName); + connGlobal.createStatement().execute(idxStmtGlobal); + connGlobal.createStatement() + .execute(String.format(upsertQueryStr, globalViewName, "global", 5, "x")); + connGlobal.commit(); + String[] + argValues = + getArgValues("", globalViewName, indexNameGlobal, 10L, SourceTable.INDEX_TABLE_SOURCE, false, null, null, null, + EnvironmentEdgeManager.currentTimeMillis()); + List completedJobs = runScrutiny(argValues); + // Sunny case, both index and view are equal. 1 row + assertEquals(1, completedJobs.size()); + for (Job job : completedJobs) { + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT)); + } + } + + @Test public void testColumnsForSelectQueryOnMultiTenantTable() throws Exception { + String indexNameGlobal = generateUniqueName(); + connGlobal.createStatement() + .execute(String.format(createIndexStr, indexNameGlobal, multiTenantTable)); + + PhoenixConnection pconn = connGlobal.unwrap(PhoenixConnection.class); + PTable pDataTable = pconn.getTable(new PTableKey(null, multiTenantTable)); + PTable pIndexTable = pconn.getTable(new PTableKey(null, indexNameGlobal)); + + SourceTargetColumnNames + columnNames = + new SourceTargetColumnNames.IndexSourceColNames(pDataTable, pIndexTable); + String targetPksCsv = Joiner.on(",").join(SchemaUtil.getEscapedFullColumnNames(columnNames.getTargetPkColNames())); + String + selectQuery = + QueryUtil.constructSelectStatement(indexNameGlobal, columnNames.getCastedTargetColNames(), targetPksCsv, + HintNode.Hint.NO_INDEX, false); + assertEquals(selectQuery, + "SELECT /*+ NO_INDEX */ CAST(\"COL1\" AS VARCHAR(15)) , CAST(\"ID\" AS INTEGER) , CAST(\"0\".\"NAME\" AS VARCHAR) FROM " + + indexNameGlobal + " WHERE (\"COL1\",\"ID\")"); + } + + /** + * Use Both as source. Add 1 row to tenant view and disable index. + * Add 1 more to view and add a wrong row to index. + * Both have 1 invalid row, 1 valid row. + **/ + @Test public void testOneValidOneInvalidUsingBothAsSource() throws Exception { + connTenant.createStatement() + .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 1, "x")); + connTenant.commit(); + connTenant.createStatement().execute( + String.format("ALTER INDEX %s ON %S disable", indexNameTenant, tenantViewName)); + + connTenant.createStatement() + .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 2, "x2")); + + connTenant.createStatement().execute(String.format("UPSERT INTO %s (\":ID\", \"0:NAME\") values (%d, '%s')", + indexNameTenant, 5555, "wrongName")); + connTenant.commit(); + + String[] + argValues = + getArgValues("", tenantViewName, indexNameTenant, 10L, SourceTable.BOTH, false, null, null, tenantId, EnvironmentEdgeManager.currentTimeMillis()); + List completedJobs = runScrutiny(argValues); + + assertEquals(2, completedJobs.size()); + for (Job job : completedJobs) { + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT)); + } + } + + /** + * Add 3 rows to Tenant view. + * Empty index table and observe they are not equal. + * Use data table as source and output to file. + **/ + @Test public void testWithEmptyIndexTableOutputToFile() throws Exception { + testWithOutput(OutputFormat.FILE); + } + + @Test public void testWithEmptyIndexTableOutputToTable() throws Exception { + testWithOutput(OutputFormat.TABLE); + assertEquals(3, countRows(connGlobal, OUTPUT_TABLE_NAME)); + } + + private void testWithOutput(OutputFormat outputFormat) throws Exception { + connTenant.createStatement() + .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 1, "x")); + connTenant.createStatement() + .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 2, "x2")); + connTenant.createStatement() + .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 3, "x3")); + connTenant.createStatement().execute(String.format("UPSERT INTO %s (\":ID\", \"0:NAME\") values (%d, '%s')", + indexNameTenant, 5555, "wrongName")); + connTenant.commit(); + + ConnectionQueryServices queryServices = connGlobal.unwrap(PhoenixConnection.class).getQueryServices(); + Admin admin = queryServices.getAdmin(); + TableName tableName = TableName.valueOf(viewIndexTableName); + admin.disableTable(tableName); + admin.truncateTable(tableName, false); + + String[] + argValues = + getArgValues("", tenantViewName, indexNameTenant, 10L, SourceTable.DATA_TABLE_SOURCE, true, outputFormat, null, + tenantId, EnvironmentEdgeManager.currentTimeMillis()); + List completedJobs = runScrutiny(argValues); + + assertEquals(1, completedJobs.size()); + for (Job job : completedJobs) { + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(0, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(3, getCounterValue(counters, INVALID_ROW_COUNT)); + } + } +} + diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java index 72857e70e09..1f9de1570a4 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java @@ -10,7 +10,6 @@ */ package org.apache.phoenix.end2end; -import static org.apache.phoenix.mapreduce.index.IndexScrutinyTableOutput.OUTPUT_TABLE_NAME; import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.BAD_COVERED_COL_VAL_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.BATCHES_PROCESSED_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.INVALID_ROW_COUNT; @@ -32,914 +31,621 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.Random; import java.util.TreeSet; -import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; -import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.CsvBulkImportUtil; import org.apache.phoenix.mapreduce.index.IndexScrutinyTableOutput; import org.apache.phoenix.mapreduce.index.IndexScrutinyTool; import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat; import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable; -import org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters; import org.apache.phoenix.mapreduce.index.SourceTargetColumnNames; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; -import org.apache.phoenix.query.BaseTest; -import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; -import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.junit.After; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; /** * Tests for the {@link IndexScrutinyTool} */ @Category(NeedsOwnMiniClusterTest.class) -@RunWith(Enclosed.class) -public class IndexScrutinyToolIT { - - abstract public static class SharedIndexToolIT extends BaseTest { - protected String outputDir; - - @BeforeClass public static void doSetup() throws Exception { - Map serverProps = Maps.newHashMap(); - //disable major compactions - serverProps.put(HConstants.MAJOR_COMPACTION_PERIOD, "0"); - Map clientProps = Maps.newHashMap(); - setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), - new ReadOnlyProps(clientProps.entrySet().iterator())); - } +@RunWith(Parameterized.class) +public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT { + private String dataTableDdl; + private String indexTableDdl; - protected List runScrutiny(String[] cmdArgs) throws Exception { - IndexScrutinyTool scrutiny = new IndexScrutinyTool(); - Configuration conf = new Configuration(getUtility().getConfiguration()); - scrutiny.setConf(conf); - int status = scrutiny.run(cmdArgs); - assertEquals(0, status); - for (Job job : scrutiny.getJobs()) { - assertTrue(job.waitForCompletion(true)); - } - return scrutiny.getJobs(); - } - - protected String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize, - SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat, Long maxOutputRows, String tenantId, Long scrutinyTs) { - final List args = Lists.newArrayList(); - if (schemaName != null) { - args.add("-s"); - args.add(schemaName); - } - args.add("-dt"); - args.add(dataTable); - args.add("-it"); - args.add(indxTable); - - // TODO test snapshot reads - // if(useSnapshot) { - // args.add("-snap"); - // } - - if (OutputFormat.FILE.equals(outputFormat)) { - args.add("-op"); - outputDir = "/tmp/" + UUID.randomUUID().toString(); - args.add(outputDir); - } + private static final String UPSERT_SQL = "UPSERT INTO %s VALUES(?,?,?,?)"; - args.add("-t"); - args.add(String.valueOf(scrutinyTs)); - args.add("-run-foreground"); - if (batchSize != null) { - args.add("-b"); - args.add(String.valueOf(batchSize)); - } + private static final String + INDEX_UPSERT_SQL = + "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:ZIP\", \"0:EMPLOY_DATE\") values (?,?,?,?)"; - // default to using data table as the source table - args.add("-src"); - if (sourceTable == null) { - args.add(SourceTable.DATA_TABLE_SOURCE.name()); - } else { - args.add(sourceTable.name()); - } - if (outputInvalidRows) { - args.add("-o"); - } - if (outputFormat != null) { - args.add("-of"); - args.add(outputFormat.name()); - } - if (maxOutputRows != null) { - args.add("-om"); - args.add(maxOutputRows.toString()); - } - if (tenantId != null) { - args.add("-tenant"); - args.add(tenantId); - } - return args.toArray(new String[0]); - } - - protected long getCounterValue(Counters counters, Enum counter) { - return counters.findCounter(counter).getValue(); - } - - protected int countRows(Connection conn, String tableFullName) throws SQLException { - ResultSet count = conn.createStatement().executeQuery("select count(*) from " + tableFullName); - count.next(); - int numRows = count.getInt(1); - return numRows; - } - - } + private static final String DELETE_SQL = "DELETE FROM %s "; - @RunWith(Parameterized.class) public static class IndexScrutinyToolNonTenantIT extends SharedIndexToolIT { + private String schemaName; + private String dataTableName; + private String dataTableFullName; + private String indexTableName; + private String indexTableFullName; - private String dataTableDdl; - private String indexTableDdl; + private Connection conn; - private static final String UPSERT_SQL = "UPSERT INTO %s VALUES(?,?,?,?)"; + private PreparedStatement dataTableUpsertStmt; - private static final String - INDEX_UPSERT_SQL = - "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:ZIP\", \"0:EMPLOY_DATE\") values (?,?,?,?)"; + private PreparedStatement indexTableUpsertStmt; - private static final String DELETE_SQL = "DELETE FROM %s "; + private long testTime; + private Properties props; - private String schemaName; - private String dataTableName; - private String dataTableFullName; - private String indexTableName; - private String indexTableFullName; - - private Connection conn; - - private PreparedStatement dataTableUpsertStmt; - - private PreparedStatement indexTableUpsertStmt; - - private long testTime; - private Properties props; - - @Parameterized.Parameters public static Collection data() { - return Arrays.asList(new Object[][] { { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR)", - "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" }, { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2", - "CREATE INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" }, { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2", - "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" } }); - } + @Parameterized.Parameters public static Collection data() { + return Arrays.asList(new Object[][] { { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR)", + "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" }, { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2", + "CREATE INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" }, { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2", + "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" } }); + } - public IndexScrutinyToolNonTenantIT(String dataTableDdl, String indexTableDdl) { - this.dataTableDdl = dataTableDdl; - this.indexTableDdl = indexTableDdl; - } + public IndexScrutinyToolIT(String dataTableDdl, String indexTableDdl) { + this.dataTableDdl = dataTableDdl; + this.indexTableDdl = indexTableDdl; + } - /** - * Create the test data and index tables - */ - @Before public void setup() throws SQLException { - generateUniqueTableNames(); - createTestTable(getUrl(), String.format(dataTableDdl, dataTableFullName)); - createTestTable(getUrl(), String.format(indexTableDdl, indexTableName, dataTableFullName)); - props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - conn = DriverManager.getConnection(getUrl(), props); - String dataTableUpsert = String.format(UPSERT_SQL, dataTableFullName); - dataTableUpsertStmt = conn.prepareStatement(dataTableUpsert); - String indexTableUpsert = String.format(INDEX_UPSERT_SQL, indexTableFullName); - indexTableUpsertStmt = conn.prepareStatement(indexTableUpsert); - conn.setAutoCommit(false); - testTime = EnvironmentEdgeManager.currentTimeMillis() - 1000; + /** + * Create the test data and index tables + */ + @Before public void setup() throws SQLException { + generateUniqueTableNames(); + createTestTable(getUrl(), String.format(dataTableDdl, dataTableFullName)); + createTestTable(getUrl(), String.format(indexTableDdl, indexTableName, dataTableFullName)); + props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + conn = DriverManager.getConnection(getUrl(), props); + String dataTableUpsert = String.format(UPSERT_SQL, dataTableFullName); + dataTableUpsertStmt = conn.prepareStatement(dataTableUpsert); + String indexTableUpsert = String.format(INDEX_UPSERT_SQL, indexTableFullName); + indexTableUpsertStmt = conn.prepareStatement(indexTableUpsert); + conn.setAutoCommit(false); + testTime = EnvironmentEdgeManager.currentTimeMillis() - 1000; - } + } - @After public void teardown() throws SQLException { - if (conn != null) { - conn.close(); - } + @After public void teardown() throws SQLException { + if (conn != null) { + conn.close(); } + } - /** - * Tests a data table that is correctly indexed. Scrutiny should report all rows as valid. - */ - @Test public void testValidIndex() throws Exception { - // insert two rows - upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); - upsertRow(dataTableUpsertStmt, 2, "name-2", 95123); - conn.commit(); - - int numDataRows = countRows(conn, dataTableFullName); - int numIndexRows = countRows(conn, indexTableFullName); - - // scrutiny should report everything as ok - List completedJobs = runScrutiny(schemaName, dataTableName, indexTableName); - Job job = completedJobs.get(0); - assertTrue(job.isSuccessful()); - Counters counters = job.getCounters(); - assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT)); - assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT)); - - // make sure row counts weren't modified by scrutiny - assertEquals(numDataRows, countRows(conn, dataTableFullName)); - assertEquals(numIndexRows, countRows(conn, indexTableFullName)); - } + /** + * Tests a data table that is correctly indexed. Scrutiny should report all rows as valid. + */ + @Test public void testValidIndex() throws Exception { + // insert two rows + upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); + upsertRow(dataTableUpsertStmt, 2, "name-2", 95123); + conn.commit(); + + int numDataRows = countRows(conn, dataTableFullName); + int numIndexRows = countRows(conn, indexTableFullName); + + // scrutiny should report everything as ok + List completedJobs = runScrutiny(schemaName, dataTableName, indexTableName); + Job job = completedJobs.get(0); + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT)); + + // make sure row counts weren't modified by scrutiny + assertEquals(numDataRows, countRows(conn, dataTableFullName)); + assertEquals(numIndexRows, countRows(conn, indexTableFullName)); + } - /** - * Tests running a scrutiny while updates and deletes are happening. - * Since CURRENT_SCN is set, the scrutiny shouldn't report any issue. - */ - @Test @Ignore("PHOENIX-4378 Unable to set KEEP_DELETED_CELLS to true on RS scanner") - public void testScrutinyWhileTakingWrites() throws Exception { - int id = 0; - while (id < 1000) { - int index = 1; - dataTableUpsertStmt.setInt(index++, id); - dataTableUpsertStmt.setString(index++, "name-" + id); - dataTableUpsertStmt.setInt(index++, id); - dataTableUpsertStmt.setTimestamp(index++, new Timestamp(testTime)); - dataTableUpsertStmt.executeUpdate(); - id++; - } - conn.commit(); - - //CURRENT_SCN for scrutiny - long scrutinyTS = EnvironmentEdgeManager.currentTimeMillis(); - - // launch background upserts and deletes - final Random random = new Random(0); - Runnable backgroundUpserts = new Runnable() { - @Override public void run() { - int idToUpsert = random.nextInt(1000); - try (Connection conn = DriverManager.getConnection(getUrl(), props)) { - PreparedStatement dataPS = - conn.prepareStatement(String.format(UPSERT_SQL, dataTableFullName)); - upsertRow(dataPS, idToUpsert, "modified-" + idToUpsert, idToUpsert + 1000); - conn.commit(); - } catch (SQLException e) { - e.printStackTrace(); - } + /** + * Tests running a scrutiny while updates and deletes are happening. + * Since CURRENT_SCN is set, the scrutiny shouldn't report any issue. + */ + @Test @Ignore("PHOENIX-4378 Unable to set KEEP_DELETED_CELLS to true on RS scanner") public void testScrutinyWhileTakingWrites() throws Exception { + int id = 0; + while (id < 1000) { + int index = 1; + dataTableUpsertStmt.setInt(index++, id); + dataTableUpsertStmt.setString(index++, "name-" + id); + dataTableUpsertStmt.setInt(index++, id); + dataTableUpsertStmt.setTimestamp(index++, new Timestamp(testTime)); + dataTableUpsertStmt.executeUpdate(); + id++; + } + conn.commit(); + + //CURRENT_SCN for scrutiny + long scrutinyTS = EnvironmentEdgeManager.currentTimeMillis(); + + // launch background upserts and deletes + final Random random = new Random(0); + Runnable backgroundUpserts = new Runnable() { + @Override public void run() { + int idToUpsert = random.nextInt(1000); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + PreparedStatement + dataPS = + conn.prepareStatement(String.format(UPSERT_SQL, dataTableFullName)); + upsertRow(dataPS, idToUpsert, "modified-" + idToUpsert, idToUpsert + 1000); + conn.commit(); + } catch (SQLException e) { + e.printStackTrace(); } - }; - Runnable backgroundDeletes = new Runnable() { - @Override public void run() { - int idToDelete = random.nextInt(1000); - try (Connection conn = DriverManager.getConnection(getUrl(), props)) { - String deleteSql = - String.format(DELETE_SQL, indexTableFullName) + "WHERE \":ID\"=" - + idToDelete; - conn.createStatement().executeUpdate(deleteSql); - conn.commit(); - } catch (SQLException e) { - e.printStackTrace(); - } + } + }; + Runnable backgroundDeletes = new Runnable() { + @Override public void run() { + int idToDelete = random.nextInt(1000); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + String + deleteSql = + String.format(DELETE_SQL, indexTableFullName) + "WHERE \":ID\"=" + idToDelete; + conn.createStatement().executeUpdate(deleteSql); + conn.commit(); + } catch (SQLException e) { + e.printStackTrace(); } - }; - ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2); - scheduledThreadPool.scheduleWithFixedDelay(backgroundUpserts, 200, 200, TimeUnit.MILLISECONDS); - scheduledThreadPool.scheduleWithFixedDelay(backgroundDeletes, 200, 200, TimeUnit.MILLISECONDS); - - // scrutiny should report everything as ok - List completedJobs = - runScrutinyCurrentSCN(schemaName, dataTableName, indexTableName, scrutinyTS); - Job job = completedJobs.get(0); - assertTrue(job.isSuccessful()); - Counters counters = job.getCounters(); - assertEquals(1000, getCounterValue(counters, VALID_ROW_COUNT)); - assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT)); - scheduledThreadPool.shutdown(); - scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS); - } - - /** - * Tests an index with the same # of rows as the data table, but one of the index rows is - * incorrect Scrutiny should report the invalid rows. - */ - @Test public void testEqualRowCountIndexIncorrect() throws Exception { - // insert one valid row - upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); - conn.commit(); - - // disable the index and insert another row which is not indexed - disableIndex(); - upsertRow(dataTableUpsertStmt, 2, "name-2", 95123); - conn.commit(); - - // insert a bad row into the index - upsertIndexRow("badName", 2, 9999); - conn.commit(); - - // scrutiny should report the bad row - List completedJobs = runScrutiny(schemaName, dataTableName, indexTableName); - Job job = completedJobs.get(0); - assertTrue(job.isSuccessful()); - Counters counters = job.getCounters(); - assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); - assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT)); - } - - /** - * Tests an index where the index pk is correct (indexed col values are indexed correctly), but - * a covered index value is incorrect. Scrutiny should report the invalid row - */ - @Test public void testCoveredValueIncorrect() throws Exception { - // insert one valid row - upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); - conn.commit(); - - // disable index and insert another data row - disableIndex(); - upsertRow(dataTableUpsertStmt, 2, "name-2", 95123); - conn.commit(); - - // insert a bad index row for the above data row - upsertIndexRow("name-2", 2, 9999); - conn.commit(); - - // scrutiny should report the bad row - List completedJobs = runScrutiny(schemaName, dataTableName, indexTableName); - Job job = completedJobs.get(0); - assertTrue(job.isSuccessful()); - Counters counters = job.getCounters(); - assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); - assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT)); - assertEquals(1, getCounterValue(counters, BAD_COVERED_COL_VAL_COUNT)); - } - - /** - * Test batching of row comparisons Inserts 1001 rows, with some random bad rows, and runs - * scrutiny with batchsize of 10, - */ - @Test public void testBatching() throws Exception { - // insert 1001 data and index rows - int numTestRows = 1001; - for (int i = 0; i < numTestRows; i++) { - upsertRow(dataTableUpsertStmt, i, "name-" + i, i + 1000); } - conn.commit(); + }; + ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2); + scheduledThreadPool.scheduleWithFixedDelay(backgroundUpserts, 200, 200, TimeUnit.MILLISECONDS); + scheduledThreadPool.scheduleWithFixedDelay(backgroundDeletes, 200, 200, TimeUnit.MILLISECONDS); + + // scrutiny should report everything as ok + List + completedJobs = + runScrutinyCurrentSCN(schemaName, dataTableName, indexTableName, scrutinyTS); + Job job = completedJobs.get(0); + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(1000, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT)); + scheduledThreadPool.shutdown(); + scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS); + } - disableIndex(); + /** + * Tests an index with the same # of rows as the data table, but one of the index rows is + * incorrect Scrutiny should report the invalid rows. + */ + @Test public void testEqualRowCountIndexIncorrect() throws Exception { + // insert one valid row + upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); + conn.commit(); + + // disable the index and insert another row which is not indexed + disableIndex(); + upsertRow(dataTableUpsertStmt, 2, "name-2", 95123); + conn.commit(); + + // insert a bad row into the index + upsertIndexRow("badName", 2, 9999); + conn.commit(); + + // scrutiny should report the bad row + List completedJobs = runScrutiny(schemaName, dataTableName, indexTableName); + Job job = completedJobs.get(0); + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT)); + } - // randomly delete some rows from the index - Random random = new Random(); - for (int i = 0; i < 100; i++) { - int idToDelete = random.nextInt(numTestRows); - deleteRow(indexTableFullName, "WHERE \":ID\"=" + idToDelete); - } - conn.commit(); - int numRows = countRows(conn, indexTableFullName); - int numDeleted = numTestRows - numRows; + /** + * Tests an index where the index pk is correct (indexed col values are indexed correctly), but + * a covered index value is incorrect. Scrutiny should report the invalid row + */ + @Test public void testCoveredValueIncorrect() throws Exception { + // insert one valid row + upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); + conn.commit(); + + // disable index and insert another data row + disableIndex(); + upsertRow(dataTableUpsertStmt, 2, "name-2", 95123); + conn.commit(); + + // insert a bad index row for the above data row + upsertIndexRow("name-2", 2, 9999); + conn.commit(); + + // scrutiny should report the bad row + List completedJobs = runScrutiny(schemaName, dataTableName, indexTableName); + Job job = completedJobs.get(0); + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT)); + assertEquals(1, getCounterValue(counters, BAD_COVERED_COL_VAL_COUNT)); + } - // run scrutiny with batch size of 10 - List completedJobs = runScrutiny(schemaName, dataTableName, indexTableName, 10L); - Job job = completedJobs.get(0); - assertTrue(job.isSuccessful()); - Counters counters = job.getCounters(); - assertEquals(numTestRows - numDeleted, getCounterValue(counters, VALID_ROW_COUNT)); - assertEquals(numDeleted, getCounterValue(counters, INVALID_ROW_COUNT)); - assertEquals(numTestRows / 10 + numTestRows % 10, - getCounterValue(counters, BATCHES_PROCESSED_COUNT)); - } + /** + * Test batching of row comparisons Inserts 1001 rows, with some random bad rows, and runs + * scrutiny with batchsize of 10, + */ + @Test public void testBatching() throws Exception { + // insert 1001 data and index rows + int numTestRows = 1001; + for (int i = 0; i < numTestRows; i++) { + upsertRow(dataTableUpsertStmt, i, "name-" + i, i + 1000); + } + conn.commit(); + + disableIndex(); + + // randomly delete some rows from the index + Random random = new Random(); + for (int i = 0; i < 100; i++) { + int idToDelete = random.nextInt(numTestRows); + deleteRow(indexTableFullName, "WHERE \":ID\"=" + idToDelete); + } + conn.commit(); + int numRows = countRows(conn, indexTableFullName); + int numDeleted = numTestRows - numRows; + + // run scrutiny with batch size of 10 + List completedJobs = runScrutiny(schemaName, dataTableName, indexTableName, 10L); + Job job = completedJobs.get(0); + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(numTestRows - numDeleted, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(numDeleted, getCounterValue(counters, INVALID_ROW_COUNT)); + assertEquals(numTestRows / 10 + numTestRows % 10, + getCounterValue(counters, BATCHES_PROCESSED_COUNT)); + } - /** - * Tests when there are more data table rows than index table rows Scrutiny should report the - * number of incorrect rows - */ - @Test public void testMoreDataRows() throws Exception { - upsertRow(dataTableUpsertStmt, 1, "name-1", 95123); - conn.commit(); - disableIndex(); - // these rows won't have a corresponding index row - upsertRow(dataTableUpsertStmt, 2, "name-2", 95124); - upsertRow(dataTableUpsertStmt, 3, "name-3", 95125); - conn.commit(); - - List completedJobs = runScrutiny(schemaName, dataTableName, indexTableName); - Job job = completedJobs.get(0); - assertTrue(job.isSuccessful()); - Counters counters = job.getCounters(); - assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); - assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT)); - } + /** + * Tests when there are more data table rows than index table rows Scrutiny should report the + * number of incorrect rows + */ + @Test public void testMoreDataRows() throws Exception { + upsertRow(dataTableUpsertStmt, 1, "name-1", 95123); + conn.commit(); + disableIndex(); + // these rows won't have a corresponding index row + upsertRow(dataTableUpsertStmt, 2, "name-2", 95124); + upsertRow(dataTableUpsertStmt, 3, "name-3", 95125); + conn.commit(); + + List completedJobs = runScrutiny(schemaName, dataTableName, indexTableName); + Job job = completedJobs.get(0); + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT)); + } - /** - * Tests when there are more index table rows than data table rows Scrutiny should report the - * number of incorrect rows when run with the index as the source table - */ - @Test public void testMoreIndexRows() throws Exception { - upsertRow(dataTableUpsertStmt, 1, "name-1", 95123); - conn.commit(); - disableIndex(); - // these index rows won't have a corresponding data row - upsertIndexRow("name-2", 2, 95124); - upsertIndexRow("name-3", 3, 95125); - conn.commit(); - - List completedJobs = - runScrutiny(schemaName, dataTableName, indexTableName, 10L, SourceTable.INDEX_TABLE_SOURCE); - Job job = completedJobs.get(0); + /** + * Tests when there are more index table rows than data table rows Scrutiny should report the + * number of incorrect rows when run with the index as the source table + */ + @Test public void testMoreIndexRows() throws Exception { + upsertRow(dataTableUpsertStmt, 1, "name-1", 95123); + conn.commit(); + disableIndex(); + // these index rows won't have a corresponding data row + upsertIndexRow("name-2", 2, 95124); + upsertIndexRow("name-3", 3, 95125); + conn.commit(); + + List + completedJobs = + runScrutiny(schemaName, dataTableName, indexTableName, 10L, SourceTable.INDEX_TABLE_SOURCE); + Job job = completedJobs.get(0); + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT)); + } + + /** + * Tests running with both the index and data tables as the source table If we have an + * incorrectly indexed row, it should be reported in each direction + */ + @Test public void testBothDataAndIndexAsSource() throws Exception { + // insert one valid row + upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); + conn.commit(); + + // disable the index and insert another row which is not indexed + disableIndex(); + upsertRow(dataTableUpsertStmt, 2, "name-2", 95123); + conn.commit(); + + // insert a bad row into the index + upsertIndexRow("badName", 2, 9999); + conn.commit(); + + List + completedJobs = + runScrutiny(schemaName, dataTableName, indexTableName, 10L, SourceTable.BOTH); + assertEquals(2, completedJobs.size()); + for (Job job : completedJobs) { assertTrue(job.isSuccessful()); Counters counters = job.getCounters(); assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); - assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT)); - } - - /** - * Tests running with both the index and data tables as the source table If we have an - * incorrectly indexed row, it should be reported in each direction - */ - @Test public void testBothDataAndIndexAsSource() throws Exception { - // insert one valid row - upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); - conn.commit(); - - // disable the index and insert another row which is not indexed - disableIndex(); - upsertRow(dataTableUpsertStmt, 2, "name-2", 95123); - conn.commit(); - - // insert a bad row into the index - upsertIndexRow("badName", 2, 9999); - conn.commit(); - - List completedJobs = - runScrutiny(schemaName, dataTableName, indexTableName, 10L, SourceTable.BOTH); - assertEquals(2, completedJobs.size()); - for (Job job : completedJobs) { - assertTrue(job.isSuccessful()); - Counters counters = job.getCounters(); - assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); - assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT)); - } + assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT)); } + } - /** - * Tests that with the output to file option set, the scrutiny tool outputs invalid rows to file - */ - @Test public void testOutputInvalidRowsToFile() throws Exception { - insertOneValid_OneBadVal_OneMissingTarget(); - - String[] argValues = - getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.FILE, null); - runScrutiny(argValues); - - // check the output files - Path outputPath = CsvBulkImportUtil.getOutputPath(new Path(outputDir), dataTableFullName); - DistributedFileSystem fs = getUtility().getDFSCluster().getFileSystem(); - List paths = Lists.newArrayList(); - Path firstPart = null; - for (FileStatus outputFile : fs.listStatus(outputPath)) { - if (outputFile.getPath().getName().startsWith("part")) { - if (firstPart == null) { - firstPart = outputFile.getPath(); - } else { - paths.add(outputFile.getPath()); - } + /** + * Tests that with the output to file option set, the scrutiny tool outputs invalid rows to file + */ + @Test public void testOutputInvalidRowsToFile() throws Exception { + insertOneValid_OneBadVal_OneMissingTarget(); + + String[] + argValues = + getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.FILE, null); + runScrutiny(argValues); + + // check the output files + Path outputPath = CsvBulkImportUtil.getOutputPath(new Path(outputDir), dataTableFullName); + DistributedFileSystem fs = getUtility().getDFSCluster().getFileSystem(); + List paths = Lists.newArrayList(); + Path firstPart = null; + for (FileStatus outputFile : fs.listStatus(outputPath)) { + if (outputFile.getPath().getName().startsWith("part")) { + if (firstPart == null) { + firstPart = outputFile.getPath(); + } else { + paths.add(outputFile.getPath()); } } - if (dataTableDdl.contains("SALT_BUCKETS")) { - fs.concat(firstPart, paths.toArray(new Path[0])); - } - Path outputFilePath = firstPart; - assertTrue(fs.exists(outputFilePath)); - FSDataInputStream fsDataInputStream = fs.open(outputFilePath); - BufferedReader reader = new BufferedReader(new InputStreamReader(fsDataInputStream)); - TreeSet lines = Sets.newTreeSet(); - try { - String line = null; - while ((line = reader.readLine()) != null) { - lines.add(line); - } - } finally { - IOUtils.closeQuietly(reader); - IOUtils.closeQuietly(fsDataInputStream); + } + if (dataTableDdl.contains("SALT_BUCKETS")) { + fs.concat(firstPart, paths.toArray(new Path[0])); + } + Path outputFilePath = firstPart; + assertTrue(fs.exists(outputFilePath)); + FSDataInputStream fsDataInputStream = fs.open(outputFilePath); + BufferedReader reader = new BufferedReader(new InputStreamReader(fsDataInputStream)); + TreeSet lines = Sets.newTreeSet(); + try { + String line = null; + while ((line = reader.readLine()) != null) { + lines.add(line); } - Iterator lineIterator = lines.iterator(); - assertEquals( - "[2, name-2, " + new Timestamp(testTime).toString() + ", 95123]\t[2, name-2, " + new Timestamp(testTime).toString() + ", 9999]", lineIterator.next()); - assertEquals("[3, name-3, " + new Timestamp(testTime).toString() + ", 95123]\tTarget row not found", - lineIterator.next()); - + } finally { + IOUtils.closeQuietly(reader); + IOUtils.closeQuietly(fsDataInputStream); } + Iterator lineIterator = lines.iterator(); + assertEquals("[2, name-2, " + new Timestamp(testTime).toString() + ", 95123]\t[2, name-2, " + + new Timestamp(testTime).toString() + ", 9999]", lineIterator.next()); + assertEquals("[3, name-3, " + new Timestamp(testTime).toString() + ", 95123]\tTarget row not found", + lineIterator.next()); - /** - * Tests writing of results to the output table - */ - @Test public void testOutputInvalidRowsToTable() throws Exception { - insertOneValid_OneBadVal_OneMissingTarget(); - String[] argValues = - getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, null); - List completedJobs = runScrutiny(argValues); - - // check that the output table contains the invalid rows - long scrutinyTimeMillis = - PhoenixConfigurationUtil - .getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration()); - String invalidRowsQuery = - IndexScrutinyTableOutput - .getSqlQueryAllInvalidRows(conn, getColNames(), scrutinyTimeMillis); - ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery + " ORDER BY ID asc"); - assertTrue(rs.next()); - assertEquals(dataTableFullName, rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME)); - assertEquals(indexTableFullName, rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME)); - assertTrue(rs.getBoolean("HAS_TARGET_ROW")); - assertEquals(2, rs.getInt("ID")); - assertEquals(2, rs.getInt(":ID")); - assertEquals(95123, rs.getInt("ZIP")); - assertEquals(9999, rs.getInt("0:ZIP")); // covered col zip incorrect - assertTrue(rs.next()); - assertEquals(dataTableFullName, rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME)); - assertEquals(indexTableFullName, rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME)); - assertFalse(rs.getBoolean("HAS_TARGET_ROW")); - assertEquals(3, rs.getInt("ID")); - assertEquals(null, rs.getObject(":ID")); // null for missing target row - assertFalse(rs.next()); + } - // check that the job results were written correctly to the metadata table - assertMetadataTableValues(argValues, scrutinyTimeMillis, invalidRowsQuery); - } + /** + * Tests writing of results to the output table + */ + @Test public void testOutputInvalidRowsToTable() throws Exception { + insertOneValid_OneBadVal_OneMissingTarget(); + String[] + argValues = + getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, null); + List completedJobs = runScrutiny(argValues); + + // check that the output table contains the invalid rows + long + scrutinyTimeMillis = + PhoenixConfigurationUtil.getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration()); + String + invalidRowsQuery = + IndexScrutinyTableOutput + .getSqlQueryAllInvalidRows(conn, getColNames(), scrutinyTimeMillis); + ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery + " ORDER BY ID asc"); + assertTrue(rs.next()); + assertEquals(dataTableFullName, rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME)); + assertEquals(indexTableFullName, rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME)); + assertTrue(rs.getBoolean("HAS_TARGET_ROW")); + assertEquals(2, rs.getInt("ID")); + assertEquals(2, rs.getInt(":ID")); + assertEquals(95123, rs.getInt("ZIP")); + assertEquals(9999, rs.getInt("0:ZIP")); // covered col zip incorrect + assertTrue(rs.next()); + assertEquals(dataTableFullName, rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME)); + assertEquals(indexTableFullName, rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME)); + assertFalse(rs.getBoolean("HAS_TARGET_ROW")); + assertEquals(3, rs.getInt("ID")); + assertEquals(null, rs.getObject(":ID")); // null for missing target row + assertFalse(rs.next()); + + // check that the job results were written correctly to the metadata table + assertMetadataTableValues(argValues, scrutinyTimeMillis, invalidRowsQuery); + } - /** - * Tests that the config for max number of output rows is observed - */ - @Test public void testMaxOutputRows() throws Exception { - insertOneValid_OneBadVal_OneMissingTarget(); - // set max to 1. There are two bad rows, but only 1 should get written to output table - String[] argValues = - getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, new Long(1)); - List completedJobs = runScrutiny(argValues); - long scrutinyTimeMillis = - PhoenixConfigurationUtil - .getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration()); - String invalidRowsQuery = - IndexScrutinyTableOutput - .getSqlQueryAllInvalidRows(conn, getColNames(), scrutinyTimeMillis); - ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery); + /** + * Tests that the config for max number of output rows is observed + */ + @Test public void testMaxOutputRows() throws Exception { + insertOneValid_OneBadVal_OneMissingTarget(); + // set max to 1. There are two bad rows, but only 1 should get written to output table + String[] + argValues = + getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, new Long(1)); + List completedJobs = runScrutiny(argValues); + long + scrutinyTimeMillis = + PhoenixConfigurationUtil.getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration()); + String + invalidRowsQuery = + IndexScrutinyTableOutput + .getSqlQueryAllInvalidRows(conn, getColNames(), scrutinyTimeMillis); + ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery); + assertTrue(rs.next()); + if (dataTableDdl.contains("SALT_BUCKETS")) { assertTrue(rs.next()); - if (dataTableDdl.contains("SALT_BUCKETS")) { - assertTrue(rs.next()); - assertFalse(rs.next()); - } else { - assertFalse(rs.next()); - } + assertFalse(rs.next()); + } else { + assertFalse(rs.next()); } + } - private SourceTargetColumnNames getColNames() throws SQLException { - PTable pdataTable = PhoenixRuntime.getTable(conn, dataTableFullName); - PTable pindexTable = PhoenixRuntime.getTable(conn, indexTableFullName); - SourceTargetColumnNames columnNames = - new SourceTargetColumnNames.DataSourceColNames(pdataTable, pindexTable); - return columnNames; - } + private SourceTargetColumnNames getColNames() throws SQLException { + PTable pdataTable = PhoenixRuntime.getTable(conn, dataTableFullName); + PTable pindexTable = PhoenixRuntime.getTable(conn, indexTableFullName); + SourceTargetColumnNames + columnNames = + new SourceTargetColumnNames.DataSourceColNames(pdataTable, pindexTable); + return columnNames; + } - // inserts one valid data/index row, one data row with a missing index row, - // and one data row with an index row that has a bad covered col val - private void insertOneValid_OneBadVal_OneMissingTarget() throws SQLException { - // insert one valid row - upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); - conn.commit(); - - // disable the index and insert another row which is not indexed - disableIndex(); - upsertRow(dataTableUpsertStmt, 2, "name-2", 95123); - upsertRow(dataTableUpsertStmt, 3, "name-3", 95123); - conn.commit(); - - // insert a bad index row for one of the above data rows - upsertIndexRow("name-2", 2, 9999); - conn.commit(); - } + // inserts one valid data/index row, one data row with a missing index row, + // and one data row with an index row that has a bad covered col val + private void insertOneValid_OneBadVal_OneMissingTarget() throws SQLException { + // insert one valid row + upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); + conn.commit(); + + // disable the index and insert another row which is not indexed + disableIndex(); + upsertRow(dataTableUpsertStmt, 2, "name-2", 95123); + upsertRow(dataTableUpsertStmt, 3, "name-3", 95123); + conn.commit(); + + // insert a bad index row for one of the above data rows + upsertIndexRow("name-2", 2, 9999); + conn.commit(); + } - private void assertMetadataTableValues(String[] argValues, long scrutinyTimeMillis, - String invalidRowsQuery) throws SQLException { - ResultSet rs; - ResultSet metadataRs = - IndexScrutinyTableOutput - .queryAllMetadata(conn, dataTableFullName, indexTableFullName, - scrutinyTimeMillis); - assertTrue(metadataRs.next()); - List - expected = + private void assertMetadataTableValues(String[] argValues, long scrutinyTimeMillis, String invalidRowsQuery) throws SQLException { + ResultSet rs; + ResultSet + metadataRs = + IndexScrutinyTableOutput + .queryAllMetadata(conn, dataTableFullName, indexTableFullName, + scrutinyTimeMillis); + assertTrue(metadataRs.next()); + List + expected = + Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis, SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, + 1L, 2L, 1L, 1L, + "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]", + "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]", + invalidRowsQuery); + if (dataTableDdl.contains("SALT_BUCKETS")) { + expected = Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis, SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L, - 2L, 1L, 1L, + 2L, 1L, 2L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]", "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery); - if (dataTableDdl.contains("SALT_BUCKETS")) { - expected = - Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis, - SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L, - 2L, 1L, 2L, - "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]", - "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]", - invalidRowsQuery); - } - - assertRsValues(metadataRs, expected); - String missingTargetQuery = metadataRs.getString("INVALID_ROWS_QUERY_MISSING_TARGET"); - rs = conn.createStatement().executeQuery(missingTargetQuery); - assertTrue(rs.next()); - assertEquals(3, rs.getInt("ID")); - assertFalse(rs.next()); - String badCoveredColQuery = metadataRs.getString("INVALID_ROWS_QUERY_BAD_COVERED_COL_VAL"); - rs = conn.createStatement().executeQuery(badCoveredColQuery); - assertTrue(rs.next()); - assertEquals(2, rs.getInt("ID")); - assertFalse(rs.next()); - } - - // assert the result set contains the expected values in the given order - private void assertRsValues(ResultSet rs, List expected) - throws SQLException { - for (int i = 0; i < expected.size(); i++) { - assertEquals(expected.get(i), rs.getObject(i + 1)); - } - } - - private void generateUniqueTableNames() { - schemaName = generateUniqueName(); - dataTableName = generateUniqueName(); - dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); - indexTableName = generateUniqueName(); - indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); - } - - private void upsertIndexRow(String name, int id, int zip) throws SQLException { - indexTableUpsertStmt.setString(1, name); - indexTableUpsertStmt.setInt(2, id); // id - indexTableUpsertStmt.setInt(3, zip); // bad zip - indexTableUpsertStmt.setTimestamp(4, new Timestamp(testTime)); - indexTableUpsertStmt.executeUpdate(); - } - - private void disableIndex() throws SQLException { - conn.createStatement().execute( - String.format("ALTER INDEX %s ON %S disable", indexTableName, dataTableFullName)); - conn.commit(); } - private String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize, - SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat, Long maxOutputRows) { - return getArgValues(schemaName, dataTable, indxTable, batchSize, sourceTable, - outputInvalidRows, outputFormat, maxOutputRows, null, Long.MAX_VALUE); - } - - private List runScrutinyCurrentSCN(String schemaName, String dataTableName, String indexTableName, Long scrutinyTS) throws Exception { - return runScrutiny( - getArgValues(schemaName, dataTableName, indexTableName, null, SourceTable.BOTH, - false, null, null, null, scrutinyTS)); - } - - private List runScrutiny(String schemaName, String dataTableName, String indexTableName) throws Exception { - return runScrutiny(schemaName, dataTableName, indexTableName, null, null); - } - - private List runScrutiny(String schemaName, String dataTableName, String indexTableName, - Long batchSize) throws Exception { - return runScrutiny(schemaName, dataTableName, indexTableName, batchSize, null); - } - - private List runScrutiny(String schemaName, String dataTableName, String indexTableName, - Long batchSize, SourceTable sourceTable) throws Exception { - final String[] cmdArgs = - getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable, - false, null, null, null, Long.MAX_VALUE); - return runScrutiny(cmdArgs); - } + assertRsValues(metadataRs, expected); + String missingTargetQuery = metadataRs.getString("INVALID_ROWS_QUERY_MISSING_TARGET"); + rs = conn.createStatement().executeQuery(missingTargetQuery); + assertTrue(rs.next()); + assertEquals(3, rs.getInt("ID")); + assertFalse(rs.next()); + String badCoveredColQuery = metadataRs.getString("INVALID_ROWS_QUERY_BAD_COVERED_COL_VAL"); + rs = conn.createStatement().executeQuery(badCoveredColQuery); + assertTrue(rs.next()); + assertEquals(2, rs.getInt("ID")); + assertFalse(rs.next()); + } - private void upsertRow(PreparedStatement stmt, int id, String name, int zip) - throws SQLException { - int index = 1; - // insert row - stmt.setInt(index++, id); - stmt.setString(index++, name); - stmt.setInt(index++, zip); - stmt.setTimestamp(index++, new Timestamp(testTime)); - stmt.executeUpdate(); + // assert the result set contains the expected values in the given order + private void assertRsValues(ResultSet rs, List expected) throws SQLException { + for (int i = 0; i < expected.size(); i++) { + assertEquals(expected.get(i), rs.getObject(i + 1)); } + } - private int deleteRow(String fullTableName, String whereCondition) throws SQLException { - String deleteSql = String.format(DELETE_SQL, indexTableFullName) + whereCondition; - PreparedStatement deleteStmt = conn.prepareStatement(deleteSql); - return deleteStmt.executeUpdate(); - } + private void generateUniqueTableNames() { + schemaName = generateUniqueName(); + dataTableName = generateUniqueName(); + dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + indexTableName = generateUniqueName(); + indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + } + private void upsertIndexRow(String name, int id, int zip) throws SQLException { + indexTableUpsertStmt.setString(1, name); + indexTableUpsertStmt.setInt(2, id); // id + indexTableUpsertStmt.setInt(3, zip); // bad zip + indexTableUpsertStmt.setTimestamp(4, new Timestamp(testTime)); + indexTableUpsertStmt.executeUpdate(); } - public static class IndexScrutinyToolTenantIT extends SharedIndexToolIT { - private Connection connGlobal = null; - private Connection connTenant = null; - - private String tenantId; - private String tenantViewName; - private String indexNameTenant; - private String multiTenantTable; - private String viewIndexTableName; - - private final String createViewStr = "CREATE VIEW %s AS SELECT * FROM %s"; - private final String - upsertQueryStr = - "UPSERT INTO %s (COL1, ID, NAME) VALUES('%s' , %d, '%s')"; - private final String createIndexStr = "CREATE INDEX %s ON %s (NAME) "; - - /** - * Create the test data - */ - @Before public void setup() throws SQLException { - tenantId = generateUniqueName(); - tenantViewName = generateUniqueName(); - indexNameTenant = generateUniqueName(); - multiTenantTable = generateUniqueName(); - viewIndexTableName = "_IDX_" + multiTenantTable; - - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - connGlobal = DriverManager.getConnection(getUrl(), props); - - props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); - connTenant = DriverManager.getConnection(getUrl(), props); - String - createTblStr = - "CREATE TABLE %s (COL1 VARCHAR(15) NOT NULL,ID INTEGER NOT NULL" + ", NAME VARCHAR, CONSTRAINT PK_1 PRIMARY KEY (COL1, ID)) MULTI_TENANT=true"; - - createTestTable(getUrl(), String.format(createTblStr, multiTenantTable)); - - connTenant.createStatement().execute( - String.format(createViewStr, tenantViewName, multiTenantTable)); - - String idxStmtTenant = String.format(createIndexStr, indexNameTenant, tenantViewName); - connTenant.createStatement().execute(idxStmtTenant); - } + private void disableIndex() throws SQLException { + conn.createStatement().execute( + String.format("ALTER INDEX %s ON %S disable", indexTableName, dataTableFullName)); + conn.commit(); + } - @After public void teardown() throws SQLException { - if (connGlobal != null) { - connGlobal.close(); - } - if (connTenant != null) { - connTenant.close(); - } - } + private String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize, + SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat, Long maxOutputRows) { + return getArgValues(schemaName, dataTable, indxTable, batchSize, sourceTable, + outputInvalidRows, outputFormat, maxOutputRows, null, Long.MAX_VALUE); + } - /** - * Tests that the config for max number of output rows is observed - */ - @Test public void testTenantViewAndIndexEqual() throws Exception { - connTenant.createStatement() - .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 1, "x")); - connTenant.commit(); - - String[] argValues = - getArgValues("", tenantViewName, indexNameTenant, 10L, - SourceTable.INDEX_TABLE_SOURCE, false, null, null, tenantId, - EnvironmentEdgeManager.currentTimeMillis()); - - List completedJobs = runScrutiny(argValues); - // Sunny case, both index and view are equal. 1 row - assertEquals(1, completedJobs.size()); - for (Job job : completedJobs) { - assertTrue(job.isSuccessful()); - Counters counters = job.getCounters(); - assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); - assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT)); - } - } + private List runScrutinyCurrentSCN(String schemaName, String dataTableName, String indexTableName, Long scrutinyTS) throws Exception { + return runScrutiny(getArgValues(schemaName, dataTableName, indexTableName, null, SourceTable.BOTH, + false, null, null, null, scrutinyTS)); + } - /** - * Tests global view on multi-tenant table should work too - **/ - @Test public void testGlobalViewOnMultiTenantTable() throws Exception { - String globalViewName = generateUniqueName(); - String indexNameGlobal = generateUniqueName(); - - connGlobal.createStatement().execute( - String.format(createViewStr, globalViewName, multiTenantTable)); - - String idxStmtGlobal = String.format(createIndexStr, indexNameGlobal, globalViewName); - connGlobal.createStatement().execute(idxStmtGlobal); - connGlobal.createStatement() - .execute(String.format(upsertQueryStr, globalViewName, "global", 5, "x")); - connGlobal.commit(); - String[] argValues = - getArgValues("", globalViewName, indexNameGlobal, 10L, - SourceTable.INDEX_TABLE_SOURCE, false, null, null, null, - EnvironmentEdgeManager.currentTimeMillis()); - List completedJobs = runScrutiny(argValues); - // Sunny case, both index and view are equal. 1 row - assertEquals(1, completedJobs.size()); - for (Job job : completedJobs) { - assertTrue(job.isSuccessful()); - Counters counters = job.getCounters(); - assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); - assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT)); - } - } + private List runScrutiny(String schemaName, String dataTableName, String indexTableName) + throws Exception { + return runScrutiny(schemaName, dataTableName, indexTableName, null, null); + } - /** - * Use Both as source. Add 1 row to tenant view and disable index. - * Add 1 more to view and add a wrong row to index. - * Both have 1 invalid row, 1 valid row. - **/ - @Test - public void testOneValidOneInvalidUsingBothAsSource() throws Exception { - connTenant.createStatement() - .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 1, "x")); - connTenant.commit(); - connTenant.createStatement().execute( - String.format("ALTER INDEX %s ON %S disable", indexNameTenant, tenantViewName)); - - connTenant.createStatement() - .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 2, "x2")); - - connTenant.createStatement().execute(String.format("UPSERT INTO %s (\":ID\", \"0:NAME\") values (%d, '%s')", - indexNameTenant, 5555, "wrongName")); - connTenant.commit(); - - String[] - argValues = - getArgValues("", tenantViewName, indexNameTenant, 10L, SourceTable.BOTH, false, - null, null, tenantId, EnvironmentEdgeManager.currentTimeMillis()); - List completedJobs = runScrutiny(argValues); - - assertEquals(2, completedJobs.size()); - for (Job job : completedJobs) { - assertTrue(job.isSuccessful()); - Counters counters = job.getCounters(); - assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); - assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT)); - } - } + private List runScrutiny(String schemaName, String dataTableName, String indexTableName, + Long batchSize) throws Exception { + return runScrutiny(schemaName, dataTableName, indexTableName, batchSize, null); + } - /** - * Add 3 rows to Tenant view. - * Empty index table and observe they are not equal. - * Use data table as source and output to file. - **/ - @Test public void testWithEmptyIndexTableOutputToFile() throws Exception{ - testWithOutput(OutputFormat.FILE); - } + private List runScrutiny(String schemaName, String dataTableName, String indexTableName, + Long batchSize, SourceTable sourceTable) throws Exception { + final String[] + cmdArgs = + getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable, + false, null, null, null, Long.MAX_VALUE); + return runScrutiny(cmdArgs); + } - @Test public void testWithEmptyIndexTableOutputToTable() throws Exception{ - testWithOutput(OutputFormat.TABLE); - assertEquals(3, countRows(connGlobal, OUTPUT_TABLE_NAME)); - } + private void upsertRow(PreparedStatement stmt, int id, String name, int zip) throws SQLException { + int index = 1; + // insert row + stmt.setInt(index++, id); + stmt.setString(index++, name); + stmt.setInt(index++, zip); + stmt.setTimestamp(index++, new Timestamp(testTime)); + stmt.executeUpdate(); + } - private void testWithOutput(OutputFormat outputFormat) throws Exception { - connTenant.createStatement() - .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 1, "x")); - connTenant.createStatement() - .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 2, "x2")); - connTenant.createStatement() - .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 3, "x3")); - connTenant.createStatement().execute(String.format("UPSERT INTO %s (\":ID\", \"0:NAME\") values (%d, '%s')", - indexNameTenant, 5555, "wrongName")); - connTenant.commit(); - - ConnectionQueryServices queryServices = connGlobal.unwrap(PhoenixConnection.class).getQueryServices(); - Admin admin = queryServices.getAdmin(); - TableName tableName = TableName.valueOf(viewIndexTableName); - admin.disableTable(tableName); - admin.truncateTable(tableName, false); - - String[] - argValues = - getArgValues("", tenantViewName, indexNameTenant, 10L, SourceTable.DATA_TABLE_SOURCE, true, outputFormat, null, - tenantId, EnvironmentEdgeManager.currentTimeMillis()); - List completedJobs = runScrutiny(argValues); - - assertEquals(1, completedJobs.size()); - for (Job job : completedJobs) { - assertTrue(job.isSuccessful()); - Counters counters = job.getCounters(); - assertEquals(0, getCounterValue(counters, VALID_ROW_COUNT)); - assertEquals(3, getCounterValue(counters, INVALID_ROW_COUNT)); - } - } + private int deleteRow(String fullTableName, String whereCondition) throws SQLException { + String deleteSql = String.format(DELETE_SQL, indexTableFullName) + whereCondition; + PreparedStatement deleteStmt = conn.prepareStatement(deleteSql); + return deleteStmt.executeUpdate(); } + } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java index df8c7abf08d..ae11b3d2b81 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java @@ -61,9 +61,10 @@ public IndexColumnNames(final PTable pdataTable, final PTable pindexTable) { if (pindexTable.getViewIndexId() != null) { offset++; } - if (pindexTable.isMultiTenant()) { + if (pindexTable.isMultiTenant() && pindexTable.getViewIndexId() != null) { offset++; } + if (offset > 0) { pindexCols = pindexCols.subList(offset, pindexCols.size()); pkColumns = pkColumns.subList(offset, pkColumns.size());