Skip to content
This repository has been archived by the owner on Jul 18, 2024. It is now read-only.

Commit

Permalink
Add mcap package with Message Capture parsing support (#1885)
Browse files Browse the repository at this point in the history
**User-Facing Changes**
None

**Description**
Adds a workspace package `@foxglove/mcap` with parsing support for the nascent mcap file format. This PR contains no Studio integration, just adding the package since it's begun to stabilize.

The package includes a validate script which can be used to test with local .mcap files:

```sh
$ yarn workspace @foxglove/mcap validate /path/to/file.mcap [--deserialize [--dump]]
Reading gps.mcap
Read 886.10kiB in 89.54ms (9.66MiB/sec)
Record counts:
       1 Chunk
      22 ChannelInfo
   30445 Message
       1 Footer
```

Because the file format and especially the parser is still evolving, we'll keep it in the studio repo for now and later move it into a separate repo.
  • Loading branch information
jtbandes committed Sep 24, 2021
1 parent 71423b8 commit f976104
Show file tree
Hide file tree
Showing 16 changed files with 1,075 additions and 9 deletions.
2 changes: 1 addition & 1 deletion packages/@types/wasm-lz4/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// file, You can obtain one at http://mozilla.org/MPL/2.0/

declare module "wasm-lz4" {
function decompress(...args: unknown[]): Buffer;
function decompress(buffer: Uint8Array, size: number): Buffer;
namespace decompress {
const isLoaded: Promise<boolean>;
}
Expand Down
8 changes: 8 additions & 0 deletions packages/mcap/jest.config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"testMatch": ["<rootDir>/src/**/*.test.ts(x)?"],
"transform": {
"\\.[jt]sx?$": ["babel-jest", { "rootMode": "upward" }]
},
"//": "Native find is slow because it does not exclude files: https://github.com/facebook/jest/pull/11264#issuecomment-825377579",
"haste": { "forceNodeFilesystemAPI": true }
}
38 changes: 38 additions & 0 deletions packages/mcap/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"name": "@foxglove/mcap",
"description": "Message Capture file reading support in TypeScript",
"license": "MPL-2.0",
"private": true,
"repository": {
"type": "git",
"url": "https://github.com/foxglove/studio.git"
},
"author": {
"name": "Foxglove Technologies",
"email": "support@foxglove.dev"
},
"homepage": "https://foxglove.dev/",
"main": "./src/index.ts",
"files": [
"dist",
"src"
],
"scripts": {
"prepack": "tsc -b",
"validate": "ts-node --project tsconfig.cjs.json scripts/validate.ts"
},
"devDependencies": {
"@foxglove/crc": "0.0.1",
"@foxglove/rosmsg": "2.0.0",
"@foxglove/rosmsg-serialization": "1.2.0",
"@foxglove/rosmsg2-serialization": "1.0.2",
"@types/lodash": "^4",
"commander": "8.2.0",
"lodash": "4.17.21",
"ts-node": "10.2.1",
"typescript": "4.4.3"
},
"dependencies": {
"eventemitter3": "4.0.7"
}
}
175 changes: 175 additions & 0 deletions packages/mcap/scripts/validate.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/

import { program } from "commander";
import fs from "fs";
import { isEqual } from "lodash";
import { performance } from "perf_hooks";
import decompressLZ4 from "wasm-lz4";

import { parse as parseMessageDefinition, RosMsgDefinition } from "@foxglove/rosmsg";
import { LazyMessageReader as ROS1LazyMessageReader } from "@foxglove/rosmsg-serialization";
import { MessageReader as ROS2MessageReader } from "@foxglove/rosmsg2-serialization";

import { McapReader, McapRecord, ChannelInfo } from "../src";

function log(...data: unknown[]) {
// eslint-disable-next-line no-restricted-syntax
console.log(...data);
}

function formatBytes(totalBytes: number) {
const units = ["B", "kiB", "MiB", "GiB", "TiB"];
let bytes = totalBytes;
let unit = 0;
while (unit + 1 < units.length && bytes >= 1024) {
bytes /= 1024;
unit++;
}
return `${bytes.toFixed(2)}${units[unit]!}`;
}

async function validate(
filePath: string,
{ deserialize, dump }: { deserialize: boolean; dump: boolean },
) {
await decompressLZ4.isLoaded;

const recordCounts = new Map<McapRecord["type"], number>();
const channelInfoById = new Map<
number,
{
info: ChannelInfo;
messageDeserializer: ROS2MessageReader | ROS1LazyMessageReader;
parsedDefinitions: RosMsgDefinition[];
}
>();

function processRecord(record: McapRecord) {
recordCounts.set(record.type, (recordCounts.get(record.type) ?? 0) + 1);

switch (record.type) {
default:
break;

case "ChannelInfo": {
const existingInfo = channelInfoById.get(record.id);
if (existingInfo) {
if (!isEqual(existingInfo.info, record)) {
throw new Error(`differing channel infos for for ${record.id}`);
}
break;
}
let parsedDefinitions;
let messageDeserializer;
if (record.schemaFormat === "ros1") {
parsedDefinitions = parseMessageDefinition(
new TextDecoder().decode(record.schemaDefinition),
);
messageDeserializer = new ROS1LazyMessageReader(parsedDefinitions);
} else if (record.schemaFormat === "ros2") {
parsedDefinitions = parseMessageDefinition(
new TextDecoder().decode(record.schemaDefinition),
{
ros2: true,
},
);
messageDeserializer = new ROS2MessageReader(parsedDefinitions);
} else {
throw new Error(`unsupported schema format ${record.schemaFormat}`);
}
channelInfoById.set(record.id, { info: record, messageDeserializer, parsedDefinitions });
break;
}

case "Message": {
const channelInfo = channelInfoById.get(record.channelId);
if (!channelInfo) {
throw new Error(`message for channel ${record.channelId} with no prior channel info`);
}
if (deserialize) {
let message: unknown;
if (channelInfo.messageDeserializer instanceof ROS1LazyMessageReader) {
const size = channelInfo.messageDeserializer.size(new Uint8Array(record.data));
if (size !== record.data.byteLength) {
throw new Error(
`Message size ${size} should match buffer length ${record.data.byteLength}`,
);
}
message = channelInfo.messageDeserializer
.readMessage(new Uint8Array(record.data))
.toJSON();
} else {
message = channelInfo.messageDeserializer.readMessage(new Uint8Array(record.data));
}
if (dump) {
log(message);
}
}
break;
}
}
}

log("Reading", filePath);
const startTime = performance.now();
let readBytes = 0n;
const reader = new McapReader({
includeChunks: true,
decompressHandlers: {
lz4: (buffer, decompressedSize) => {
const result = decompressLZ4(
new Uint8Array(buffer.buffer, buffer.byteOffset, buffer.byteLength),
Number(decompressedSize),
);
return new DataView(result.buffer, result.byteOffset, result.byteLength);
},
},
});

await new Promise<void>((resolve, reject) => {
const stream = fs.createReadStream(filePath);
stream.on("data", (data) => {
try {
if (typeof data === "string") {
throw new Error("expected buffer");
}
readBytes += BigInt(data.byteLength);
reader.append(data);
for (let record; (record = reader.nextRecord()); ) {
processRecord(record);
}
} catch (error) {
reject(error);
stream.close();
}
});
stream.on("error", (error) => reject(error));
stream.on("close", () => resolve());
});

if (!reader.done()) {
throw new Error(`File read incomplete; ${reader.bytesRemaining()} bytes remain after parsing`);
}

const durationMs = performance.now() - startTime;
log(
`Read ${formatBytes(Number(readBytes))} in ${durationMs.toFixed(2)}ms (${formatBytes(
Number(readBytes) / (durationMs / 1000),
)}/sec)`,
);
log("Record counts:");
for (const [type, count] of recordCounts) {
log(` ${count.toFixed().padStart(6, " ")} ${type}`);
}
}

program
.argument("<file>", "path to mcap file")
.option("--deserialize", "deserialize message contents", false)
.option("--dump", "dump message contents to stdout", false)
.action((file: string, options: { deserialize: boolean; dump: boolean }) => {
validate(file, options).catch(console.error);
})
.parse();
Loading

0 comments on commit f976104

Please sign in to comment.