Skip to content

Commit

Permalink
Frame encode reqresp error (#4466)
Browse files Browse the repository at this point in the history
* v1.0.0

* Frame encode ReqResp errors

* Fix e2e tests
  • Loading branch information
dapplion committed Aug 22, 2022
1 parent 709cf39 commit 1495a7f
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 34 deletions.
22 changes: 11 additions & 11 deletions packages/beacon-node/src/network/reqresp/encoders/responseDecode.ts
Expand Up @@ -90,23 +90,23 @@ export async function readResultHeader(bufferedSource: BufferedSource): Promise<
* ```
*/
export async function readErrorMessage(bufferedSource: BufferedSource): Promise<string> {
// Read at least 256 or wait for the stream to end
for await (const buffer of bufferedSource) {
// Wait for next chunk with bytes or for the stream to end
// Note: The entire <error_message> is expected to be in the same chunk
if (buffer.length === 0) {
continue;
}

const bytes = buffer.slice();
try {
return decodeErrorMessage(bytes);
} catch {
return bytes.toString("hex");
if (buffer.length >= 256) {
break;
}
}

// Error message is optional and may not be included in the response stream
return "";
const bytes = bufferedSource["buffer"].slice(0, 256);

try {
return decodeErrorMessage(bytes);
} catch {
// Error message is optional and may not be included in the response stream
return bytes.toString("hex");
}
}

/**
Expand Down
Expand Up @@ -57,6 +57,7 @@ export function responseEncodeSuccess(
* fn yields exactly one `<error_response>` and afterwards the stream must be terminated
*/
export async function* responseEncodeError(
protocol: Protocol,
status: RpcResponseStatusError,
errorMessage: string
): AsyncGenerator<Buffer> {
Expand All @@ -65,7 +66,7 @@ export async function* responseEncodeError(

// <error_message>? is optional
if (errorMessage) {
yield encodeErrorMessage(errorMessage);
yield* encodeErrorMessage(errorMessage, protocol.encoding);
}
}

Expand Down
Expand Up @@ -7,6 +7,8 @@ import {SnappyFramesUncompress} from "./snappyFrames/uncompress.js";
import {maxEncodedLen} from "./utils.js";
import {SszSnappyError, SszSnappyErrorCode} from "./errors.js";

export type RequestOrResponseTypeRead = Pick<RequestOrResponseType, "minSize" | "maxSize" | "deserialize">;

/**
* ssz_snappy encoding strategy reader.
* Consumes a stream source to read encoded header and payload as defined in the spec:
Expand All @@ -16,7 +18,7 @@ import {SszSnappyError, SszSnappyErrorCode} from "./errors.js";
*/
export async function readSszSnappyPayload<T extends RequestOrIncomingResponseBody>(
bufferedSource: BufferedSource,
type: RequestOrResponseType
type: RequestOrResponseTypeRead
): Promise<T> {
const sszDataLength = await readSszSnappyHeader(bufferedSource, type);

Expand All @@ -28,7 +30,10 @@ export async function readSszSnappyPayload<T extends RequestOrIncomingResponseBo
* Reads `<encoding-dependent-header>` for ssz-snappy.
* encoding-header ::= the length of the raw SSZ bytes, encoded as an unsigned protobuf varint
*/
async function readSszSnappyHeader(bufferedSource: BufferedSource, type: RequestOrResponseType): Promise<number> {
export async function readSszSnappyHeader(
bufferedSource: BufferedSource,
type: Pick<RequestOrResponseType, "minSize" | "maxSize">
): Promise<number> {
for await (const buffer of bufferedSource) {
// Get next bytes if empty
if (buffer.length === 0) {
Expand Down Expand Up @@ -74,7 +79,7 @@ async function readSszSnappyHeader(bufferedSource: BufferedSource, type: Request
* Reads `<encoded-payload>` for ssz-snappy and decompress.
* The returned bytes can be SSZ deseralized
*/
async function readSszSnappyBody(bufferedSource: BufferedSource, sszDataLength: number): Promise<Buffer> {
export async function readSszSnappyBody(bufferedSource: BufferedSource, sszDataLength: number): Promise<Buffer> {
const decompressor = new SnappyFramesUncompress();
const uncompressedData = new BufferList();
let readBytes = 0;
Expand Down Expand Up @@ -124,7 +129,10 @@ async function readSszSnappyBody(bufferedSource: BufferedSource, sszDataLength:
* Deseralizes SSZ body.
* `isSszTree` option allows the SignedBeaconBlock type to be deserialized as a tree
*/
function deserializeSszBody<T extends RequestOrIncomingResponseBody>(bytes: Buffer, type: RequestOrResponseType): T {
function deserializeSszBody<T extends RequestOrIncomingResponseBody>(
bytes: Buffer,
type: RequestOrResponseTypeRead
): T {
try {
return type.deserialize(bytes) as T;
} catch (e) {
Expand Down
Expand Up @@ -17,18 +17,19 @@ export async function* writeSszSnappyPayload<T extends RequestOrOutgoingResponse
): AsyncGenerator<Buffer> {
const serializedBody = serializeSszBody(body, serializer);

// MUST encode the length of the raw SSZ bytes, encoded as an unsigned protobuf varint
yield Buffer.from(varint.encode(serializedBody.length));

// By first computing and writing the SSZ byte length, the SSZ encoder can then directly
// write the chunk contents to the stream. Snappy writer compresses frame by frame
yield* encodeSszSnappy(serializedBody);
}

/**
* Buffered Snappy writer
*/
function encodeSszSnappy(bytes: Buffer): AsyncGenerator<Buffer> {
export async function* encodeSszSnappy(bytes: Buffer): AsyncGenerator<Buffer> {
// MUST encode the length of the raw SSZ bytes, encoded as an unsigned protobuf varint
yield Buffer.from(varint.encode(bytes.length));

// By first computing and writing the SSZ byte length, the SSZ encoder can then directly
// write the chunk contents to the stream. Snappy writer compresses frame by frame

/**
* Use sync version (default) for compress as it is almost 2x faster than async
* one and most payloads are "1 chunk" and 100kb payloads (which would mostly be
Expand All @@ -40,7 +41,7 @@ function encodeSszSnappy(bytes: Buffer): AsyncGenerator<Buffer> {
const stream = snappy.createCompressStream();
stream.write(bytes);
stream.end();
return source<Buffer>(stream);
yield* source<Buffer>(stream);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/network/reqresp/response/index.ts
Expand Up @@ -78,7 +78,7 @@ export async function handleRequest(
);
} catch (e) {
const status = e instanceof ResponseError ? e.status : RespStatus.SERVER_ERROR;
yield* responseEncodeError(status, (e as Error).message);
yield* responseEncodeError(protocol, status, (e as Error).message);

// Should not throw an error here or libp2p-mplex throws with 'AbortError: stream reset'
// throw e;
Expand Down
12 changes: 10 additions & 2 deletions packages/beacon-node/src/network/reqresp/utils/errorMessage.ts
@@ -1,3 +1,6 @@
import {encodeSszSnappy} from "../encodingStrategies/sszSnappy/encode.js";
import {Encoding} from "../types.js";

// ErrorMessage schema:
//
// (
Expand All @@ -12,9 +15,14 @@
/**
* Encodes a UTF-8 string to 256 bytes max
*/
export function encodeErrorMessage(errorMessage: string): Buffer {
export async function* encodeErrorMessage(errorMessage: string, encoding: Encoding): AsyncGenerator<Buffer> {
const encoder = new TextEncoder();
return Buffer.from(encoder.encode(errorMessage).slice(0, 256));
const bytes = Buffer.from(encoder.encode(errorMessage).slice(0, 256));

switch (encoding) {
case Encoding.SSZ_SNAPPY:
yield* encodeSszSnappy(bytes);
}
}

/**
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/test/e2e/network/reqresp.test.ts
Expand Up @@ -209,7 +209,7 @@ describe("network / ReqResp", function () {
await expectRejectedWithLodestarError(
netA.reqResp.beaconBlocksByRange(netB.peerId, {startSlot: 0, step: 1, count: 3}),
new RequestError(
{code: RequestErrorCode.SERVER_ERROR, errorMessage: testErrorMessage},
{code: RequestErrorCode.SERVER_ERROR, errorMessage: "sNaPpYa" + testErrorMessage},
formatMetadata(Method.BeaconBlocksByRange, Encoding.SSZ_SNAPPY, netB.peerId)
)
);
Expand All @@ -228,7 +228,7 @@ describe("network / ReqResp", function () {
await expectRejectedWithLodestarError(
netA.reqResp.beaconBlocksByRange(netB.peerId, {startSlot: 0, step: 1, count: 3}),
new RequestError(
{code: RequestErrorCode.SERVER_ERROR, errorMessage: testErrorMessage},
{code: RequestErrorCode.SERVER_ERROR, errorMessage: "sNaPpYa" + testErrorMessage},
formatMetadata(Method.BeaconBlocksByRange, Encoding.SSZ_SNAPPY, netB.peerId)
)
);
Expand Down
Expand Up @@ -5,7 +5,7 @@ import all from "it-all";
import {ForkName, SLOTS_PER_EPOCH} from "@lodestar/params";
import {chainConfig} from "@lodestar/config/default";
import {createIBeaconConfig} from "@lodestar/config";
import {LodestarError} from "@lodestar/utils";
import {fromHex, LodestarError} from "@lodestar/utils";
import {allForks} from "@lodestar/types";
import {
Method,
Expand Down Expand Up @@ -236,8 +236,12 @@ describe("network / reqresp / encoders / response - Success and error cases", ()
},
{
id: "SERVER_ERROR - with error message",
decodeError: new ResponseError(RespStatus.SERVER_ERROR, "TEST_ERROR"),
chunks: [Buffer.from([RespStatus.SERVER_ERROR]), Buffer.from("TEST_ERROR")],
decodeError: new ResponseError(RespStatus.SERVER_ERROR, "sNaPpYIzTEST_ERROR"),
chunks: [
Buffer.from([RespStatus.SERVER_ERROR]),
fromHexBuf("0x0a"),
fromHexBuf("0xff060000734e61507059010e000049b97aaf544553545f4552524f52"),
],
responseChunks: [{status: RespStatus.SERVER_ERROR, errorMessage: "TEST_ERROR"}],
},
// This last two error cases are not possible to encode since are invalid. Test decoding only
Expand Down Expand Up @@ -265,7 +269,7 @@ describe("network / reqresp / encoders / response - Success and error cases", ()
responseEncodeSuccess(config, protocol)
);
} else {
yield* responseEncodeError(chunk.status, chunk.errorMessage);
yield* responseEncodeError(protocol, chunk.status, chunk.errorMessage);
}
}
}
Expand Down Expand Up @@ -327,3 +331,7 @@ function onlySuccessChunks(responseChunks: ResponseChunk[]): IncomingResponseBod
}
return bodyArr;
}

function fromHexBuf(hex: string): Buffer {
return Buffer.from(fromHex(hex));
}
@@ -1,6 +1,6 @@
import chai, {expect} from "chai";
import chaiAsPromised from "chai-as-promised";
import {LodestarError} from "@lodestar/utils";
import {LodestarError, fromHex} from "@lodestar/utils";
import {RespStatus} from "../../../../../src/constants/index.js";
import {Method, Encoding, Version} from "../../../../../src/network/reqresp/types.js";
import {handleRequest, PerformRequestHandler} from "../../../../../src/network/reqresp/response/index.js";
Expand Down Expand Up @@ -52,7 +52,8 @@ describe("network / reqresp / response / handleRequest", async () => {
...sszSnappyPing.chunks,
// Chunk 2 - error, with errorMessage
Buffer.from([RespStatus.SERVER_ERROR]),
Buffer.from("TEST_ERROR"),
Buffer.from(fromHex("0x0a")),
Buffer.from(fromHex("0xff060000734e61507059010e000049b97aaf544553545f4552524f52")),
],
},
];
Expand Down

0 comments on commit 1495a7f

Please sign in to comment.