Skip to content

Commit

Permalink
Fix netty tests
Browse files Browse the repository at this point in the history
  • Loading branch information
aaudiber committed Apr 21, 2017
1 parent 288b0d5 commit 981f12e
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 21 deletions.
Expand Up @@ -14,6 +14,7 @@
import alluxio.EmbeddedNoExceptionChannel; import alluxio.EmbeddedNoExceptionChannel;
import alluxio.network.protocol.RPCProtoMessage; import alluxio.network.protocol.RPCProtoMessage;
import alluxio.proto.dataserver.Protocol; import alluxio.proto.dataserver.Protocol;
import alluxio.proto.status.Status.PStatus;
import alluxio.util.proto.ProtoMessage; import alluxio.util.proto.ProtoMessage;
import alluxio.worker.block.BlockWorker; import alluxio.worker.block.BlockWorker;
import alluxio.worker.block.io.BlockReader; import alluxio.worker.block.io.BlockReader;
Expand Down Expand Up @@ -80,7 +81,7 @@ public void readFailure() throws Exception {
mBlockReader.close(); mBlockReader.close();
mChannelNoException.writeInbound(buildReadRequest(0, fileSize)); mChannelNoException.writeInbound(buildReadRequest(0, fileSize));
Object response = waitForOneResponse(mChannelNoException); Object response = waitForOneResponse(mChannelNoException);
checkReadResponse(response, Protocol.Status.Code.INTERNAL); checkReadResponse(response, PStatus.FAILED_PRECONDITION);
} }


@Override @Override
Expand Down
Expand Up @@ -16,6 +16,7 @@
import alluxio.network.protocol.databuffer.DataBuffer; import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.network.protocol.databuffer.DataNettyBufferV2; import alluxio.network.protocol.databuffer.DataNettyBufferV2;
import alluxio.proto.dataserver.Protocol; import alluxio.proto.dataserver.Protocol;
import alluxio.proto.status.Status.PStatus;
import alluxio.util.io.BufferUtils; import alluxio.util.io.BufferUtils;
import alluxio.util.proto.ProtoMessage; import alluxio.util.proto.ProtoMessage;
import alluxio.worker.block.BlockWorker; import alluxio.worker.block.BlockWorker;
Expand Down Expand Up @@ -71,7 +72,7 @@ public void writeFailure() throws Exception {
mChannelNoException.writeInbound(buildWriteRequest(PACKET_SIZE, PACKET_SIZE)); mChannelNoException.writeInbound(buildWriteRequest(PACKET_SIZE, PACKET_SIZE));
Object writeResponse = waitForResponse(mChannelNoException); Object writeResponse = waitForResponse(mChannelNoException);
Assert.assertTrue(writeResponse instanceof RPCProtoMessage); Assert.assertTrue(writeResponse instanceof RPCProtoMessage);
checkWriteResponse(writeResponse, Protocol.Status.Code.INTERNAL); checkWriteResponse(writeResponse, PStatus.FAILED_PRECONDITION);
} }


@Override @Override
Expand Down
Expand Up @@ -14,12 +14,12 @@
import alluxio.Configuration; import alluxio.Configuration;
import alluxio.Constants; import alluxio.Constants;
import alluxio.PropertyKey; import alluxio.PropertyKey;
import alluxio.exception.status.Status;
import alluxio.network.protocol.RPCProtoMessage; import alluxio.network.protocol.RPCProtoMessage;
import alluxio.network.protocol.databuffer.DataBuffer; import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.network.protocol.databuffer.DataFileChannel; import alluxio.network.protocol.databuffer.DataFileChannel;
import alluxio.network.protocol.databuffer.DataNettyBufferV2; import alluxio.network.protocol.databuffer.DataNettyBufferV2;
import alluxio.proto.dataserver.Protocol; import alluxio.proto.dataserver.Protocol;
import alluxio.proto.status.Status.PStatus;
import alluxio.util.CommonUtils; import alluxio.util.CommonUtils;
import alluxio.util.WaitForOptions; import alluxio.util.WaitForOptions;
import alluxio.util.io.BufferUtils; import alluxio.util.io.BufferUtils;
Expand Down Expand Up @@ -98,7 +98,7 @@ public void readEmptyFile() throws Exception {
populateInputFile(0, 0, 0); populateInputFile(0, 0, 0);
mChannelNoException.writeInbound(buildReadRequest(0, 0)); mChannelNoException.writeInbound(buildReadRequest(0, 0));
Object response = waitForOneResponse(mChannelNoException); Object response = waitForOneResponse(mChannelNoException);
checkReadResponse(response, Protocol.Status.Code.INVALID_ARGUMENT); checkReadResponse(response, PStatus.INVALID_ARGUMENT);
} }


/** /**
Expand All @@ -124,7 +124,7 @@ public void cancelRequest() throws Exception {
// There is small chance that we can still receive an OK response here because it is too // There is small chance that we can still receive an OK response here because it is too
// fast to read all the data. If that ever happens, either increase the file size or allow it // fast to read all the data. If that ever happens, either increase the file size or allow it
// to be OK here. // to be OK here.
DataBuffer buffer = checkReadResponse(response, Protocol.Status.Code.CANCELED); DataBuffer buffer = checkReadResponse(response, PStatus.CANCELED);
if (buffer == null) { if (buffer == null) {
eof = true; eof = true;
break; break;
Expand Down Expand Up @@ -183,7 +183,7 @@ protected void checkAllReadResponses(EmbeddedChannel channel, long checksumExpec
Assert.fail(); Assert.fail();
break; break;
} }
DataBuffer buffer = checkReadResponse(readResponse, Protocol.Status.Code.OK); DataBuffer buffer = checkReadResponse(readResponse, PStatus.OK);
eof = buffer == null; eof = buffer == null;
if (buffer != null) { if (buffer != null) {
if (buffer instanceof DataNettyBufferV2) { if (buffer instanceof DataNettyBufferV2) {
Expand Down Expand Up @@ -214,18 +214,17 @@ protected void checkAllReadResponses(EmbeddedChannel channel, long checksumExpec
* @param statusExpected the expected error code * @param statusExpected the expected error code
* @return the data buffer extracted from the read response * @return the data buffer extracted from the read response
*/ */
protected DataBuffer checkReadResponse(Object readResponse, Status statusExpected) { protected DataBuffer checkReadResponse(Object readResponse, PStatus statusExpected) {
Assert.assertTrue(readResponse instanceof RPCProtoMessage); Assert.assertTrue(readResponse instanceof RPCProtoMessage);


ProtoMessage response = ((RPCProtoMessage) readResponse).getMessage(); ProtoMessage response = ((RPCProtoMessage) readResponse).getMessage();
Assert.assertTrue(response.getType() == ProtoMessage.Type.RESPONSE); Assert.assertTrue(response.getType() == ProtoMessage.Type.RESPONSE);
DataBuffer buffer = ((RPCProtoMessage) readResponse).getPayloadDataBuffer(); DataBuffer buffer = ((RPCProtoMessage) readResponse).getPayloadDataBuffer();
if (buffer != null) { if (buffer != null) {
Assert.assertEquals(Status.OK, Assert.assertEquals(PStatus.OK, response.<Protocol.Response>getMessage().getStatus());
response.<Protocol.Response>getMessage().getStatus().getCode());
} else { } else {
Assert.assertEquals(statusExpected, Assert.assertEquals(statusExpected,
response.<Protocol.Response>getMessage().getStatus().getCode()); response.<Protocol.Response>getMessage().getStatus());
} }
return buffer; return buffer;
} }
Expand Down
Expand Up @@ -13,8 +13,9 @@


import alluxio.EmbeddedNoExceptionChannel; import alluxio.EmbeddedNoExceptionChannel;
import alluxio.network.protocol.RPCProtoMessage; import alluxio.network.protocol.RPCProtoMessage;
import alluxio.util.proto.ProtoMessage;
import alluxio.proto.dataserver.Protocol; import alluxio.proto.dataserver.Protocol;
import alluxio.proto.status.Status.PStatus;
import alluxio.util.proto.ProtoMessage;
import alluxio.worker.file.FileSystemWorker; import alluxio.worker.file.FileSystemWorker;


import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.embedded.EmbeddedChannel;
Expand Down Expand Up @@ -58,7 +59,7 @@ public void readFailure() throws Exception {
mInputStream.close(); mInputStream.close();
mChannelNoException.writeInbound(buildReadRequest(0, fileSize)); mChannelNoException.writeInbound(buildReadRequest(0, fileSize));
Object response = waitForOneResponse(mChannelNoException); Object response = waitForOneResponse(mChannelNoException);
checkReadResponse(response, Protocol.Status.Code.INTERNAL); checkReadResponse(response, PStatus.UNAVAILABLE);
} }


@Override @Override
Expand Down
Expand Up @@ -16,6 +16,7 @@
import alluxio.network.protocol.databuffer.DataBuffer; import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.network.protocol.databuffer.DataNettyBufferV2; import alluxio.network.protocol.databuffer.DataNettyBufferV2;
import alluxio.proto.dataserver.Protocol; import alluxio.proto.dataserver.Protocol;
import alluxio.proto.status.Status.PStatus;
import alluxio.underfs.UnderFileSystem; import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.options.CreateOptions; import alluxio.underfs.options.CreateOptions;
import alluxio.util.io.BufferUtils; import alluxio.util.io.BufferUtils;
Expand Down Expand Up @@ -78,7 +79,7 @@ public void writeFailure() throws Exception {
mOutputStream.close(); mOutputStream.close();
mChannelNoException.writeInbound(buildWriteRequest(PACKET_SIZE, PACKET_SIZE)); mChannelNoException.writeInbound(buildWriteRequest(PACKET_SIZE, PACKET_SIZE));
Object writeResponse = waitForResponse(mChannelNoException); Object writeResponse = waitForResponse(mChannelNoException);
checkWriteResponse(writeResponse, Protocol.Status.Code.INTERNAL); checkWriteResponse(writeResponse, PStatus.UNAVAILABLE);
} }


@Override @Override
Expand Down
Expand Up @@ -12,9 +12,9 @@
package alluxio.worker.netty; package alluxio.worker.netty;


import alluxio.Constants; import alluxio.Constants;
import alluxio.exception.status.Status;
import alluxio.network.protocol.RPCProtoMessage; import alluxio.network.protocol.RPCProtoMessage;
import alluxio.proto.dataserver.Protocol; import alluxio.proto.dataserver.Protocol;
import alluxio.proto.status.Status.PStatus;
import alluxio.util.CommonUtils; import alluxio.util.CommonUtils;
import alluxio.util.WaitForOptions; import alluxio.util.WaitForOptions;
import alluxio.util.io.BufferUtils; import alluxio.util.io.BufferUtils;
Expand Down Expand Up @@ -56,7 +56,7 @@ public void writeEmptyFile() throws Exception {
mChannel.writeInbound(buildWriteRequest(0, 0)); mChannel.writeInbound(buildWriteRequest(0, 0));


Object writeResponse = waitForResponse(mChannel); Object writeResponse = waitForResponse(mChannel);
checkWriteResponse(writeResponse, Status.OK); checkWriteResponse(writeResponse, PStatus.OK);
} }


/** /**
Expand All @@ -73,7 +73,7 @@ public void writeNonEmptyFile() throws Exception {
mChannel.writeInbound(buildWriteRequest(len, 0)); mChannel.writeInbound(buildWriteRequest(len, 0));


Object writeResponse = waitForResponse(mChannel); Object writeResponse = waitForResponse(mChannel);
checkWriteResponse(writeResponse, Status.OK); checkWriteResponse(writeResponse, PStatus.OK);
checkFileContent(len); checkFileContent(len);
} }


Expand All @@ -91,7 +91,7 @@ public void cancel() throws Exception {
mChannel.writeInbound(buildWriteRequest(len, -1)); mChannel.writeInbound(buildWriteRequest(len, -1));


Object writeResponse = waitForResponse(mChannel); Object writeResponse = waitForResponse(mChannel);
checkWriteResponse(writeResponse, Status.CANCELED); checkWriteResponse(writeResponse, PStatus.CANCELED);
// Our current implementation does not really abort the file when the write is cancelled. // Our current implementation does not really abort the file when the write is cancelled.
// The client issues another request to block worker to abort it. // The client issues another request to block worker to abort it.
checkFileContent(len); checkFileContent(len);
Expand All @@ -106,7 +106,7 @@ public void writeInvalidOffset() throws Exception {
mChannelNoException.writeInbound(buildWriteRequest(PACKET_SIZE + 1, PACKET_SIZE)); mChannelNoException.writeInbound(buildWriteRequest(PACKET_SIZE + 1, PACKET_SIZE));
Object writeResponse = waitForResponse(mChannelNoException); Object writeResponse = waitForResponse(mChannelNoException);
Assert.assertTrue(writeResponse instanceof RPCProtoMessage); Assert.assertTrue(writeResponse instanceof RPCProtoMessage);
checkWriteResponse(writeResponse, Status.INVALID_ARGUMENT); checkWriteResponse(writeResponse, PStatus.INVALID_ARGUMENT);
} }


/** /**
Expand All @@ -115,13 +115,12 @@ public void writeInvalidOffset() throws Exception {
* @param writeResponse the write response * @param writeResponse the write response
* @param statusExpected the expected status code * @param statusExpected the expected status code
*/ */
protected void checkWriteResponse(Object writeResponse, Status statusExpected) { protected void checkWriteResponse(Object writeResponse, PStatus statusExpected) {
Assert.assertTrue(writeResponse instanceof RPCProtoMessage); Assert.assertTrue(writeResponse instanceof RPCProtoMessage);


ProtoMessage response = ((RPCProtoMessage) writeResponse).getMessage(); ProtoMessage response = ((RPCProtoMessage) writeResponse).getMessage();
Assert.assertTrue(response.getType() == ProtoMessage.Type.RESPONSE); Assert.assertTrue(response.getType() == ProtoMessage.Type.RESPONSE);
Assert.assertEquals(Status.toProto(statusExpected), Assert.assertEquals(statusExpected, response.<Protocol.Response>getMessage().getStatus());
response.<Protocol.Response>getMessage().getException().getStatus());
} }


/** /**
Expand Down

0 comments on commit 981f12e

Please sign in to comment.