From 254abc40d0e34f6c223c254abbdc6c470c1ae2d9 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Wed, 26 Nov 2025 12:58:07 +0000 Subject: [PATCH] Provided implementation for FaultInjectorFileIoEvents --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 + .../datanode/FaultInjectorFileIoEvents.java | 97 +++++++++- .../hdfs/server/datanode/FileIoProvider.java | 171 ++++++++++++++++-- .../src/main/resources/hdfs-default.xml | 24 +++ .../TestFaultInjectorFileIoEvents.java | 148 +++++++++++++++ .../impl/TestDataNodeFileIOFailures.java | 139 ++++++++++++++ 6 files changed, 554 insertions(+), 29 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFaultInjectorFileIoEvents.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDataNodeFileIOFailures.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index df9e3907bdaf2..b6e3098092d44 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1202,6 +1202,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.datanode.enable.fileio.fault.injection"; public static final boolean DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_DEFAULT = false; + public static final String DFS_DATANODE_ENABLED_OPS_FILEIO_FAULT_INJECTION_KEY = + "dfs.datanode.fileio.fault.injection.operations"; + public static final String DFS_DATANODE_FILEIO_FAULT_PERCENTAGE_KEY = + "dfs.datanode.fileio.fault.percentage"; public static final String DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY = "dfs.datanode.fileio.profiling.sampling.percentage"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FaultInjectorFileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FaultInjectorFileIoEvents.java index ead6ed9912f8a..85384dc3f2bcd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FaultInjectorFileIoEvents.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FaultInjectorFileIoEvents.java @@ -18,13 +18,23 @@ package org.apache.hadoop.hdfs.server.datanode; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ENABLED_OPS_FILEIO_FAULT_INJECTION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FILEIO_FAULT_PERCENTAGE_KEY; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +import javax.annotation.Nullable; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; - -import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Injects faults in the metadata and data related operations on datanode @@ -33,23 +43,90 @@ @InterfaceAudience.Private public class FaultInjectorFileIoEvents { + public static final class InjectedFileIOFaultException extends Exception { + + private static final long serialVersionUID = 1L; + + private InjectedFileIOFaultException() { + super("Fault injected by configuration"); + } + } + + public static final Logger LOG = LoggerFactory.getLogger( + FaultInjectorFileIoEvents.class); + private final boolean isEnabled; + private final Set configuredOps; + private final int faultRangeMax; public FaultInjectorFileIoEvents(@Nullable Configuration conf) { if (conf != null) { - isEnabled = conf.getBoolean(DFSConfigKeys - .DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_KEY, DFSConfigKeys - .DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_DEFAULT); + isEnabled = conf.getBoolean( + DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_KEY, + DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_DEFAULT); } else { isEnabled = false; } + configuredOps = new HashSet<>(); + if (isEnabled) { + String ops = conf.get( + DFS_DATANODE_ENABLED_OPS_FILEIO_FAULT_INJECTION_KEY); + if (ops != null) { + String[] parts = ops.split(","); + for (String part : parts) { + String opName = part.trim().toUpperCase(); + try { + configuredOps.add(FileIoProvider.OPERATION.valueOf(opName)); + } catch (IllegalArgumentException e) { + LOG.warn("Value '{}' is not valid FileIoProvider.OPERATION, " + + "ignoring...", opName); + } + } + } + int faultPercentagePropVal = Math.min(conf.getInt( + DFS_DATANODE_FILEIO_FAULT_PERCENTAGE_KEY, 0), 100); + faultRangeMax = (int) ((double) faultPercentagePropVal / 100 * + Integer.MAX_VALUE); + LOG.warn("FaultInjectorFileIoEvents is enabled and will fail the " + + "following operations: {}", configuredOps); + LOG.warn(" *** DO NOT USE IN PRODUCTION!!! ***"); + } else { + faultRangeMax = 0; + } + } + + @VisibleForTesting + boolean isEnabled() { + return isEnabled; + } + + @VisibleForTesting + Set getOperations() { + return configuredOps; + } + + @VisibleForTesting + int getFaultRangeMax() { + return faultRangeMax; + } + + private void fault(FileIoProvider.OPERATION op) + throws InjectedFileIOFaultException { + if (isEnabled && faultRangeMax > 0 && configuredOps.contains(op) + && ThreadLocalRandom.current().nextInt() < faultRangeMax) { + LOG.error("Throwing fault for operation: " + op); + throw new InjectedFileIOFaultException(); + } } - public void beforeMetadataOp( - @Nullable FsVolumeSpi volume, FileIoProvider.OPERATION op) { + public void beforeMetadataOp(@Nullable FsVolumeSpi volume, + FileIoProvider.OPERATION op) throws InjectedFileIOFaultException { + fault(op); } - public void beforeFileIo( - @Nullable FsVolumeSpi volume, FileIoProvider.OPERATION op, long len) { + public void beforeFileIo(@Nullable FsVolumeSpi volume, + FileIoProvider.OPERATION op, long len) + throws InjectedFileIOFaultException { + fault(op); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java index 552f5199f1d3a..13cb8c8f15b37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java @@ -18,6 +18,39 @@ package org.apache.hadoop.hdfs.server.datanode; +import static org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION.DELETE; +import static org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION.EXISTS; +import static org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION.FADVISE; +import static org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION.FLUSH; +import static org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION.LIST; +import static org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION.MKDIRS; +import static org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION.MOVE; +import static org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION.NATIVE_COPY; +import static org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION.OPEN; +import static org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION.READ; +import static org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION.SYNC; +import static org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION.TRANSFER; +import static org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION.WRITE; + +import java.io.File; +import java.io.FileDescriptor; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FilenameFilter; +import java.io.Flushable; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.io.UncheckedIOException; +import java.nio.channels.FileChannel; +import java.nio.file.CopyOption; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + import org.apache.commons.io.FileUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -26,35 +59,18 @@ import org.apache.hadoop.fs.HardLink; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.datanode.FaultInjectorFileIoEvents.InjectedFileIOFaultException; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.nativeio.Errno; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIOException; import org.apache.hadoop.net.SocketOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import java.io.File; -import java.io.FileDescriptor; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.FilenameFilter; -import java.io.Flushable; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.channels.FileChannel; -import java.nio.file.CopyOption; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.List; - -import static org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION.*; - /** * This class abstracts out various file IO operations performed by the * DataNode and invokes profiling (for collecting stats) and fault injection @@ -132,6 +148,9 @@ public void flush( faultInjectorEventHook.beforeFileIo(volume, FLUSH, 0); f.flush(); profilingEventHook.afterFileIo(volume, FLUSH, begin, 0); + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch (Exception e) { onFailure(volume, begin); throw e; @@ -151,6 +170,9 @@ public void sync( faultInjectorEventHook.beforeFileIo(volume, SYNC, 0); IOUtils.fsync(fos.getChannel(), false); profilingEventHook.afterFileIo(volume, SYNC, begin, 0); + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch (Exception e) { onFailure(volume, begin); throw e; @@ -168,6 +190,9 @@ public void dirSync(@Nullable FsVolumeSpi volume, File dir) faultInjectorEventHook.beforeFileIo(volume, SYNC, 0); IOUtils.fsync(dir); profilingEventHook.afterFileIo(volume, SYNC, begin, 0); + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch (Exception e) { onFailure(volume, begin); throw e; @@ -187,6 +212,9 @@ public void syncFileRange( faultInjectorEventHook.beforeFileIo(volume, SYNC, 0); NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, numBytes, flags); profilingEventHook.afterFileIo(volume, SYNC, begin, 0); + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new NativeIOException(e.getMessage(), Errno.UNKNOWN); } catch (Exception e) { onFailure(volume, begin); throw e; @@ -207,6 +235,9 @@ public void posixFadvise( NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( identifier, outFd, offset, length, flags); profilingEventHook.afterMetadataOp(volume, FADVISE, begin); + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new NativeIOException(e.getMessage(), Errno.UNKNOWN); } catch (Exception e) { onFailure(volume, begin); throw e; @@ -226,6 +257,9 @@ public boolean delete(@Nullable FsVolumeSpi volume, File f) { boolean deleted = f.delete(); profilingEventHook.afterMetadataOp(volume, DELETE, begin); return deleted; + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new UncheckedIOException(e.getMessage(), new IOException(e)); } catch (Exception e) { onFailure(volume, begin); throw e; @@ -249,6 +283,9 @@ public boolean deleteWithExistsCheck(@Nullable FsVolumeSpi volume, File f) { LOG.warn("Failed to delete file {}", f); } return deleted; + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new UncheckedIOException(e.getMessage(), new IOException(e)); } catch (Exception e) { onFailure(volume, begin); throw e; @@ -278,6 +315,9 @@ public void transferToSocketFully( sockOut.transferToFully(fileCh, position, count, waitTime, transferTime); profilingEventHook.afterFileIo(volume, TRANSFER, begin, count); + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch (Exception e) { String em = e.getMessage(); if (em != null) { @@ -308,6 +348,9 @@ public boolean createFile( boolean created = f.createNewFile(); profilingEventHook.afterMetadataOp(volume, OPEN, begin); return created; + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch (Exception e) { onFailure(volume, begin); throw e; @@ -335,6 +378,9 @@ public FileInputStream getFileInputStream( fis = new WrappedFileInputStream(volume, f); profilingEventHook.afterMetadataOp(volume, OPEN, begin); return fis; + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new FileNotFoundException(e.getMessage()); } catch(Exception e) { IOUtils.closeStream(fis); onFailure(volume, begin); @@ -366,6 +412,9 @@ public FileOutputStream getFileOutputStream( fos = new WrappedFileOutputStream(volume, f, append); profilingEventHook.afterMetadataOp(volume, OPEN, begin); return fos; + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new FileNotFoundException(e.getMessage()); } catch(Exception e) { IOUtils.closeStream(fos); onFailure(volume, begin); @@ -431,6 +480,9 @@ public FileInputStream getShareDeleteFileInputStream( NativeIO.getShareDeleteFileDescriptor(f, offset)); profilingEventHook.afterMetadataOp(volume, OPEN, begin); return fis; + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch(Exception e) { IOUtils.closeStream(fis); onFailure(volume, begin); @@ -463,6 +515,9 @@ public FileInputStream openAndSeek( FsDatasetUtil.openAndSeek(f, offset)); profilingEventHook.afterMetadataOp(volume, OPEN, begin); return fis; + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch(Exception e) { IOUtils.closeStream(fis); onFailure(volume, begin); @@ -494,6 +549,9 @@ public RandomAccessFile getRandomAccessFile( raf = new WrappedRandomAccessFile(volume, f, mode); profilingEventHook.afterMetadataOp(volume, OPEN, begin); return raf; + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new FileNotFoundException(e.getMessage()); } catch(Exception e) { IOUtils.closeStream(raf); onFailure(volume, begin); @@ -516,6 +574,9 @@ public boolean fullyDelete(@Nullable FsVolumeSpi volume, File dir) { LOG.trace("Deletion of dir {} {}", dir, deleted ? "succeeded" : "failed"); profilingEventHook.afterMetadataOp(volume, DELETE, begin); return deleted; + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new UncheckedIOException(e.getMessage(), new IOException(e)); } catch(Exception e) { onFailure(volume, begin); throw e; @@ -538,6 +599,9 @@ public void replaceFile( faultInjectorEventHook.beforeMetadataOp(volume, MOVE); FileUtil.replaceFile(src, target); profilingEventHook.afterMetadataOp(volume, MOVE, begin); + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch(Exception e) { onFailure(volume, begin); throw e; @@ -561,6 +625,9 @@ public void rename( faultInjectorEventHook.beforeMetadataOp(volume, MOVE); Storage.rename(src, target); profilingEventHook.afterMetadataOp(volume, MOVE, begin); + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch(Exception e) { onFailure(volume, begin); throw e; @@ -584,6 +651,9 @@ public void moveFile( faultInjectorEventHook.beforeMetadataOp(volume, MOVE); FileUtils.moveFile(src, target); profilingEventHook.afterMetadataOp(volume, MOVE, begin); + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch(Exception e) { onFailure(volume, begin); throw e; @@ -609,6 +679,9 @@ public void move( faultInjectorEventHook.beforeMetadataOp(volume, MOVE); Files.move(src, target, options); profilingEventHook.afterMetadataOp(volume, MOVE, begin); + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch(Exception e) { onFailure(volume, begin); throw e; @@ -635,6 +708,9 @@ public void nativeCopyFileUnbuffered( faultInjectorEventHook.beforeFileIo(volume, NATIVE_COPY, length); Storage.nativeCopyFileUnbuffered(src, target, preserveFileDate); profilingEventHook.afterFileIo(volume, NATIVE_COPY, begin, length); + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch(Exception e) { onFailure(volume, begin); throw e; @@ -661,6 +737,9 @@ public boolean mkdirs( created = dir.mkdirs(); isDirectory = !created && dir.isDirectory(); profilingEventHook.afterMetadataOp(volume, MKDIRS, begin); + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch(Exception e) { onFailure(volume, begin); throw e; @@ -688,6 +767,9 @@ public void mkdirsWithExistsCheck( faultInjectorEventHook.beforeMetadataOp(volume, MKDIRS); succeeded = dir.isDirectory() || dir.mkdirs(); profilingEventHook.afterMetadataOp(volume, MKDIRS, begin); + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch(Exception e) { onFailure(volume, begin); throw e; @@ -715,6 +797,9 @@ public File[] listFiles( File[] children = FileUtil.listFiles(dir); profilingEventHook.afterMetadataOp(volume, LIST, begin); return children; + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch(Exception e) { onFailure(volume, begin); throw e; @@ -738,6 +823,9 @@ public String[] list( String[] children = FileUtil.list(dir); profilingEventHook.afterMetadataOp(volume, LIST, begin); return children; + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch(Exception e) { onFailure(volume, begin); throw e; @@ -762,6 +850,9 @@ public List listDirectory( List children = IOUtils.listDirectory(dir, filter); profilingEventHook.afterMetadataOp(volume, LIST, begin); return children; + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch(Exception e) { onFailure(volume, begin); throw e; @@ -785,6 +876,9 @@ public int getHardLinkCount( int count = HardLink.getLinkCount(f); profilingEventHook.afterMetadataOp(volume, LIST, begin); return count; + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch(Exception e) { onFailure(volume, begin); throw e; @@ -805,6 +899,9 @@ public boolean exists(@Nullable FsVolumeSpi volume, File f) { boolean exists = f.exists(); profilingEventHook.afterMetadataOp(volume, EXISTS, begin); return exists; + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new UncheckedIOException(e.getMessage(), new IOException(e)); } catch(Exception e) { onFailure(volume, begin); throw e; @@ -847,6 +944,9 @@ public int read() throws IOException { int b = super.read(); profilingEventHook.afterFileIo(volume, READ, begin, LEN_INT); return b; + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch(Exception e) { onFailure(volume, begin); throw e; @@ -865,6 +965,9 @@ public int read(@Nonnull byte[] b) throws IOException { int numBytesRead = super.read(b); profilingEventHook.afterFileIo(volume, READ, begin, numBytesRead); return numBytesRead; + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch(Exception e) { onFailure(volume, begin); throw e; @@ -882,6 +985,9 @@ public int read(@Nonnull byte[] b, int off, int len) throws IOException { int numBytesRead = super.read(b, off, len); profilingEventHook.afterFileIo(volume, READ, begin, numBytesRead); return numBytesRead; + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch(Exception e) { onFailure(volume, begin); throw e; @@ -926,6 +1032,9 @@ public void write(int b) throws IOException { faultInjectorEventHook.beforeFileIo(volume, WRITE, LEN_INT); super.write(b); profilingEventHook.afterFileIo(volume, WRITE, begin, LEN_INT); + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch(Exception e) { onFailure(volume, begin); throw e; @@ -943,6 +1052,9 @@ public void write(@Nonnull byte[] b) throws IOException { faultInjectorEventHook.beforeFileIo(volume, WRITE, b.length); super.write(b); profilingEventHook.afterFileIo(volume, WRITE, begin, b.length); + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch(Exception e) { onFailure(volume, begin); throw e; @@ -959,6 +1071,9 @@ public void write(@Nonnull byte[] b, int off, int len) throws IOException { faultInjectorEventHook.beforeFileIo(volume, WRITE, len); super.write(b, off, len); profilingEventHook.afterFileIo(volume, WRITE, begin, len); + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch(Exception e) { onFailure(volume, begin); throw e; @@ -988,6 +1103,9 @@ public int read() throws IOException { int b = super.read(); profilingEventHook.afterFileIo(volume, READ, begin, LEN_INT); return b; + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch(Exception e) { onFailure(volume, begin); throw e; @@ -1002,6 +1120,9 @@ public int read(byte[] b, int off, int len) throws IOException { int numBytesRead = super.read(b, off, len); profilingEventHook.afterFileIo(volume, READ, begin, numBytesRead); return numBytesRead; + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch(Exception e) { onFailure(volume, begin); throw e; @@ -1017,6 +1138,9 @@ public int read(byte[] b) throws IOException { int numBytesRead = super.read(b); profilingEventHook.afterFileIo(volume, READ, begin, numBytesRead); return numBytesRead; + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch(Exception e) { onFailure(volume, begin); throw e; @@ -1031,6 +1155,9 @@ public void write(int b) throws IOException { faultInjectorEventHook.beforeFileIo(volume, WRITE, LEN_INT); super.write(b); profilingEventHook.afterFileIo(volume, WRITE, begin, LEN_INT); + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch(Exception e) { onFailure(volume, begin); throw e; @@ -1045,6 +1172,9 @@ public void write(@Nonnull byte[] b) throws IOException { faultInjectorEventHook.beforeFileIo(volume, WRITE, b.length); super.write(b); profilingEventHook.afterFileIo(volume, WRITE, begin, b.length); + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch(Exception e) { onFailure(volume, begin); throw e; @@ -1058,6 +1188,9 @@ public void write(byte[] b, int off, int len) throws IOException { faultInjectorEventHook.beforeFileIo(volume, WRITE, len); super.write(b, off, len); profilingEventHook.afterFileIo(volume, WRITE, begin, len); + } catch (InjectedFileIOFaultException e) { + onFailure(volume, begin); + throw new IOException(e.getMessage(), e); } catch(Exception e) { onFailure(volume, begin); throw e; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 2b889dd2adc5c..3fff39091533e 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -6712,4 +6712,28 @@ Enables observer reads for clients. This should only be enabled when clients are using routers. + + dfs.datanode.enable.fileio.fault.injection + false + + Enables fault injection in the DataNode for file I/O operations. This is useful for testing + how applications respond to exceptions. This should not be enabled on a production cluster. + + + + dfs.datanode.fileio.fault.injection.operations + + + Comma-separated list of FileIOProvider.OPERATION values for which to throw faults. + + + + dfs.datanode.fileio.fault.percentage + 0 + + This setting controls the percentage of file I/O operations for which + an exception will be thrown. Set to an integer value between 1 and 100 + to enable. A value of zero will disable the fault injector. + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFaultInjectorFileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFaultInjectorFileIoEvents.java new file mode 100644 index 0000000000000..25eaf7bc30b1a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFaultInjectorFileIoEvents.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ENABLED_OPS_FILEIO_FAULT_INJECTION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FILEIO_FAULT_PERCENTAGE_KEY; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.datanode.FaultInjectorFileIoEvents.InjectedFileIOFaultException; +import org.apache.hadoop.hdfs.server.datanode.FileIoProvider.OPERATION; +import org.junit.jupiter.api.Test; + +public class TestFaultInjectorFileIoEvents { + + private FaultInjectorFileIoEvents createFaultInjector(boolean enabled, String ops, + int percentage) { + Configuration conf = new Configuration(); + conf.setBoolean(DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_KEY, enabled); + conf.set(DFS_DATANODE_ENABLED_OPS_FILEIO_FAULT_INJECTION_KEY, ops); + conf.setInt(DFS_DATANODE_FILEIO_FAULT_PERCENTAGE_KEY, percentage); + return new FaultInjectorFileIoEvents(conf); + } + + private int getFaultMaxRange(int propertyValue) { + return (int) ((double) propertyValue / 100 * Integer.MAX_VALUE); + } + + private Set createExpected(OPERATION... ops) { + Set result = new HashSet<>(); + for (OPERATION o : ops) { + result.add(o); + } + return result; + } + + @Test + public void testDisabled() throws InjectedFileIOFaultException { + FaultInjectorFileIoEvents injector = createFaultInjector(false, "", 32); + assertEquals(false, injector.isEnabled()); + assertEquals(createExpected(), injector.getOperations()); + assertEquals(0, injector.getFaultRangeMax()); + injector.beforeMetadataOp(null, OPERATION.DELETE); + injector.beforeFileIo(null, OPERATION.WRITE, 0); + } + + @Test + public void testEnabledZeroPercentage() throws InjectedFileIOFaultException { + FaultInjectorFileIoEvents injector = createFaultInjector(true, OPERATION.DELETE.name(), 0); + assertEquals(true, injector.isEnabled()); + assertEquals(createExpected(OPERATION.DELETE), injector.getOperations()); + assertEquals(getFaultMaxRange(0), injector.getFaultRangeMax()); + injector.beforeMetadataOp(null, OPERATION.DELETE); + injector.beforeFileIo(null, OPERATION.WRITE, 0); + } + + @Test + public void testEnabled() throws InjectedFileIOFaultException { + FaultInjectorFileIoEvents injector = createFaultInjector(true, OPERATION.DELETE.name(), 100); + assertEquals(true, injector.isEnabled()); + assertEquals(createExpected(OPERATION.DELETE), injector.getOperations()); + assertEquals(getFaultMaxRange(100), injector.getFaultRangeMax()); + assertThrows(InjectedFileIOFaultException .class, () -> injector.beforeMetadataOp(null, + OPERATION.DELETE)); + assertThrows(InjectedFileIOFaultException .class, () -> injector.beforeFileIo(null, + OPERATION.DELETE, 0)); + injector.beforeMetadataOp(null, OPERATION.WRITE); + injector.beforeFileIo(null, OPERATION.WRITE, 0); + } + + @Test + public void testEnabledMulti() throws InjectedFileIOFaultException { + String ops = OPERATION.DELETE.name() + "," + OPERATION.WRITE.name(); + FaultInjectorFileIoEvents injector = createFaultInjector(true, ops, 100); + assertEquals(true, injector.isEnabled()); + assertEquals(createExpected(OPERATION.DELETE, OPERATION.WRITE), injector.getOperations()); + assertEquals(getFaultMaxRange(100), injector.getFaultRangeMax()); + assertThrows(InjectedFileIOFaultException .class, () -> injector.beforeMetadataOp(null, + OPERATION.DELETE)); + assertThrows(InjectedFileIOFaultException .class, () -> injector.beforeFileIo(null, + OPERATION.DELETE, 0)); + assertThrows(InjectedFileIOFaultException .class, () -> injector.beforeMetadataOp(null, + OPERATION.WRITE)); + assertThrows(InjectedFileIOFaultException .class, () -> injector.beforeFileIo(null, + OPERATION.WRITE, 0)); + injector.beforeMetadataOp(null, OPERATION.READ); + injector.beforeFileIo(null, OPERATION.READ, 0); + } + + @Test + public void testEnabledMultiInvalidEntry() throws InjectedFileIOFaultException { + String ops = OPERATION.DELETE.name() + "," + OPERATION.WRITE.name() + ",foo"; + FaultInjectorFileIoEvents injector = createFaultInjector(true, ops, 100); + assertEquals(true, injector.isEnabled()); + assertEquals(createExpected(OPERATION.DELETE, OPERATION.WRITE), injector.getOperations()); + assertEquals(getFaultMaxRange(100), injector.getFaultRangeMax()); + assertThrows(InjectedFileIOFaultException .class, () -> injector.beforeMetadataOp(null, + OPERATION.DELETE)); + assertThrows(InjectedFileIOFaultException .class, () -> injector.beforeFileIo(null, + OPERATION.DELETE, 0)); + assertThrows(InjectedFileIOFaultException .class, () -> injector.beforeMetadataOp(null, + OPERATION.WRITE)); + assertThrows(InjectedFileIOFaultException .class, () -> injector.beforeFileIo(null, + OPERATION.WRITE, 0)); + injector.beforeMetadataOp(null, OPERATION.READ); + injector.beforeFileIo(null, OPERATION.READ, 0); + } + + @Test + public void testEnabledMultiInvalidEntryLower() throws InjectedFileIOFaultException { + String ops = OPERATION.DELETE.name() + "," + OPERATION.WRITE.name() + ",foo"; + FaultInjectorFileIoEvents injector = createFaultInjector(true, ops.toLowerCase(), 100); + assertEquals(true, injector.isEnabled()); + assertEquals(createExpected(OPERATION.DELETE, OPERATION.WRITE), injector.getOperations()); + assertEquals(getFaultMaxRange(100), injector.getFaultRangeMax()); + assertThrows(InjectedFileIOFaultException .class, () -> injector.beforeMetadataOp(null, + OPERATION.DELETE)); + assertThrows(InjectedFileIOFaultException .class, () -> injector.beforeFileIo(null, + OPERATION.DELETE, 0)); + assertThrows(InjectedFileIOFaultException .class, () -> injector.beforeMetadataOp(null, + OPERATION.WRITE)); + assertThrows(InjectedFileIOFaultException .class, () -> injector.beforeFileIo(null, + OPERATION.WRITE, 0)); + injector.beforeMetadataOp(null, OPERATION.READ); + injector.beforeFileIo(null, OPERATION.READ, 0); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDataNodeFileIOFailures.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDataNodeFileIOFailures.java new file mode 100644 index 0000000000000..be8d7635c243a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestDataNodeFileIOFailures.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ENABLED_OPS_FILEIO_FAULT_INJECTION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FILEIO_FAULT_PERCENTAGE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.util.EnumSet; + +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.fs.Syncable; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; +import org.junit.jupiter.api.Test; + +public class TestDataNodeFileIOFailures { + + @Test + public void testFileHSyncFailure() throws Exception { + + HdfsConfiguration config = new HdfsConfiguration(); + config.setBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, false); + config.setBoolean(DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_KEY, true); + config.set(DFS_DATANODE_ENABLED_OPS_FILEIO_FAULT_INJECTION_KEY, + FileIoProvider.OPERATION.SYNC.name()); + // Fail 100% of the time + config.set(DFS_DATANODE_FILEIO_FAULT_PERCENTAGE_KEY, "100"); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(config) + .numDataNodes(1).build(); + + try { + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + Path path = new Path("/testFileHSyncFailure"); + FSDataOutputStream os = fs.create(path); + assertTrue(os.hasCapability(StreamCapabilities.HSYNC)); + assertTrue(os instanceof Syncable); + os.writeUTF("test"); + IOException ioe = assertThrows(IOException.class, () -> os.hsync()); + assertTrue(ioe.getMessage().startsWith("All datanodes ") && + ioe.getMessage().endsWith(" are bad. Aborting...")); + IOException ioe2 = assertThrows(IOException.class, () -> os.close()); + assertTrue(ioe2.getMessage().startsWith("All datanodes ") && + ioe2.getMessage().endsWith(" are bad. Aborting...")); + } finally { + cluster.shutdown(); + } + } + + @Test + public void testFileSyncCreateFlagFailure() throws Exception { + + HdfsConfiguration config = new HdfsConfiguration(); + config.setBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, false); + config.setBoolean(DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_KEY, true); + config.set(DFS_DATANODE_ENABLED_OPS_FILEIO_FAULT_INJECTION_KEY, + FileIoProvider.OPERATION.SYNC.name()); + // Fail 100% of the time + config.set(DFS_DATANODE_FILEIO_FAULT_PERCENTAGE_KEY, "100"); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(config) + .numDataNodes(1).build(); + + try { + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + Path path = new Path("/testFileSyncCreateFlagFailure"); + EnumSet flags = EnumSet.of(CreateFlag.SYNC_BLOCK, CreateFlag.CREATE); + IOException ioe = assertThrows(IOException.class, () -> { + try (FSDataOutputStream os = fs.create(path, FsPermission.getDefault(), flags, + 1024, (short) 1, 8192, null)) { + os.writeUTF("test"); + } + }); + assertTrue(ioe.getMessage().startsWith("All datanodes ") && + ioe.getMessage().endsWith(" are bad. Aborting...")); + } finally { + cluster.shutdown(); + } + } + + @Test + public void testFileSyncOnCloseFailure() throws Exception { + + HdfsConfiguration config = new HdfsConfiguration(); + config.setBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, true); + config.setBoolean(DFS_DATANODE_ENABLE_FILEIO_FAULT_INJECTION_KEY, true); + config.set(DFS_DATANODE_ENABLED_OPS_FILEIO_FAULT_INJECTION_KEY, + FileIoProvider.OPERATION.SYNC.name()); + // Fail 100% of the time + config.set(DFS_DATANODE_FILEIO_FAULT_PERCENTAGE_KEY, "100"); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(config) + .numDataNodes(1).build(); + + try { + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + Path path = new Path("/testFileSyncOnCloseFailure"); + IOException ioe = assertThrows(IOException.class, () -> { + try (FSDataOutputStream os = fs.create(path)) { + os.writeUTF("test"); + } + }); + assertTrue(ioe.getMessage().startsWith("All datanodes ") && + ioe.getMessage().endsWith(" are bad. Aborting...")); + } finally { + cluster.shutdown(); + } + } +}