Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a fix/workaround when fs.append() unable to return a valid outputstream #388

Merged
merged 1 commit into from
May 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
private final Integer bufferSize;
private final Short replication;
private FSDataOutputStream output;
private static final String APPEND_UNAVAILABLE_EXCEPTION_MESSAGE = "not sufficiently replicated yet";

/**
* @param fs
Expand All @@ -69,6 +70,18 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
try {
this.output = fs.append(path, bufferSize);
} catch (RemoteException e) {
if (e.getMessage().contains(APPEND_UNAVAILABLE_EXCEPTION_MESSAGE)) {
// This issue happens when all replicas for a file are down and/or being decommissioned.
// The fs.append() API could append to the last block for a file. If the last block is full, a new block is
// appended to. In a scenario when a lot of DN's are decommissioned, it can happen that DN's holding all
// replicas for a block/file are decommissioned together. During this process, all these blocks will start to
// get replicated to other active DataNodes but this process might take time (can be of the order of few
// hours). During this time, if a fs.append() API is invoked for a file whose last block is eligible to be
// appended to, then the NN will throw an exception saying that it couldn't find any active replica with the
// last block. Find more information here : https://issues.apache.org/jira/browse/HDFS-6325
log.warn("Failed to open an append stream to the log file. Opening a new log file..", e);
createNewFile();
}
// this happens when either another task executor writing to this file died or
// data node is going down
if (e.getClassName().equals(AlreadyBeingCreatedException.class.getName())
Expand All @@ -86,9 +99,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
} catch (IOException ioe) {
if (ioe.getMessage().equalsIgnoreCase("Not supported")) {
log.info("Append not supported. Opening a new log file..");
this.logFile = logFile.rollOver(fs);
this.output = fs.create(this.logFile.getPath(), false, bufferSize, replication,
WriterBuilder.DEFAULT_SIZE_THRESHOLD, null);
createNewFile();
} else {
throw ioe;
}
Expand Down Expand Up @@ -192,6 +203,12 @@ private Writer rolloverIfNeeded() throws IOException, InterruptedException {
return this;
}

private void createNewFile() throws IOException {
this.logFile = logFile.rollOver(fs);
this.output = fs.create(this.logFile.getPath(), false, bufferSize, replication,
WriterBuilder.DEFAULT_SIZE_THRESHOLD, null);
}

@Override
public void close() throws IOException {
flush();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.uber.hoodie.common.table.log;

import static com.uber.hoodie.common.util.SchemaTestUtil.getSimpleSchema;

import com.google.common.collect.Maps;
import com.uber.hoodie.common.minicluster.MiniClusterUtil;
import com.uber.hoodie.common.model.HoodieArchivedLogFile;
import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.util.SchemaTestUtil;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/**
* This class is intentionally using a different way of setting up the MiniDFSCluster and not relying on
* {@link MiniClusterUtil} to reproduce append() issue : https://issues.apache.org/jira/browse/HDFS-6325
* Reference : https://issues.apache.org/jira/secure/attachment/12645053/HDFS-6325.patch
*/
public class HoodieLogFormatAppendFailureTest {

private static File baseDir;
private static MiniDFSCluster cluster;

@BeforeClass
public static void setUpClass() throws IOException {
// NOTE : The MiniClusterDFS leaves behind the directory under which the cluster was created
baseDir = new File("/tmp/" + UUID.randomUUID().toString());
FileUtil.fullyDelete(baseDir);
// Append is not supported in LocalFileSystem. HDFS needs to be setup.
Configuration conf = new Configuration();
// lower heartbeat interval for fast recognition of DN
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 3000);
cluster = new MiniDFSCluster.Builder(conf).checkExitOnShutdown(true).numDataNodes(4).build();
}

@AfterClass
public static void tearDownClass() {
cluster.shutdown(true);
// Force clean up the directory under which the cluster was created
FileUtil.fullyDelete(baseDir);
}

@Test(timeout = 60000)
public void testFailedToGetAppendStreamFromHDFSNameNode() throws IOException, URISyntaxException,
InterruptedException, TimeoutException {

// Use some fs like LocalFileSystem, that does not support appends
String uuid = UUID.randomUUID().toString();
Path localPartitionPath = new Path("/tmp/");
FileSystem fs = cluster.getFileSystem();
Path testPath = new Path(localPartitionPath, uuid);
fs.mkdirs(testPath);

// Some data & append.
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 10);
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);

Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(testPath)
.withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits"
+ ".archive").overBaseCommit("")
.withFs(fs).build();

writer = writer.appendBlock(dataBlock);
// get the current log file version to compare later
int logFileVersion = writer.getLogFile().getLogVersion();
Path logFilePath = writer.getLogFile().getPath();
writer.close();

// Wait for 3 times replication of file
DFSTestUtil.waitReplication(fs, logFilePath, (short) 3);
// Shut down all DNs that have the last block location for the file
LocatedBlocks lbs = cluster.getFileSystem().getClient().getNamenode()
.getBlockLocations("/tmp/" + uuid + "/" + logFilePath.getName(), 0, Long.MAX_VALUE);
List<DataNode> dnsOfCluster = cluster.getDataNodes();
DatanodeInfo[] dnsWithLocations = lbs.getLastLocatedBlock().getLocations();
for (DataNode dn : dnsOfCluster) {
for (DatanodeInfo loc : dnsWithLocations) {
if (dn.getDatanodeId().equals(loc)) {
dn.shutdown();
cluster.stopDataNode(dn.getDisplayName());
DFSTestUtil.waitForDatanodeDeath(dn);
}
}
}
// Wait for the replication of this file to go down to 0
DFSTestUtil.waitReplication(fs, logFilePath, (short) 0);

// Opening a new Writer right now will throw IOException. The code should handle this, rollover the logfile and
// return a new writer with a bumped up logVersion
writer = HoodieLogFormat.newWriterBuilder().onParentPath(testPath)
.withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits" + ".archive")
.overBaseCommit("")
.withFs(fs).build();
// The log version should be different for this new writer
Assert.assertFalse(writer.getLogFile().getLogVersion() == logFileVersion);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.uber.hoodie.common.util.SpillableMapTestUtils;
import com.uber.hoodie.common.util.collection.converter.HoodieRecordConverter;
import com.uber.hoodie.common.util.collection.converter.StringConverter;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -182,12 +181,6 @@ public void simpleTestWithException() throws IOException, URISyntaxException {
}
}

@Test
public void simpleTestWithExceptionValidateFileIsRemoved() throws Exception {
File file = new File(FAILURE_OUTPUT_PATH);
assertFalse(file.exists());
}

@Test
public void testDataCorrectnessWithUpsertsToDataInMapAndOnDisk() throws IOException, URISyntaxException {

Expand Down