Skip to content

Commit

Permalink
IGNITE-8748 All FileIO#write methods should return number of written …
Browse files Browse the repository at this point in the history
…bytes - Fixes #4170.

Signed-off-by: Ivan Rakov <irakov@apache.org>
  • Loading branch information
Stelmak Alexey authored and glukos committed Jun 15, 2018
1 parent 312acd0 commit b4b965b
Show file tree
Hide file tree
Showing 10 changed files with 44 additions and 19 deletions.
Expand Up @@ -154,14 +154,14 @@ public AsyncFileIO(File file, ThreadLocal<ChannelOpFuture> holder, OpenOption...
}

/** {@inheritDoc} */
@Override public void write(byte[] buf, int off, int len) throws IOException {
@Override public int write(byte[] buf, int off, int len) throws IOException {
ChannelOpFuture fut = holder.get();
fut.reset();

ch.write(ByteBuffer.wrap(buf, off, len), position, this, fut);

try {
fut.getUninterruptibly();
return fut.getUninterruptibly();
}
catch (IgniteCheckedException e) {
throw new IOException(e);
Expand Down
Expand Up @@ -120,9 +120,11 @@ public interface FileIO extends AutoCloseable {
* @param off Start offset in the {@code buffer}.
* @param len Number of bytes to write.
*
* @return Number of written bytes.
*
* @throws IOException If some I/O error occurs.
*/
public void write(byte[] buf, int off, int len) throws IOException;
public int write(byte[] buf, int off, int len) throws IOException;

/**
* Allocates memory mapped buffer for this file with given size.
Expand Down
Expand Up @@ -72,8 +72,8 @@ public FileIODecorator(FileIO delegate) {
}

/** {@inheritDoc} */
@Override public void write(byte[] buf, int off, int len) throws IOException {
delegate.write(buf, off, len);
@Override public int write(byte[] buf, int off, int len) throws IOException {
return delegate.write(buf, off, len);
}

/** {@inheritDoc} */
Expand Down
Expand Up @@ -79,8 +79,8 @@ public RandomAccessFileIO(File file, OpenOption... modes) throws IOException {
}

/** {@inheritDoc} */
@Override public void write(byte[] buf, int off, int len) throws IOException {
ch.write(ByteBuffer.wrap(buf, off, len));
@Override public int write(byte[] buf, int off, int len) throws IOException {
return ch.write(ByteBuffer.wrap(buf, off, len));
}

/** {@inheritDoc} */
Expand Down
Expand Up @@ -110,7 +110,7 @@ public UnzipFileIO(File zip) throws IOException {
}

/** {@inheritDoc} */
@Override public void write(byte[] buf, int off, int len) throws IOException {
@Override public int write(byte[] buf, int off, int len) throws IOException {
throw new UnsupportedOperationException();
}

Expand Down
Expand Up @@ -71,6 +71,7 @@
import org.apache.ignite.events.EventType;
import org.apache.ignite.events.WalSegmentArchivedEvent;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
Expand Down Expand Up @@ -98,6 +99,7 @@
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.GridUnsafe;
Expand Down Expand Up @@ -254,6 +256,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** Events service */
private final GridEventStorageManager evt;

/** Failure processor */
private final FailureProcessor failureProcessor;

/** */
private IgniteConfiguration igCfg;

Expand Down Expand Up @@ -360,6 +365,7 @@ public FileWriteAheadLogManager(@NotNull final GridKernalContext ctx) {
ioFactory = new RandomAccessFileIOFactory();
walAutoArchiveAfterInactivity = dsCfg.getWalAutoArchiveAfterInactivity();
evt = ctx.event();
failureProcessor = ctx.failure();
}

/**
Expand Down Expand Up @@ -1382,11 +1388,19 @@ private void formatFile(File file, int bytesCntToFormat) throws IgniteCheckedExc
try (FileIO fileIO = ioFactory.create(file, CREATE, READ, WRITE)) {
int left = bytesCntToFormat;

if (mode == WALMode.FSYNC) {
if (mode == WALMode.FSYNC || mmap) {
while (left > 0) {
int toWrite = Math.min(FILL_BUF.length, left);

fileIO.write(FILL_BUF, 0, toWrite);
if (fileIO.write(FILL_BUF, 0, toWrite) < toWrite) {
final IgniteCheckedException ex = new IgniteCheckedException("Failed to extend WAL segment file: " +
file.getName() + ". Probably disk is too busy, please check your device.");

if (failureProcessor != null)
failureProcessor.process(new FailureContext(FailureType.CRITICAL_ERROR, ex));

throw ex;
}

left -= toWrite;
}
Expand Down Expand Up @@ -2133,7 +2147,15 @@ private class FileDecompressor extends Thread {

int bytesRead;
while ((bytesRead = zis.read(arr)) > 0)
io.write(arr, 0, bytesRead);
if (io.write(arr, 0, bytesRead) < bytesRead) {
final IgniteCheckedException ex = new IgniteCheckedException("Failed to extend file: " +
unzipTmp.getName() + ". Probably disk is too busy, please check your device.");

if (failureProcessor != null)
failureProcessor.process(new FailureContext(FailureType.CRITICAL_ERROR, ex));

throw ex;
}
}

try {
Expand Down
Expand Up @@ -604,7 +604,7 @@ private static class TestFileIO implements FileIO {
}

/** {@inheritDoc} */
@Override public void write(byte[] buf, int off, int len) throws IOException {
@Override public int write(byte[] buf, int off, int len) throws IOException {
CountDownLatch latch = fileIOLatch.get();

if (latch != null && Thread.currentThread().getName().contains("checkpoint"))
Expand All @@ -615,7 +615,7 @@ private static class TestFileIO implements FileIO {
throw new IgniteException(ex);
}

delegate.write(buf, off, len);
return delegate.write(buf, off, len);
}

/** {@inheritDoc} */
Expand Down
Expand Up @@ -429,11 +429,12 @@ public LimitedSizeFileIO(FileIO delegate, AtomicLong availableSpaceBytes) {
}

/** {@inheritDoc} */
@Override public void write(byte[] buf, int off, int len) throws IOException {
super.write(buf, off, len);
@Override public int write(byte[] buf, int off, int len) throws IOException {
final int num = super.write(buf, off, len);
availableSpaceBytes.addAndGet(-len);
if (availableSpaceBytes.get() < 0)
throw new IOException("Not enough space!");
return num;
}

/** {@inheritDoc} */
Expand Down
Expand Up @@ -311,11 +311,11 @@ private static class SlowCheckpointFileIOFactory implements FileIOFactory {
return delegate.write(srcBuf, position);
}

@Override public void write(byte[] buf, int off, int len) throws IOException {
@Override public int write(byte[] buf, int off, int len) throws IOException {
if (slowCheckpointEnabled.get() && Thread.currentThread().getName().contains("checkpoint"))
LockSupport.parkNanos(5_000_000);

delegate.write(buf, off, len);
return delegate.write(buf, off, len);
}

/** {@inheritDoc} */
Expand Down
Expand Up @@ -455,8 +455,8 @@ private static String getLastError() {
}

/** {@inheritDoc} */
@Override public void write(byte[] buf, int off, int len) throws IOException {
write(ByteBuffer.wrap(buf, off, len));
@Override public int write(byte[] buf, int off, int len) throws IOException {
return write(ByteBuffer.wrap(buf, off, len));
}

/** {@inheritDoc} */
Expand Down

0 comments on commit b4b965b

Please sign in to comment.