Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PHOENIX-5190 Implement TaskRegionObserver for Index rebuild #457

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
package org.apache.phoenix.end2end;

import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Collection;

Expand Down Expand Up @@ -103,6 +107,9 @@ public void testDropTableWithChildViews() throws Exception {
try (Connection conn = DriverManager.getConnection(getUrl());
Connection viewConn =
isMultiTenant ? DriverManager.getConnection(TENANT_SPECIFIC_URL1) : conn) {
// Empty the task table first.
conn.createStatement().execute("DELETE " + " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME);

String ddlFormat =
"CREATE TABLE IF NOT EXISTS " + baseTable + " ("
+ " %s PK2 VARCHAR NOT NULL, V1 VARCHAR, V2 VARCHAR "
Expand All @@ -126,16 +133,14 @@ public void testDropTableWithChildViews() throws Exception {
// Run DropChildViewsTask to complete the tasks for dropping child views. The depth of the view tree is 2,
// so we expect that this will be done in two task handling runs as each non-root level will be processed
// in one run
TaskRegionObserver.DropChildViewsTask task =
new TaskRegionObserver.DropChildViewsTask(
TaskRegionObserver.SelfHealingTask task =
new TaskRegionObserver.SelfHealingTask(
TaskRegionEnvironment, QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
task.run();
task.run();
ResultSet rs = conn.createStatement().executeQuery("SELECT * " +
" FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME +
" WHERE " + PhoenixDatabaseMetaData.TASK_TYPE + " = " +
PTable.TaskType.DROP_CHILD_VIEWS.getSerializedValue());
assertFalse(rs.next());

assertTaskColumns(conn, PTable.TaskStatus.COMPLETED.toString(), PTable.TaskType.DROP_CHILD_VIEWS, null);

// Views should be dropped by now
TableName linkTable = TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES);
TableViewFinderResult childViewsResult = new TableViewFinderResult();
Expand All @@ -147,9 +152,25 @@ public void testDropTableWithChildViews() throws Exception {
childViewsResult);
assertTrue(childViewsResult.getLinks().size() == 0);
// There should not be any orphan views
rs = conn.createStatement().executeQuery("SELECT * FROM " + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME +
ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME +
" WHERE " + PhoenixDatabaseMetaData.TABLE_SCHEM + " = '" + SCHEMA2 +"'");
assertFalse(rs.next());
}
}

public static void assertTaskColumns(Connection conn, String expectedStatus, PTable.TaskType taskType, String expectedData)
throws SQLException {
ResultSet rs = conn.createStatement().executeQuery("SELECT * " +
" FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME +
" WHERE " + PhoenixDatabaseMetaData.TASK_TYPE + " = " +
taskType.getSerializedValue());
assertTrue(rs.next());
String taskStatus = rs.getString(PhoenixDatabaseMetaData.TASK_STATUS);
assertEquals(expectedStatus, taskStatus);

if (expectedData != null) {
String data = rs.getString(PhoenixDatabaseMetaData.TASK_DATA);
assertEquals(expectedData, data);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.TaskRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.task.Task;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.TestUtil;
import org.junit.BeforeClass;
import org.junit.Test;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;

import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class IndexRebuildTaskIT extends BaseUniqueNamesOwnClusterIT {
protected static String TENANT1 = "tenant1";
private static RegionCoprocessorEnvironment TaskRegionEnvironment;

@BeforeClass
public static void doSetup() throws Exception {
HashMap<String, String> props = new HashMap<>();
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
TaskRegionEnvironment =
getUtility()
.getRSForFirstRegionInTable(
PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME)
.getRegions(PhoenixDatabaseMetaData.SYSTEM_TASK_HBASE_TABLE_NAME)
.get(0).getCoprocessorHost()
.findCoprocessorEnvironment(TaskRegionObserver.class.getName());
}

private String generateDDL(String format) {
StringBuilder optionsBuilder = new StringBuilder();

if (optionsBuilder.length() != 0) optionsBuilder.append(",");
optionsBuilder.append("MULTI_TENANT=true");

return String.format(format, "TENANT_ID VARCHAR NOT NULL, ", "TENANT_ID, ", optionsBuilder.toString());
}

@Test
public void testIndexRebuildTask() throws Throwable {
String baseTable = generateUniqueName();
Connection conn = null;
Connection viewConn = null;
try {
conn = DriverManager.getConnection(getUrl());
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, TENANT1);

viewConn =DriverManager.getConnection(getUrl(), props);
String ddlFormat =
"CREATE TABLE IF NOT EXISTS " + baseTable + " ("
+ " %s PK2 VARCHAR NOT NULL, V1 VARCHAR, V2 VARCHAR "
+ " CONSTRAINT NAME_PK PRIMARY KEY (%s PK2)" + " ) %s";
conn.createStatement().execute(generateDDL(ddlFormat));
conn.commit();
// Create a view
String viewName = generateUniqueName();
String viewDDL = "CREATE VIEW " + viewName + " AS SELECT * FROM " + baseTable;
viewConn.createStatement().execute(viewDDL);

// Create index
String indexName = generateUniqueName();
String idxSDDL = String.format("CREATE INDEX %s ON %s (V1)", indexName, viewName);

viewConn.createStatement().execute(idxSDDL);

// Insert rows
int numOfValues = 1;
for (int i=0; i < numOfValues; i++){
viewConn.createStatement().execute(
String.format("UPSERT INTO %s VALUES('%s', '%s', '%s')", viewName, String.valueOf(i), "y",
"z"));
}
viewConn.commit();

String data = "{IndexName:" + indexName + "}";
// Run IndexRebuildTask
TaskRegionObserver.SelfHealingTask task =
new TaskRegionObserver.SelfHealingTask(
TaskRegionEnvironment, QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);

Timestamp startTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
// Add a task to System.Task to build indexes
Task.addTask(conn.unwrap(PhoenixConnection.class), PTable.TaskType.INDEX_REBUILD,
TENANT1, null, viewName,
PTable.TaskStatus.CREATED.toString(), data, null, startTs, null, true);


task.run();

String viewIndexTableName = MetaDataUtil.getViewIndexPhysicalName(baseTable);
ConnectionQueryServices queryServices = conn.unwrap(PhoenixConnection.class).getQueryServices();
int count = getUtility().countRows(queryServices.getTable(Bytes.toBytes(viewIndexTableName)));
assertTrue(count == numOfValues);


// Remove index contents and try again
Admin admin = queryServices.getAdmin();
TableName tableName = TableName.valueOf(viewIndexTableName);
admin.disableTable(tableName);
admin.truncateTable(tableName, false);

data = "{IndexName:" + indexName + ", DisableBefore:true}";

// Add a new task (update status to created) to System.Task to rebuild indexes
Task.addTask(conn.unwrap(PhoenixConnection.class), PTable.TaskType.INDEX_REBUILD,
TENANT1, null, viewName,
PTable.TaskStatus.CREATED.toString(), data, null, startTs, null, true);
task.run();

Thread.sleep(15000);

Table systemHTable= queryServices.getTable(Bytes.toBytes("SYSTEM."+PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE));
count = getUtility().countRows(systemHTable);
assertEquals(1, count);

// Check task status and other column values.
DropTableWithViewsIT.assertTaskColumns(conn, PTable.TaskStatus.COMPLETED.toString(), PTable.TaskType.INDEX_REBUILD,
null);

// See that index is rebuilt and confirm index has rows
Table htable= queryServices.getTable(Bytes.toBytes(viewIndexTableName));
count = getUtility().countRows(htable);
assertEquals(numOfValues, count);
} finally {
conn.createStatement().execute("DELETE " + " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME);
conn.commit();
if (conn != null) {
conn.close();
}
if (viewConn != null) {
viewConn.close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.task.Task;
import org.apache.phoenix.schema.types.PBinary;
import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.schema.types.PDataType;
Expand Down Expand Up @@ -2837,7 +2838,7 @@ private MetaDataMutationResult doDropTable(byte[] key, byte[] tenantId, byte[] s
}
try {
PhoenixConnection conn = QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class);
TaskRegionObserver.addTask(conn, PTable.TaskType.DROP_CHILD_VIEWS, Bytes.toString(tenantId),
Task.addTask(conn, PTable.TaskType.DROP_CHILD_VIEWS, Bytes.toString(tenantId),
Bytes.toString(schemaName), Bytes.toString(tableName), this.accessCheckEnabled);
} catch (Throwable t) {
logger.error("Adding a task to drop child views failed!", t);
Expand Down
Loading