Skip to content

Commit

Permalink
[CARBONDATA-3582] support table status file backup
Browse files Browse the repository at this point in the history
When overwriting table status file, if process crashed, table status
file will be in corrupted state. This can happen in an unstable
environment, like in the cloud. To prevent the table corruption, user
can enable a newly added CarbonProperty to enable backup of the table
status before overwriting it.

New CarbonProperty: ENABLE_TABLE_STATUS_BACKUP (default is false)
When enabling this property, "tablestatus.backup" file will be created
in the same folder of "tablestatus" file

This closes apache#3459
  • Loading branch information
jackylk authored and MarvinLitt committed Jan 3, 2020
1 parent c14cc58 commit e6f7d4e
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1295,6 +1295,17 @@ private CarbonCommonConstants() {

public static final String ENABLE_VECTOR_READER_DEFAULT = "true";

/**
* In cloud object store scenario, overwriting table status file is not an atomic
* operation since it uses rename API. Thus, it is possible that table status is corrupted
* if process crashed when overwriting the table status file.
* To protect from file corruption, user can enable this property.
*/
@CarbonProperty(dynamicConfigurable = true)
public static final String ENABLE_TABLE_STATUS_BACKUP = "carbon.enable.tablestatus.backup";

public static final String ENABLE_TABLE_STATUS_BACKUP_DEFAULT = "false";

/**
* property to set is IS_DRIVER_INSTANCE
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
Expand Down Expand Up @@ -254,18 +255,23 @@ public static LoadMetadataDetails[] readLoadHistoryMetadata(String metadataFolde
}
}

public static LoadMetadataDetails[] readTableStatusFile(String tableStatusPath)
throws IOException {
Gson gsonObjectToRead = new Gson();
/**
* Read file and return its content as string
*
* @param tableStatusPath path of the table status to read
* @return file content, null is file does not exist
* @throws IOException if IO errors
*/
private static String readFileAsString(String tableStatusPath) throws IOException {
DataInputStream dataInputStream = null;
BufferedReader buffReader = null;
InputStreamReader inStream = null;
LoadMetadataDetails[] loadFolderDetails = null;

AtomicFileOperations fileOperation =
AtomicFileOperationFactory.getAtomicFileOperations(tableStatusPath);

if (!FileFactory.isFileExist(tableStatusPath)) {
return new LoadMetadataDetails[0];
return null;
}

// When storing table status file in object store, reading of table status file may
Expand All @@ -277,8 +283,7 @@ public static LoadMetadataDetails[] readTableStatusFile(String tableStatusPath)
dataInputStream = fileOperation.openForRead();
inStream = new InputStreamReader(dataInputStream, Charset.forName(DEFAULT_CHARSET));
buffReader = new BufferedReader(inStream);
loadFolderDetails = gsonObjectToRead.fromJson(buffReader, LoadMetadataDetails[].class);
retry = 0;
return buffReader.readLine();
} catch (EOFException ex) {
retry--;
if (retry == 0) {
Expand All @@ -299,13 +304,23 @@ public static LoadMetadataDetails[] readTableStatusFile(String tableStatusPath)
closeStreams(buffReader, inStream, dataInputStream);
}
}
return null;
}

// if listOfLoadFolderDetailsArray is null, return empty array
if (null == loadFolderDetails) {
/**
* Read table status file and decoded to segment meta arrays
*
* @param tableStatusPath table status file path
* @return segment metadata
* @throws IOException if IO errors
*/
public static LoadMetadataDetails[] readTableStatusFile(String tableStatusPath)
throws IOException {
String content = readFileAsString(tableStatusPath);
if (content == null) {
return new LoadMetadataDetails[0];
}

return loadFolderDetails;
return new Gson().fromJson(content, LoadMetadataDetails[].class);
}

/**
Expand All @@ -314,7 +329,7 @@ public static LoadMetadataDetails[] readTableStatusFile(String tableStatusPath)
* @param loadMetadataDetails
* @return
*/
public static int getMaxSegmentId(LoadMetadataDetails[] loadMetadataDetails) {
private static int getMaxSegmentId(LoadMetadataDetails[] loadMetadataDetails) {
int newSegmentId = -1;
for (int i = 0; i < loadMetadataDetails.length; i++) {
try {
Expand Down Expand Up @@ -525,45 +540,78 @@ public static List<String> updateDeletionStatus(AbsoluteTableIdentifier identifi
}

/**
* writes load details into a given file at @param dataLoadLocation
* Backup the table status file as 'tablestatus.backup' in the same path
*
* @param dataLoadLocation
* @param listOfLoadFolderDetailsArray
* @throws IOException
* @param tableStatusPath table status file path
*/
public static void writeLoadDetailsIntoFile(String dataLoadLocation,
private static void backupTableStatus(String tableStatusPath) throws IOException {
CarbonFile file = FileFactory.getCarbonFile(tableStatusPath);
if (file.exists()) {
String backupPath = tableStatusPath + ".backup";
String currentContent = readFileAsString(tableStatusPath);
if (currentContent != null) {
writeStringIntoFile(backupPath, currentContent);
}
}
}

/**
* writes load details to specified path
*
* @param tableStatusPath path of the table status file
* @param listOfLoadFolderDetailsArray segment metadata
* @throws IOException if IO errors
*/
public static void writeLoadDetailsIntoFile(
String tableStatusPath,
LoadMetadataDetails[] listOfLoadFolderDetailsArray) throws IOException {
AtomicFileOperations fileWrite =
AtomicFileOperationFactory.getAtomicFileOperations(dataLoadLocation);
// When overwriting table status file, if process crashed, table status file
// will be in corrupted state. This can happen in an unstable environment,
// like in the cloud. To prevent the table corruption, user can enable following
// property to enable backup of the table status before overwriting it.
if (tableStatusPath.endsWith(CarbonTablePath.TABLE_STATUS_FILE) &&
CarbonProperties.isEnableTableStatusBackup()) {
backupTableStatus(tableStatusPath);
}
String content = new Gson().toJson(listOfLoadFolderDetailsArray);
mockForTest();
// make the table status file smaller by removing fields that are default value
for (LoadMetadataDetails loadMetadataDetails : listOfLoadFolderDetailsArray) {
loadMetadataDetails.removeUnnecessaryField();
}
// If process crashed during following write, table status file need to be
// manually recovered.
writeStringIntoFile(tableStatusPath, content);
}

// a dummy func for mocking in testcase, which simulates IOException
private static void mockForTest() throws IOException {
}

/**
* writes string content to specified path
*
* @param filePath path of the file to write
* @param content content to write
* @throws IOException if IO errors
*/
private static void writeStringIntoFile(String filePath, String content) throws IOException {
AtomicFileOperations fileWrite = AtomicFileOperationFactory.getAtomicFileOperations(filePath);
BufferedWriter brWriter = null;
DataOutputStream dataOutputStream = null;
Gson gsonObjectToWrite = new Gson();
// write the updated data into the metadata file.

try {
dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
Charset.forName(DEFAULT_CHARSET)));

// make the table status file smaller by removing fields that are default value
for (LoadMetadataDetails loadMetadataDetails : listOfLoadFolderDetailsArray) {
loadMetadataDetails.removeUnnecessaryField();
}

String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetailsArray);
brWriter.write(metadataInstance);
brWriter = new BufferedWriter(new OutputStreamWriter(
dataOutputStream, Charset.forName(DEFAULT_CHARSET)));
brWriter.write(content);
} catch (IOException ioe) {
LOG.error("Error message: " + ioe.getLocalizedMessage());
LOG.error("Write file failed: " + ioe.getLocalizedMessage());
fileWrite.setFailed();
throw ioe;
} finally {
if (null != brWriter) {
brWriter.flush();
}
CarbonUtil.closeStreams(brWriter);
fileWrite.close();
}

}

/**
Expand Down Expand Up @@ -637,7 +685,7 @@ && isLoadInProgress(absoluteTableIdentifier, loadId)) {
* @param invalidLoadTimestamps
* @return invalidLoadTimestamps
*/
public static List<String> updateDeletionStatus(AbsoluteTableIdentifier absoluteTableIdentifier,
private static List<String> updateDeletionStatus(AbsoluteTableIdentifier absoluteTableIdentifier,
String loadDate, LoadMetadataDetails[] listOfLoadFolderDetailsArray,
List<String> invalidLoadTimestamps, Long loadStartTime) {
// For each load timestamp loop through data and if the
Expand Down Expand Up @@ -708,8 +756,7 @@ private static void closeStreams(Closeable... streams) {
* @param newMetadata
* @return
*/

public static List<LoadMetadataDetails> updateLatestTableStatusDetails(
private static List<LoadMetadataDetails> updateLatestTableStatusDetails(
LoadMetadataDetails[] oldMetadata, LoadMetadataDetails[] newMetadata) {

List<LoadMetadataDetails> newListMetadata =
Expand All @@ -727,7 +774,7 @@ public static List<LoadMetadataDetails> updateLatestTableStatusDetails(
*
* @param loadMetadata
*/
public static void updateSegmentMetadataDetails(LoadMetadataDetails loadMetadata) {
private static void updateSegmentMetadataDetails(LoadMetadataDetails loadMetadata) {
// update status only if the segment is not marked for delete
if (SegmentStatus.MARKED_FOR_DELETE != loadMetadata.getSegmentStatus()) {
loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
Expand Down Expand Up @@ -893,7 +940,7 @@ private static boolean isLoadDeletionRequired(LoadMetadataDetails[] details) {
* @param newList
* @return
*/
public static List<LoadMetadataDetails> updateLoadMetadataFromOldToNew(
private static List<LoadMetadataDetails> updateLoadMetadataFromOldToNew(
LoadMetadataDetails[] oldList, LoadMetadataDetails[] newList) {

List<LoadMetadataDetails> newListMetadata =
Expand Down Expand Up @@ -1024,7 +1071,9 @@ public static void deleteLoadsAndUpdateMetadata(CarbonTable carbonTable, boolean
// update the metadata details from old to new status.
List<LoadMetadataDetails> latestStatus =
updateLoadMetadataFromOldToNew(tuple2.details, latestMetadata);
writeLoadMetadata(identifier, latestStatus);
writeLoadDetailsIntoFile(
CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()),
latestStatus.toArray(new LoadMetadataDetails[0]));
}
updationCompletionStatus = true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1132,6 +1132,11 @@ public boolean isEnableVectorReader() {
CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT).equalsIgnoreCase("true");
}

public static boolean isEnableTableStatusBackup() {
return getInstance().getProperty(CarbonCommonConstants.ENABLE_TABLE_STATUS_BACKUP,
CarbonCommonConstants.ENABLE_TABLE_STATUS_BACKUP_DEFAULT).equalsIgnoreCase("true");
}

/**
* Validate the restrictions
*
Expand Down
1 change: 1 addition & 0 deletions docs/configuration-parameters.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ This section provides the details of all the configurations required for the Car
| carbon.lock.retry.timeout.sec | 5 | Specifies the interval between the retries to obtain the lock for any operation other than load. **NOTE:** Refer to ***carbon.lock.retries*** for understanding why CarbonData uses locks for operations. |
| carbon.fs.custom.file.provider | None | To support FileTypeInterface for configuring custom CarbonFile implementation to work with custom FileSystem. |
| carbon.timeseries.first.day.of.week | SUNDAY | This parameter configures which day of the week to be considered as first day of the week. Because first day of the week will be different in different parts of the world. |
| carbon.enable.tablestatus.backup | false | In cloud object store scenario, overwriting table status file is not an atomic operation since it uses rename API. Thus, it is possible that table status is corrupted if process crashed when overwriting the table status file. To protect from file corruption, user can enable this property. |

## Data Loading Configuration

Expand Down
5 changes: 5 additions & 0 deletions integration/spark2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.jmockit</groupId>
<artifactId>jmockit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.carbondata

import java.io.IOException

import mockit.{Mock, MockUp}
import org.apache.spark.sql.CarbonEnv
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath

class TableStatusBackupTest extends QueryTest with BeforeAndAfterAll {
override protected def beforeAll(): Unit = {
CarbonProperties.getInstance().addProperty(
CarbonCommonConstants.ENABLE_TABLE_STATUS_BACKUP, "true")
sql("drop table if exists source")
sql("create table source(a string) stored as carbondata")
}

override protected def afterAll(): Unit = {
sql("drop table if exists source")
CarbonProperties.getInstance().addProperty(
CarbonCommonConstants.ENABLE_TABLE_STATUS_BACKUP, "false")
}

test("backup table status file") {
sql("insert into source values ('A'), ('B')")
val tablePath = CarbonEnv.getCarbonTable(None, "source")(sqlContext.sparkSession).getTablePath
val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(tablePath)
val oldTableStatus = SegmentStatusManager.readTableStatusFile(tableStatusFilePath)

var mock = new MockUp[SegmentStatusManager]() {
@Mock
@throws[IOException]
def mockForTest(): Unit = {
throw new IOException("thrown in mock")
}
}

val exception = intercept[IOException] {
sql("insert into source values ('A'), ('B')")
}
assert(exception.getMessage.contains("thrown in mock"))
val backupPath = tableStatusFilePath + ".backup"
assert(FileFactory.isFileExist(backupPath))
val backupTableStatus = SegmentStatusManager.readTableStatusFile(backupPath)
assertResult(oldTableStatus)(backupTableStatus)

mock = new MockUp[SegmentStatusManager]() {
@Mock
def mockForTest(): Unit = {
}
}
}
}

0 comments on commit e6f7d4e

Please sign in to comment.