diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultPhoenixMultiViewListProviderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultPhoenixMultiViewListProviderIT.java new file mode 100644 index 00000000000..94b8b566f37 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DefaultPhoenixMultiViewListProviderIT.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.mapreduce.util.DefaultPhoenixMultiViewListProvider; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil; +import org.apache.phoenix.mapreduce.util.ViewInfoWritable; +import org.apache.phoenix.util.PropertiesUtil; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.List; + +import static org.apache.phoenix.mapreduce.PhoenixTTLTool.DELETE_ALL_VIEWS; +import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_QUERY_BATCH_SIZE; +import static org.junit.Assert.assertEquals; + +@Category(NeedsOwnMiniClusterTest.class) +public class DefaultPhoenixMultiViewListProviderIT extends ParallelStatsDisabledIT { + private final String BASE_TABLE_DDL = "CREATE TABLE %s (TENANT_ID CHAR(10) NOT NULL, " + + "ID CHAR(10) NOT NULL, NUM BIGINT CONSTRAINT " + + "PK PRIMARY KEY (TENANT_ID,ID)) MULTI_TENANT=true, COLUMN_ENCODED_BYTES = 0"; + private final String VIEW_DDL = "CREATE VIEW %s (" + + "PK1 BIGINT PRIMARY KEY,A BIGINT, B BIGINT, C BIGINT, D BIGINT)" + + " AS SELECT * FROM %s "; + private final String VIEW_DDL_WITH_ID_PREFIX_AND_TTL = + VIEW_DDL + " PHOENIX_TTL = 1000"; + private final String VIEW_INDEX_DDL = "CREATE INDEX %s ON %s(%s)"; + private final String TENANT_VIEW_DDL = + "CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s"; + private final String TENANT_VIEW_DDL_WITH_TTL = TENANT_VIEW_DDL + " PHOENIX_TTL = 1000";; + + @Test + public void testGetPhoenixMultiViewList() throws Exception { + String schema = generateUniqueName(); + String baseTableFullName = schema + "." + generateUniqueName(); + String globalViewName1 = schema + "." + generateUniqueName(); + String globalViewName2 = schema + "." + generateUniqueName(); + String tenantViewName1 = schema + "." + generateUniqueName(); + String tenantViewName2 = schema + "." + generateUniqueName(); + String tenantViewName3 = schema + "." + generateUniqueName(); + String tenantViewName4 = schema + "." + generateUniqueName(); + String indexTable1 = generateUniqueName() + "_IDX"; + String indexTable2 = generateUniqueName() + "_IDX"; + String indexTable3 = generateUniqueName() + "_IDX"; + String indexTable4 = generateUniqueName() + "_IDX"; + String tenant1 = generateUniqueName(); + String tenant2 = generateUniqueName(); + DefaultPhoenixMultiViewListProvider defaultPhoenixMultiViewListProvider = + new DefaultPhoenixMultiViewListProvider(); + Configuration cloneConfig = PropertiesUtil.cloneConfig(config); + + cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_ALL_VIEWS, + DELETE_ALL_VIEWS); + cloneConfig.set(MAPREDUCE_MULTI_INPUT_QUERY_BATCH_SIZE,"2"); + List result = + defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig); + + /* + Case 1 : no view + */ + assertEquals(0, result.size()); + + /* + Case 2 : + BaseMultiTenantTable + GlobalView1 with TTL(1 ms) + Index1 Index2 + TenantView1, TenantView2 + */ + try (Connection globalConn = DriverManager.getConnection(url); + Connection tenant1Connection = + PhoenixMultiInputUtil.buildTenantConnection(url, tenant1); + Connection tenant2Connection = + PhoenixMultiInputUtil.buildTenantConnection(url, tenant2)) { + + globalConn.createStatement().execute(String.format(BASE_TABLE_DDL, baseTableFullName)); + globalConn.createStatement().execute(String.format(VIEW_DDL_WITH_ID_PREFIX_AND_TTL, + globalViewName1, baseTableFullName)); + + globalConn.createStatement().execute( + String.format(VIEW_INDEX_DDL, indexTable1, globalViewName1, "A,B")); + globalConn.createStatement().execute( + String.format(VIEW_INDEX_DDL, indexTable2, globalViewName1, "C,D")); + + tenant1Connection.createStatement().execute( + String.format(TENANT_VIEW_DDL,tenantViewName1, globalViewName1)); + tenant2Connection.createStatement().execute( + String.format(TENANT_VIEW_DDL,tenantViewName2, globalViewName1)); + } + + result = defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig); + // view with 2 index views is issuing 3 deletion jobs + // 1 from the data table, 2 from the index table. + assertEquals(3, result.size()); + + /* + Case 3: globalView2 without TTL + BaseMultiTenantTable + GlobalView1 with TTL(1 ms) GlobalView2 without TTL + Index1 Index2 Index3 Index4 + TenantView1, TenantView2 + */ + try (Connection globalConn = DriverManager.getConnection(url)) { + globalConn.createStatement().execute(String.format(VIEW_DDL, + globalViewName2, baseTableFullName)); + globalConn.createStatement().execute( + String.format(VIEW_INDEX_DDL, indexTable3, globalViewName2, "A,B")); + globalConn.createStatement().execute( + String.format(VIEW_INDEX_DDL, indexTable4, globalViewName2, "C,D")); + } + result = defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig); + assertEquals(3, result.size()); + + /* + Case 4: adding tenant3 and tenant4 with TTL + BaseMultiTenantTable + GlobalView1 with TTL(1 ms) GlobalView2 without TTL + Index1 Index2 Index3 Index4 + TenantView1, TenantView2 TenantView3 with TTL, TenantView4 without TTL + */ + + try (Connection tenant1Connection = + PhoenixMultiInputUtil.buildTenantConnection(url, tenant1); + Connection tenant2Connection = + PhoenixMultiInputUtil.buildTenantConnection(url, tenant2)) { + tenant1Connection.createStatement().execute( + String.format(TENANT_VIEW_DDL_WITH_TTL,tenantViewName3, globalViewName2)); + tenant2Connection.createStatement().execute( + String.format(TENANT_VIEW_DDL,tenantViewName4, globalViewName2)); + } + result = defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig); + assertEquals(6, result.size()); + + /* + Testing tenant specific case. Even tenant1 created 2 leaf views, one of them was created + under a global view with TTL. This will not add to the deletion list. + */ + cloneConfig = PropertiesUtil.cloneConfig(config); + cloneConfig.set(MAPREDUCE_MULTI_INPUT_QUERY_BATCH_SIZE,"2"); + cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID, tenant1); + result = defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig); + assertEquals(3, result.size()); + + /* + Deleting tenant1 with tenantViewName1 will NOT add any deletion job to the list because + the parent global view has TTL value. + */ + cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID, tenant1); + cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW, + tenantViewName1); + result = defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig); + assertEquals(0, result.size()); + + /* + Without tenant id, it will not add the job to the list even the tenant view name is + provided. + */ + cloneConfig = PropertiesUtil.cloneConfig(config); + cloneConfig.set(MAPREDUCE_MULTI_INPUT_QUERY_BATCH_SIZE,"2"); + cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW, + tenantViewName3); + result = defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig); + assertEquals(0, result.size()); + + /* + tenant id + tenant view name will add 3 job to the list. + 1 for data table and 2 for the index table. + */ + cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID, tenant1); + cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW, + tenantViewName3); + result = defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig); + assertEquals(3, result.size()); + + /* + tenant id + tenant view name will NOT add ot the list because tenantViewName4 did NOT + have TTL value. + */ + cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID, tenant2); + cloneConfig.set(PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW, + tenantViewName4); + result = defaultPhoenixMultiViewListProvider.getPhoenixMultiViewList(cloneConfig); + assertEquals(0, result.size()); + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixTTLToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixTTLToolIT.java new file mode 100644 index 00000000000..77d923fb2c2 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixTTLToolIT.java @@ -0,0 +1,816 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.RegexStringComparator; +import org.apache.phoenix.mapreduce.PhoenixTTLTool; +import org.apache.phoenix.mapreduce.util.PhoenixMultiInputUtil; +import org.apache.phoenix.query.HBaseFactoryProvider; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@Category(NeedsOwnMiniClusterTest.class) +public class PhoenixTTLToolIT extends ParallelStatsDisabledIT { + + private final long PHOENIX_TTL_EXPIRE_IN_A_MILLISECOND = 1; + private final long PHOENIX_TTL_EXPIRE_IN_A_DAY = 1000 * 60 * 60 * 24; + + private final String VIEW_PREFIX1 = "V01"; + private final String VIEW_PREFIX2 = "V02"; + private final String UPSERT_TO_GLOBAL_VIEW_QUERY = + "UPSERT INTO %s (PK1,A,B,C,D) VALUES(1,1,1,1,1)"; + private final String UPSERT_TO_LEAF_VIEW_QUERY = + "UPSERT INTO %s (PK1,A,B,C,D,E,F) VALUES(1,1,1,1,1,1,1)"; + private final String VIEW_DDL_WITH_ID_PREFIX_AND_TTL = "CREATE VIEW %s (" + + "PK1 BIGINT PRIMARY KEY,A BIGINT, B BIGINT, C BIGINT, D BIGINT)" + + " AS SELECT * FROM %s WHERE ID = '%s' PHOENIX_TTL = %d"; + private final String VIEW_INDEX_DDL = "CREATE INDEX %s ON %s(%s)"; + private final String TENANT_VIEW_DDL = + "CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s"; + + private void verifyNumberOfRowsFromHBaseLevel(String tableName, String regrex, int expectedRows) + throws Exception { + try (Table table = HBaseFactoryProvider.getHConnectionFactory(). + createConnection(config).getTable(tableName)) { + Filter filter = + new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regrex)); + Scan scan = new Scan(); + scan.setFilter(filter); + assertEquals(expectedRows, getRowCount(table,scan)); + } + } + + private void verifyNumberOfRows(String tableName, String tenantId, int expectedRows, + Connection conn) throws Exception { + String query = "SELECT COUNT(*) FROM " + tableName; + if (tenantId != null) { + query = query + " WHERE TENANT_ID = '" + tenantId + "'"; + } + try (Statement stm = conn.createStatement()) { + + ResultSet rs = stm.executeQuery(query); + assertTrue(rs.next()); + assertEquals(expectedRows, rs.getInt(1)); + } + } + + private long getRowCount(Table table, Scan scan) throws Exception { + ResultScanner scanner = table.getScanner(scan); + int count = 0; + for (Result dummy : scanner) { + count++; + } + scanner.close(); + return count; + } + + private void createMultiTenantTable(Connection conn, String tableName) throws Exception { + String ddl = "CREATE TABLE " + tableName + + " (TENANT_ID CHAR(10) NOT NULL, ID CHAR(10) NOT NULL, NUM BIGINT CONSTRAINT " + + "PK PRIMARY KEY (TENANT_ID,ID)) MULTI_TENANT=true, COLUMN_ENCODED_BYTES = 0"; + + try (Statement stmt = conn.createStatement()) { + stmt.execute(ddl); + } + } + + /* + BaseMultiTenantTable + GlobalView1 with TTL(1 ms) + Index1 Index2 + + Creating 2 tenantViews and Upserting data. + After running the MR job, it should delete all data. + */ + @Test + public void testTenantViewOnGlobalViewWithMoreThanOneIndex() throws Exception { + String schema = generateUniqueName(); + String baseTableFullName = schema + "." + generateUniqueName(); + String indexTable1 = generateUniqueName() + "_IDX"; + String indexTable2 = generateUniqueName() + "_IDX"; + String globalViewName = schema + "." + generateUniqueName(); + String tenant1 = generateUniqueName(); + String tenant2 = generateUniqueName(); + String tenantView1 = schema + "." + generateUniqueName(); + String tenantView2 = schema + "." + generateUniqueName(); + String indexTable = "_IDX_" + baseTableFullName; + + try (Connection globalConn = DriverManager.getConnection(getUrl()); + Connection tenant1Connection = + PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1); + Connection tenant2Connection = + PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) { + + createMultiTenantTable(globalConn, baseTableFullName); + globalConn.createStatement().execute(String.format(VIEW_DDL_WITH_ID_PREFIX_AND_TTL, + globalViewName, baseTableFullName, VIEW_PREFIX1, + PHOENIX_TTL_EXPIRE_IN_A_MILLISECOND)); + + globalConn.createStatement().execute( + String.format(VIEW_INDEX_DDL, indexTable1, globalViewName, "A,B")); + globalConn.createStatement().execute( + String.format(VIEW_INDEX_DDL, indexTable2, globalViewName, "C,D")); + + tenant1Connection.createStatement().execute( + String.format(TENANT_VIEW_DDL,tenantView1, globalViewName)); + tenant2Connection.createStatement().execute( + String.format(TENANT_VIEW_DDL,tenantView2, globalViewName)); + + tenant1Connection.createStatement().execute( + String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantView1)); + tenant1Connection.commit(); + verifyNumberOfRows(baseTableFullName, tenant1, 1, globalConn); + tenant2Connection.createStatement().execute( + String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantView2)); + tenant2Connection.commit(); + verifyNumberOfRows(baseTableFullName, tenant2, 1, globalConn); + + // the view has 2 view indexes, so upsert 1 row(base table) will result + // 2 rows(index table) + verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant1 + ".*", 2); + verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant2 + ".*", 2); + + // running MR job to delete expired rows. + PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool(); + Configuration conf = new Configuration(getUtility().getConfiguration()); + phoenixTtlTool.setConf(conf); + int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"}); + assertEquals(0, status); + + verifyNumberOfRows(baseTableFullName, tenant1, 0, globalConn); + verifyNumberOfRows(baseTableFullName, tenant2, 0, globalConn); + verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant2 + ".*", 0); + verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant1 + ".*", 0); + } + } + + /* + BaseMultiTenantTable + GlobalView1 with TTL(1 ms) GlobalView2 with TTL(1 DAY) + Index1 Index2 Index3 Index4 + + Upserting data to both global views and run the MR job. + It should only delete GlobalView1 data not remove GlobalView2 data. + */ + @Test + public void testGlobalViewWithMoreThanOneIndex() throws Exception { + String schema = generateUniqueName(); + String baseTableFullName = schema + "." + generateUniqueName(); + String globalViewName1 = schema + "." + generateUniqueName(); + String globalViewName2 = schema + "." + generateUniqueName(); + String indexTable1 = generateUniqueName() + "_IDX"; + String indexTable2 = generateUniqueName() + "_IDX"; + String indexTable3 = generateUniqueName() + "_IDX"; + String indexTable4 = generateUniqueName() + "_IDX"; + String indexTable = "_IDX_" + baseTableFullName; + String tenant1 = generateUniqueName(); + String tenant2 = generateUniqueName(); + + try (Connection globalConn = DriverManager.getConnection(getUrl()); + Connection tenant1Connection = + PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1); + Connection tenant2Connection = + PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) { + + createMultiTenantTable(globalConn, baseTableFullName); + + globalConn.createStatement().execute(String.format(VIEW_DDL_WITH_ID_PREFIX_AND_TTL, + globalViewName1, baseTableFullName, VIEW_PREFIX1, + PHOENIX_TTL_EXPIRE_IN_A_MILLISECOND)); + globalConn.createStatement().execute(String.format(VIEW_DDL_WITH_ID_PREFIX_AND_TTL, + globalViewName2, baseTableFullName, VIEW_PREFIX2, PHOENIX_TTL_EXPIRE_IN_A_DAY)); + + globalConn.createStatement().execute( + String.format(VIEW_INDEX_DDL, indexTable1, globalViewName1, "A,B")); + globalConn.createStatement().execute( + String.format(VIEW_INDEX_DDL, indexTable2, globalViewName1, "C,D")); + + globalConn.createStatement().execute( + String.format(VIEW_INDEX_DDL, indexTable3, globalViewName2, "A,B")); + globalConn.createStatement().execute( + String.format(VIEW_INDEX_DDL, indexTable4, globalViewName2, "C,D")); + + tenant1Connection.createStatement().execute( + String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName1)); + tenant1Connection.commit(); + tenant1Connection.createStatement().execute( + String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName2)); + tenant1Connection.commit(); + verifyNumberOfRows(baseTableFullName, tenant1, 2, globalConn); + + tenant2Connection.createStatement().execute( + String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName1)); + tenant2Connection.commit(); + tenant2Connection.createStatement().execute( + String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName2)); + tenant2Connection.commit(); + verifyNumberOfRows(baseTableFullName, tenant2, 2, globalConn); + + verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant1 + ".*", 4); + verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant2 + ".*", 4); + + // running MR job to delete expired rows. + PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool(); + Configuration conf = new Configuration(getUtility().getConfiguration()); + phoenixTtlTool.setConf(conf); + int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"}); + assertEquals(0, status); + + verifyNumberOfRows(baseTableFullName, tenant1, 1, globalConn); + verifyNumberOfRows(baseTableFullName, tenant2, 1, globalConn); + verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant2 + ".*", 2); + verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant1 + ".*", 2); + } + } + + /* + BaseMultiTenantTable + GlobalView1 with TTL(1 ms) GlobalView2 with TTL(1 DAY) + Index1 Index2 Index3 Index4 + TenantView1 TenantView2 + + Upserting data to both global views, and run the MR job. + It should only delete GlobalView1 data not remove GlobalView2 data. + */ + @Test + public void testTenantViewCase() throws Exception { + String schema = generateUniqueName(); + String baseTableFullName = schema + "." + generateUniqueName(); + String globalViewName1 = schema + "." + generateUniqueName(); + String globalViewName2 = schema + "." + generateUniqueName(); + String tenantViewName1 = schema + "." + generateUniqueName(); + String tenantViewName2 = schema + "." + generateUniqueName(); + String indexTable1 = generateUniqueName() + "_IDX"; + String indexTable2 = generateUniqueName() + "_IDX"; + String indexTable3 = generateUniqueName() + "_IDX"; + String indexTable4 = generateUniqueName() + "_IDX"; + String indexTable = "_IDX_" + baseTableFullName; + String tenant1 = generateUniqueName(); + String tenant2 = generateUniqueName(); + + try (Connection globalConn = DriverManager.getConnection(getUrl()); + Connection tenant1Connection = + PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1); + Connection tenant2Connection = + PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) { + + createMultiTenantTable(globalConn, baseTableFullName); + String ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " + + "A BIGINT, B BIGINT, C BIGINT, D BIGINT)" + + " AS SELECT * FROM " + baseTableFullName + " WHERE ID ='%s' PHOENIX_TTL = %d"; + + globalConn.createStatement().execute( + String.format(ddl, globalViewName1, VIEW_PREFIX1, + PHOENIX_TTL_EXPIRE_IN_A_MILLISECOND)); + globalConn.createStatement().execute( + String.format(ddl, globalViewName2, VIEW_PREFIX2, PHOENIX_TTL_EXPIRE_IN_A_DAY)); + + ddl = "CREATE INDEX %s ON %s(%s)"; + + globalConn.createStatement().execute( + String.format(ddl, indexTable1, globalViewName1, "A,B")); + globalConn.createStatement().execute( + String.format(ddl, indexTable2, globalViewName1, "C,D")); + + globalConn.createStatement().execute( + String.format(ddl, indexTable3, globalViewName2, "A,B")); + globalConn.createStatement().execute( + String.format(ddl, indexTable4, globalViewName2, "C,D")); + + ddl = "CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s"; + tenant1Connection.createStatement().execute( + String.format(ddl, tenantViewName1, globalViewName1)); + tenant1Connection.createStatement().execute( + String.format(ddl, tenantViewName2, globalViewName2)); + + tenant2Connection.createStatement().execute( + String.format(ddl, tenantViewName1, globalViewName1)); + tenant2Connection.createStatement().execute( + String.format(ddl, tenantViewName2, globalViewName2)); + + tenant1Connection.createStatement().execute( + String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName1)); + tenant1Connection.commit(); + tenant1Connection.createStatement().execute( + String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName2)); + tenant1Connection.commit(); + verifyNumberOfRows(baseTableFullName, tenant1, 2, globalConn); + + tenant2Connection.createStatement().execute( + String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName1)); + tenant2Connection.commit(); + tenant2Connection.createStatement().execute( + String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName2)); + tenant2Connection.commit(); + verifyNumberOfRows(baseTableFullName, tenant2, 2, globalConn); + + verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant1 + ".*", 4); + verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant2 + ".*", 4); + + // running MR job to delete expired rows. + PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool(); + Configuration conf = new Configuration(getUtility().getConfiguration()); + phoenixTtlTool.setConf(conf); + int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"}); + assertEquals(0, status); + + verifyNumberOfRows(baseTableFullName, tenant1, 1, globalConn); + verifyNumberOfRows(baseTableFullName, tenant2, 1, globalConn); + verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant2 + ".*", 2); + verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + tenant1 + ".*", 2); + } + } + + /* + BaseMultiTenantTable + GlobalView1 with TTL(1 ms) GlobalView2 with TTL(1 DAY) + Upserting data to both global views, and run the MR job. + It should only delete GlobalView1 data not remove GlobalView2 data. + */ + @Test + public void testGlobalViewWithNoIndex() throws Exception { + String schema = generateUniqueName(); + String baseTableFullName = schema + "." + generateUniqueName(); + String globalViewName1 = schema + "." + generateUniqueName(); + String globalViewName2 = schema + "." + generateUniqueName(); + String tenant1 = generateUniqueName(); + String tenant2 = generateUniqueName(); + + try (Connection globalConn = DriverManager.getConnection(getUrl()); + Connection tenant1Connection = + PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1); + Connection tenant2Connection = + PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) { + + createMultiTenantTable(globalConn, baseTableFullName); + String ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " + + "A BIGINT, B BIGINT, C BIGINT, D BIGINT)" + + " AS SELECT * FROM " + baseTableFullName + " WHERE ID ='%s' PHOENIX_TTL = %d"; + + globalConn.createStatement().execute( + String.format(ddl, globalViewName1, VIEW_PREFIX1, + PHOENIX_TTL_EXPIRE_IN_A_MILLISECOND)); + globalConn.createStatement().execute( + String.format(ddl, globalViewName2, VIEW_PREFIX2, PHOENIX_TTL_EXPIRE_IN_A_DAY)); + + tenant1Connection.createStatement().execute( + String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName1)); + tenant1Connection.commit(); + tenant1Connection.createStatement().execute( + String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName2)); + tenant1Connection.commit(); + verifyNumberOfRows(baseTableFullName, tenant1, 2, globalConn); + + tenant2Connection.createStatement().execute( + String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName1)); + tenant2Connection.commit(); + tenant2Connection.createStatement().execute( + String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName2)); + tenant2Connection.commit(); + verifyNumberOfRows(baseTableFullName, tenant2, 2, globalConn); + + // running MR job to delete expired rows. + PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool(); + Configuration conf = new Configuration(getUtility().getConfiguration()); + phoenixTtlTool.setConf(conf); + int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"}); + assertEquals(0, status); + + verifyNumberOfRows(baseTableFullName, tenant1, 1, globalConn); + verifyNumberOfRows(baseTableFullName, tenant2, 1, globalConn); + } + } + + /* + BaseTable + GlobalView1 with TTL(1 ms) GlobalView2 with TTL(1 DAY) + Upserting data to both global views, and run the MR job. + It should only delete GlobalView1 data not remove GlobalView2 data. + */ + @Test + public void testGlobalViewOnNonMultiTenantTable() throws Exception { + String schema = generateUniqueName(); + String baseTableFullName = schema + "." + generateUniqueName(); + String globalViewName1 = schema + "." + generateUniqueName(); + String globalViewName2 = schema + "." + generateUniqueName(); + + try (Connection globalConn = DriverManager.getConnection(getUrl())) { + String ddl = "CREATE TABLE " + baseTableFullName + + " (ID CHAR(10) NOT NULL PRIMARY KEY, NUM BIGINT)"; + globalConn.createStatement().execute(ddl); + + ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " + + "A BIGINT, B BIGINT, C BIGINT, D BIGINT)" + + " AS SELECT * FROM " + baseTableFullName + " WHERE ID ='%s' PHOENIX_TTL = %d"; + + globalConn.createStatement().execute( + String.format(ddl, globalViewName1, VIEW_PREFIX1, + PHOENIX_TTL_EXPIRE_IN_A_MILLISECOND)); + globalConn.createStatement().execute( + String.format(ddl, globalViewName2, VIEW_PREFIX2, PHOENIX_TTL_EXPIRE_IN_A_DAY)); + + globalConn.createStatement().execute( + String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName1)); + globalConn.commit(); + globalConn.createStatement().execute( + String.format(UPSERT_TO_GLOBAL_VIEW_QUERY, globalViewName2)); + globalConn.commit(); + verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 1); + verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1); + + // running MR job to delete expired rows. + PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool(); + Configuration conf = new Configuration(getUtility().getConfiguration()); + phoenixTtlTool.setConf(conf); + int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"}); + assertEquals(0, status); + verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 0); + verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1); + } + } + + /* + BaseTable + GlobalView1 with TTL(1 ms) GlobalView2 with TTL(1 DAY) + Index1 Index2 Index3 Index4 + + Upserting data to both global views, and run the MR job. + It should only delete GlobalView1 data not remove GlobalView2 data. + */ + @Test + public void testGlobalViewOnNonMultiTenantTableWithIndex() throws Exception { + String schema = generateUniqueName(); + String baseTableFullName = schema + "." + generateUniqueName(); + String globalViewName1 = schema + "." + generateUniqueName(); + String globalViewName2 = schema + "." + generateUniqueName(); + String indexTable1 = generateUniqueName() + "_IDX"; + String indexTable2 = generateUniqueName() + "_IDX"; + String indexTable3 = generateUniqueName() + "_IDX"; + String indexTable4 = generateUniqueName() + "_IDX"; + String indexTable = "_IDX_" + baseTableFullName; + + try (Connection globalConn = DriverManager.getConnection(getUrl())) { + String ddl = "CREATE TABLE " + baseTableFullName + + " (PK1 BIGINT NOT NULL, ID CHAR(10) NOT NULL, NUM BIGINT CONSTRAINT " + + "PK PRIMARY KEY (PK1,ID))"; + globalConn.createStatement().execute(ddl); + + ddl = "CREATE VIEW %s (PK2 BIGINT PRIMARY KEY, " + + "A BIGINT, B BIGINT, C BIGINT, D BIGINT)" + + " AS SELECT * FROM " + baseTableFullName + " WHERE PK1=%d PHOENIX_TTL = %d"; + + globalConn.createStatement().execute( + String.format(ddl, globalViewName1, 1, PHOENIX_TTL_EXPIRE_IN_A_MILLISECOND)); + globalConn.createStatement().execute( + String.format(ddl, globalViewName2, 2, PHOENIX_TTL_EXPIRE_IN_A_DAY)); + + ddl = "CREATE INDEX %s ON %s(%s)"; + globalConn.createStatement().execute( + String.format(ddl, indexTable1, globalViewName1, "A,ID,B")); + globalConn.createStatement().execute( + String.format(ddl, indexTable2, globalViewName1, "C,ID,D")); + globalConn.createStatement().execute( + String.format(ddl, indexTable3, globalViewName2, "A,ID,B")); + globalConn.createStatement().execute( + String.format(ddl, indexTable4, globalViewName2, "C,ID,D")); + + String query = "UPSERT INTO %s (PK2,A,B,C,D,ID) VALUES(1,1,1,1,1,'%s')"; + globalConn.createStatement().execute( + String.format(query, globalViewName1, VIEW_PREFIX1)); + globalConn.commit(); + globalConn.createStatement().execute( + String.format(query, globalViewName2, VIEW_PREFIX2)); + globalConn.commit(); + verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 1); + verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1); + verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + VIEW_PREFIX1 + ".*", 2); + verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + VIEW_PREFIX2 + ".*", 2); + + // running MR job to delete expired rows. + PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool(); + Configuration conf = new Configuration(getUtility().getConfiguration()); + phoenixTtlTool.setConf(conf); + int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"}); + assertEquals(0, status); + verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 0); + verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1); + verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + VIEW_PREFIX1 + ".*", 0); + verifyNumberOfRowsFromHBaseLevel(indexTable, ".*" + VIEW_PREFIX2 + ".*", 2); + } + } + + /* + BaseMultiTenantTable + GlobalView1 + TenantView1 TenantView2 + */ + @Test + public void testDeleteByViewAndTenant() throws Exception { + String schema = generateUniqueName(); + String baseTableFullName = schema + "." + generateUniqueName(); + String globalViewName1 = schema + "." + generateUniqueName(); + String tenantViewName1 = schema + "." + generateUniqueName(); + String tenantViewName2 = schema + "." + generateUniqueName(); + String tenant1 = generateUniqueName(); + + try (Connection globalConn = DriverManager.getConnection(getUrl()); + Connection tenant1Connection = + PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1)) { + + createMultiTenantTable(globalConn, baseTableFullName); + String ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " + + "A BIGINT, B BIGINT, C BIGINT, D BIGINT)" + + " AS SELECT * FROM " + baseTableFullName + " WHERE NUM = 1"; + + globalConn.createStatement().execute(String.format(ddl, globalViewName1)); + + ddl = "CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s " + + "WHERE ID = '%s' PHOENIX_TTL = %d"; + tenant1Connection.createStatement().execute( + String.format(ddl, tenantViewName1, globalViewName1, VIEW_PREFIX1, + PHOENIX_TTL_EXPIRE_IN_A_MILLISECOND)); + tenant1Connection.createStatement().execute( + String.format(ddl, tenantViewName2, globalViewName1, VIEW_PREFIX2, + PHOENIX_TTL_EXPIRE_IN_A_MILLISECOND)); + + tenant1Connection.createStatement().execute( + String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName1)); + tenant1Connection.commit(); + tenant1Connection.createStatement().execute( + String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName2)); + tenant1Connection.commit(); + verifyNumberOfRows(baseTableFullName, tenant1, 2, globalConn); + verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 1); + verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1); + // running MR job to delete expired rows. + PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool(); + Configuration conf = new Configuration(getUtility().getConfiguration()); + phoenixTtlTool.setConf(conf); + int status = phoenixTtlTool.run(new String[]{"-runfg", "-v", tenantViewName2, "-i", tenant1}); + assertEquals(0, status); + + verifyNumberOfRows(baseTableFullName, tenant1, 1, globalConn); + verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 1); + verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 0); + } + } + + /* + BaseMultiTenantTable + GlobalView1 GlobalView1 + TenantView1 TenantView2 TenantView1 TenantView2 + */ + @Test + public void testDeleteByTenant() throws Exception { + String schema = generateUniqueName(); + String baseTableFullName = schema + "." + generateUniqueName(); + String globalViewName1 = schema + "." + generateUniqueName(); + String globalViewName2 = schema + "." + generateUniqueName(); + String tenantViewName1 = schema + "." + generateUniqueName(); + String tenantViewName2 = schema + "." + generateUniqueName(); + String tenantViewName3 = schema + "." + generateUniqueName(); + String tenantViewName4 = schema + "." + generateUniqueName(); + String tenant1 = generateUniqueName(); + String tenant2 = generateUniqueName(); + + try (Connection globalConn = DriverManager.getConnection(getUrl()); + Connection tenant1Connection = + PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1); + Connection tenant2Connection = + PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) { + + createMultiTenantTable(globalConn, baseTableFullName); + String ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " + + "A BIGINT, B BIGINT, C BIGINT, D BIGINT)" + + " AS SELECT * FROM " + baseTableFullName + " WHERE NUM = %d"; + + globalConn.createStatement().execute(String.format(ddl, globalViewName1, 1)); + globalConn.createStatement().execute(String.format(ddl, globalViewName2, 2)); + + ddl = "CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s " + + "WHERE ID = '%s' PHOENIX_TTL = %d"; + tenant1Connection.createStatement().execute( + String.format(ddl, tenantViewName1, globalViewName1, VIEW_PREFIX1, + PHOENIX_TTL_EXPIRE_IN_A_MILLISECOND)); + tenant1Connection.createStatement().execute( + String.format(ddl, tenantViewName2, globalViewName2, VIEW_PREFIX2, + PHOENIX_TTL_EXPIRE_IN_A_MILLISECOND)); + + tenant2Connection.createStatement().execute( + String.format(ddl, tenantViewName3, globalViewName1, VIEW_PREFIX1, + PHOENIX_TTL_EXPIRE_IN_A_MILLISECOND)); + tenant2Connection.createStatement().execute( + String.format(ddl, tenantViewName4, globalViewName2, VIEW_PREFIX2, + PHOENIX_TTL_EXPIRE_IN_A_MILLISECOND)); + + tenant1Connection.createStatement().execute( + String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName1)); + tenant1Connection.commit(); + tenant1Connection.createStatement().execute( + String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName2)); + tenant1Connection.commit(); + tenant2Connection.createStatement().execute( + String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName3)); + tenant2Connection.commit(); + tenant2Connection.createStatement().execute( + String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName4)); + tenant2Connection.commit(); + + verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + tenant1 + ".*", 2); + verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + tenant2 + ".*", 2); + + // running MR job to delete expired rows. + PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool(); + Configuration conf = new Configuration(getUtility().getConfiguration()); + phoenixTtlTool.setConf(conf); + int status = phoenixTtlTool.run(new String[]{"-runfg", "-i", tenant1}); + assertEquals(0, status); + + verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + tenant1 + ".*", 0); + verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + tenant2 + ".*", 2); + } + } + + /* + BaseMultiTenantTable + GlobalView1 GlobalView1 + TenantView1 TenantView2 TenantView1 TenantView2 + */ + @Test + public void testDeleteByViewName() throws Exception { + String schema = generateUniqueName(); + String baseTableFullName = schema + "." + generateUniqueName(); + String globalViewName1 = schema + "." + generateUniqueName(); + String globalViewName2 = schema + "." + generateUniqueName(); + String tenantViewName1 = schema + "." + generateUniqueName(); + String tenantViewName2 = schema + "." + generateUniqueName(); + String tenantViewName3 = schema + "." + generateUniqueName(); + String tenantViewName4 = schema + "." + generateUniqueName(); + String tenant1 = generateUniqueName(); + String tenant2 = generateUniqueName(); + + try (Connection globalConn = DriverManager.getConnection(getUrl()); + Connection tenant1Connection = + PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant1); + Connection tenant2Connection = + PhoenixMultiInputUtil.buildTenantConnection(getUrl(), tenant2)) { + + createMultiTenantTable(globalConn, baseTableFullName); + String ddl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " + + "A BIGINT, B BIGINT, C BIGINT, D BIGINT)" + + " AS SELECT * FROM " + baseTableFullName + " WHERE NUM = %d PHOENIX_TTL = %d"; + + globalConn.createStatement().execute( + String.format(ddl, globalViewName1, 1, PHOENIX_TTL_EXPIRE_IN_A_MILLISECOND)); + globalConn.createStatement().execute( + String.format(ddl, globalViewName2, 2, PHOENIX_TTL_EXPIRE_IN_A_MILLISECOND)); + + ddl = "CREATE VIEW %s (E BIGINT, F BIGINT) AS SELECT * FROM %s WHERE ID = '%s'"; + tenant1Connection.createStatement().execute( + String.format(ddl, tenantViewName1, globalViewName1, VIEW_PREFIX1)); + tenant1Connection.createStatement().execute( + String.format(ddl, tenantViewName2, globalViewName2, VIEW_PREFIX2)); + + tenant2Connection.createStatement().execute( + String.format(ddl, tenantViewName3, globalViewName1, VIEW_PREFIX1)); + tenant2Connection.createStatement().execute( + String.format(ddl, tenantViewName4, globalViewName2, VIEW_PREFIX2)); + + tenant1Connection.createStatement().execute( + String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName1)); + tenant1Connection.commit(); + tenant1Connection.createStatement().execute( + String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName2)); + tenant1Connection.commit(); + + tenant2Connection.createStatement().execute( + String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName3)); + tenant2Connection.commit(); + tenant2Connection.createStatement().execute( + String.format(UPSERT_TO_LEAF_VIEW_QUERY, tenantViewName4)); + tenant2Connection.commit(); + + verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 2); + verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 2); + + // running MR job to delete expired rows. + PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool(); + Configuration conf = new Configuration(getUtility().getConfiguration()); + phoenixTtlTool.setConf(conf); + int status = phoenixTtlTool.run(new String[]{"-runfg", "-v", globalViewName1}); + assertEquals(0, status); + + verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 0); + verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 2); + } + } + + + /* + BaseTable + GlobalView1 with TTL + MiddleLevelView1 with TTL(1 ms) MiddleLevelView2 with TTL(1 DAY) + LeafView1 LeafView2 + Upserting data to both leafView, and run the MR job. + It should only delete MiddleLevelView1 data not remove MiddleLevelView2 data. + */ + @Test + public void testCleanMoreThanThreeLevelViewCase() throws Exception { + String schema = generateUniqueName(); + String baseTableFullName = schema + "." + generateUniqueName(); + String globalViewName = schema + "." + generateUniqueName(); + String middleLevelViewName1 = schema + "." + generateUniqueName(); + String middleLevelViewName2 = schema + "." + generateUniqueName(); + String leafViewName1 = schema + "." + generateUniqueName(); + String leafViewName2 = schema + "." + generateUniqueName(); + + try (Connection globalConn = DriverManager.getConnection(getUrl())) { + String baseTableDdl = "CREATE TABLE " + baseTableFullName + + " (ID CHAR(10) NOT NULL PRIMARY KEY, NUM BIGINT)"; + globalConn.createStatement().execute(baseTableDdl); + + String globalViewDdl = "CREATE VIEW %s (PK1 BIGINT PRIMARY KEY, " + + "A BIGINT, B BIGINT)" + " AS SELECT * FROM " + baseTableFullName; + + globalConn.createStatement().execute(String.format(globalViewDdl, globalViewName)); + + String middleLevelViewDdl = "CREATE VIEW %s (C BIGINT, D BIGINT)" + + " AS SELECT * FROM %s WHERE ID ='%s' PHOENIX_TTL = %d"; + + globalConn.createStatement().execute(String.format(middleLevelViewDdl, + middleLevelViewName1, globalViewName, + VIEW_PREFIX1, PHOENIX_TTL_EXPIRE_IN_A_MILLISECOND)); + globalConn.createStatement().execute(String.format(middleLevelViewDdl, + middleLevelViewName2, globalViewName, VIEW_PREFIX2, + PHOENIX_TTL_EXPIRE_IN_A_DAY)); + + String leafViewDdl = "CREATE VIEW %s (E BIGINT, F BIGINT)" + + " AS SELECT * FROM %s"; + + globalConn.createStatement().execute(String.format(leafViewDdl, + leafViewName1, middleLevelViewName1)); + globalConn.createStatement().execute(String.format(leafViewDdl, + leafViewName2, middleLevelViewName2)); + + globalConn.createStatement().execute( + String.format(UPSERT_TO_LEAF_VIEW_QUERY, leafViewName1)); + globalConn.commit(); + globalConn.createStatement().execute( + String.format(UPSERT_TO_LEAF_VIEW_QUERY, leafViewName2)); + globalConn.commit(); + + verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 1); + verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1); + + // running MR job to delete expired rows. + PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool(); + Configuration conf = new Configuration(getUtility().getConfiguration()); + phoenixTtlTool.setConf(conf); + int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"}); + assertEquals(0, status); + verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX1 + ".*", 0); + verifyNumberOfRowsFromHBaseLevel(baseTableFullName, ".*" + VIEW_PREFIX2 + ".*", 1); + } + } + + + @Test + public void testNoViewCase() throws Exception { + PhoenixTTLTool phoenixTtlTool = new PhoenixTTLTool(); + Configuration conf = new Configuration(getUtility().getConfiguration()); + phoenixTtlTool.setConf(conf); + int status = phoenixTtlTool.run(new String[]{"-runfg", "-a"}); + assertEquals(0, status); + } +} \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormat.java new file mode 100644 index 00000000000..9d7da2e2987 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormat.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.phoenix.mapreduce.util.DefaultPhoenixMultiViewListProvider; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.mapreduce.util.PhoenixMultiViewListProvider; +import org.apache.phoenix.mapreduce.util.DefaultMultiViewSplitStrategy; +import org.apache.phoenix.mapreduce.util.ViewInfoWritable; +import org.apache.phoenix.mapreduce.util.MultiViewSplitStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/* + This is a generic MultiViewInputFormat class that using by the MR job. You can + provide your own split strategy and provider class to customize your own business needed by + overwrite and load class blow: + MAPREDUCE_MULTI_INPUT_STRATEGY_CLAZZ + MAPREDUCE_MULTI_INPUT_SPLIT_STRATEGY_CLAZZ + */ +public class PhoenixMultiViewInputFormat extends InputFormat { + private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixMultiViewInputFormat.class); + + @Override public List getSplits(JobContext context) throws IOException { + List listOfInputSplit; + try { + final Configuration configuration = context.getConfiguration(); + Class defaultMultiInputStrategyClazz = DefaultPhoenixMultiViewListProvider.class; + if (configuration.get( + PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_STRATEGY_CLAZZ) != null) { + defaultMultiInputStrategyClazz = Class.forName(configuration.get( + PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_STRATEGY_CLAZZ)); + } + PhoenixMultiViewListProvider phoenixMultiViewListProvider = + (PhoenixMultiViewListProvider) defaultMultiInputStrategyClazz.newInstance(); + List views = + phoenixMultiViewListProvider.getPhoenixMultiViewList(configuration); + + Class defaultDeletionMultiInputSplitStrategyClazz = + DefaultMultiViewSplitStrategy.class; + if (configuration.get( + PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_SPLIT_STRATEGY_CLAZZ) != null) { + defaultDeletionMultiInputSplitStrategyClazz = Class.forName(configuration.get( + PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_SPLIT_STRATEGY_CLAZZ)); + } + MultiViewSplitStrategy multiViewSplitStrategy = (MultiViewSplitStrategy) + defaultDeletionMultiInputSplitStrategyClazz.newInstance(); + listOfInputSplit = multiViewSplitStrategy.generateSplits(views, configuration); + } catch (ClassNotFoundException e) { + LOGGER.debug("PhoenixMultiViewInputFormat is getting ClassNotFoundException : " + + e.getMessage()); + throw new IOException( + "PhoenixMultiViewInputFormat is getting ClassNotFoundException : " + + e.getMessage(), e.getCause()); + } catch (InstantiationException e) { + LOGGER.debug("PhoenixMultiViewInputFormat is getting InstantiationException : " + + e.getMessage()); + throw new IOException( + "PhoenixMultiViewInputFormat is getting InstantiationException : " + + e.getMessage(), e.getCause()); + } catch (IllegalAccessException e) { + LOGGER.debug("PhoenixMultiViewInputFormat is getting IllegalAccessException : " + + e.getMessage()); + throw new IOException( + "PhoenixMultiViewInputFormat is getting IllegalAccessException : " + + e.getMessage(), e.getCause()); + } + + return listOfInputSplit == null ? new ArrayList() : listOfInputSplit; + } + + @Override + public RecordReader createRecordReader(InputSplit split, + TaskAttemptContext context) { + final Configuration configuration = context.getConfiguration(); + + final Class inputClass = + (Class) PhoenixConfigurationUtil.getInputClass(configuration); + return getPhoenixRecordReader(inputClass, configuration); + } + + private RecordReader getPhoenixRecordReader(Class inputClass, + Configuration configuration) { + return new PhoenixMultiViewReader<>(inputClass , configuration); + } +} \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputSplit.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputSplit.java new file mode 100644 index 00000000000..37f42184422 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputSplit.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.phoenix.mapreduce.util.ViewInfoTracker; +import org.apache.phoenix.mapreduce.util.ViewInfoWritable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/* + Generic class that provide a list of views for the MR job. You can overwrite your own logic to + filter/add views. + */ +public class PhoenixMultiViewInputSplit extends InputSplit implements Writable { + + List viewInfoTrackerList; + + public PhoenixMultiViewInputSplit() { + this.viewInfoTrackerList = new ArrayList<>(); + } + + public PhoenixMultiViewInputSplit(List viewInfoTracker) { + this.viewInfoTrackerList = viewInfoTracker; + } + + @Override public void write(DataOutput output) throws IOException { + WritableUtils.writeVInt(output, this.viewInfoTrackerList.size()); + for (ViewInfoWritable viewInfoWritable : this.viewInfoTrackerList) { + if (viewInfoWritable instanceof ViewInfoTracker) { + viewInfoWritable.write(output); + } + } + } + + @Override public void readFields(DataInput input) throws IOException { + int count = WritableUtils.readVInt(input); + for (int i = 0; i < count; i++) { + ViewInfoTracker viewInfoTracker = new ViewInfoTracker(); + viewInfoTracker.readFields(input); + this.viewInfoTrackerList.add(viewInfoTracker); + } + } + + @Override public long getLength() { + return 0; + } + + @Override public String[] getLocations() { + return new String[0]; + } + + public List getViewInfoTrackerList() { + return this.viewInfoTrackerList; + } +} \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewReader.java new file mode 100644 index 00000000000..f1d76625c10 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixMultiViewReader.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.phoenix.mapreduce.util.ViewInfoWritable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +public class PhoenixMultiViewReader extends RecordReader { + private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixMultiViewReader.class); + + private Configuration configuration; + private Class inputClass; + Iterator it; + + public PhoenixMultiViewReader() { + + } + + public PhoenixMultiViewReader(final Class inputClass, final Configuration configuration) { + this.configuration = configuration; + this.inputClass = inputClass; + } + + @Override public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException { + if (split instanceof PhoenixMultiViewInputSplit) { + final PhoenixMultiViewInputSplit pSplit = (PhoenixMultiViewInputSplit)split; + final List viewInfoTracker = pSplit.getViewInfoTrackerList(); + it = viewInfoTracker.iterator(); + } else { + LOGGER.error("InputSplit class cannot cast to PhoenixMultiViewInputSplit."); + throw new IOException("InputSplit class cannot cast to PhoenixMultiViewInputSplit"); + } + } + + @Override public boolean nextKeyValue() throws IOException, InterruptedException { + return it.hasNext(); + } + + @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { + return null; + } + + @Override public T getCurrentValue() throws IOException, InterruptedException { + ViewInfoWritable currentValue = null; + if (it.hasNext()) { + currentValue = it.next(); + } + return (T)currentValue; + } + + @Override public float getProgress() throws IOException, InterruptedException { + return 0; + } + + @Override public void close() throws IOException { + + } +} \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTTLDeleteJobMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTTLDeleteJobMapper.java new file mode 100644 index 00000000000..3e104fd0b89 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTTLDeleteJobMapper.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixResultSet; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.mapreduce.util.ViewInfoTracker; +import org.apache.phoenix.mapreduce.util.ViewInfoWritable.ViewInfoJobState; +import org.apache.phoenix.mapreduce.util.MultiViewJobStatusTracker; +import org.apache.phoenix.mapreduce.util.DefaultMultiViewJobStatusTracker; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.SchemaUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Properties; + +public class PhoenixTTLDeleteJobMapper extends Mapper { + private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixTTLDeleteJobMapper.class); + private MultiViewJobStatusTracker multiViewJobStatusTracker; + private static final int DEFAULT_MAX_RETRIES = 3; + private static final int DEFAULT_RETRY_SLEEP_TIME_IN_MS = 10000; + + private void initMultiViewJobStatusTracker(Configuration config) throws Exception { + try { + Class defaultViewDeletionTrackerClass = DefaultMultiViewJobStatusTracker.class; + if (config.get( + PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ) != null) { + LOGGER.info("Using customized tracker class : " + config.get( + PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ)); + defaultViewDeletionTrackerClass = Class.forName(config.get( + PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ)); + } else { + LOGGER.info("Using default tracker class "); + } + this.multiViewJobStatusTracker = (MultiViewJobStatusTracker) + defaultViewDeletionTrackerClass.newInstance(); + } catch (Exception e) { + LOGGER.error("Getting exception While initializing initMultiViewJobStatusTracker " + + "with error message " + e.getMessage()); + throw e; + } + } + + @Override + protected void map(NullWritable key, ViewInfoTracker value, Context context) + throws IOException { + try { + final Configuration config = context.getConfiguration(); + + if (this.multiViewJobStatusTracker == null) { + initMultiViewJobStatusTracker(config); + } + + LOGGER.debug(String.format("Deleting from view %s, TenantID %s, and TTL value: %d", + value.getViewName(), value.getTenantId(), value.getPhoenixTtl())); + + deleteExpiredRows(value, config, context); + + } catch (SQLException e) { + LOGGER.error("Mapper got an exception while deleting expired rows : " + + e.getMessage() ); + throw new IOException(e.getMessage(), e.getCause()); + } catch (Exception e) { + LOGGER.error("Getting IOException while running View TTL Deletion Job mapper " + + "with error : " + e.getMessage()); + throw new IOException(e.getMessage(), e.getCause()); + } + } + + private void deleteExpiredRows(ViewInfoTracker value, Configuration config, Context context) + throws Exception { + try (PhoenixConnection connection = + (PhoenixConnection) ConnectionUtil.getInputConnection(config)) { + if (value.getTenantId() != null && !value.getTenantId().equals("NULL")) { + Properties props = new Properties(); + props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, value.getTenantId()); + + try (PhoenixConnection tenantConnection = (PhoenixConnection) + DriverManager.getConnection(connection.getURL(), props)) { + deleteExpiredRows(tenantConnection, value, config, context); + } + } else { + deleteExpiredRows(connection, value, config, context); + } + } + } + + /* + * Each Mapper that receives a MultiPhoenixViewInputSplit will execute a DeleteMutation/Scan + * (With DELETE_TTL_EXPIRED attribute) per view for all the views and view indexes in the split. + * For each DeleteMutation, it bounded by the view start and stop keys for the region and + * TTL attributes and Delete Hint. + */ + private void deleteExpiredRows(PhoenixConnection connection, ViewInfoTracker viewInfoTracker, + Configuration config, Context context) throws Exception { + try (PhoenixStatement pstmt = + new PhoenixStatement(connection).unwrap(PhoenixStatement.class)) { + PTable pTable = PhoenixRuntime.getTable(connection, viewInfoTracker.getViewName()); + String deleteIfExpiredStatement = "SELECT /*+ NO_INDEX */ count(*) FROM " + + viewInfoTracker.getViewName(); + + if (viewInfoTracker.isIndexRelation()) { + pTable = PhoenixRuntime.getTable(connection, viewInfoTracker.getRelationName()); + deleteIfExpiredStatement = "SELECT count(*) FROM " + + viewInfoTracker.getRelationName(); + } + + String sourceTableName = pTable.getTableName().getString(); + this.multiViewJobStatusTracker.updateJobStatus(viewInfoTracker, 0, + ViewInfoJobState.INITIALIZED.getValue(), config, 0, context.getJobName()); + final QueryPlan queryPlan = pstmt.optimizeQuery(deleteIfExpiredStatement); + final Scan scan = queryPlan.getContext().getScan(); + byte[] emptyColumnFamilyName = SchemaUtil.getEmptyColumnFamily(pTable); + byte[] emptyColumnName = pTable.getEncodingScheme() == + PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ? + QueryConstants.EMPTY_COLUMN_BYTES : + pTable.getEncodingScheme().encode( + QueryConstants.ENCODED_EMPTY_COLUMN_NAME); + + scan.setAttribute( + BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyColumnFamilyName); + scan.setAttribute( + BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyColumnName); + scan.setAttribute( + BaseScannerRegionObserver.DELETE_PHOENIX_TTL_EXPIRED, PDataType.TRUE_BYTES); + scan.setAttribute( + BaseScannerRegionObserver.PHOENIX_TTL, + Bytes.toBytes(viewInfoTracker.getPhoenixTtl())); + scan.setAttribute( + BaseScannerRegionObserver.PHOENIX_TTL_SCAN_TABLE_NAME, + Bytes.toBytes(sourceTableName)); + + this.multiViewJobStatusTracker.updateJobStatus(viewInfoTracker, 0, + ViewInfoJobState.RUNNING.getValue(), config, 0, context.getJobName()); + + addingDeletionMarkWithRetries(pstmt, viewInfoTracker, config, context, + queryPlan); + } catch (Exception e) { + if (e instanceof SQLException && ((SQLException) e).getErrorCode() == + SQLExceptionCode.TABLE_UNDEFINED.getErrorCode()) { + this.multiViewJobStatusTracker.updateJobStatus(viewInfoTracker, 0, + ViewInfoJobState.DELETED.getValue(), config, 0, context.getJobName()); + } + LOGGER.error(String.format("Had an issue to process the view: %s, " + + "see error %s ", viewInfoTracker.toString(),e.getMessage())); + } + } + + private boolean addingDeletionMarkWithRetries(PhoenixStatement stmt, + ViewInfoTracker viewInfoTracker, + Configuration config, Context context, + QueryPlan queryPlan) + throws Exception { + int retry = 0; + long startTime = System.currentTimeMillis(); + String viewInfo = viewInfoTracker.getTenantId() == null ? + viewInfoTracker.getViewName() : viewInfoTracker.getTenantId() + + "." + viewInfoTracker.getViewName(); + + while (retry < DEFAULT_MAX_RETRIES) { + try { + PhoenixResultSet rs = stmt.newResultSet( + queryPlan.iterator(), queryPlan.getProjector(), queryPlan.getContext()); + + long numberOfDeletedRows = 0; + if (rs.next()) { + numberOfDeletedRows = rs.getLong(1); + } + this.multiViewJobStatusTracker.updateJobStatus(viewInfoTracker, numberOfDeletedRows, + ViewInfoJobState.SUCCEEDED.getValue(), config, + System.currentTimeMillis() - startTime, context.getJobName()); + PhoenixTTLTool.MR_COUNTER_METRICS metricsStatus = + viewInfoTracker.isIndexRelation() ? + PhoenixTTLTool.MR_COUNTER_METRICS.VIEW_INDEX_SUCCEED : + PhoenixTTLTool.MR_COUNTER_METRICS.VIEW_SUCCEED; + context.getCounter(metricsStatus).increment(1); + return true; + } catch (Exception e) { + PhoenixTTLTool.MR_COUNTER_METRICS metricsStatus = + viewInfoTracker.isIndexRelation() ? + PhoenixTTLTool.MR_COUNTER_METRICS.VIEW_INDEX_FAILED : + PhoenixTTLTool.MR_COUNTER_METRICS.VIEW_FAILED; + if (e instanceof SQLException && ((SQLException) e).getErrorCode() == + SQLExceptionCode.TABLE_UNDEFINED.getErrorCode()) { + LOGGER.info(viewInfo + " has been deleted : " + e.getMessage()); + this.multiViewJobStatusTracker.updateJobStatus(viewInfoTracker, 0, + ViewInfoJobState.DELETED.getValue(), config, 0, context.getJobName()); + context.getCounter(metricsStatus).increment(1); + return false; + } + retry++; + + if (retry == DEFAULT_MAX_RETRIES) { + LOGGER.error("Deleting " + viewInfo + " expired rows has an exception for : " + + e.getMessage()); + this.multiViewJobStatusTracker.updateJobStatus(viewInfoTracker, 0, + ViewInfoJobState.FAILED.getValue(), config, 0, context.getJobName()); + context.getCounter(metricsStatus).increment(1); + throw e; + } else { + Thread.sleep(DEFAULT_RETRY_SLEEP_TIME_IN_MS); + } + } + } + return false; + } +} \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTTLTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTTLTool.java new file mode 100644 index 00000000000..153e7be680e --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTTLTool.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.PosixParser; +import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobPriority; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.util.Properties; + +public class PhoenixTTLTool extends Configured implements Tool { + private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixTTLTool.class); + + public static enum MR_COUNTER_METRICS { + VIEW_FAILED, + VIEW_SUCCEED, + VIEW_INDEX_FAILED, + VIEW_INDEX_SUCCEED + } + + public static final String DELETE_ALL_VIEWS = "DELETE_ALL_VIEWS"; + public static final int DEFAULT_MAPPER_SPLIT_SIZE = 10; + public static final int DEFAULT_QUERY_BATCH_SIZE = 100; + + private static final Option DELETE_ALL_VIEWS_OPTION = new Option("a", "all", false, + "Delete all views from all tables."); + private static final Option VIEW_NAME_OPTION = new Option("v", "view", true, + "Delete Phoenix View Name"); + private static final Option TENANT_ID_OPTION = new Option("i", "id", true, + "Delete an view based on the tenant id."); + private static final Option JOB_PRIORITY_OPTION = new Option("p", "job-priority", true, + "Define job priority from 0(highest) to 4"); + private static final Option SPLIT_SIZE_OPTION = new Option("s", "split-size-per-mapper", true, + "Define split size for each mapper."); + private static final Option BATCH_SIZE_OPTION = new Option("b", "batch-size-for-query-more", true, + "Define batch size for fetching views metadata from syscat."); + private static final Option RUN_FOREGROUND_OPTION = new Option("runfg", + "run-foreground", false, "If specified, runs PhoenixTTLTool " + + "in Foreground. Default - Runs the build in background"); + + private static final Option HELP_OPTION = new Option("h", "help", false, "Help"); + + private Configuration configuration; + private Connection connection; + private String viewName; + private String tenantId; + private String jobName; + private boolean isDeletingAllViews; + private JobPriority jobPriority; + private boolean isForeground; + private int splitSize; + private int batchSize; + private Job job; + + public void parseArgs(String[] args) { + CommandLine cmdLine; + try { + cmdLine = parseOptions(args); + } catch (IllegalStateException e) { + printHelpAndExit(e.getMessage(), getOptions()); + throw e; + } + + if (getConf() == null) { + setConf(HBaseConfiguration.create()); + } + + if (cmdLine.hasOption(DELETE_ALL_VIEWS_OPTION.getOpt())) { + this.isDeletingAllViews = true; + } else if (cmdLine.hasOption(VIEW_NAME_OPTION.getOpt())) { + viewName = cmdLine.getOptionValue(VIEW_NAME_OPTION.getOpt()); + this.isDeletingAllViews = false; + } + + if (cmdLine.hasOption(TENANT_ID_OPTION.getOpt())) { + tenantId = cmdLine.getOptionValue((TENANT_ID_OPTION.getOpt())); + } + + if (cmdLine.hasOption(SPLIT_SIZE_OPTION.getOpt())) { + splitSize = Integer.parseInt(cmdLine.getOptionValue(SPLIT_SIZE_OPTION.getOpt())); + } else { + splitSize = DEFAULT_MAPPER_SPLIT_SIZE; + } + + if (cmdLine.hasOption(BATCH_SIZE_OPTION.getOpt())) { + batchSize = Integer.parseInt(cmdLine.getOptionValue(SPLIT_SIZE_OPTION.getOpt())); + } else { + batchSize = DEFAULT_QUERY_BATCH_SIZE; + } + + isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt()); + } + + public String getJobPriority() { + return this.jobPriority.toString(); + } + + private JobPriority getJobPriority(CommandLine cmdLine) { + String jobPriorityOption = cmdLine.getOptionValue(JOB_PRIORITY_OPTION.getOpt()); + if (jobPriorityOption == null) { + return JobPriority.NORMAL; + } + + switch (jobPriorityOption) { + case "0" : return JobPriority.VERY_HIGH; + case "1" : return JobPriority.HIGH; + case "2" : return JobPriority.NORMAL; + case "3" : return JobPriority.LOW; + case "4" : return JobPriority.VERY_LOW; + default: + return JobPriority.NORMAL; + } + } + + public Job getJob() { + return this.job; + } + + public boolean isDeletingAllViews() { + return this.isDeletingAllViews; + } + + public String getTenantId() { + return this.tenantId; + } + + public String getViewName() { + return this.viewName; + } + + public int getSplitSize() { + return this.splitSize; + } + + public int getBatchSize() { + return this.batchSize; + } + + public CommandLine parseOptions(String[] args) { + final Options options = getOptions(); + CommandLineParser parser = new PosixParser(); + CommandLine cmdLine = null; + try { + cmdLine = parser.parse(options, args); + } catch (ParseException e) { + printHelpAndExit("Error parsing command line options: " + e.getMessage(), + options); + } + + if (!cmdLine.hasOption(DELETE_ALL_VIEWS_OPTION.getOpt()) && + !cmdLine.hasOption(VIEW_NAME_OPTION.getOpt()) && + !cmdLine.hasOption(TENANT_ID_OPTION.getOpt())) { + throw new IllegalStateException("No deletion job is specified, " + + "please indicate deletion job for ALL/VIEW/TENANT level"); + } + + if (cmdLine.hasOption(HELP_OPTION.getOpt())) { + printHelpAndExit(options, 0); + } + + this.jobPriority = getJobPriority(cmdLine); + + return cmdLine; + } + + private Options getOptions() { + final Options options = new Options(); + options.addOption(DELETE_ALL_VIEWS_OPTION); + options.addOption(VIEW_NAME_OPTION); + options.addOption(TENANT_ID_OPTION); + options.addOption(HELP_OPTION); + options.addOption(JOB_PRIORITY_OPTION); + options.addOption(RUN_FOREGROUND_OPTION); + options.addOption(SPLIT_SIZE_OPTION); + options.addOption(BATCH_SIZE_OPTION); + + return options; + } + + private void printHelpAndExit(String errorMessage, Options options) { + System.err.println(errorMessage); + LOGGER.error(errorMessage); + printHelpAndExit(options, 1); + } + + private void printHelpAndExit(Options options, int exitCode) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("help", options); + System.exit(exitCode); + } + + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public String getJobName() { + if (this.jobName == null) { + String jobName; + if (this.isDeletingAllViews) { + jobName = DELETE_ALL_VIEWS; + } else if (this.getViewName() != null) { + jobName = this.getViewName(); + } else { + jobName = this.tenantId; + } + this.jobName = "PhoenixTTLTool-" + jobName + "-"; + } + + return this.jobName; + } + + public void setPhoenixTTLJobInputConfig(Configuration configuration) { + if (this.isDeletingAllViews) { + configuration.set(PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_ALL_VIEWS, + DELETE_ALL_VIEWS); + } else if (this.getViewName() != null) { + configuration.set(PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW, + this.viewName); + } + + if (this.tenantId != null) { + configuration.set(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID, this.tenantId); + } + } + + public void configureJob() throws Exception { + this.job = Job.getInstance(getConf(),getJobName()); + PhoenixMapReduceUtil.setInput(job, this); + + job.setJarByClass(PhoenixTTLTool.class); + job.setMapperClass(PhoenixTTLDeleteJobMapper.class); + job.setMapOutputKeyClass(NullWritable.class); + job.setMapOutputValueClass(NullWritable.class); + job.setOutputFormatClass(NullOutputFormat.class); + job.setNumReduceTasks(0); + job.setPriority(this.jobPriority); + + TableMapReduceUtil.addDependencyJars(job); + LOGGER.info("PhoenixTTLTool is running for " + job.getJobName()); + } + + public int runJob() { + try { + if (isForeground) { + LOGGER.info("Running PhoenixTTLTool in foreground. " + + "Runs full table scans. This may take a long time!"); + return (job.waitForCompletion(true)) ? 0 : 1; + } else { + LOGGER.info("Running PhoenixTTLTool in Background - Submit async and exit"); + job.submit(); + return 0; + } + } catch (Exception e) { + LOGGER.error("Caught exception " + e + " trying to run PhoenixTTLTool."); + return 1; + } + } + + @Override + public int run(String[] args) throws Exception { + connection = null; + int ret; + try { + parseArgs(args); + configuration = HBaseConfiguration.addHbaseResources(getConf()); + connection = ConnectionUtil.getInputConnection(configuration, new Properties()); + configureJob(); + TableMapReduceUtil.initCredentials(job); + ret = runJob(); + } catch (Exception e) { + printHelpAndExit(e.getMessage(), getOptions()); + return -1; + } finally { + if (connection != null) { + connection.close(); + } + } + return ret; + } + + public static void main(final String[] args) throws Exception { + int result = ToolRunner.run(new PhoenixTTLTool(), args); + System.exit(result); + } +} \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultMultiViewJobStatusTracker.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultMultiViewJobStatusTracker.java new file mode 100644 index 00000000000..6e2f4f1d0de --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultMultiViewJobStatusTracker.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.mapreduce.util.ViewInfoWritable.ViewInfoJobState; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultMultiViewJobStatusTracker implements MultiViewJobStatusTracker { + private static final Logger LOGGER = + LoggerFactory.getLogger(DefaultMultiViewJobStatusTracker.class); + + public void updateJobStatus(ViewInfoTracker view, long numberOfDeletedRows, int state, + Configuration config, long duration, String mrJobName) { + if (state == ViewInfoJobState.SUCCEEDED.getValue()) { + LOGGER.debug(String.format("Number of deleted rows from view %s, TenantID %s, " + + "and Source Table Name %s : " + + "number of deleted row %d, duration : %d, mr job name : %s.", + view.getViewName(), view.getTenantId(), view.getRelationName(), + numberOfDeletedRows, duration, mrJobName)); + } else if (state == ViewInfoJobState.DELETED.getValue()) { + LOGGER.debug(String.format("View has been deleted, view info : view %s, TenantID %s, " + + "and Source Table Name %s : %d," + + " mr job name : %s.", view.getViewName(), view.getTenantId(), + view.getRelationName(), mrJobName)); + } else { + LOGGER.debug(String.format("Job is in state %d for view %s, TenantID %s, " + + "Source Table Name %s , and duration : %d, " + + "mr job name : %s.", state, view.getViewName(), view.getTenantId(), + view.getRelationName(), duration, mrJobName)); + } + } +} \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultMultiViewSplitStrategy.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultMultiViewSplitStrategy.java new file mode 100644 index 00000000000..00eee0908e9 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultMultiViewSplitStrategy.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.util; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.phoenix.mapreduce.PhoenixMultiViewInputSplit; + +import java.util.List; + +import static org.apache.phoenix.mapreduce.PhoenixTTLTool.DEFAULT_MAPPER_SPLIT_SIZE; + +public class DefaultMultiViewSplitStrategy implements MultiViewSplitStrategy { + + public List generateSplits(List views, + Configuration configuration) { + int numViewsInSplit = PhoenixConfigurationUtil.getMultiViewSplitSize(configuration); + + if (numViewsInSplit < 1) { + numViewsInSplit = DEFAULT_MAPPER_SPLIT_SIZE; + } + + int numberOfMappers = getNumberOfMappers(views.size(),numViewsInSplit); + + final List pSplits = Lists.newArrayListWithExpectedSize(numberOfMappers); + // Split the views into splits + + for (int i = 0; i < numberOfMappers; i++) { + pSplits.add(new PhoenixMultiViewInputSplit(views.subList( + i * numViewsInSplit, getUpperBound(numViewsInSplit, i, views.size())))); + } + + return pSplits; + } + + /* + Calculate number of mappers are needed based on split policy and + number of views on the cluster + */ + public int getNumberOfMappers(int viewSize, int numViewsInSplit) { + int numberOfMappers = viewSize / numViewsInSplit; + if (Math.ceil(viewSize % numViewsInSplit) > 0) { + numberOfMappers++; + } + return numberOfMappers; + } + + /* + Calculate the upper bound for each mapper. For example, given + split policy is 10 cleanup jobs per mapper, and the total view size at the cluster + is 12. + The first mapper will take from [0 - 10), this method will return 10 as upper bound + The second mapper will take from [10 - 12), this method will return 12 as upper bound. + */ + public int getUpperBound(int numViewsInSplit, int i, int viewSize) { + int upper = (i + 1) * numViewsInSplit; + if (viewSize < upper) { + upper = viewSize; + } + + return upper; + } +} \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultPhoenixMultiViewListProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultPhoenixMultiViewListProvider.java new file mode 100644 index 00000000000..a9bd79e0048 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultPhoenixMultiViewListProvider.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.SchemaUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + +public class DefaultPhoenixMultiViewListProvider implements PhoenixMultiViewListProvider { + private static final Logger LOGGER = + LoggerFactory.getLogger(DefaultPhoenixMultiViewListProvider.class); + + public List getPhoenixMultiViewList(Configuration configuration) { + boolean isFetchAll = configuration.get( + PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_ALL_VIEWS) != null; + + if (!isFetchAll) { + return getTenantOrViewMultiViewList(configuration); + } + List viewInfoWritables = new ArrayList<>(); + boolean isQueryMore = true; + String query = PhoenixMultiInputUtil.getFetchViewQuery(configuration); + + int limit = PhoenixConfigurationUtil.getMultiViewQueryMoreSplitSize(configuration); + + String schema = null; + String tableName = configuration.get( + PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW); + if (tableName != null) { + schema = SchemaUtil.getSchemaNameFromFullName(tableName); + } + String tenantId = configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID); + + try (PhoenixConnection connection = (PhoenixConnection) + ConnectionUtil.getInputConnection(configuration)){ + try (PreparedStatement stmt = connection.prepareStatement(query)) { + do { + stmt.setString(1, tenantId); + stmt.setString(2, schema); + stmt.setString(3, tableName); + stmt.setInt(4, limit); + + ResultSet viewRs = stmt.executeQuery(); + String fullTableName = null; + + while (viewRs.next()) { + tenantId = viewRs.getString(1); + schema = viewRs.getString(2); + tableName = viewRs.getString(3); + fullTableName = tableName; + Long viewTtlValue = viewRs.getLong(4); + + if (schema != null && schema.length() > 0) { + fullTableName = SchemaUtil.getTableName(schema, tableName); + } + + if (!isParentHasTTL(connection, tenantId, fullTableName)) { + addingViewIndexToTheFinalList(connection,tenantId,fullTableName, + viewTtlValue, viewInfoWritables); + } + } + if (isQueryMore) { + if (fullTableName == null) { + isQueryMore = false; + } + } + } while (isQueryMore); + } + + } catch (Exception e) { + LOGGER.error("Getting view info failed with: " + e.getMessage()); + } + return viewInfoWritables; + } + + public List getTenantOrViewMultiViewList(Configuration configuration) { + List viewInfoWritables = new ArrayList<>(); + String query = PhoenixMultiInputUtil.getFetchViewQuery(configuration); + + try (PhoenixConnection connection = (PhoenixConnection) + ConnectionUtil.getInputConnection(configuration)) { + try (Statement stmt = connection.createStatement()) { + ResultSet viewRs = stmt.executeQuery(query); + while (viewRs.next()) { + String tenantId = viewRs.getString(1); + String schema = viewRs.getString(2); + String tableName = viewRs.getString(3); + Long viewTtlValue = viewRs.getLong(4); + String fullTableName = tableName; + + if (schema != null && schema.length() > 0) { + fullTableName = SchemaUtil.getTableName(schema, tableName); + } + + if (!isParentHasTTL(connection, tenantId, fullTableName)) { + addingViewIndexToTheFinalList(connection,tenantId,fullTableName, + viewTtlValue, viewInfoWritables); + } + } + } + }catch (Exception e) { + LOGGER.error("Getting view info failed with: " + e.getMessage()); + } + return viewInfoWritables; + } + + private boolean isParentHasTTL(PhoenixConnection connection, + String tenantId, String fullTableName) { + boolean skip= false; + try { + PTable pTable = PhoenixRuntime.getTable(connection, tenantId, fullTableName); + PTable parentTable = PhoenixRuntime.getTable(connection, null, + pTable.getParentName().toString()); + if (parentTable.getType() == PTableType.VIEW && + parentTable.getPhoenixTTL() > 0) { + /* if the current view parent already has a TTL value, we want to + skip the current view cleanup job because we want to run the cleanup + job for at the GlobalView level instead of running multi-jobs at + the LeafView level for the better performance. + + BaseTable + GlobalView(has TTL) + LeafView1, LeafView2, LeafView3.... + */ + skip = true; + } + } catch (Exception e) { + skip = true; + LOGGER.error(String.format("Had an issue to process the view: %s, " + + "tenantId: see error %s ", fullTableName, tenantId, + e.getMessage())); + } + return skip; + } + + private void addingViewIndexToTheFinalList(PhoenixConnection connection, String tenantId, + String fullTableName, long viewTtlValue, + List viewInfoWritables) + throws Exception { + PTable pTable = PhoenixRuntime.getTable(connection, tenantId, fullTableName); + ViewInfoWritable viewInfoTracker = new ViewInfoTracker( + tenantId, + fullTableName, + viewTtlValue, + pTable.getPhysicalName().getString(), + false + + ); + viewInfoWritables.add(viewInfoTracker); + + List allIndexesOnView = pTable.getIndexes(); + for (PTable viewIndexTable : allIndexesOnView) { + String indexName = viewIndexTable.getTableName().getString(); + String indexSchema = viewIndexTable.getSchemaName().getString(); + if (indexName.contains( + QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) { + indexName = SchemaUtil.getTableNameFromFullName(indexName, + QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR); + } + indexName = SchemaUtil.getTableNameFromFullName(indexName); + indexName = SchemaUtil.getTableName(indexSchema, indexName); + ViewInfoWritable viewInfoTrackerForIndexEntry = new ViewInfoTracker( + tenantId, + fullTableName, + viewTtlValue, + indexName, + true + + ); + viewInfoWritables.add(viewInfoTrackerForIndexEntry); + } + } +} \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/MultiViewJobStatusTracker.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/MultiViewJobStatusTracker.java new file mode 100644 index 00000000000..7520dbe608a --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/MultiViewJobStatusTracker.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.util; + +import org.apache.hadoop.conf.Configuration; + +public interface MultiViewJobStatusTracker { + void updateJobStatus(ViewInfoTracker view, long numberOfDeletedRows, int state, + Configuration config, long duration, String mrJobName); +} \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/MultiViewSplitStrategy.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/MultiViewSplitStrategy.java new file mode 100644 index 00000000000..d438005d860 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/MultiViewSplitStrategy.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; + +import java.util.List; + +public interface MultiViewSplitStrategy { + List generateSplits(List views, Configuration configuration); +} \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java index 82c1f2f8597..792b621422c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java @@ -166,6 +166,27 @@ public final class PhoenixConfigurationUtil { public static final String MAPREDUCE_JOB_TYPE = "phoenix.mapreduce.jobtype"; + // group number of views per mapper to run the deletion job + public static final String MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE = "phoenix.mapreduce.multi.input.split.size"; + + public static final String MAPREDUCE_MULTI_INPUT_QUERY_BATCH_SIZE = "phoenix.mapreduce.multi.input.batch.size"; + + // phoenix ttl data deletion job for a specific view + public static final String MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW = "phoenix.mapreduce.phoenix_ttl.per_view"; + + // phoenix ttl data deletion job for all views. + public static final String MAPREDUCE_PHOENIX_TTL_DELETE_JOB_ALL_VIEWS = "phoenix.mapreduce.phoenix_ttl.all"; + + // provide an absolute path to inject your multi input logic + public static final String MAPREDUCE_MULTI_INPUT_STRATEGY_CLAZZ = "phoenix.mapreduce.multi.input.strategy.path"; + + // provide an absolute path to inject your multi split logic + public static final String MAPREDUCE_MULTI_INPUT_SPLIT_STRATEGY_CLAZZ = "phoenix.mapreduce.multi.split.strategy.path"; + + // provide an absolute path to inject your multi input mapper logic + public static final String MAPREDUCE_MULTI_INPUT_MAPPER_TRACKER_CLAZZ = "phoenix.mapreduce.multi.mapper.tracker.path"; + + /** * Determines type of Phoenix Map Reduce job. * 1. QUERY allows running arbitrary queries without aggregates @@ -408,12 +429,28 @@ public static String getUpsertStatement(final Configuration configuration) throw } - public static void setUpsertStatement(final Configuration configuration, String upsertStmt) throws SQLException { - Preconditions.checkNotNull(configuration); - Preconditions.checkNotNull(upsertStmt); - configuration.set(UPSERT_STATEMENT, upsertStmt); - } - + public static void setUpsertStatement(final Configuration configuration, String upsertStmt) throws SQLException { + Preconditions.checkNotNull(configuration); + Preconditions.checkNotNull(upsertStmt); + configuration.set(UPSERT_STATEMENT, upsertStmt); + } + + public static void setMultiInputMapperSplitSize(Configuration configuration, final int splitSize) { + Preconditions.checkNotNull(configuration); + configuration.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, String.valueOf(splitSize)); + } + + public static void setMultiViewQueryMoreSplitSize(Configuration configuration, final int batchSize) { + Preconditions.checkNotNull(configuration); + configuration.set(MAPREDUCE_MULTI_INPUT_QUERY_BATCH_SIZE, String.valueOf(batchSize)); + } + + public static int getMultiViewQueryMoreSplitSize(final Configuration configuration) { + final String batchSize = configuration.get(MAPREDUCE_MULTI_INPUT_QUERY_BATCH_SIZE); + Preconditions.checkNotNull(batchSize); + return Integer.valueOf(batchSize); + } + public static List getSelectColumnMetadataList(final Configuration configuration) throws SQLException { Preconditions.checkNotNull(configuration); List columnMetadataList = null; @@ -438,6 +475,12 @@ public static List getSelectColumnMetadataList(final Configuration c return columnMetadataList; } + public static int getMultiViewSplitSize(final Configuration configuration) { + final String splitSize = configuration.get(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE); + Preconditions.checkNotNull(splitSize); + return Integer.valueOf(splitSize); + } + private static List getSelectColumnList( final Configuration configuration) { List selectColumnList = PhoenixConfigurationUtil.getSelectColumnNames(configuration); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java index 6c23fd9b493..cab23614cd0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java @@ -23,7 +23,9 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.phoenix.mapreduce.PhoenixInputFormat; +import org.apache.phoenix.mapreduce.PhoenixMultiViewInputFormat; import org.apache.phoenix.mapreduce.PhoenixOutputFormat; +import org.apache.phoenix.mapreduce.PhoenixTTLTool; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType; import java.io.IOException; @@ -125,6 +127,21 @@ public static void setInput(final Job job, final Class inp PhoenixConfigurationUtil.setSelectColumnNames(configuration, fieldNames); } + /** + * + * @param job MR job instance + * @param tool PhoenixTtlTool for Phoenix TTL deletion MR job + */ + public static void setInput(final Job job, PhoenixTTLTool tool) { + Configuration configuration = job.getConfiguration(); + job.setInputFormatClass(PhoenixMultiViewInputFormat.class); + tool.setPhoenixTTLJobInputConfig(configuration); + PhoenixConfigurationUtil.setSchemaType(configuration, + PhoenixConfigurationUtil.SchemaType.QUERY); + PhoenixConfigurationUtil.setMultiInputMapperSplitSize(configuration, tool.getSplitSize()); + PhoenixConfigurationUtil.setMultiViewQueryMoreSplitSize(configuration, tool.getBatchSize()); + } + /** * * @param job diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMultiInputUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMultiInputUtil.java new file mode 100644 index 00000000000..d25e0e3156b --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMultiInputUtil.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.mapreduce.PhoenixTTLTool; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.StringUtil; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Properties; + + +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE; + +public class PhoenixMultiInputUtil { + public static final String SELECT_ALL_VIEW_METADATA_FROM_SYSCAT_QUERY = + "SELECT TENANT_ID, TABLE_SCHEM, TABLE_NAME, PHOENIX_TTL FROM " + + SYSTEM_CATALOG_NAME + " WHERE " + + TABLE_TYPE + " = '" + PTableType.VIEW.getSerializedValue() + "' AND " + + PHOENIX_TTL + " IS NOT NULL AND " + + PHOENIX_TTL + " > " + PHOENIX_TTL_NOT_DEFINED + " AND " + + VIEW_TYPE + " <> " + PTable.ViewType.MAPPED.getSerializedValue(); + + public static Connection buildTenantConnection(String url, String tenantId) + throws SQLException { + Properties props = new Properties(); + props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + return DriverManager.getConnection(url, props); + } + + public static String getSelectAllPageQuery() { + return SELECT_ALL_VIEW_METADATA_FROM_SYSCAT_QUERY + " AND " + + "(TENANT_ID,TABLE_SCHEM,TABLE_NAME) > (?,?,?) LIMIT ?"; + } + + public static String constructViewMetadataQueryBasedOnView(String fullName, String tenantId) { + String query = SELECT_ALL_VIEW_METADATA_FROM_SYSCAT_QUERY; + + + if (fullName != null) { + if (fullName.equals(PhoenixTTLTool.DELETE_ALL_VIEWS)) { + return query; + } + + String schema = SchemaUtil.getSchemaNameFromFullName(fullName); + String viewName = SchemaUtil.getTableNameFromFullName(fullName); + + if (!schema.equals(StringUtil.EMPTY_STRING)) { + query = query + " AND TABLE_SCHEM = '" + schema + "'"; + } else { + query = query + " AND TABLE_SCHEM IS NULL"; + } + + query = query + " AND TABLE_NAME = '" + viewName + "'"; + } + + if (tenantId != null && tenantId.length() > 0) { + query = query + " AND TENANT_ID = '" + tenantId + "'"; + } else { + query = query + " AND TENANT_ID IS NULL"; + } + + return query; + } + + + public static String constructViewMetadataQueryBasedOnTenant(String tenant) { + return constructViewMetadataQueryBasedOnView(null, tenant); + } + + public static String getFetchViewQuery(Configuration configuration) { + String query; + if (configuration.get( + PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_ALL_VIEWS) != null) { + query = PhoenixMultiInputUtil.getSelectAllPageQuery(); + } else if (configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID) != null && + configuration.get(PhoenixConfigurationUtil. + MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW) == null) { + query = PhoenixMultiInputUtil.constructViewMetadataQueryBasedOnTenant( + configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID)); + } else { + query = PhoenixMultiInputUtil.constructViewMetadataQueryBasedOnView( + configuration.get( + PhoenixConfigurationUtil.MAPREDUCE_PHOENIX_TTL_DELETE_JOB_PER_VIEW), + configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID)); + } + return query; + } +} \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMultiViewListProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMultiViewListProvider.java new file mode 100644 index 00000000000..154f47f4554 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMultiViewListProvider.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.util; + +import org.apache.hadoop.conf.Configuration; +import java.util.List; + +public interface PhoenixMultiViewListProvider { + List getPhoenixMultiViewList(Configuration configuration); +} \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ViewInfoTracker.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ViewInfoTracker.java new file mode 100644 index 00000000000..a4f3226b61d --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ViewInfoTracker.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.util; + +import org.apache.hadoop.io.WritableUtils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class ViewInfoTracker implements ViewInfoWritable { + + String tenantId; + String viewName; + String relationName; + long phoenixTtl; + boolean isIndexRelation; + + public ViewInfoTracker() { + + } + + public ViewInfoTracker(String tenantId, String viewName, long phoenixTtl, + String relationName, boolean isIndexRelation) { + setTenantId(tenantId); + this.viewName = viewName; + this.phoenixTtl = phoenixTtl; + this.relationName = relationName; + this.isIndexRelation = isIndexRelation; + } + + private void setTenantId(String tenantId) { + if (tenantId != null) { + this.tenantId = tenantId; + } + } + + @Override + public String getTenantId() { + return tenantId; + } + + @Override + public String getViewName() { + return viewName; + } + + @Override + public String getRelationName() { + return relationName; + } + + @Override + public boolean isIndexRelation() { + return this.isIndexRelation; + } + + public long getPhoenixTtl() { + return phoenixTtl; + } + + @Override public void write(DataOutput output) throws IOException { + WritableUtils.writeString(output, tenantId); + WritableUtils.writeString(output, viewName); + WritableUtils.writeVLong(output, phoenixTtl); + WritableUtils.writeString(output, relationName); + WritableUtils.writeString(output, isIndexRelation ? "true" : "false"); + } + + @Override public void readFields(DataInput input) throws IOException { + setTenantId(WritableUtils.readString(input)); + viewName = WritableUtils.readString(input); + phoenixTtl = WritableUtils.readVLong(input); + relationName = WritableUtils.readString(input); + isIndexRelation = WritableUtils.readString(input).equals("true"); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("ViewName" + this.viewName); + if (this.tenantId != null) { + sb.append(", Tenant:" + this.tenantId); + } + if (this.isIndexRelation) { + sb.append(", IndexName:" + this.relationName); + } else { + sb.append(", BaseTableName:" + this.relationName); + } + + return sb.toString(); + } +} \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ViewInfoWritable.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ViewInfoWritable.java new file mode 100644 index 00000000000..48a08e24554 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ViewInfoWritable.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.util; + +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public interface ViewInfoWritable extends Writable { + public enum ViewInfoJobState { + INITIALIZED(1), + RUNNING(2), + SUCCEEDED(3), + FAILED(4), + KILLED(5), + DELETED(6); + + int value; + + ViewInfoJobState(int value) { + this.value = value; + } + + public int getValue() { + return this.value; + } + } + + void write(DataOutput output) throws IOException; + void readFields(DataInput input) throws IOException; + String getTenantId(); + String getViewName(); + String getRelationName(); // from index or data table + boolean isIndexRelation(); +} \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 748c4230cb9..27edf848b42 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -335,7 +335,6 @@ public interface QueryServices extends SQLCloseable { // Flag indicating that server side masking of ttl expired rows is enabled. public static final String PHOENIX_TTL_SERVER_SIDE_MASKING_ENABLED = "phoenix.ttl.server_side.masking.enabled"; - // Before 4.15 when we created a view we included the parent table column metadata in the view // metadata. After PHOENIX-3534 we allow SYSTEM.CATALOG to split and no longer store the parent // table column metadata along with the child view metadata. When we resolve a child view, we diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/DefaultMultiViewSplitStrategyTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/DefaultMultiViewSplitStrategyTest.java new file mode 100644 index 00000000000..3db85deb28d --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/DefaultMultiViewSplitStrategyTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.phoenix.mapreduce.util.DefaultMultiViewSplitStrategy; +import org.apache.phoenix.mapreduce.util.ViewInfoTracker; +import org.apache.phoenix.mapreduce.util.ViewInfoWritable; +import org.apache.phoenix.query.BaseTest; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE; +import static org.junit.Assert.assertEquals; + +public class DefaultMultiViewSplitStrategyTest extends BaseTest { + DefaultMultiViewSplitStrategy defaultMultiViewSplitStrategy = + new DefaultMultiViewSplitStrategy(); + @Test + public void testGetUpperBound() { + // given split policy to be 10 with view size 12 + // we expect 2 mappers with range [0,10) and [10,12) + assertEquals(10, + defaultMultiViewSplitStrategy.getUpperBound(10, 0, 12)); + assertEquals(12, + defaultMultiViewSplitStrategy.getUpperBound(10, 1, 12)); + + // given split policy to be 8 with view size 12 + // we expect 2 mappers with range [0,8) and [8,12) + assertEquals(8, + defaultMultiViewSplitStrategy.getUpperBound(8, 0, 12)); + assertEquals(12, + defaultMultiViewSplitStrategy.getUpperBound(8, 1, 12)); + + // given split policy to be 5 with view size 12 + // we expect 1 mappers with range [0,1) + assertEquals(1, + defaultMultiViewSplitStrategy.getUpperBound(5, 0, 1)); + } + + @Test + public void testGetNumberOfMappers() { + int viewSize = 0; + int numViewsInSplit = 10; + + // test empty cluster, which is view size is 0 + assertEquals(0, + defaultMultiViewSplitStrategy.getNumberOfMappers(viewSize,numViewsInSplit)); + + viewSize = 9; + // test viewSize is less than numViewsInSplit + assertEquals(1, + defaultMultiViewSplitStrategy.getNumberOfMappers(viewSize,numViewsInSplit)); + + // test viewSize is equal to numViewsInSplit + viewSize = 10; + assertEquals(1, + defaultMultiViewSplitStrategy.getNumberOfMappers(viewSize,numViewsInSplit)); + + // test viewSize is greater than numViewsInSplit + viewSize = 11; + assertEquals(2, + defaultMultiViewSplitStrategy.getNumberOfMappers(viewSize,numViewsInSplit)); + } + + @Test + public void testGenerateSplits() { + // test number of views greater than split policy + testGenerateSplits(11, 10, 2); + + // test number of views equal to split policy + testGenerateSplits(10, 10, 1); + + // test number of views equal to split policy + testGenerateSplits(8, 10, 1); + + // test number of views is 0 + testGenerateSplits(0, 10, 0); + + // test split policy is 0 + testGenerateSplits(8, 0, 1); + } + + private void testGenerateSplits(int numberOfViews, int splitPolicy, int expectedResultSize) { + List views = new ArrayList<>(); + for (int i = 0; i < numberOfViews; i++) { + views.add(new ViewInfoTracker()); + } + config.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, String.valueOf(splitPolicy)); + List result = defaultMultiViewSplitStrategy.generateSplits(views, config); + assertEquals(expectedResultSize, result.size()); + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormatTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormatTest.java new file mode 100644 index 00000000000..89a5c5a0a37 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewInputFormatTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobContext; +import org.junit.Test; +import org.mockito.Mockito; + +import static junit.framework.TestCase.assertTrue; +import static junit.framework.TestCase.fail; +import static org.apache.phoenix.mapreduce.util. + PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE; +import static org.apache.phoenix.mapreduce.util. + PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_SPLIT_STRATEGY_CLAZZ; +import static org.apache.phoenix.mapreduce.util. + PhoenixConfigurationUtil.MAPREDUCE_MULTI_INPUT_STRATEGY_CLAZZ; +import static org.mockito.Mockito.when; + +public class PhoenixMultiViewInputFormatTest { + + @Test + public void testDefaultConfig() throws Exception { + PhoenixMultiViewInputFormat multiViewInputFormat = new PhoenixMultiViewInputFormat(); + + Configuration config = new Configuration(); + config.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, "10"); + JobContext mockContext = Mockito.mock(JobContext.class); + when(mockContext.getConfiguration()).thenReturn(config); + + // default run should not raise error + multiViewInputFormat.getSplits(mockContext); + } + + + @Test + public void testCustomizedInputStrategyClassNotExists() { + PhoenixMultiViewInputFormat multiViewInputFormat = new PhoenixMultiViewInputFormat(); + + Configuration config = new Configuration(); + config.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, "10"); + config.set(MAPREDUCE_MULTI_INPUT_STRATEGY_CLAZZ, "dummy.path"); + JobContext mockContext = Mockito.mock(JobContext.class); + when(mockContext.getConfiguration()).thenReturn(config); + + try { + multiViewInputFormat.getSplits(mockContext); + fail(); + } catch (Exception e) { + assertTrue(e.getMessage().contains("ClassNotFoundException")); + } + } + + @Test + public void testCustomizedInputSplitClassNotExists() { + PhoenixMultiViewInputFormat multiViewInputFormat = new PhoenixMultiViewInputFormat(); + + Configuration config = new Configuration(); + config.set(MAPREDUCE_MULTI_INPUT_MAPPER_SPLIT_SIZE, "10"); + config.set(MAPREDUCE_MULTI_INPUT_SPLIT_STRATEGY_CLAZZ, "dummy.path"); + JobContext mockContext = Mockito.mock(JobContext.class); + when(mockContext.getConfiguration()).thenReturn(config); + + try { + multiViewInputFormat.getSplits(mockContext); + fail(); + } catch (Exception e) { + assertTrue(e.getMessage().contains("ClassNotFoundException")); + } + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewReaderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewReaderTest.java new file mode 100644 index 00000000000..0e439ee13a7 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewReaderTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.phoenix.mapreduce.util.ViewInfoTracker; +import org.apache.phoenix.mapreduce.util.ViewInfoWritable; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.List; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.when; + +public class PhoenixMultiViewReaderTest { + + @Test + public void test() throws Exception { + String tenantId = "Tenant1"; + String viewName = "viewName1"; + long ttl = 1; + String indexTable = "indexTable"; + String globalView = "globalView"; + + PhoenixMultiViewInputSplit mockInput = Mockito.mock(PhoenixMultiViewInputSplit.class); + TaskAttemptContext mockContext = Mockito.mock(TaskAttemptContext.class); + List viewInfoTracker = new ArrayList<>(); + viewInfoTracker.add(new ViewInfoTracker( + tenantId, + viewName, + ttl, + globalView, + false + )); + + viewInfoTracker.add(new ViewInfoTracker( + tenantId, + viewName, + ttl, + indexTable, + true + )); + when(mockInput.getViewInfoTrackerList()).thenReturn(viewInfoTracker); + PhoenixMultiViewReader phoenixMultiViewReader = new PhoenixMultiViewReader(); + phoenixMultiViewReader.initialize(mockInput, mockContext); + + ViewInfoTracker viewInfoWritable; + assertTrue(phoenixMultiViewReader.nextKeyValue()); + viewInfoWritable = (ViewInfoTracker)phoenixMultiViewReader.getCurrentValue(); + assertEquals(tenantId, viewInfoWritable.getTenantId()); + assertEquals(viewName, viewInfoWritable.getViewName()); + assertEquals(ttl, viewInfoWritable.getPhoenixTtl()); + assertEquals(false, viewInfoWritable.isIndexRelation()); + + assertTrue(phoenixMultiViewReader.nextKeyValue()); + viewInfoWritable = (ViewInfoTracker)phoenixMultiViewReader.getCurrentValue(); + assertEquals(tenantId, viewInfoWritable.getTenantId()); + assertEquals(viewName, viewInfoWritable.getViewName()); + assertEquals(ttl, viewInfoWritable.getPhoenixTtl()); + assertEquals(true, viewInfoWritable.isIndexRelation()); + + assertFalse(phoenixMultiViewReader.nextKeyValue()); + viewInfoWritable = (ViewInfoTracker)phoenixMultiViewReader.getCurrentValue(); + assertNull(phoenixMultiViewReader.getCurrentValue()); + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixTTLToolTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixTTLToolTest.java new file mode 100644 index 00000000000..a4532833cbf --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixTTLToolTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import org.apache.phoenix.query.BaseTest; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class PhoenixTTLToolTest extends BaseTest { + String viewName = generateUniqueName(); + String tenantId = generateUniqueName(); + + @Test + public void testParseInput() { + PhoenixTTLTool tool = new PhoenixTTLTool(); + tool.parseArgs(new String[] {"-a"}); + + assertEquals("NORMAL", tool.getJobPriority()); + assertEquals(true, tool.isDeletingAllViews()); + assertEquals(null, tool.getViewName()); + assertEquals(null, tool.getTenantId()); + + tool = new PhoenixTTLTool(); + tool.parseArgs(new String[] {"-v", viewName, "-i",tenantId }); + assertEquals("NORMAL", tool.getJobPriority()); + assertEquals(false, tool.isDeletingAllViews()); + assertEquals(viewName, tool.getViewName()); + assertEquals(tenantId, tool.getTenantId()); + + tool = new PhoenixTTLTool(); + tool.parseArgs(new String[] {"-v", viewName, "-p", "0"}); + assertEquals("VERY_HIGH", tool.getJobPriority()); + assertEquals(false, tool.isDeletingAllViews()); + assertEquals(viewName, tool.getViewName()); + assertEquals(null, tool.getTenantId()); + + tool = new PhoenixTTLTool(); + tool.parseArgs(new String[] {"-v", viewName, "-p", "-1"}); + assertEquals("NORMAL", tool.getJobPriority()); + assertEquals(false, tool.isDeletingAllViews()); + assertEquals(viewName, tool.getViewName()); + assertEquals(null, tool.getTenantId()); + + tool = new PhoenixTTLTool(); + tool.parseArgs(new String[] {"-v", viewName, "-p", "DSAFDAS"}); + assertEquals("NORMAL", tool.getJobPriority()); + assertEquals(false, tool.isDeletingAllViews()); + assertEquals(viewName, tool.getViewName()); + assertEquals(null, tool.getTenantId()); + + tool = new PhoenixTTLTool(); + tool.parseArgs(new String[] {"-i", tenantId}); + assertEquals("NORMAL", tool.getJobPriority()); + assertEquals(false, tool.isDeletingAllViews()); + assertEquals(null, tool.getViewName()); + assertEquals(tenantId, tool.getTenantId()); + } + + @Test (expected = IllegalStateException.class) + public void testNoInputParam() { + PhoenixTTLTool tool; + tool = new PhoenixTTLTool(); + tool.parseOptions(new String[] {}); + } +} \ No newline at end of file