Skip to content

Commit

Permalink
feat(cli):support input formatted messages to publish
Browse files Browse the repository at this point in the history
  • Loading branch information
ni00 committed Jun 28, 2023
1 parent 99e4ce8 commit 3aa1758
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 23 deletions.
6 changes: 1 addition & 5 deletions cli/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,7 @@ export class Commander {
)
.option('-Pp, --protobuf-path <PATH>', 'the .proto file that defines the message format of protobuf')
.option('-Pmn, --protobuf-message-name <NAME>', 'the name of the protobuf message type')
.option(
'-Pft, --protobuf-format-type <TYPE>',
'the format type of message body, support base64, json, hex',
parseFormat,
)
.option('-f, --format <TYPE>', 'the input message format type, support base64, json, hex', parseFormat)
.allowUnknownOption(false)
.action(pub)

Expand Down
12 changes: 6 additions & 6 deletions cli/src/lib/pub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@ const send = (
message: string | Buffer
protobufPath: string | undefined
protobufMessageName: string | undefined
protobufFormatType: FormatType | undefined
format: FormatType | undefined
opts: IClientPublishOptions
},
) => {
const client = mqtt.connect(connOpts)
basicLog.connecting(config, connOpts.hostname!, connOpts.port, pubOpts.topic, pubOpts.message.toString())
client.on('connect', () => {
basicLog.connected()
const { topic, message, protobufPath, protobufMessageName, protobufFormatType } = pubOpts
const { topic, message, protobufPath, protobufMessageName, format } = pubOpts
basicLog.publishing()
let bufferMessage = serializeProtobufToBuffer(message, protobufPath, protobufMessageName, protobufFormatType)
let bufferMessage = serializeProtobufToBuffer(message, protobufPath, protobufMessageName, format)
client.publish(topic, bufferMessage, pubOpts.opts, (err) => {
if (err) {
signale.warn(err)
Expand Down Expand Up @@ -59,7 +59,7 @@ const multisend = (
message: string | Buffer
protobufPath: string | undefined
protobufMessageName: string | undefined
protobufFormatType: FormatType | undefined
format: FormatType | undefined
opts: IClientPublishOptions
},
maximumReconnectTimes: number,
Expand All @@ -72,9 +72,9 @@ const multisend = (
objectMode: true,
})
sender._write = (line, _enc, cb) => {
const { topic, opts, protobufPath, protobufMessageName, protobufFormatType } = pubOpts
const { topic, opts, protobufPath, protobufMessageName, format } = pubOpts

let bufferMessage = serializeProtobufToBuffer(line.trim(), protobufPath, protobufMessageName, protobufFormatType)
let bufferMessage = serializeProtobufToBuffer(line.trim(), protobufPath, protobufMessageName, format)
client.publish(topic, bufferMessage, opts, cb)
}

Expand Down
4 changes: 2 additions & 2 deletions cli/src/types/global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ declare global {
connUserProperties?: Record<string, string | string[]>
protobufPath?: string
protobufMessageName?: string
protobufFormatType?: FormatType
format?: FormatType
}

interface SubscribeOptions extends ConnectOptions {
Expand All @@ -100,7 +100,7 @@ declare global {

type OmitPublishOptions = Omit<
PublishOptions,
'stdin' | 'multiline' | 'protobufPath' | 'protobufMessageName' | 'protobufFormatType'
'stdin' | 'multiline' | 'protobufPath' | 'protobufMessageName' | 'format'
>

interface BenchPublishOptions extends OmitPublishOptions {
Expand Down
4 changes: 2 additions & 2 deletions cli/src/utils/parse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ const parsePublishOptions = (options: PublishOptions) => {
contentType,
protobufPath,
protobufMessageName,
protobufFormatType,
format,
} = options

const publishOptions: IClientPublishOptions = {
Expand All @@ -317,7 +317,7 @@ const parsePublishOptions = (options: PublishOptions) => {
)
}

return { topic, message, protobufPath, protobufMessageName, protobufFormatType, opts: publishOptions }
return { topic, message, protobufPath, protobufMessageName, format, opts: publishOptions }
}

const parseSubscribeOptions = (options: SubscribeOptions) => {
Expand Down
25 changes: 17 additions & 8 deletions cli/src/utils/protobuf.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import { transformPBJSError } from './protobufErrors'
const convertObject = (raw: string | Buffer, format?: FormatType | undefined) => {
switch (format) {
case 'base64':
return JSON.parse(Buffer.from(raw.toString('utf-8'), 'base64').toString('utf-8'))
return Buffer.from(raw.toString('utf-8'), 'base64').toString('utf-8')
case 'hex':
return JSON.parse(Buffer.from(raw.toString('utf-8').replaceAll(' ', ''), 'hex').toString('utf-8'))
return Buffer.from(raw.toString('utf-8').replaceAll(' ', ''), 'hex').toString('utf-8')
case 'json':
return JSON.stringify(JSON.parse(raw.toString('utf-8')), null, 2)
default:
return JSON.parse(raw.toString('utf-8'))
return raw.toString('utf-8')
}
}

Expand All @@ -19,22 +21,29 @@ export const serializeProtobufToBuffer = (
protobufMessageName: string | undefined,
format?: FormatType | undefined,
): Buffer => {
let bufferMessage = Buffer.from(raw)
let rawData
try {
rawData = convertObject(raw, format)
} catch (error: unknown) {
signale.error(`Message format type error : ${(error as Error).message.split('\n')[0]}`)
process.exit(1)
}

let bufferMessage = Buffer.from(rawData)
if (protobufPath && protobufMessageName) {
try {
const root = protobuf.loadSync(protobufPath)
const Message = root.lookupType(protobufMessageName)
const rawData = convertObject(raw, format)
const err = Message.verify(rawData)
const err = Message.verify(JSON.parse(rawData))
if (err) {
signale.error(`Message serialization error: ${err}`)
process.exit(1)
}
const data = Message.create(rawData)
const data = Message.create(JSON.parse(rawData))
const serializedMessage = Message.encode(data).finish()
bufferMessage = Buffer.from(serializedMessage)
} catch (error: unknown) {
signale.error(`Message format type error : ${(error as Error).message.split('\n')[0]}`)
signale.error(`Message serialization error: ${(error as Error).message.split('\n')[0]}`)
process.exit(1)
}
}
Expand Down
3 changes: 3 additions & 0 deletions cli/src/utils/protobufErrors.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/**
* @reference https://github.com/spluxx/Protoman/blob/master/src/core/protobuf/pbjsErrors.ts
*/
function prepend(msg: string): string {
return `Message deserialization error: ${msg}`
}
Expand Down

0 comments on commit 3aa1758

Please sign in to comment.