Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry {
private ECBlockOutputStream[] blockOutputStreams;
private int currentStreamIdx = 0;
private long successfulBlkGrpAckedLen;
private long position;

@SuppressWarnings({"parameternumber", "squid:S00107"})
ECBlockOutputStreamEntry(BlockID blockID, String key,
Expand All @@ -79,6 +80,7 @@ public class ECBlockOutputStreamEntry extends BlockOutputStreamEntry {
this.replicationConfig =
(ECReplicationConfig) pipeline.getReplicationConfig();
this.length = replicationConfig.getData() * length;
this.position = 0;
}

@Override
Expand Down Expand Up @@ -125,11 +127,25 @@ public long getLength() {
return length;
}

@Override
public void write(int b) throws IOException {
write(new byte[]{(byte) b}, 0, 1);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
super.write(b, off, len);
position += len;
if (position % replicationConfig.getEcChunkSize() == 0) {
useNextBlockStream();
}
}

public int getCurrentStreamIdx() {
return currentStreamIdx;
}

public void useNextBlockStream() {
private void useNextBlockStream() {
currentStreamIdx =
(currentStreamIdx + 1) % replicationConfig.getRequiredNodes();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,11 @@ private StripeWriteStatus rewriteStripeToNewBlockGroup(
if (currentLen > 0) {
handleOutputStreamWrite(i, currentLen, false);
}
currentStreamEntry.useNextBlockStream();
totalLenToWrite -= currentLen;
}
for (int i = 0; i < (numParityBlks); i++) {
handleOutputStreamWrite(i + numDataBlks, failedParityStripeChunkLens[i],
true);
currentStreamEntry.useNextBlockStream();
}

if (hasWriteFailure()) {
Expand Down Expand Up @@ -379,14 +377,12 @@ private int handleDataWrite(int currIdx, byte[] b, int off, long len,
"When full cell passed, the pos: " + pos
+ " should match to ec chunk size.");
handleOutputStreamWrite(currIdx, pos, false);
blockOutputStreamEntryPool.getCurrentStreamEntry().useNextBlockStream();
}
return pos;
}

private void handleParityWrite(int currIdx, int len) {
handleOutputStreamWrite(currIdx, len, true);
blockOutputStreamEntryPool.getCurrentStreamEntry().useNextBlockStream();
}

private void handleOutputStreamWrite(int currIdx, int len, boolean isParity) {
Expand Down Expand Up @@ -424,8 +420,7 @@ private long writeToOutputStream(ECBlockOutputStreamEntry current,
LOG.debug(
"Exception while writing the cell buffers. The writeLen: " + writeLen
+ ". The block internal index is: "
+ current
.getCurrentStreamIdx(), ioe);
+ current.getCurrentStreamIdx(), ioe);
handleException(current, ioe);
}
return writeLen;
Expand Down