Skip to content
Permalink
Browse files
[NO ISSUE][STO] Delete invalid indexes during cluster global recovery
- user model changes: yes
- storage format changes: no
- interface changes: no

Details:

- Before starting cluster global recovery, send to all NCs valid
  dataset ids from the metadata node.
- Delete any invalid indexes on NCs based on the metadata received
  from the CC.
- Add storage options to enable/disable global storage recovery.
  This allows tests that create storage objects without using the
  metadata node to bypass global cleanup.
- Add storage option to specify the timeout for nodes to perform
  global storage cleanup.
- Add test case for global storage recovery.
- Adapt existing test cases that require bypassing global cleanup.

Change-Id: Idee73e57fa5879c3b9aab5f881bf848e225f874b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10784
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Reviewed-by: Michael Blow <mblow@apache.org>
  • Loading branch information
mhubail committed Apr 1, 2021
1 parent d382766 commit 17670aab184fe12fa30dc79376e819e07fac43c4
Show file tree
Hide file tree
Showing 9 changed files with 304 additions and 5 deletions.
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.app.message;

import static org.apache.hyracks.util.ExitUtil.EC_NC_FAILED_TO_NOTIFY_TASKS_COMPLETED;

import java.util.Map;
import java.util.Set;

import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.asterix.common.messaging.CcIdentifiedMessage;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.common.messaging.api.INcAddressedMessage;
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.LocalResource;
import org.apache.hyracks.util.ExitUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class StorageCleanupRequestMessage extends CcIdentifiedMessage implements INcAddressedMessage {

private static final long serialVersionUID = 1L;
private static final Logger LOGGER = LogManager.getLogger();
private final Set<Integer> validDatasetIds;
private final long reqId;

public StorageCleanupRequestMessage(long reqId, Set<Integer> validDatasetIds) {
this.validDatasetIds = validDatasetIds;
this.reqId = reqId;
}

@Override
public void handle(INcApplicationContext appContext) throws HyracksDataException, InterruptedException {
INCMessageBroker broker = (INCMessageBroker) appContext.getServiceContext().getMessageBroker();
PersistentLocalResourceRepository localResourceRepository =
(PersistentLocalResourceRepository) appContext.getLocalResourceRepository();
Map<Long, LocalResource> localResources = localResourceRepository.loadAndGetAllResources();
for (LocalResource resource : localResources.values()) {
DatasetLocalResource lr = (DatasetLocalResource) resource.getResource();
if (MetadataIndexImmutableProperties.isMetadataDataset(lr.getDatasetId())) {
// skip metadata indexes
continue;
}
if (!validDatasetIds.contains(lr.getDatasetId())) {
LOGGER.warn("found invalid index {} with dataset id {}", resource.getPath(), lr.getDatasetId());
deleteInvalidIndex(appContext, localResourceRepository, resource);
}
}
try {
broker.sendMessageToPrimaryCC(new VoidResponse(reqId, null));
} catch (Exception e) {
LOGGER.error("failed to notify CC of storage clean up; halting...", e);
ExitUtil.halt(EC_NC_FAILED_TO_NOTIFY_TASKS_COMPLETED);
}
}

private void deleteInvalidIndex(INcApplicationContext appContext,
PersistentLocalResourceRepository localResourceRepository, LocalResource resource)
throws HyracksDataException {
IDatasetLifecycleManager lcManager = appContext.getDatasetLifecycleManager();
String resourceRelPath = resource.getPath();
synchronized (lcManager) {
IIndex index;
index = lcManager.get(resourceRelPath);
if (index != null) {
LOGGER.warn("unregistering invalid index {}", resourceRelPath);
lcManager.unregister(resourceRelPath);
} else {
LOGGER.warn("initializing unregistered invalid index {}", resourceRelPath);
try {
index = resource.getResource().createInstance(appContext.getServiceContext());
} catch (Exception e) {
LOGGER.warn("failed to initialize invalid index {}", resourceRelPath, e);
}
}
localResourceRepository.delete(resourceRelPath);
if (index != null) {
index.destroy();
}
}
}

@Override
public String toString() {
return StorageCleanupRequestMessage.class.getSimpleName();
}
}
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.app.message;

import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.common.messaging.api.ICCMessageBroker.ResponseState;
import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
import org.apache.asterix.common.messaging.api.INcResponse;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.hyracks.api.exceptions.HyracksDataException;

/**
* A response to a request only indicating success or failure
*/
public class VoidResponse implements ICcAddressedMessage, INcResponse {

private static final long serialVersionUID = 1L;
private final Long reqId;
private final Throwable failure;

public VoidResponse(Long reqId, Throwable failure) {
this.reqId = reqId;
this.failure = failure;
}

@Override
public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker();
broker.respond(reqId, this);
}

@Override
public void setResult(MutablePair<ResponseState, Object> result) {
if (failure != null) {
result.setLeft(ResponseState.FAILURE);
result.setRight(failure);
} else {
result.setLeft(ResponseState.SUCCESS);
}
}

@Override
public String toString() {
return "{ \"response\" : \"" + (failure == null ? "success" : failure.getClass().getSimpleName()) + "\"}";
}
}
@@ -18,11 +18,15 @@
*/
package org.apache.asterix.hyracks.bootstrap;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.asterix.app.message.StorageCleanupRequestMessage;
import org.apache.asterix.common.api.IClusterManagementWork;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
@@ -32,6 +36,7 @@
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.messaging.CCMessageBroker;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -111,6 +116,10 @@ protected void recover(ICcApplicationContext appCtx) throws HyracksDataException
LOGGER.info("Starting Global Recovery");
MetadataManager.INSTANCE.init();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
if (appCtx.getStorageProperties().isStorageGlobalCleanup()) {
int storageGlobalCleanupTimeout = appCtx.getStorageProperties().getStorageGlobalCleanupTimeout();
performGlobalStorageCleanup(mdTxnCtx, storageGlobalCleanupTimeout);
}
mdTxnCtx = doRecovery(appCtx, mdTxnCtx);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
recoveryCompleted = true;
@@ -122,6 +131,27 @@ protected void recover(ICcApplicationContext appCtx) throws HyracksDataException
}
}

protected void performGlobalStorageCleanup(MetadataTransactionContext mdTxnCtx, int storageGlobalCleanupTimeoutSecs)
throws Exception {
List<Dataverse> dataverses = MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
Set<Integer> validDatasetIds = new HashSet<>();
for (Dataverse dataverse : dataverses) {
List<Dataset> dataverseDatasets =
MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverse.getDataverseName());
dataverseDatasets.stream().map(Dataset::getDatasetId).forEach(validDatasetIds::add);
}
ICcApplicationContext ccAppCtx = (ICcApplicationContext) serviceCtx.getApplicationContext();
final List<String> ncs = new ArrayList<>(ccAppCtx.getClusterStateManager().getParticipantNodes());
CCMessageBroker messageBroker = (CCMessageBroker) ccAppCtx.getServiceContext().getMessageBroker();
long reqId = messageBroker.newRequestId();
List<StorageCleanupRequestMessage> requests = new ArrayList<>();
for (int i = 0; i < ncs.size(); i++) {
requests.add(new StorageCleanupRequestMessage(reqId, validDatasetIds));
}
messageBroker.sendSyncRequestToNCs(reqId, ncs, requests,
TimeUnit.SECONDS.toMillis(storageGlobalCleanupTimeoutSecs));
}

protected MetadataTransactionContext doRecovery(ICcApplicationContext appCtx, MetadataTransactionContext mdTxnCtx)
throws Exception {
// Loop over datasets
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.test.dataflow;

import java.io.File;

import org.apache.asterix.app.bootstrap.TestNodeController;
import org.apache.asterix.test.common.TestHelper;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class GlobalStorageCleanupTest {

public static final Logger LOGGER = LogManager.getLogger();
private static TestNodeController nc;

@BeforeClass
public static void setUp() throws Exception {
System.out.println("SetUp: ");
TestHelper.deleteExistingInstanceFiles();
String configPath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "test"
+ File.separator + "resources" + File.separator + "cc.conf";
nc = new TestNodeController(configPath, false);
}

@Test
public void globalStorageCleanup() throws Exception {
nc.init(true);
LSMFlushRecoveryTest.nc = nc;
LSMFlushRecoveryTest lsmFlushRecoveryTest = new LSMFlushRecoveryTest();
lsmFlushRecoveryTest.initializeTestCtx();
lsmFlushRecoveryTest.createIndex();
lsmFlushRecoveryTest.readIndex();
nc.deInit(false);
nc.init(false);
// the index should deleted after the node initialization
lsmFlushRecoveryTest.initializeTestCtx();
boolean failedToReadIndex = false;
try {
lsmFlushRecoveryTest.readIndex();
} catch (Exception e) {
failedToReadIndex = true;
Assert.assertTrue(e.getMessage().contains(ErrorCode.INDEX_DOES_NOT_EXIST.errorCode()));
}
Assert.assertTrue(failedToReadIndex);
nc.deInit(false);
}
}
@@ -77,7 +77,7 @@

public class LSMFlushRecoveryTest {
public static final Logger LOGGER = LogManager.getLogger();
private static TestNodeController nc;
public static TestNodeController nc;
private static Dataset dataset;
private static PrimaryIndexInfo[] primaryIndexInfos;
private static SecondaryIndexInfo[] secondaryIndexInfo;
@@ -156,6 +156,10 @@ public void testRecovery() throws Exception {
}

private void initializeNc(boolean cleanUpOnStart) throws Exception {
// disable global clean up for this test to allow internal index creation
List<Pair<IOption, Object>> opts = new ArrayList<>();
opts.add(Pair.of(Option.STORAGE_GLOBAL_CLEANUP, false));
nc.setOpts(opts);
nc.init(cleanUpOnStart);
ncAppCtx = nc.getAppRuntimeContext();
// Override the LSMIOScheduler to avoid halting on failure and enable
@@ -177,7 +181,7 @@ public void operationFailed(ILSMIOOperation operation, Throwable t) {
dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
}

private void createIndex() throws Exception {
public void createIndex() throws Exception {
dataset = StorageTestUtils.DATASET;
secondaryIndexEntity = new Index(dataset.getDataverseName(), dataset.getDatasetName(), SECONDARY_INDEX_NAME,
SECONDARY_INDEX_TYPE, SECONDARY_INDEX_FIELD_NAMES, SECONDARY_INDEX_FIELD_INDICATORS,
@@ -193,7 +197,7 @@ private void createIndex() throws Exception {

}

private void initializeTestCtx() throws Exception {
public void initializeTestCtx() throws Exception {
JobId jobId = nc.newJobId();
testCtxs = new IHyracksTaskContext[NUM_PARTITIONS];
for (int i = 0; i < NUM_PARTITIONS; i++) {
@@ -203,7 +207,7 @@ private void initializeTestCtx() throws Exception {
new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
}

private void readIndex() throws HyracksDataException {
public void readIndex() throws HyracksDataException {
primaryIndexDataflowHelpers = new IIndexDataflowHelper[NUM_PARTITIONS];
primaryIndexes = new TestLsmBtree[NUM_PARTITIONS];
for (int i = 0; i < NUM_PARTITIONS; i++) {
@@ -44,6 +44,7 @@
"replication\.timeout" : 30,
"ssl\.enabled" : false,
"storage.compression.block" : "snappy",
"storage.global.cleanup.timeout" : 600,
"storage.lsm.bloomfilter.falsepositiverate" : 0.01,
"txn\.commitprofiler\.enabled" : false,
"txn\.commitprofiler\.reportinterval" : 5,
@@ -44,6 +44,7 @@
"replication\.timeout" : 30,
"ssl\.enabled" : false,
"storage.compression.block" : "snappy",
"storage.global.cleanup.timeout" : 600,
"storage.lsm.bloomfilter.falsepositiverate" : 0.01,
"txn\.commitprofiler\.enabled" : false,
"txn\.commitprofiler\.reportinterval" : 5,
@@ -44,6 +44,7 @@
"replication\.timeout" : 30,
"ssl\.enabled" : false,
"storage.compression.block" : "snappy",
"storage.global.cleanup.timeout" : 600,
"storage.lsm.bloomfilter.falsepositiverate" : 0.01,
"txn\.commitprofiler\.enabled" : false,
"txn\.commitprofiler\.reportinterval" : 5,

0 comments on commit 17670aa

Please sign in to comment.