Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -519,20 +519,7 @@ public void flush() throws IOException {
&& (!config.isStreamBufferFlushDelay() ||
writtenDataLength - totalDataFlushedLength
>= config.getStreamBufferSize())) {
try {
handleFlush(false);
} catch (ExecutionException e) {
// just set the exception here as well in order to maintain sanctity of
// ioException field
handleExecutionException(e);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
handleInterruptedException(ex, true);
} catch (Throwable e) {
String msg = "Failed to flush. error: " + e.getMessage();
LOG.error(msg, e);
throw e;
}
handleFlush(false);
}
}

Expand All @@ -553,7 +540,26 @@ private void writeChunk(ChunkBuffer buffer)
/**
* @param close whether the flush is happening as part of closing the stream
*/
private void handleFlush(boolean close)
protected void handleFlush(boolean close) throws IOException {
try {
handleFlushInternal(close);
} catch (ExecutionException e) {
handleExecutionException(e);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
handleInterruptedException(ex, true);
} catch (Throwable e) {
String msg = "Failed to flush. error: " + e.getMessage();
LOG.error(msg, e);
throw e;
} finally {
if (close) {
cleanup(false);
}
}
}

private void handleFlushInternal(boolean close)
throws IOException, InterruptedException, ExecutionException {
checkOpen();
// flush the last chunk data residing on the currentBuffer
Expand Down Expand Up @@ -587,20 +593,7 @@ private void handleFlush(boolean close)
public void close() throws IOException {
if (xceiverClientFactory != null && xceiverClient != null
&& bufferPool != null && bufferPool.getSize() > 0) {
try {
handleFlush(true);
} catch (ExecutionException e) {
handleExecutionException(e);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
handleInterruptedException(ex, true);
} catch (Throwable e) {
String msg = "Failed to flush. error: " + e.getMessage();
LOG.error(msg, e);
throw e;
} finally {
cleanup(false);
}
handleFlush(true);
// TODO: Turn the below buffer empty check on when Standalone pipeline
// is removed in the write path in tests
// Preconditions.checkArgument(buffer.position() == 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ public void hflush() throws IOException {
@Override
public void hsync() throws IOException {
if (!isClosed()) {
if (getBufferPool() != null && getBufferPool().getSize() > 0) {
handleFlush(false);
}
waitForFlushAndCommit(false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ void commitKey(long offset) throws IOException {
// in test, this could be null
long length = getKeyLength();
Preconditions.checkArgument(offset == length,
"Epected offset: " + offset + " expected len: " + length);
"Expected offset: " + offset + " expected len: " + length);
keyArgs.setDataSize(length);
keyArgs.setLocationInfoList(getLocationInfoList());
// When the key is multipart upload part file upload, we should not
Expand All @@ -345,6 +345,27 @@ void commitKey(long offset) throws IOException {
}
}

void hsyncKey(long offset) throws IOException {
if (keyArgs != null) {
// in test, this could be null
long length = getKeyLength();
Preconditions.checkArgument(offset == length,
"Expected offset: " + offset + " expected len: " + length);
keyArgs.setDataSize(length);
keyArgs.setLocationInfoList(getLocationInfoList());
// When the key is multipart upload part file upload, we should not
// commit the key, as this is not an actual key, this is a just a
// partial key of a large file.
if (keyArgs.getIsMultipartKey()) {
throw new IOException("Hsync is unsupported for multipart keys.");
} else {
omClient.hsyncKey(keyArgs, openID);
}
} else {
LOG.warn("Closing KeyOutputStream, but key args is null");
}
}

BlockOutputStreamEntry getCurrentStreamEntry() {
if (streamEntries.isEmpty() || streamEntries.size() <= currentStreamIndex) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ enum StreamAction {

private long clientID;

private OzoneManagerProtocol omClient;

public KeyOutputStream(ContainerClientMetrics clientMetrics) {
closed = false;
this.retryPolicyMap = HddsClientUtils.getExceptionList()
Expand Down Expand Up @@ -156,6 +158,7 @@ public KeyOutputStream(
this.isException = false;
this.writeOffset = 0;
this.clientID = handler.getId();
this.omClient = omClient;
}

/**
Expand Down Expand Up @@ -450,11 +453,7 @@ public void hflush() throws IOException {
public void hsync() throws IOException {
checkNotClosed();
handleFlushOrClose(StreamAction.HSYNC);
//TODO HDDS-7593: send hsyncKey to update length;
// where the hsyncKey op is similar to
// blockOutputStreamEntryPool.commitKey(offset)
// except that hsyncKey only updates the key length
// instead of committing it.
blockOutputStreamEntryPool.hsyncKey(offset);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public enum OMAction implements AuditAction {
ALLOCATE_BLOCK,
ALLOCATE_KEY,
COMMIT_KEY,
HSYNC,
CREATE_VOLUME,
CREATE_BUCKET,
DELETE_VOLUME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,20 @@ default void commitKey(OmKeyArgs args, long clientID)
"this to be implemented, as write requests use a new approach.");
}

/**
* Synchronize the key length. This will make the change from the client
* visible. The client is identified by the clientID.
*
* @param args the key to commit
* @param clientID the client identification
* @throws IOException
*/
default void hsyncKey(OmKeyArgs args, long clientID)
throws IOException {
throw new UnsupportedOperationException("OzoneManager does not require " +
"this to be implemented, as write requests use a new approach.");
}


/**
* Allocate a new block, it is assumed that the client is having an open key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,8 +742,20 @@ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, long clientId,
.getAllocateBlockResponse();
return OmKeyLocationInfo.getFromProtobuf(resp.getKeyLocation());
}

@Override
public void hsyncKey(OmKeyArgs args, long clientId)
throws IOException {
updateKey(args, clientId, true);
}

@Override
public void commitKey(OmKeyArgs args, long clientId)
throws IOException {
updateKey(args, clientId, false);
}

private void updateKey(OmKeyArgs args, long clientId, boolean hsync)
throws IOException {
CommitKeyRequest.Builder req = CommitKeyRequest.newBuilder();
List<OmKeyLocationInfo> locationInfoList = args.getLocationInfoList();
Expand Down Expand Up @@ -771,6 +783,7 @@ public void commitKey(OmKeyArgs args, long clientId)

req.setKeyArgs(keyArgsBuilder.build());
req.setClientID(clientId);
req.setHsync(hsync);


OMRequest omRequest = createOMRequest(Type.CommitKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,15 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.TestDataUtil;
import org.apache.hadoop.ozone.client.BucketArgs;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.junit.AfterClass;
Expand All @@ -35,6 +41,7 @@
import org.junit.jupiter.api.Assertions;
import org.junit.rules.Timeout;

import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;

import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
Expand All @@ -44,6 +51,8 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;


/**
Expand Down Expand Up @@ -157,4 +166,56 @@ static void runTestHSync(FileSystem fs, Path file) throws Exception {
}
Assertions.assertEquals(data.length, offset);
}

@Test
public void testStreamCapability() throws Exception {
final String rootPath = String.format("%s://%s/",
OZONE_OFS_URI_SCHEME, conf.get(OZONE_OM_ADDRESS_KEY));
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);

final String dir = OZONE_ROOT + bucket.getVolumeName()
+ OZONE_URI_DELIMITER + bucket.getName();
final Path file = new Path(dir, "file");

FileSystem fs = FileSystem.get(conf);
final FSDataOutputStream os = fs.create(file, true);
// Verify output stream supports hsync() and hflush().
assertTrue("KeyOutputStream should support hflush()!",
os.hasCapability(StreamCapabilities.HFLUSH));
assertTrue("KeyOutputStream should support hsync()!",
os.hasCapability(StreamCapabilities.HSYNC));
os.close();
}

@Test
public void testECStreamCapability() throws Exception {
// create EC bucket to be used by OzoneFileSystem
BucketArgs.Builder builder = BucketArgs.newBuilder();
builder.setStorageType(StorageType.DISK);
builder.setBucketLayout(BucketLayout.FILE_SYSTEM_OPTIMIZED);
builder.setDefaultReplicationConfig(
new DefaultReplicationConfig(
new ECReplicationConfig(
3, 2, ECReplicationConfig.EcCodec.RS, 1024)));
BucketArgs omBucketArgs = builder.build();
String ecBucket = UUID.randomUUID().toString();
TestDataUtil.createBucket(cluster, bucket.getVolumeName(), omBucketArgs,
ecBucket);
String ecUri = String.format("%s://%s.%s/",
OzoneConsts.OZONE_URI_SCHEME, ecBucket, bucket.getVolumeName());
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, ecUri);

final String dir = OZONE_ROOT + bucket.getVolumeName()
+ OZONE_URI_DELIMITER + bucket.getName();
final Path file = new Path(dir, "file");

FileSystem fs = FileSystem.get(conf);
final FSDataOutputStream os = fs.create(file, true);
// Verify output stream supports hsync() and hflush().
assertFalse("ECKeyOutputStream should not support hflush()!",
os.hasCapability(StreamCapabilities.HFLUSH));
assertFalse("ECKeyOutputStream should not support hsync()!",
os.hasCapability(StreamCapabilities.HSYNC));
os.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,11 @@

<property>
<name>fs.contract.supports-hsync</name>
<value>false</value>
<!-- TODO: switch to true after HDDS-7688 -->
<value>true</value>
</property>

<property>
<name>fs.contract.supports-hflush</name>
<value>false</value>
<!-- TODO: switch to true after HDDS-7688 -->
<value>true</value>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -1283,6 +1283,7 @@ message ListKeysResponse {
message CommitKeyRequest {
required KeyArgs keyArgs = 1;
required uint64 clientID = 2;
optional bool hsync = 3;
}

message CommitKeyResponse {
Expand Down
Loading