Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.messages;

import com.google.common.base.Preconditions;
import java.util.UUID;
import javax.annotation.Nonnull;
import org.apache.helix.model.Message;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.spi.config.table.TableType;

/**
* This Helix message is sent from the controller to the servers to remove TableDataManager when the table is deleted.
*/
public class TableDeletionControllerMessage extends Message {
public static final String DELETE_TABLE_MSG_SUB_TYPE = "DELETE_TABLE";
public static final String TABLE_NAME_WITH_TYPE_KEY = "tableNameWithType";
public static final String TABLE_TYPE_KEY = "tableType";
public static final String RETENTION_PERIOD_KEY = "retentionPeriod";

public TableDeletionControllerMessage(@Nonnull String tableNameWithType, TableType tableType,
String retentionPeriod) {
super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
setMsgSubType(DELETE_TABLE_MSG_SUB_TYPE);
ZNRecord znRecord = getRecord();
znRecord.setSimpleField(TABLE_NAME_WITH_TYPE_KEY, tableNameWithType);
if (tableType != null) {
znRecord.setSimpleField(TABLE_TYPE_KEY, tableType.toString());
}
if (retentionPeriod != null) {
znRecord.setSimpleField(RETENTION_PERIOD_KEY, retentionPeriod);
}

// Give it infinite time to process the message, as long as session is alive
setExecutionTimeout(-1);
}

public TableDeletionControllerMessage(Message message) {
super(message.getRecord());
String msgSubType = message.getMsgSubType();
Preconditions.checkArgument(msgSubType.equals(DELETE_TABLE_MSG_SUB_TYPE),
"Invalid message sub type: " + msgSubType + " for TableDeletionMessage");
}

public String getTableNameWithType() {
return getRecord().getSimpleField(TABLE_NAME_WITH_TYPE_KEY);
}

public TableType getTableType() {
if (getRecord().getSimpleFields().containsKey(TABLE_TYPE_KEY)) {
return TableType.valueOf(getRecord().getSimpleField(TABLE_TYPE_KEY));
} else {
return null;
}
}

public String getRetentionPeriod() {
return getRecord().getSimpleField(RETENTION_PERIOD_KEY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@
package org.apache.pinot.common.metadata.controllerjob;

public enum ControllerJobType {
RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE
RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE, TABLE_DELETE
}
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ private void setUpPinotController() {
// Register message handler for incoming user-defined helix messages.
_helixParticipantManager.getMessagingService()
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
new ControllerUserDefinedMessageHandlerFactory(_periodicTaskScheduler));
new ControllerUserDefinedMessageHandlerFactory(_periodicTaskScheduler, _helixResourceManager));

String accessControlFactoryClass = _config.getAccessControlFactoryClass();
LOGGER.info("Use class: {} as the AccessControlFactory", accessControlFactoryClass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,24 @@
*/
package org.apache.pinot.controller;

import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.ws.rs.core.Response;
import org.apache.helix.NotificationContext;
import org.apache.helix.messaging.handling.HelixTaskResult;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.Message;
import org.apache.pinot.common.messages.RunPeriodicTaskMessage;
import org.apache.pinot.common.messages.TableDeletionControllerMessage;
import org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.core.periodictask.PeriodicTask;
import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -38,9 +46,12 @@ public class ControllerUserDefinedMessageHandlerFactory implements MessageHandle
private static final String USER_DEFINED_MSG_STRING = Message.MessageType.USER_DEFINE_MSG.toString();

private final PeriodicTaskScheduler _periodicTaskScheduler;
private final PinotHelixResourceManager _pinotHelixResourceManager;

public ControllerUserDefinedMessageHandlerFactory(PeriodicTaskScheduler periodicTaskScheduler) {
public ControllerUserDefinedMessageHandlerFactory(PeriodicTaskScheduler periodicTaskScheduler,
PinotHelixResourceManager pinotHelixResourceManager) {
_periodicTaskScheduler = periodicTaskScheduler;
_pinotHelixResourceManager = pinotHelixResourceManager;
}

@Override
Expand All @@ -51,6 +62,11 @@ public MessageHandler createHandler(Message message, NotificationContext notific
_periodicTaskScheduler);
}

if (messageType.equals(TableDeletionControllerMessage.DELETE_TABLE_MSG_SUB_TYPE)) {
return new TableDeletionControllerMessageHandler(new TableDeletionControllerMessage(message), notificationContext,
_pinotHelixResourceManager);
}

// Log a warning and return no-op message handler for unsupported message sub-types. This can happen when
// a new message sub-type is added, and the sender gets deployed first while receiver is still running the
// old version.
Expand Down Expand Up @@ -140,4 +156,75 @@ public void onError(Exception e, ErrorCode code, ErrorType type) {
LOGGER.error("Got error for no-op message handling (error code: {}, error type: {})", code, type, e);
}
}

private static class TableDeletionControllerMessageHandler extends MessageHandler {
private final String _tableNameWithType;
private final TableType _tableType;
private final String _retentionPeriod;
private final PinotHelixResourceManager _helixResourceManager;

TableDeletionControllerMessageHandler(TableDeletionControllerMessage message, NotificationContext context,
PinotHelixResourceManager helixResourceManager) {
super(message, context);
_tableNameWithType = message.getTableNameWithType();
_tableType = message.getTableType();
_retentionPeriod = message.getRetentionPeriod();
_helixResourceManager = helixResourceManager;
}

@Override
public HelixTaskResult handleMessage() {
LOGGER.info("Handling deletion for table {}: Start", _tableNameWithType);

//TODO: some of this validation logic is duplicate and can be removed
List<String> tablesDeleted = new LinkedList<>();
HelixTaskResult result = new HelixTaskResult();
try {
boolean tableExist = false;
if (verifyTableType(_tableNameWithType, _tableType, TableType.OFFLINE)) {
tableExist = _helixResourceManager.hasOfflineTable(_tableNameWithType);
// Even the table name does not exist, still go on to delete remaining table metadata
// in case a previous delete did not complete.
_helixResourceManager.deleteOfflineTableBlocking(_tableNameWithType, _retentionPeriod);
if (tableExist) {
tablesDeleted.add(TableNameBuilder.OFFLINE.tableNameWithType(_tableNameWithType));
}
}
if (verifyTableType(_tableNameWithType, _tableType, TableType.REALTIME)) {
tableExist = _helixResourceManager.hasRealtimeTable(_tableNameWithType);
// Even the table name does not exist, still go on to delete remaining table metadata
// in case a previous delete did not complete.
_helixResourceManager.deleteRealtimeTableTableBlocking(_tableNameWithType, _retentionPeriod);
if (tableExist) {
tablesDeleted.add(TableNameBuilder.REALTIME.tableNameWithType(_tableNameWithType));
}
}
if (!tablesDeleted.isEmpty()) {
result.setSuccess(true);
result.setMessage("Following tables deleted: " + tablesDeleted);
return result;
}
} catch (Exception e) {
result.setException(e);
return result;
}

result.setException(new ControllerApplicationException(LOGGER, "Table " + _tableNameWithType + " does not exist",
Response.Status.NOT_FOUND));
return result;
}

private boolean verifyTableType(String tableName, TableType tableType, TableType expectedType) {
if (tableType != null && tableType != expectedType) {
return false;
}
TableType typeFromTableName = TableNameBuilder.getTableTypeFromTableName(tableName);
return typeFromTableName == null || typeFromTableName == expectedType;
}

@Override
public void onError(Exception e, ErrorCode code, ErrorType type) {
LOGGER.error("Got error for table deletion message handling (error code: {}, error type: {})", code, type, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.AccessOption;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
Expand Down Expand Up @@ -430,38 +431,125 @@ public SuccessResponse deleteTable(
@ApiParam(value = "realtime|offline") @QueryParam("type") String tableTypeStr,
@ApiParam(value = "Retention period for the table segments (e.g. 12h, 3d); If not set, the retention period "
+ "will default to the first config that's not null: the cluster setting, then '7d'. Using 0d or -1d will "
+ "instantly delete segments without retention") @QueryParam("retention") String retentionPeriod) {
+ "instantly delete segments without retention") @QueryParam("retention") String retentionPeriod,
@ApiParam(value = "Delete table in background. Should be used for large tables.")
@QueryParam("async") @DefaultValue("false") boolean async) {
TableType tableType = Constants.validateTableType(tableTypeStr);

List<String> tablesDeleted = new LinkedList<>();
try {
boolean tableExist = false;
if (verifyTableType(tableName, tableType, TableType.OFFLINE)) {
tableExist = _pinotHelixResourceManager.hasOfflineTable(tableName);
// Even the table name does not exist, still go on to delete remaining table metadata in case a previous delete
// did not complete.
_pinotHelixResourceManager.deleteOfflineTable(tableName, retentionPeriod);
if (tableExist) {
tablesDeleted.add(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
}
boolean isOfflineTable = verifyTableType(tableName, tableType, TableType.OFFLINE);
boolean isRealtimeTable = verifyTableType(tableName, tableType, TableType.REALTIME);

boolean offlineTableExists =
isOfflineTable && _pinotHelixResourceManager.hasOfflineTable(
tableName);

boolean realtimeTableExists =
isRealtimeTable && _pinotHelixResourceManager.hasRealtimeTable(
tableName);

if (!offlineTableExists && !realtimeTableExists) {
throw new ControllerApplicationException(LOGGER,
"Table '" + tableName + "' with type " + tableType + " does not exist", Response.Status.NOT_FOUND);
}

if (async) {
Pair<Integer, String> msgInfo =
_pinotHelixResourceManager.deleteTableAsync(tableName, tableType, retentionPeriod);

//TODO: change this error message for correct exception handling
if (msgInfo.getLeft() == 0) {
throw new ControllerApplicationException(LOGGER, "Table '" + tableName + "' does not exist",
Response.Status.BAD_REQUEST);
}
if (verifyTableType(tableName, tableType, TableType.REALTIME)) {
tableExist = _pinotHelixResourceManager.hasRealtimeTable(tableName);
// Even the table name does not exist, still go on to delete remaining table metadata in case a previous delete
// did not complete.
_pinotHelixResourceManager.deleteRealtimeTable(tableName, retentionPeriod);
if (tableExist) {
tablesDeleted.add(TableNameBuilder.REALTIME.tableNameWithType(tableName));

Map<String, String> tableDeleteMeta = new HashMap<>();
tableDeleteMeta.put("numMessagesSent", String.valueOf(msgInfo.getLeft()));
tableDeleteMeta.put("deleteTableJobId", msgInfo.getRight());
// Store in ZK
try {
if (_pinotHelixResourceManager.addTableDeleteJob(tableName, msgInfo.getRight(), msgInfo.getLeft())) {
tableDeleteMeta.put("tableDeleteJobMetaZKStorageStatus", "SUCCESS");
} else {
tableDeleteMeta.put("tableDeleteMetaZKStorageStatus", "FAILED");
LOGGER.error("Failed to add delete table job metadata into zookeeper for table: {}", tableName);
}
} catch (Exception e) {
tableDeleteMeta.put("tableDeleteMetaZKStorageStatus", "FAILED");
LOGGER.error("Failed to add delete table job metadata into zookeeper for table: {}", tableName, e);
}
if (!tablesDeleted.isEmpty()) {
try {
tableDeleteMeta.put("message", "Table deletion triggered");
tableDeleteMeta.put("table", tableName);
return new SuccessResponse(JsonUtils.objectToString(tableDeleteMeta));
} catch (JsonProcessingException e) {
throw new ControllerApplicationException(LOGGER, "Failed to serialize response",
Response.Status.INTERNAL_SERVER_ERROR, e);
}
} else {
List<String> tablesDeleted = new LinkedList<>();
try {
if (isOfflineTable) {
// Even the table name does not exist, still go on to delete remaining table metadata
// in case a previous delete did not complete.
_pinotHelixResourceManager.deleteOfflineTable(tableName, retentionPeriod);

if (offlineTableExists) {
tablesDeleted.add(TableNameBuilder.OFFLINE.tableNameWithType(tableName));
}
}
if (isRealtimeTable) {
// Even the table name does not exist, still go on to delete remaining table metadata
// in case a previous delete did not complete.
_pinotHelixResourceManager.deleteRealtimeTable(tableName, retentionPeriod);

if (realtimeTableExists) {
tablesDeleted.add(TableNameBuilder.REALTIME.tableNameWithType(tableName));
}
}

return new SuccessResponse("Tables: " + tablesDeleted + " deleted");
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
}
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
}
throw new ControllerApplicationException(LOGGER,
"Table '" + tableName + "' with type " + tableType + " does not exist", Response.Status.NOT_FOUND);
}

@GET
@Path("/tables/deleteTableStatus/{jobId}")
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Get status for a submitted force commit operation",
notes = "Get status for a submitted force commit operation")
public JsonNode getDeleteTableJobStatus(
@ApiParam(value = "Delete table job id", required = true) @PathParam("jobId") String deleteTableJobId)
throws Exception {
Map<String, String> controllerJobZKMetadata =
_pinotHelixResourceManager.getControllerJobZKMetadata(deleteTableJobId,
ControllerJobType.TABLE_DELETE);
if (controllerJobZKMetadata == null) {
throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + deleteTableJobId,
Response.Status.NOT_FOUND);
}
String tableNameWithType = controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);

ExternalView externalView =
_pinotHelixResourceManager.getTableExternalView(tableNameWithType);
Map<String, Object> result = new HashMap<>(controllerJobZKMetadata);

if (externalView != null) {
result.put("segmentsPendingDeletion", externalView.getPartitionSet());
result.put("segmentsPendingDeletionCount", externalView.getPartitionSet().size());
} else {
result.put("segmentsPendingDeletion", Collections.emptyList());
result.put("segmentsPendingDeletionCount", 0);
}

TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
if (tableConfig != null) {
result.put("tableConfigDeleted", false);
} else {
result.put("tableConfigDeleted", true);
}
return JsonUtils.objectToJsonNode(result);
}

// Return true iff the table is of the expectedType based on the given tableName and tableType. The truth table:
Expand Down
Loading