diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java index a4cd3548284..6e1f8aa57e3 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java @@ -18,12 +18,16 @@ package org.apache.phoenix.end2end; import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; import java.util.Arrays; import java.util.Collection; @@ -103,6 +107,9 @@ public void testDropTableWithChildViews() throws Exception { try (Connection conn = DriverManager.getConnection(getUrl()); Connection viewConn = isMultiTenant ? DriverManager.getConnection(TENANT_SPECIFIC_URL1) : conn) { + // Empty the task table first. + conn.createStatement().execute("DELETE " + " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME); + String ddlFormat = "CREATE TABLE IF NOT EXISTS " + baseTable + " (" + " %s PK2 VARCHAR NOT NULL, V1 VARCHAR, V2 VARCHAR " @@ -126,16 +133,14 @@ public void testDropTableWithChildViews() throws Exception { // Run DropChildViewsTask to complete the tasks for dropping child views. The depth of the view tree is 2, // so we expect that this will be done in two task handling runs as each non-root level will be processed // in one run - TaskRegionObserver.DropChildViewsTask task = - new TaskRegionObserver.DropChildViewsTask( + TaskRegionObserver.SelfHealingTask task = + new TaskRegionObserver.SelfHealingTask( TaskRegionEnvironment, QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS); task.run(); task.run(); - ResultSet rs = conn.createStatement().executeQuery("SELECT * " + - " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + - " WHERE " + PhoenixDatabaseMetaData.TASK_TYPE + " = " + - PTable.TaskType.DROP_CHILD_VIEWS.getSerializedValue()); - assertFalse(rs.next()); + + assertTaskColumns(conn, PTable.TaskStatus.COMPLETED.toString(), PTable.TaskType.DROP_CHILD_VIEWS, null); + // Views should be dropped by now TableName linkTable = TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES); TableViewFinderResult childViewsResult = new TableViewFinderResult(); @@ -147,9 +152,25 @@ public void testDropTableWithChildViews() throws Exception { childViewsResult); assertTrue(childViewsResult.getLinks().size() == 0); // There should not be any orphan views - rs = conn.createStatement().executeQuery("SELECT * FROM " + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + + ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " WHERE " + PhoenixDatabaseMetaData.TABLE_SCHEM + " = '" + SCHEMA2 +"'"); assertFalse(rs.next()); } } + + public static void assertTaskColumns(Connection conn, String expectedStatus, PTable.TaskType taskType, String expectedData) + throws SQLException { + ResultSet rs = conn.createStatement().executeQuery("SELECT * " + + " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + + " WHERE " + PhoenixDatabaseMetaData.TASK_TYPE + " = " + + taskType.getSerializedValue()); + assertTrue(rs.next()); + String taskStatus = rs.getString(PhoenixDatabaseMetaData.TASK_STATUS); + assertEquals(expectedStatus, taskStatus); + + if (expectedData != null) { + String data = rs.getString(PhoenixDatabaseMetaData.TASK_DATA); + assertEquals(expectedData, data); + } + } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java new file mode 100644 index 00000000000..b1a6ba3c356 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.TaskRegionObserver; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.task.Task; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.MetaDataUtil; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Timestamp; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class IndexRebuildTaskIT extends BaseUniqueNamesOwnClusterIT { + protected static String TENANT1 = "tenant1"; + private static RegionCoprocessorEnvironment TaskRegionEnvironment; + + @BeforeClass + public static void doSetup() throws Exception { + HashMap props = new HashMap<>(); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + TaskRegionEnvironment = + getUtility() + .getRSForFirstRegionInTable( + PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME) + .getRegions(PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME) + .get(0).getCoprocessorHost() + .findCoprocessorEnvironment(TaskRegionObserver.class.getName()); + } + + private String generateDDL(String format) { + StringBuilder optionsBuilder = new StringBuilder(); + + if (optionsBuilder.length() != 0) optionsBuilder.append(","); + optionsBuilder.append("MULTI_TENANT=true"); + + return String.format(format, "TENANT_ID VARCHAR NOT NULL, ", "TENANT_ID, ", optionsBuilder.toString()); + } + + @Test + public void testIndexRebuildTask() throws Throwable { + String baseTable = generateUniqueName(); + Connection conn = null; + Connection viewConn = null; + try { + conn = DriverManager.getConnection(getUrl()); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, TENANT1); + + viewConn =DriverManager.getConnection(getUrl(), props); + String ddlFormat = + "CREATE TABLE IF NOT EXISTS " + baseTable + " (" + + " %s PK2 VARCHAR NOT NULL, V1 VARCHAR, V2 VARCHAR " + + " CONSTRAINT NAME_PK PRIMARY KEY (%s PK2)" + " ) %s"; + conn.createStatement().execute(generateDDL(ddlFormat)); + conn.commit(); + // Create a view + String viewName = generateUniqueName(); + String viewDDL = "CREATE VIEW " + viewName + " AS SELECT * FROM " + baseTable; + viewConn.createStatement().execute(viewDDL); + + // Create index + String indexName = generateUniqueName(); + String idxSDDL = String.format("CREATE INDEX %s ON %s (V1)", indexName, viewName); + + viewConn.createStatement().execute(idxSDDL); + + // Insert rows + int numOfValues = 1; + for (int i=0; i < numOfValues; i++){ + viewConn.createStatement().execute( + String.format("UPSERT INTO %s VALUES('%s', '%s', '%s')", viewName, String.valueOf(i), "y", + "z")); + } + viewConn.commit(); + + String data = "{IndexName:" + indexName + "}"; + // Run IndexRebuildTask + TaskRegionObserver.SelfHealingTask task = + new TaskRegionObserver.SelfHealingTask( + TaskRegionEnvironment, QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS); + + Timestamp startTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis()); + // Add a task to System.Task to build indexes + Task.addTask(conn.unwrap(PhoenixConnection.class), PTable.TaskType.INDEX_REBUILD, + TENANT1, null, viewName, + PTable.TaskStatus.CREATED.toString(), data, null, startTs, null, true); + + + task.run(); + + String viewIndexTableName = MetaDataUtil.getViewIndexPhysicalName(baseTable); + ConnectionQueryServices queryServices = conn.unwrap(PhoenixConnection.class).getQueryServices(); + int count = getUtility().countRows(queryServices.getTable(Bytes.toBytes(viewIndexTableName))); + assertTrue(count == numOfValues); + + + // Remove index contents and try again + Admin admin = queryServices.getAdmin(); + TableName tableName = TableName.valueOf(viewIndexTableName); + admin.disableTable(tableName); + admin.truncateTable(tableName, false); + + data = "{IndexName:" + indexName + ", DisableBefore:true}"; + + // Add a new task (update status to created) to System.Task to rebuild indexes + Task.addTask(conn.unwrap(PhoenixConnection.class), PTable.TaskType.INDEX_REBUILD, + TENANT1, null, viewName, + PTable.TaskStatus.CREATED.toString(), data, null, startTs, null, true); + task.run(); + + Thread.sleep(15000); + + Table systemHTable= queryServices.getTable(Bytes.toBytes("SYSTEM."+PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE)); + count = getUtility().countRows(systemHTable); + assertEquals(1, count); + + // Check task status and other column values. + DropTableWithViewsIT.assertTaskColumns(conn, PTable.TaskStatus.COMPLETED.toString(), PTable.TaskType.INDEX_REBUILD, + null); + + // See that index is rebuilt and confirm index has rows + Table htable= queryServices.getTable(Bytes.toBytes(viewIndexTableName)); + count = getUtility().countRows(htable); + assertEquals(numOfValues, count); + } finally { + conn.createStatement().execute("DELETE " + " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME); + conn.commit(); + if (conn != null) { + conn.close(); + } + if (viewConn != null) { + viewConn.close(); + } + } + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 192d004e928..45712a4476c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -227,6 +227,7 @@ import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.task.Task; import org.apache.phoenix.schema.types.PBinary; import org.apache.phoenix.schema.types.PBoolean; import org.apache.phoenix.schema.types.PDataType; @@ -2837,7 +2838,7 @@ private MetaDataMutationResult doDropTable(byte[] key, byte[] tenantId, byte[] s } try { PhoenixConnection conn = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class); - TaskRegionObserver.addTask(conn, PTable.TaskType.DROP_CHILD_VIEWS, Bytes.toString(tenantId), + Task.addTask(conn, PTable.TaskType.DROP_CHILD_VIEWS, Bytes.toString(tenantId), Bytes.toString(schemaName), Bytes.toString(tableName), this.accessCheckEnabled); } catch (Throwable t) { logger.error("Adding a task to drop child views failed!", t); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java index 2d94aa6bd0b..a6c53288193 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java @@ -18,21 +18,24 @@ package org.apache.phoenix.coprocessor; import java.io.IOException; -import java.security.PrivilegedExceptionAction; -import java.sql.PreparedStatement; -import java.sql.ResultSet; +import java.lang.reflect.Method; import java.sql.SQLException; import java.sql.Timestamp; -import java.sql.Types; +import java.util.Arrays; +import java.util.List; +import java.util.Map; import java.util.Optional; -import java.util.Properties; import java.util.TimerTask; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.GuardedBy; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -41,20 +44,16 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; -import org.apache.hadoop.hbase.ipc.RpcCall; -import org.apache.hadoop.hbase.ipc.RpcUtil; -import org.apache.hadoop.hbase.security.User; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.TaskType; -import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.schema.task.Task; +import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.QueryUtil; @@ -65,12 +64,51 @@ public class TaskRegionObserver implements RegionObserver, RegionCoprocessor { public static final Log LOG = LogFactory.getLog(TaskRegionObserver.class); + protected ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(TaskType.values().length); private long timeInterval = QueryServicesOptions.DEFAULT_TASK_HANDLING_INTERVAL_MS; private long timeMaxInterval = QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS; @GuardedBy("TaskRegionObserver.class") private long initialDelay = QueryServicesOptions.DEFAULT_TASK_HANDLING_INITIAL_DELAY_MS; + private static Map classMap = ImmutableMap.builder() + .put(TaskType.DROP_CHILD_VIEWS, "org.apache.phoenix.coprocessor.tasks.DropChildViewsTask") + .put(TaskType.INDEX_REBUILD, "org.apache.phoenix.coprocessor.tasks.IndexRebuildTask") + .build(); + + public enum TaskResultCode { + SUCCESS, + FAIL, + SKIPPED, + } + + public static class TaskResult { + private TaskResultCode resultCode; + private String details; + + public TaskResult(TaskResultCode resultCode, String details) { + this.resultCode = resultCode; + this.details = details; + } + + public TaskResultCode getResultCode() { + return resultCode; + } + + public String getDetails() { + return details; + } + + @Override + public String toString() { + String result = resultCode.name(); + if (!Strings.isNullOrEmpty(details)) { + result = result + " - " + details; + } + return result; + } + } + @Override public void preClose(final ObserverContext c, boolean abortRequested) { @@ -109,102 +147,16 @@ public void postOpen(ObserverContext e) { deprecationLogger.setLevel(Level.WARN); } - DropChildViewsTask task = new DropChildViewsTask(e.getEnvironment(), timeMaxInterval); + SelfHealingTask task = new SelfHealingTask(e.getEnvironment(), timeMaxInterval); executor.scheduleWithFixedDelay(task, initialDelay, timeInterval, TimeUnit.MILLISECONDS); } - private static void mutateSystemTaskTable(PhoenixConnection conn, PreparedStatement stmt, boolean accessCheckEnabled) - throws IOException { - // we need to mutate SYSTEM.TASK with HBase/login user if access is enabled. - if (accessCheckEnabled) { - User.runAsLoginUser(new PrivilegedExceptionAction() { - @Override - public Void run() throws Exception { - final RpcCall rpcContext = RpcUtil.getRpcContext(); - // setting RPC context as null so that user can be reset - try { - RpcUtil.setRpcContext(null); - stmt.execute(); - conn.commit(); - } catch (SQLException e) { - throw new IOException(e); - } finally { - // setting RPC context back to original context of the RPC - RpcUtil.setRpcContext(rpcContext); - } - return null; - } - }); - } - else { - try { - stmt.execute(); - conn.commit(); - } catch (SQLException e) { - throw new IOException(e); - } - } - } - - public static void addTask(PhoenixConnection conn, TaskType taskType, String tenantId, String schemaName, - String tableName, boolean accessCheckEnabled) - throws IOException { - PreparedStatement stmt = null; - try { - stmt = conn.prepareStatement("UPSERT INTO " + - PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + " ( " + - PhoenixDatabaseMetaData.TASK_TYPE + ", " + - PhoenixDatabaseMetaData.TENANT_ID + ", " + - PhoenixDatabaseMetaData.TABLE_SCHEM + ", " + - PhoenixDatabaseMetaData.TABLE_NAME + " ) VALUES(?,?,?,?)"); - stmt.setByte(1, taskType.getSerializedValue()); - if (tenantId != null) { - stmt.setString(2, tenantId); - } else { - stmt.setNull(2, Types.VARCHAR); - } - if (schemaName != null) { - stmt.setString(3, schemaName); - } else { - stmt.setNull(3, Types.VARCHAR); - } - stmt.setString(4, tableName); - } catch (SQLException e) { - throw new IOException(e); - } - mutateSystemTaskTable(conn, stmt, accessCheckEnabled); - } - - public static void deleteTask(PhoenixConnection conn, TaskType taskType, Timestamp ts, String tenantId, - String schemaName, String tableName, boolean accessCheckEnabled) throws IOException { - PreparedStatement stmt = null; - try { - stmt = conn.prepareStatement("DELETE FROM " + - PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + - " WHERE " + PhoenixDatabaseMetaData.TASK_TYPE + " = ? AND " + - PhoenixDatabaseMetaData.TASK_TS + " = ? AND " + - PhoenixDatabaseMetaData.TENANT_ID + (tenantId == null ? " IS NULL " : " = '" + tenantId + "'") + " AND " + - PhoenixDatabaseMetaData.TABLE_SCHEM + (schemaName == null ? " IS NULL " : " = '" + schemaName + "'") + " AND " + - PhoenixDatabaseMetaData.TABLE_NAME + " = ?"); - stmt.setByte(1, taskType.getSerializedValue()); - stmt.setTimestamp(2, ts); - stmt.setString(3, tableName); - } catch (SQLException e) { - throw new IOException(e); - } - mutateSystemTaskTable(conn, stmt, accessCheckEnabled); - } - - /** - * Task runs periodically to clean up task of child views whose parent is dropped - * - */ - public static class DropChildViewsTask extends TimerTask { - private RegionCoprocessorEnvironment env; - private long timeMaxInterval; - private boolean accessCheckEnabled; + public static class SelfHealingTask extends TimerTask { + protected RegionCoprocessorEnvironment env; + protected long timeMaxInterval; + protected boolean accessCheckEnabled; - public DropChildViewsTask(RegionCoprocessorEnvironment env, long timeMaxInterval) { + public SelfHealingTask(RegionCoprocessorEnvironment env, long timeMaxInterval) { this.env = env; this.accessCheckEnabled = env.getConfiguration().getBoolean(QueryServices.PHOENIX_ACLS_ENABLED, QueryServicesOptions.DEFAULT_PHOENIX_ACLS_ENABLED); @@ -214,88 +166,105 @@ public DropChildViewsTask(RegionCoprocessorEnvironment env, long timeMaxInterval @Override public void run() { PhoenixConnection connForTask = null; - Timestamp timestamp = null; - String tenantId = null; - byte[] tenantIdBytes; - String schemaName= null; - byte[] schemaNameBytes; - String tableName = null; - byte[] tableNameBytes; - PhoenixConnection pconn; try { - String taskQuery = "SELECT " + - PhoenixDatabaseMetaData.TASK_TS + ", " + - PhoenixDatabaseMetaData.TENANT_ID + ", " + - PhoenixDatabaseMetaData.TABLE_SCHEM + ", " + - PhoenixDatabaseMetaData.TABLE_NAME + - " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + - " WHERE "+ PhoenixDatabaseMetaData.TASK_TYPE + " = " + PTable.TaskType.DROP_CHILD_VIEWS.getSerializedValue(); - connForTask = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class); - PreparedStatement taskStatement = connForTask.prepareStatement(taskQuery); - ResultSet rs = taskStatement.executeQuery(); - while (rs.next()) { + String[] excludeStates = new String[] { PTable.TaskStatus.FAILED.toString(), + PTable.TaskStatus.COMPLETED.toString() }; + List taskRecords = Task.queryTaskTable(connForTask, excludeStates); + for (Task.TaskRecord taskRecord : taskRecords){ try { - // delete child views only if the parent table is deleted from the system catalog - timestamp = rs.getTimestamp(1); - tenantId = rs.getString(2); - tenantIdBytes= rs.getBytes(2); - schemaName= rs.getString(3); - schemaNameBytes = rs.getBytes(3); - tableName= rs.getString(4); - tableNameBytes = rs.getBytes(4); - - if (tenantId != null) { - Properties tenantProps = new Properties(); - tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); - pconn = QueryUtil.getConnectionOnServer(tenantProps, env.getConfiguration()).unwrap(PhoenixConnection.class); - + TaskType taskType = taskRecord.getTaskType(); + if (!classMap.containsKey(taskType)) { + LOG.warn("Don't know how to execute task type: " + taskType.name()); + continue; } - else { - pconn = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class); + + String className = classMap.get(taskType); + + Class concreteClass = Class.forName(className); + + Object obj = concreteClass.newInstance(); + Method runMethod = concreteClass.getDeclaredMethod("run", + Task.TaskRecord.class); + Method checkCurretResult = concreteClass.getDeclaredMethod("checkCurrentResult", Task.TaskRecord.class); + Method initMethod = concreteClass.getSuperclass().getDeclaredMethod("init", + RegionCoprocessorEnvironment.class, Long.class); + initMethod.invoke(obj, env, timeMaxInterval); + + // if current status is already Started, check if we need to re-run. + // Task can be async and already Started before. + TaskResult result = null; + if (taskRecord.getStatus() != null && taskRecord.getStatus().equals(PTable.TaskStatus.STARTED.toString())) { + result = (TaskResult) checkCurretResult.invoke(obj, taskRecord); } - MetaDataProtocol.MetaDataMutationResult result = new MetaDataClient(pconn).updateCache(pconn.getTenantId(), - schemaName, tableName, true); - if (result.getMutationCode() != MetaDataProtocol.MutationCode.TABLE_ALREADY_EXISTS) { - MetaDataEndpointImpl.dropChildViews(env, tenantIdBytes, schemaNameBytes, tableNameBytes); - } else if (System.currentTimeMillis() < timeMaxInterval + timestamp.getTime()) { - // skip this task as it has not been expired and its parent table has not been dropped yet - LOG.info("Skipping a child view drop task. The parent table has not been dropped yet : " + - schemaName + "." + tableName + - " with tenant id " + (tenantId == null ? " IS NULL" : tenantId) + - " and timestamp " + timestamp.toString()); - continue; + if (result == null) { + // reread task record. There might be async setting of task status + taskRecord = Task.queryTaskTable(connForTask, taskRecord.getSchemaName(), taskRecord.getTableName(), + taskType, taskRecord.getTenantId(), null).get(0); + if (taskRecord.getStatus() != null && Arrays.stream(excludeStates).anyMatch(taskRecord.getStatus()::equals)) { + continue; + } + // Change task status to STARTED + Task.addTask(connForTask, taskRecord.getTaskType(), taskRecord.getTenantId(), taskRecord.getSchemaName(), + taskRecord.getTableName(), PTable.TaskStatus.STARTED.toString(), + taskRecord.getData(), taskRecord.getPriority(), taskRecord.getTimeStamp(), null, true); + + // invokes the method at runtime + result = (TaskResult) runMethod.invoke(obj, taskRecord); } - else { - LOG.warn(" A drop child view task has expired and will be removed from the system task table : " + - schemaName + "." + tableName + - " with tenant id " + (tenantId == null ? " IS NULL" : tenantId) + - " and timestamp " + timestamp.toString()); + + if (result != null) { + String taskStatus = PTable.TaskStatus.FAILED.toString(); + if (result.getResultCode() == TaskResultCode.SUCCESS) { + taskStatus = PTable.TaskStatus.COMPLETED.toString(); + } else if (result.getResultCode() == TaskResultCode.SKIPPED) { + // We will pickup this task again + continue; + } + + setEndTaskStatus(connForTask, taskRecord, taskStatus); } - deleteTask(connForTask, PTable.TaskType.DROP_CHILD_VIEWS, timestamp, tenantId, schemaName, - tableName, this.accessCheckEnabled); } catch (Throwable t) { - LOG.warn("Exception while dropping a child view task. " + + LOG.warn("Exception while running self healingtask. " + "It will be retried in the next system task table scan : " + - schemaName + "." + tableName + - " with tenant id " + (tenantId == null ? " IS NULL" : tenantId) + - " and timestamp " + timestamp.toString(), t); + " taskType : " + taskRecord.getTaskType().name() + + taskRecord.getSchemaName() + "." + taskRecord.getTableName() + + " with tenant id " + (taskRecord.getTenantId() == null ? " IS NULL" : taskRecord.getTenantId()) + + " and timestamp " + taskRecord.getTimeStamp().toString(), t); } } } catch (Throwable t) { - LOG.error("DropChildViewsTask failed!", t); + LOG.error("SelfHealingTask failed!", t); } finally { if (connForTask != null) { try { connForTask.close(); } catch (SQLException ignored) { - LOG.debug("DropChildViewsTask can't close connection", ignored); + LOG.debug("SelfHealingTask can't close connection", ignored); } } } } + + public static void setEndTaskStatus(PhoenixConnection connForTask, Task.TaskRecord taskRecord, String taskStatus) + throws IOException { + // update data with details. + String data = taskRecord.getData(); + if (Strings.isNullOrEmpty(data)) { + data = "{}"; + } + JsonParser jsonParser = new JsonParser(); + JsonObject jsonObject = jsonParser.parse(data).getAsJsonObject(); + jsonObject.addProperty("TaskDetails", taskStatus); + data = jsonObject.toString(); + + Timestamp endTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis()); + Task.addTask(connForTask, taskRecord.getTaskType(), taskRecord.getTenantId(), taskRecord.getSchemaName(), + taskRecord.getTableName(), taskStatus, data, taskRecord.getPriority(), + taskRecord.getTimeStamp(), endTs, true); + } } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/BaseTask.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/BaseTask.java new file mode 100644 index 00000000000..5c9a5c4b5e4 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/BaseTask.java @@ -0,0 +1,17 @@ +package org.apache.phoenix.coprocessor.tasks; + +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.phoenix.coprocessor.TaskRegionObserver; +import org.apache.phoenix.schema.task.Task; + +public abstract class BaseTask { + protected long timeMaxInterval; + protected RegionCoprocessorEnvironment env; + public void init(RegionCoprocessorEnvironment env, Long interval) { + this.env = env; + this.timeMaxInterval = interval; + } + public abstract TaskRegionObserver.TaskResult run(Task.TaskRecord taskRecord); + + public abstract TaskRegionObserver.TaskResult checkCurrentResult(Task.TaskRecord taskRecord) throws Exception; +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/DropChildViewsTask.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/DropChildViewsTask.java new file mode 100644 index 00000000000..f00e1f622dd --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/DropChildViewsTask.java @@ -0,0 +1,81 @@ +package org.apache.phoenix.coprocessor.tasks; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.phoenix.coprocessor.MetaDataEndpointImpl; +import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.coprocessor.TaskRegionObserver; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.schema.MetaDataClient; +import org.apache.phoenix.schema.task.Task; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; + +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.Properties; + +/** + * Task runs periodically to clean up task of child views whose parent is dropped + * + */ +public class DropChildViewsTask extends BaseTask { + public static final Log LOG = LogFactory.getLog(DropChildViewsTask.class); + + public TaskRegionObserver.TaskResult run(Task.TaskRecord taskRecord) { + PhoenixConnection pconn = null; + Timestamp timestamp = taskRecord.getTimeStamp(); + try { + String tenantId = taskRecord.getTenantId(); + if (tenantId != null) { + Properties tenantProps = new Properties(); + tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + pconn = QueryUtil.getConnectionOnServer(tenantProps, env.getConfiguration()).unwrap(PhoenixConnection.class); + } + else { + pconn = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class); + } + + MetaDataProtocol.MetaDataMutationResult result = new MetaDataClient(pconn).updateCache(pconn.getTenantId(), + taskRecord.getSchemaName(), taskRecord.getTableName(), true); + if (result.getMutationCode() != MetaDataProtocol.MutationCode.TABLE_ALREADY_EXISTS) { + MetaDataEndpointImpl + .dropChildViews(env, taskRecord.getTenantIdBytes(), taskRecord.getSchemaNameBytes(), taskRecord.getTableNameBytes()); + return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SUCCESS, ""); + } else if (System.currentTimeMillis() < timeMaxInterval + timestamp.getTime()) { + // skip this task as it has not been expired and its parent table has not been dropped yet + LOG.info("Skipping a child view drop task. The parent table has not been dropped yet : " + + taskRecord.getSchemaName() + "." + taskRecord.getTableName() + + " with tenant id " + (tenantId == null ? " IS NULL" : tenantId) + + " and timestamp " + timestamp.toString()); + return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SKIPPED, ""); + } + else { + LOG.warn(" A drop child view task has expired and will be marked as failed : " + + taskRecord.getSchemaName() + "." + taskRecord.getTableName() + + " with tenant id " + (tenantId == null ? " IS NULL" : tenantId) + + " and timestamp " + timestamp.toString()); + return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, "Expired"); + } + } + catch (Throwable t) { + LOG.warn("Exception while dropping a child view task. " + + taskRecord.getSchemaName() + "." + taskRecord.getTableName() + + " with tenant id " + (taskRecord.getTenantId() == null ? " IS NULL" : taskRecord.getTenantId()) + + " and timestamp " + timestamp.toString(), t); + return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, t.toString()); + } finally { + if (pconn != null) { + try { + pconn.close(); + } catch (SQLException ignored) { + LOG.debug("DropChildViewsTask can't close connection", ignored); + } + } + } + } + + public TaskRegionObserver.TaskResult checkCurrentResult(Task.TaskRecord taskRecord) throws Exception { + return null; + } +} \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java new file mode 100644 index 00000000000..c2bdf5115de --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java @@ -0,0 +1,151 @@ +package org.apache.phoenix.coprocessor.tasks; + +import com.google.common.base.Strings; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.mapreduce.Cluster; +import org.apache.hadoop.mapreduce.Job; +import org.apache.phoenix.coprocessor.TaskRegionObserver; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.index.IndexTool; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.task.Task; +import org.apache.phoenix.util.QueryUtil; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Map; + +/** + * Task runs periodically to rebuild indexes for System.Task entries. + * + */ +public class IndexRebuildTask extends BaseTask { + public static final String IndexName = "IndexName"; + public static final String JobID = "JobID"; + public static final Log LOG = LogFactory.getLog(IndexRebuildTask.class); + + @Override + public TaskRegionObserver.TaskResult run(Task.TaskRecord taskRecord) { + Connection conn = null; + + try { + // We have to clone the configuration because env.getConfiguration is readonly. + Configuration conf = HBaseConfiguration.create(env.getConfiguration()); + conn = QueryUtil.getConnectionOnServer(env.getConfiguration()); + + conf.set(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString()); + + String data = taskRecord.getData(); + if (Strings.isNullOrEmpty(taskRecord.getData())) { + data = "{}"; + } + JsonParser jsonParser = new JsonParser(); + JsonObject jsonObject = jsonParser.parse(data).getAsJsonObject(); + String indexName = getIndexName(jsonObject); + if (Strings.isNullOrEmpty(indexName)) { + String str = "Index name is not found. Index rebuild cannot continue " + + "Data : " + data; + LOG.warn(str); + return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, str); + } + + boolean shouldDisable = false; + if (jsonObject.has("DisableBefore")) { + String disableBefore = jsonObject.get("DisableBefore").toString(); + if (!Strings.isNullOrEmpty(disableBefore)) { + shouldDisable = Boolean.valueOf(disableBefore); + } + } + + // Run index tool async. + boolean runForeground = false; + Map.Entry indexToolRes = IndexTool + .run(conf, taskRecord.getSchemaName(), taskRecord.getTableName(), indexName, true, + false, taskRecord.getTenantId(), shouldDisable, runForeground); + int status = indexToolRes.getKey(); + if (status != 0) { + return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, "Index tool returned : " + status); + } + + Job job = indexToolRes.getValue(); + + jsonObject.addProperty(JobID, job.getJobID().toString()); + Task.addTask(conn.unwrap(PhoenixConnection.class ), taskRecord.getTaskType(), taskRecord.getTenantId(), taskRecord.getSchemaName(), + taskRecord.getTableName(), PTable.TaskStatus.STARTED.toString(), jsonObject.toString(), taskRecord.getPriority(), + taskRecord.getTimeStamp(), null, true); + // It will take some time to finish, so we will check the status in a separate task. + return null; + } + catch (Throwable t) { + LOG.warn("Exception while running index rebuild task. " + + "It will be retried in the next system task table scan : " + + taskRecord.getSchemaName() + "." + taskRecord.getTableName() + + " with tenant id " + (taskRecord.getTenantId() == null ? " IS NULL" : taskRecord.getTenantId()) + + " and data " + taskRecord.getData(), t); + return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, t.toString()); + } finally { + if (conn != null) { + try { + conn.close(); + } catch (SQLException e) { + LOG.debug("IndexRebuildTask can't close connection"); + } + } + } + + } + + private String getIndexName(JsonObject jsonObject) { + String indexName = null; + // Get index name from data column. + if (jsonObject.has(IndexName)) { + indexName = jsonObject.get(IndexName).toString().replaceAll("\"", ""); + } + return indexName; + } + + private String getJobID(String data) { + if (Strings.isNullOrEmpty(data)) { + data = "{}"; + } + JsonParser jsonParser = new JsonParser(); + JsonObject jsonObject = jsonParser.parse(data).getAsJsonObject(); + String jobId = null; + if (jsonObject.has(JobID)) { + jobId = jsonObject.get(JobID).toString().replaceAll("\"", ""); + } + return jobId; + } + + @Override + public TaskRegionObserver.TaskResult checkCurrentResult(Task.TaskRecord taskRecord) + throws Exception { + + String jobID = getJobID(taskRecord.getData()); + if (jobID != null) { + Configuration conf = HBaseConfiguration.create(env.getConfiguration()); + Configuration configuration = HBaseConfiguration.addHbaseResources(conf); + Cluster cluster = new Cluster(configuration); + + Job job = cluster.getJob(org.apache.hadoop.mapreduce.JobID.forName(jobID)); + + if (job != null && job.isComplete()) { + if (job.isSuccessful()) { + LOG.warn("IndexRebuildTask checkCurrentResult job is successful " + taskRecord.getTableName()); + return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SUCCESS, ""); + } else { + return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, + "Index is DISABLED"); + } + } + + } + return null; + } +} \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index 5427b5fd4d0..d56eaa4a72c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -219,6 +219,11 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { public static final byte[] TASK_TYPE_BYTES = Bytes.toBytes(TASK_TYPE); public static final String TASK_TS = "TASK_TS"; public static final byte[] TASK_TS_BYTES = Bytes.toBytes(TASK_TS); + public static final String TASK_STATUS = "TASK_STATUS"; + public static final String TASK_END_TS = "TASK_END_TS"; + public static final String TASK_PRIORITY = "TASK_PRIORITY"; + public static final String TASK_DATA = "TASK_DATA"; + public static final String TASK_TABLE_TTL = "864000"; public static final String ARRAY_SIZE = "ARRAY_SIZE"; public static final byte[] ARRAY_SIZE_BYTES = Bytes.toBytes(ARRAY_SIZE); public static final String VIEW_CONSTANT = "VIEW_CONSTANT"; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index bd1a310baba..6fefe4ed386 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -28,10 +28,12 @@ import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import com.google.common.base.Strings; import org.apache.commons.cli.CommandLine; @@ -493,14 +495,17 @@ private Job configureJobForServerBuildIndex() PhoenixConfigurationUtil.setTenantId(configuration, tenantId); } - fs = outputPath.getFileSystem(configuration); - fs.delete(outputPath, true); - + if (outputPath != null) { + fs = outputPath.getFileSystem(configuration); + fs.delete(outputPath, true); + } final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable); final Job job = Job.getInstance(configuration, jobName); job.setJarByClass(IndexTool.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); - FileOutputFormat.setOutputPath(job, outputPath); + if (outputPath != null) { + FileOutputFormat.setOutputPath(job, outputPath); + } PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, PhoenixServerBuildIndexInputFormat.class, qDataTable, ""); @@ -590,6 +595,7 @@ public int run(String[] args) throws Exception { tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt()); configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); } + schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt()); dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt()); indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt()); @@ -608,7 +614,7 @@ public int run(String[] args) throws Exception { pIndexTable = null; connection = ConnectionUtil.getInputConnection(configuration); - + if (indexTable != null) { if (!isValidIndexTable(connection, qDataTable,indexTable, tenantId)) { throw new IllegalArgumentException(String.format( @@ -875,6 +881,48 @@ public static boolean isValidIndexTable(final Connection connection, final Strin return false; } + public static Map.Entry run(Configuration conf, String schemaName, String dataTable, String indexTable, + boolean directApi, boolean useSnapshot, String tenantId, boolean disableBefore, boolean runForeground) throws Exception { + final List args = Lists.newArrayList(); + if (schemaName != null) { + args.add("-s"); + args.add(schemaName); + } + args.add("-dt"); + args.add(dataTable); + args.add("-it"); + args.add(indexTable); + if (directApi) { + args.add("-direct"); + } + + if (runForeground) { + args.add("-runfg"); + } + + if (useSnapshot) { + args.add("-snap"); + } + + if (tenantId != null) { + args.add("-tenant"); + args.add(tenantId); + } + + args.add("-op"); + args.add("/tmp/" + UUID.randomUUID().toString()); + + if (disableBefore) { + PhoenixConfigurationUtil.setDisableIndexes(conf, indexTable); + } + + IndexTool indexingTool = new IndexTool(); + indexingTool.setConf(conf); + int status = indexingTool.run(args.toArray(new String[0])); + Job job = indexingTool.getJob(); + return new AbstractMap.SimpleEntry(status, job); + } + public static void main(final String[] args) throws Exception { int result = ToolRunner.run(new IndexTool(), args); System.exit(result); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java index 0786b9b3632..57688fd2f0e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java @@ -18,17 +18,31 @@ package org.apache.phoenix.mapreduce.index; import java.io.IOException; +import java.sql.Connection; import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.List; +import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; +import org.apache.phoenix.coprocessor.TaskRegionObserver; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.task.Task; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.SchemaUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID; + /** * Reducer class that does only one task and that is to update the index state of the table. */ @@ -41,9 +55,38 @@ public class PhoenixIndexImportDirectReducer extends protected void cleanup(Context context) throws IOException, InterruptedException{ try { IndexToolUtil.updateIndexState(context.getConfiguration(), PIndexState.ACTIVE); + + updateTasksTable(context); } catch (SQLException e) { LOG.error(" Failed to update the status to Active"); throw new RuntimeException(e.getMessage()); } } + + private void updateTasksTable(Context context) throws SQLException, IOException { + final Properties overrideProps = new Properties(); + final Connection + connection = ConnectionUtil + .getOutputConnection(context.getConfiguration(), overrideProps); + try { + String fullTableName = PhoenixConfigurationUtil.getInputTableName(context.getConfiguration()); + String tenantId = context.getConfiguration().get(MAPREDUCE_TENANT_ID, null); + String schemaName = SchemaUtil.getSchemaNameFromFullName(fullTableName); + String tableName = SchemaUtil.getTableNameFromFullName(fullTableName); + String indexName = PhoenixConfigurationUtil.getDisableIndexes(context.getConfiguration()); + List taskRecords = Task.queryTaskTable(connection, schemaName, tableName, + PTable.TaskType.INDEX_REBUILD, tenantId, indexName); + if (taskRecords != null && taskRecords.size() > 0) { + for (Task.TaskRecord taskRecord : taskRecords) { + TaskRegionObserver.SelfHealingTask.setEndTaskStatus( + connection.unwrap(PhoenixConnection.class), taskRecords.get(0), + PTable.TaskStatus.COMPLETED.toString()); + } + } + } finally { + if (connection != null) { + connection.close(); + } + } + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 1f5cd48e3e2..9dcdd0f338b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -47,8 +47,10 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_NAME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TASK_TABLE_TTL; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HCONNECTIONS_COUNTER; @@ -241,6 +243,7 @@ import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.schema.types.PTimestamp; import org.apache.phoenix.schema.types.PTinyint; import org.apache.phoenix.schema.types.PUnsignedTinyint; import org.apache.phoenix.schema.types.PVarbinary; @@ -3521,6 +3524,30 @@ public void upgradeSystemTables(final String url, final Properties props) throws clearCache(); } } + + try { + metaConnection.createStatement().executeUpdate(getTaskDDL()); + } catch (NewerTableAlreadyExistsException e) { + + } catch (TableAlreadyExistsException e) { + long currentServerSideTableTimeStamp = e.getTable().getTimeStamp(); + if (currentServerSideTableTimeStamp <= MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_15_0) { + String + columnsToAdd = + PhoenixDatabaseMetaData.TASK_STATUS + " " + PVarchar.INSTANCE.getSqlTypeName() + ", " + + PhoenixDatabaseMetaData.TASK_END_TS + " " + PTimestamp.INSTANCE.getSqlTypeName() + ", " + + PhoenixDatabaseMetaData.TASK_PRIORITY + " " + PUnsignedTinyint.INSTANCE.getSqlTypeName() + ", " + + PhoenixDatabaseMetaData.TASK_DATA + " " + PVarchar.INSTANCE.getSqlTypeName(); + String taskTableFullName = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_TASK_TABLE); + metaConnection = + addColumnsIfNotExists(metaConnection, taskTableFullName, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP, columnsToAdd); + metaConnection.createStatement().executeUpdate( + "ALTER TABLE " + taskTableFullName + " SET " + TTL + "=" + TASK_TABLE_TTL); + clearCache(); + } + } + try { metaConnection.createStatement().executeUpdate(getFunctionTableDDL()); } catch (NewerTableAlreadyExistsException e) {} catch (TableAlreadyExistsException e) {} @@ -3533,9 +3560,6 @@ public void upgradeSystemTables(final String url, final Properties props) throws try { metaConnection.createStatement().executeUpdate(getMutexDDL()); } catch (NewerTableAlreadyExistsException e) {} catch (TableAlreadyExistsException e) {} - try { - metaConnection.createStatement().executeUpdate(getTaskDDL()); - } catch (NewerTableAlreadyExistsException e) {} catch (TableAlreadyExistsException e) {} // In case namespace mapping is enabled and system table to system namespace mapping is also enabled, // create an entry for the SYSTEM namespace in the SYSCAT table, so that GRANT/REVOKE commands can work diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java index 028adfdaef7..a8f332cf5a9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -384,9 +384,15 @@ public enum JoinType {INNER, LEFT_OUTER} TENANT_ID + " VARCHAR NULL," + TABLE_SCHEM + " VARCHAR NULL," + TABLE_NAME + " VARCHAR NOT NULL,\n" + + // Non-PK columns + TASK_STATUS + " VARCHAR NULL," + + TASK_END_TS + " TIMESTAMP NULL," + + TASK_PRIORITY + " UNSIGNED_TINYINT NULL," + + TASK_DATA + " VARCHAR NULL,\n" + "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TASK_TYPE + "," + TASK_TS + " ROW_TIMESTAMP," + TENANT_ID + "," + TABLE_SCHEM + "," + TABLE_NAME + "))\n" + HConstants.VERSIONS + "=%s,\n" + ColumnFamilyDescriptorBuilder.KEEP_DELETED_CELLS + "=%s,\n" + + ColumnFamilyDescriptorBuilder.TTL + "=" + TASK_TABLE_TTL + ",\n" + // 10 days PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE; } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java index 3e22225c94e..6c4d3a361a7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java @@ -194,7 +194,8 @@ public static LinkType fromSerializedValue(byte serializedValue) { } public enum TaskType { - DROP_CHILD_VIEWS((byte)1); + DROP_CHILD_VIEWS((byte)1), + INDEX_REBUILD((byte)2); private final byte[] byteValue; private final byte serializedValue; @@ -222,6 +223,29 @@ public static TaskType fromSerializedValue(byte serializedValue) { } } + public enum TaskStatus { + CREATED { + public String toString() { + return "CREATED"; + } + }, + STARTED { + public String toString() { + return "STARTED"; + } + }, + COMPLETED { + public String toString() { + return "COMPLETED"; + } + }, + FAILED { + public String toString() { + return "FAILED"; + } + }, + } + public enum ImmutableStorageScheme implements ColumnValueEncoderDecoderSupplier { ONE_CELL_PER_COLUMN((byte)1) { @Override diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java new file mode 100644 index 00000000000..0f79634f8c8 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java @@ -0,0 +1,369 @@ +package org.apache.phoenix.schema.task; + +import com.google.common.base.Strings; +import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.hadoop.hbase.ipc.RpcUtil; +import org.apache.hadoop.hbase.security.User; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.EnvironmentEdgeManager; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.ArrayList; +import java.util.List; + +public class Task { + private static void mutateSystemTaskTable(PhoenixConnection conn, PreparedStatement stmt, boolean accessCheckEnabled) + throws IOException { + // we need to mutate SYSTEM.TASK with HBase/login user if access is enabled. + if (accessCheckEnabled) { + User.runAsLoginUser(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + final RpcCall rpcContext = RpcUtil.getRpcContext(); + // setting RPC context as null so that user can be reset + try { + RpcUtil.setRpcContext(null); + stmt.execute(); + conn.commit(); + } catch (SQLException e) { + throw new IOException(e); + } finally { + // setting RPC context back to original context of the RPC + RpcUtil.setRpcContext(rpcContext); + } + return null; + } + }); + } + else { + try { + stmt.execute(); + conn.commit(); + } catch (SQLException e) { + throw new IOException(e); + } + } + } + + private static PreparedStatement setValuesToAddTaskPS(PreparedStatement stmt, PTable.TaskType taskType, + String tenantId, String schemaName, String tableName) throws SQLException { + stmt.setByte(1, taskType.getSerializedValue()); + if (tenantId != null) { + stmt.setString(2, tenantId); + } else { + stmt.setNull(2, Types.VARCHAR); + } + if (schemaName != null) { + stmt.setString(3, schemaName); + } else { + stmt.setNull(3, Types.VARCHAR); + } + stmt.setString(4, tableName); + return stmt; + } + + private static PreparedStatement setValuesToAddTaskPS(PreparedStatement stmt, PTable.TaskType taskType, + String tenantId, String schemaName, String tableName, String taskStatus, String data, + Integer priority, Timestamp startTs, Timestamp endTs) throws SQLException { + stmt = setValuesToAddTaskPS(stmt, taskType, tenantId, schemaName, tableName); + if (taskStatus != null) { + stmt.setString(5, taskStatus); + } else { + stmt.setString(5, PTable.TaskStatus.CREATED.toString()); + } + if (priority != null) { + stmt.setInt(6, priority); + } else { + byte defaultPri = 4; + stmt.setInt(6, defaultPri); + } + if (startTs == null) { + startTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis()); + } + stmt.setTimestamp(7, startTs); + if (endTs != null) { + stmt.setTimestamp(8, endTs); + } else { + if (taskStatus != null && taskStatus.equals(PTable.TaskStatus.COMPLETED.toString())) { + endTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis()); + stmt.setTimestamp(8, endTs); + } else { + stmt.setNull(8, Types.TIMESTAMP); + } + } + if (data != null) { + stmt.setString(9, data); + } else { + stmt.setNull(9, Types.VARCHAR); + } + return stmt; + } + + public static void addTask(PhoenixConnection conn, PTable.TaskType taskType, String tenantId, String schemaName, + String tableName, boolean accessCheckEnabled) + throws IOException { + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement("UPSERT INTO " + + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + " ( " + + PhoenixDatabaseMetaData.TASK_TYPE + ", " + + PhoenixDatabaseMetaData.TENANT_ID + ", " + + PhoenixDatabaseMetaData.TABLE_SCHEM + ", " + + PhoenixDatabaseMetaData.TABLE_NAME + " ) VALUES(?,?,?,?)"); + stmt = setValuesToAddTaskPS(stmt, taskType, tenantId, schemaName, tableName); + } catch (SQLException e) { + throw new IOException(e); + } + mutateSystemTaskTable(conn, stmt, accessCheckEnabled); + } + + public static void addTask(PhoenixConnection conn, PTable.TaskType taskType, String tenantId, String schemaName, + String tableName, String taskStatus, String data, Integer priority, Timestamp startTs, Timestamp endTs, + boolean accessCheckEnabled) + throws IOException { + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement("UPSERT INTO " + + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + " ( " + + PhoenixDatabaseMetaData.TASK_TYPE + ", " + + PhoenixDatabaseMetaData.TENANT_ID + ", " + + PhoenixDatabaseMetaData.TABLE_SCHEM + ", " + + PhoenixDatabaseMetaData.TABLE_NAME + ", " + + PhoenixDatabaseMetaData.TASK_STATUS + ", " + + PhoenixDatabaseMetaData.TASK_PRIORITY + ", " + + PhoenixDatabaseMetaData.TASK_TS + ", " + + PhoenixDatabaseMetaData.TASK_END_TS + ", " + + PhoenixDatabaseMetaData.TASK_DATA + + " ) VALUES(?,?,?,?,?,?,?,?,?)"); + stmt = setValuesToAddTaskPS(stmt, taskType, tenantId, schemaName, tableName, taskStatus, data, priority, startTs, endTs); + } catch (SQLException e) { + throw new IOException(e); + } + mutateSystemTaskTable(conn, stmt, accessCheckEnabled); + } + + public static void deleteTask(PhoenixConnection conn, PTable.TaskType taskType, Timestamp ts, String tenantId, + String schemaName, String tableName, boolean accessCheckEnabled) throws IOException { + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement("DELETE FROM " + + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + + " WHERE " + PhoenixDatabaseMetaData.TASK_TYPE + " = ? AND " + + PhoenixDatabaseMetaData.TASK_TS + " = ? AND " + + PhoenixDatabaseMetaData.TENANT_ID + (tenantId == null ? " IS NULL " : " = '" + tenantId + "'") + " AND " + + PhoenixDatabaseMetaData.TABLE_SCHEM + (schemaName == null ? " IS NULL " : " = '" + schemaName + "'") + " AND " + + PhoenixDatabaseMetaData.TABLE_NAME + " = ?"); + stmt.setByte(1, taskType.getSerializedValue()); + stmt.setTimestamp(2, ts); + stmt.setString(3, tableName); + } catch (SQLException e) { + throw new IOException(e); + } + mutateSystemTaskTable(conn, stmt, accessCheckEnabled); + } + + private static List populateTasks(Connection connection, String taskQuery) + throws SQLException { + PreparedStatement taskStatement = connection.prepareStatement(taskQuery); + ResultSet rs = taskStatement.executeQuery(); + + List result = new ArrayList<>(); + while (rs.next()) { + // delete child views only if the parent table is deleted from the system catalog + TaskRecord taskRecord = parseResult(rs); + result.add(taskRecord); + } + return result; + } + + public static List queryTaskTable(Connection connection, String schema, String tableName, + PTable.TaskType taskType, String tenantId, String indexName) + throws SQLException { + String taskQuery = "SELECT " + + PhoenixDatabaseMetaData.TASK_TS + ", " + + PhoenixDatabaseMetaData.TENANT_ID + ", " + + PhoenixDatabaseMetaData.TABLE_SCHEM + ", " + + PhoenixDatabaseMetaData.TABLE_NAME + ", " + + PhoenixDatabaseMetaData.TASK_STATUS + ", " + + PhoenixDatabaseMetaData.TASK_TYPE + ", " + + PhoenixDatabaseMetaData.TASK_PRIORITY + ", " + + PhoenixDatabaseMetaData.TASK_DATA + + " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME; + taskQuery += " WHERE " + + PhoenixDatabaseMetaData.TABLE_NAME + " ='" + tableName + "' AND " + + PhoenixDatabaseMetaData.TASK_TYPE + "=" + taskType.getSerializedValue(); + if (!Strings.isNullOrEmpty(tenantId)) { + taskQuery += " AND " + PhoenixDatabaseMetaData.TENANT_ID + "='" + tenantId + "' "; + } + + if (!Strings.isNullOrEmpty(schema)) { + taskQuery += " AND " + PhoenixDatabaseMetaData.TABLE_SCHEM + "='" + schema + "' "; + } + + if (!Strings.isNullOrEmpty(indexName)) { + taskQuery += " AND " + PhoenixDatabaseMetaData.TASK_DATA + " LIKE '%" + indexName + "%'"; + } + + return populateTasks(connection, taskQuery); + } + + public static List queryTaskTable(Connection connection, String[] excludedTaskStatus) + throws SQLException { + String taskQuery = "SELECT " + + PhoenixDatabaseMetaData.TASK_TS + ", " + + PhoenixDatabaseMetaData.TENANT_ID + ", " + + PhoenixDatabaseMetaData.TABLE_SCHEM + ", " + + PhoenixDatabaseMetaData.TABLE_NAME + ", " + + PhoenixDatabaseMetaData.TASK_STATUS + ", " + + PhoenixDatabaseMetaData.TASK_TYPE + ", " + + PhoenixDatabaseMetaData.TASK_PRIORITY + ", " + + PhoenixDatabaseMetaData.TASK_DATA + + " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME; + if (excludedTaskStatus != null && excludedTaskStatus.length > 0) { + taskQuery += " WHERE " + PhoenixDatabaseMetaData.TASK_STATUS + " IS NULL OR " + + PhoenixDatabaseMetaData.TASK_STATUS + " NOT IN ("; + String[] values = new String[excludedTaskStatus.length]; + for (int i=0; i < excludedTaskStatus.length; i++) { + values[i] = String.format("'%s'", excludedTaskStatus[i].trim()); + } + + //Delimit with comma + taskQuery += String.join(",", values); + taskQuery += ")"; + } + + return populateTasks(connection, taskQuery); + } + + public static TaskRecord parseResult(ResultSet rs) throws SQLException { + TaskRecord taskRecord = new TaskRecord(); + taskRecord.setTimeStamp(rs.getTimestamp(PhoenixDatabaseMetaData.TASK_TS)); + taskRecord.setTenantId(rs.getString(PhoenixDatabaseMetaData.TENANT_ID)); + taskRecord.setTenantIdBytes(rs.getBytes(PhoenixDatabaseMetaData.TENANT_ID)); + taskRecord.setSchemaName(rs.getString(PhoenixDatabaseMetaData.TABLE_SCHEM)); + taskRecord.setSchemaNameBytes(rs.getBytes(PhoenixDatabaseMetaData.TABLE_SCHEM)); + taskRecord.setTableName(rs.getString(PhoenixDatabaseMetaData.TABLE_NAME)); + taskRecord.setTableNameBytes(rs.getBytes(PhoenixDatabaseMetaData.TABLE_NAME)); + taskRecord.setStatus(rs.getString(PhoenixDatabaseMetaData.TASK_STATUS)); + taskRecord.setTaskType(PTable.TaskType.fromSerializedValue(rs.getByte(PhoenixDatabaseMetaData.TASK_TYPE ))); + taskRecord.setPriority(rs.getInt(PhoenixDatabaseMetaData.TASK_PRIORITY)); + taskRecord.setData(rs.getString(PhoenixDatabaseMetaData.TASK_DATA)); + return taskRecord; + } + + public static class TaskRecord { + private String tenantId; + private Timestamp timeStamp; + private byte[] tenantIdBytes; + private String schemaName= null; + private byte[] schemaNameBytes; + private String tableName = null; + private byte[] tableNameBytes; + + private PTable.TaskType taskType; + private String status; + private int priority; + private String data; + + public String getTenantId() { + return tenantId; + } + + public void setTenantId(String tenantId) { + this.tenantId = tenantId; + } + + public Timestamp getTimeStamp() { + return timeStamp; + } + + public void setTimeStamp(Timestamp timeStamp) { + this.timeStamp = timeStamp; + } + + public byte[] getTenantIdBytes() { + return tenantIdBytes; + } + + public void setTenantIdBytes(byte[] tenantIdBytes) { + this.tenantIdBytes = tenantIdBytes; + } + + public String getSchemaName() { + return schemaName; + } + + public void setSchemaName(String schemaName) { + this.schemaName = schemaName; + } + + public byte[] getSchemaNameBytes() { + return schemaNameBytes; + } + + public void setSchemaNameBytes(byte[] schemaNameBytes) { + this.schemaNameBytes = schemaNameBytes; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public byte[] getTableNameBytes() { + return tableNameBytes; + } + + public void setTableNameBytes(byte[] tableNameBytes) { + this.tableNameBytes = tableNameBytes; + } + + public String getData() { + if (data == null) { + return ""; + } + return data; + } + + public int getPriority() { + return priority; + } + + public void setPriority(int priority) { + this.priority = priority; + } + + public void setData(String data) { + this.data = data; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public PTable.TaskType getTaskType() { + return taskType; + } + + public void setTaskType(PTable.TaskType taskType) { + this.taskType = taskType; + } + + } +}