Skip to content

Commit

Permalink
feat(cli):support protobuf format
Browse files Browse the repository at this point in the history
  • Loading branch information
ni00 committed Jun 27, 2023
1 parent 7a1fa7f commit f12c3b3
Show file tree
Hide file tree
Showing 8 changed files with 794 additions and 655 deletions.
1 change: 1 addition & 0 deletions cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"concat-stream": "^2.0.0",
"core-js": "^3.26.0",
"mqtt": "^4.3.7",
"protobufjs": "^7.2.3",
"pump": "^3.0.0",
"readable-stream": "^3.6.0",
"signale": "^1.4.0",
Expand Down
4 changes: 4 additions & 0 deletions cli/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ export class Commander {
'--config [PATH]',
'load the parameters from the local configuration file, which supports json and yaml format, default path is ./mqttx-cli-config.json',
)
.option('-Pp, --protobuf-path <PATH>', 'the .proto file that defines the message format of protobuf')
.option('-Pmn, --protobuf-message-name <NAME>', 'the name of the protobuf message type')
.action(pub)

this.program
Expand Down Expand Up @@ -280,6 +282,8 @@ export class Commander {
'--config [PATH]',
'load the parameters from the local configuration file, which supports json and yaml format, default path is ./mqttx-cli-config.json',
)
.option('-Pp, --protobuf-path <PATH>', 'the .proto file that defines the message format of protobuf')
.option('-Pmn, --protobuf-message-name <NAME>', 'the name of the protobuf message type')
.action(sub)

const benchCmd = this.program.command('bench').description('MQTT Benchmark in performance testing.')
Expand Down
30 changes: 23 additions & 7 deletions cli/src/lib/pub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,27 @@ import { parseConnectOptions, parsePublishOptions, checkTopicExists, checkScenar
import delay from '../utils/delay'
import { saveConfig, loadConfig } from '../utils/config'
import { loadSimulator } from '../utils/simulate'

import { serializeProtobufToBuffer } from '../utils/protobuf'
const send = (
config: boolean | string | undefined,
connOpts: IClientOptions,
pubOpts: { topic: string; message: string | Buffer; opts: IClientPublishOptions },
pubOpts: {
topic: string
message: string | Buffer
protobufPath: string | undefined
protobufMessageName: string | undefined
opts: IClientPublishOptions
},
) => {
const client = mqtt.connect(connOpts)
basicLog.connecting(config, connOpts.hostname!, connOpts.port, pubOpts.topic, pubOpts.message.toString())
client.on('connect', () => {
basicLog.connected()
const { topic, message } = pubOpts
const { topic, message, protobufPath, protobufMessageName } = pubOpts
basicLog.publishing()
client.publish(topic, message, pubOpts.opts, (err) => {

let bufferMessage = serializeProtobufToBuffer(message, protobufPath, protobufMessageName)
client.publish(topic, bufferMessage, pubOpts.opts, (err) => {
if (err) {
signale.warn(err)
} else {
Expand All @@ -46,7 +54,13 @@ const send = (
const multisend = (
config: boolean | string | undefined,
connOpts: IClientOptions,
pubOpts: { topic: string; message: string | Buffer; opts: IClientPublishOptions },
pubOpts: {
topic: string
message: string | Buffer
protobufPath: string | undefined
protobufMessageName: string | undefined
opts: IClientPublishOptions
},
maximumReconnectTimes: number,
) => {
let isNewConnection = true
Expand All @@ -57,8 +71,10 @@ const multisend = (
objectMode: true,
})
sender._write = (line, _enc, cb) => {
const { topic, opts } = pubOpts
client.publish(topic, line.trim(), opts, cb)
const { topic, opts, protobufPath, protobufMessageName } = pubOpts

let bufferMessage = serializeProtobufToBuffer(line.trim(), protobufPath, protobufMessageName)
client.publish(topic, bufferMessage, opts, cb)
}

client.on('connect', () => {
Expand Down
6 changes: 4 additions & 2 deletions cli/src/lib/sub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { parseConnectOptions, parseSubscribeOptions, checkTopicExists } from '..
import delay from '../utils/delay'
import convertPayload from '../utils/convertPayload'
import { saveConfig, loadConfig } from '../utils/config'
import { deserializeBufferToProtobuf } from '../utils/protobuf'

const sub = (options: SubscribeOptions) => {
const { save, config } = options
Expand Down Expand Up @@ -59,13 +60,14 @@ const sub = (options: SubscribeOptions) => {
})

client.on('message', (topic, payload, packet) => {
const { format } = options
const { format, protobufPath, protobufMessageName } = options

const msgData: Record<string, unknown>[] = []

options.verbose && msgData.push({ label: 'topic', value: topic })

msgData.push({ label: 'payload', value: convertPayload(payload, format) })
let payloadMessage = deserializeBufferToProtobuf(payload, protobufPath, protobufMessageName)
msgData.push({ label: 'payload', value: payloadMessage ? payloadMessage : convertPayload(payload, format) })

packet.retain && msgData.push({ label: 'retain', value: packet.retain })

Expand Down
8 changes: 6 additions & 2 deletions cli/src/types/global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ declare global {
subscriptionIdentifier?: number
contentType?: string
connUserProperties?: Record<string, string | string[]>
protobufPath?: string
protobufMessageName?: string
}

interface SubscribeOptions extends ConnectOptions {
Expand All @@ -86,14 +88,16 @@ declare global {
outputMode?: OutputMode
verbose: boolean
connUserProperties?: Record<string, string | string[]>
protobufPath?: string
protobufMessageName?: string
}

interface BenchConnectOptions extends ConnectOptions {
count: number
interval: number
}

type OmitPublishOptions = Omit<PublishOptions, 'stdin' | 'multiline'>
type OmitPublishOptions = Omit<PublishOptions, 'stdin' | 'multiline' | 'protobufPath' | 'protobufMessageName'>

interface BenchPublishOptions extends OmitPublishOptions {
count: number
Expand All @@ -102,7 +106,7 @@ declare global {
verbose: boolean
}

type OmitSubscribeOptions = Omit<SubscribeOptions, 'format' | 'outputMode'>
type OmitSubscribeOptions = Omit<SubscribeOptions, 'format' | 'outputMode' | 'protobufPath' | 'protobufMessageName'>

interface BenchSubscribeOptions extends OmitSubscribeOptions {
count: number
Expand Down
4 changes: 3 additions & 1 deletion cli/src/utils/parse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ const parsePublishOptions = (options: PublishOptions) => {
userProperties,
subscriptionIdentifier,
contentType,
protobufPath,
protobufMessageName,
} = options

const publishOptions: IClientPublishOptions = {
Expand All @@ -314,7 +316,7 @@ const parsePublishOptions = (options: PublishOptions) => {
)
}

return { topic, message, opts: publishOptions }
return { topic, message, protobufPath, protobufMessageName, opts: publishOptions }
}

const parseSubscribeOptions = (options: SubscribeOptions) => {
Expand Down
29 changes: 29 additions & 0 deletions cli/src/utils/protobuf.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import protobuf from 'protobufjs'

export const serializeProtobufToBuffer = (
raw: string | Buffer,
protobufPath: string | undefined,
protobufMessageName: string | undefined,
): Buffer => {
let bufferMessage = Buffer.from(raw)
if (protobufPath && protobufMessageName) {
const root = protobuf.loadSync(protobufPath)
const MyMessage = root.lookupType(protobufMessageName)
const data = MyMessage.create(JSON.parse(raw.toString()))
const serializedMessage = MyMessage.encode(data).finish()
bufferMessage = Buffer.from(serializedMessage)
}
return bufferMessage
}

export const deserializeBufferToProtobuf = (
payload: Buffer,
protobufPath: string | undefined,
protobufMessageName: string | undefined,
): any => {
if (protobufPath && protobufMessageName) {
const root = protobuf.loadSync(protobufPath)
const MyMessage = root.lookupType(protobufMessageName)
return MyMessage.decode(payload)
}
}

0 comments on commit f12c3b3

Please sign in to comment.