Skip to content

Latest commit

 

History

History
255 lines (220 loc) · 14.3 KB

pip-290.md

File metadata and controls

255 lines (220 loc) · 14.3 KB

Background knowledge

1. Web Socket Proxy Server

Web Socket Proxy Server provides a simple way to interact with Pulsar under WSS protocol.

  • When a wss-producer was registered, Web Socket Proxy Server will create a one-to-one producer to actually send messages to the Broker.
  • When a wss-consumer was registered, Web Socket Proxy Server will create a one-to-one consumer to actually receive messages from the Broker and send them to WSS Consumer.

2. When a user wants to encrypt the message payload, there are two solutions:

  • Solution 1: encrypt message payload before WSS Producer sends messages, and decrypt after WSS Consumer receives messages. If the user wants to use different encryption keys for different messages, they can set a property into messages to indicate the message was encrypted by which key. But this solution has a shortcoming: if the user also has consumers with Java clients, then these consumers cannot auto-decrypt the messages(Normally, java clients can decrypt messages automatically). And the benefit of this solution is that the user does not need to expose the private key to Web Socket Proxy Server.
  • Solution 2: In the release 2.11, there is a feature that provides a way to set encrypt keys for the internal producers and consumers of Web Socket Proxy Server, but needs the user to upload both public key and private key into the Web Socket Proxy Server(in other words: user should expose the keys to Web Socket Proxy Server), there is a un-recommended workaround for this shortcoming[1]. The benefit is that the WSS producer and WSS consumer should not care about encryption and decryption.

3. The message payload process during message sending

  • The Producer will composite several message payloads into a batched message payload if the producer is enabled batch;
  • The Producer will compress the batched message payload to a compressed payload if enabled compression;
  • After the previous two steps, the Producer encrypts the compressed payload to an encrypted payload.

4. Encrypt context

The Construction of the Encrypt Context:

{
  "batchSize": 2, // How many single messages are in the batch. If null, it means it is not a batched message.
  "compressionType": "NONE", // the compression type.
  "uncompressedMessageSize": 0, // the size of the uncompressed payload.
  "keys": {
    "client-rsa.pem": {  // key name.
      "keyValue": "asdvfdw==", // key value.
      "metadata": {} // extra props of the key.
    }
  },
  "param": "Tfu1PxVm6S9D3+Hk" // the IV of current encryption for this message. 
}

All the fields of Encrypt Context are used to parse the encrypted message payload.

  • keys and param are used to decrypt the encrypted message payload.
  • compressionType and uncompressedMessageSize are used to uncompress the compressed message payload.
  • batchSize is used to extract the batched message payload.

There is another attribute named encryptionAlgo used to identify what encrypt algo is using, it is an optional attribute, so there is no such property in Encrypt Context.

When the internal consumer of the Web Socket Proxy Server receives a message, if the message metadata indicates that the message is encrypted, the consumer will add Encrypt Context into the response for the WSS consumer.

5. Quick explanation of the used components in the section Design:

  • CryptoKeyReader: an interface that requires users to implement to read public key and private key.
  • MessageCrypto: a tool interface to encrypt and decrypt the message payload and add and extract encryption information for message metadata.

Motivation

Therefore, there is no way to enable encryption under the WSS protocol and meet the following conditions:

  • WSS producer and WSS consumer did encrypt and decrypt themselves and did not share private keys to Web Socket Proxy Server.
  • Other clients(such as Java and CPP) can automatically decrypt the messages which WSS producer sent.

Goals

Provide a way to make Web Socket Proxy Server just passes encrypt information to the client, the WSS producer and WSS consumer did encrypt and decrypt themselves.

Since the order of producer operation for message payloads is compression --> encryption, users need to handle Compression themselves if needed.

If other clients(such as Java, CPP) are sending messages to the topic that the WSS consumer was subscribed to, it is possible that there are some batched messages in the topic, then the WSS consumer will inevitably receive the batched messages. Since the order of consumer operation for message payload is deencryption --> un-compression --> extract the batched messages, users need to handle Un-compression and Extract Batch Messages themselves.

Out of Scope

This proposal does not intend to support the three features:

  • Support publishing "Null value messages" for WSS producers.
  • Support publishing "Chunked messages" for WSS producers.
  • Support publishing "Batched messages" for WSS producers.

High-Level Design

For WSS producers: Modify the definition of parameter encryptionKeys to make it can set in two ways:

  • The original mode: If the producer registered with a string parameter encryptionKeys, then Web Socket Proxy Server will still work in the original way, which is defined in the PIP Support encryption in Web Socket Proxy Server
  • The new mode: If a producer registered with a JSON parameter encryptionKeys, and the encryptionKeys[{key_name}].keyValue is not empty, Web Socket Proxy Server will mark this Producer as Client-Side Encryption Producer, then discard server-side batch messages, server-side compression, and server-side encryption. The constructor of encryptionKeys is like below:
{
  "client-ecdsa.pem": {
  "keyValue": "BDJfN+Iw==",
    "metadata": {
      "k1": "v1"
    }
  }
}

For WSS consumers: Users can set the parameter cryptoFailureAction to CONSUME to directly receive the undecrypted message payload (it was supported before).

Detailed Design

For the producers marked as Client-Side Encryption Producers:

  • forcefully set the component CryptoKeyReader to DummyCryptoKeyReaderImpl.
    • DummyCryptoKeyReaderImpl: doesn't provide any public key or private key, and just returns null.
  • forcefully set the component MessageCrypto to WSSDummyMessageCryptoImpl to skip the message Server-Side encryption.
    • WSSDummyMessageCryptoImpl: only set the encryption info into the message metadata and discard payload encryption.
  • forcefully set enableBatching to false to skip Server-Side batch messages building, and print a log if the discarded parameters enableBatching, batchingMaxMessages, maxPendingMessages, batchingMaxPublishDelay were set.
  • forcefully set the CompressionType to None to skip the Server-Side compression, and print a log if the discarded parameter compressionType was set.
  • forcefully set the param enableChunking to false(the default value is false) to prevent unexpected problems if the default setting is changed in the future.

For the client-side encryption consumers:

  • To avoid too many warning logs: after setting the config cryptoFailureAction of the consumer is CONSUME, just print an DEBUG level log when receiving an encrypted message if the consumer could not decrypt it(the original log level is WARN).

Public API

Define a new mode for the parameter encryptionKeys:

param name description constructor (before encode)
encryptionKeys Base64 encoded and URL encoded and JSON formatted encryption keys Map<String, EncryptionKey>

Add JSON attributes below:

param name description constructor (before encode)
compressionType Compression type. Do not set it if compression is not performed CompressionType
uncompressedMessageSize The size of the payload before compression. Do not set it if compression is not performed int
encryptionParam Base64 encoded serialized initialization vector used when the client encrypts byte[]

A demo for client-side encryption producer

public void connect() {
    String protocolAndHostPort = "ws://localhost:55217";
    String topicName = "perssitent://public/default/tp1";
    String keys = ```
    {
      "client-ecdsa.pem": {
        "keyValue": "BDJf/72DhLRs0C0/U+vkykeIBfXaaJiwpqPVgWJvV7B7GwqIMvY6OFXdFvi0gx7Co/0xO7vKTHLQP8GZAt8DWrsCb8W1jhxmOjpThHBaksXG0kN+Iw==",
        "metadata": {
          "k1": "v1"
        }
      }
    }
    ```
    StringBuilder producerUrL = new StringBuilder(protocolAndHostPort)
        .append("/ws/v2/producer/persistent/")
        .append(topicName)
        .append("?")
        .append("encryptionKeys=").append(base64AndURLEncode(keys));
    WebSocketClient wssClient = new WebSocketClient();
    wssClient.start();
    Session session = wssClient.connect(this, producerUrL, new ClientUpgradeRequest()).get();
}

public void sendMessage() {
    byte[] payload = "msg-123".getBytes(UTF-8); // [109, 115, 103, 45, 49, 50, 51]
    String msgKey = "client-ecdsa.pem";
    // Compression if needed(optional).
    CompressionType compressionType = CompressionType.LZ4;
    msg.uncompressedMessageSize = 5;
    byte[] compressedPayload = compress(payload); // [109, 115, 103, 45, 49, 50, 51]
    // Encrypt if needed.
    bytes[] encryptionParam = getEncryptionParam(); // [-10, -5, -124, 23, 14, -122, 30, 127, 64, 63, 85, -79]
    String base64EncodedEncryptionParam = base64Encode(encryptionParam); // 9vuEFw6GHn9AP1Wx
    bytes[] encryptedPayload = encrypt(compressedPayload, encryptionParam); // H2RbToHyfXrAUJq3kCC81wlmpGRU5l4=
    // Do send.
    ProducerMessage msg = new ProducerMessage();
    msg.key = msgKey;
    msg.payload = encryptedPayload;
    msg.encryptionParam = base64EncodedEncryptionParam;
    msg.compressionType = compressionType;
    msg.uncompressedMessageSize = uncompressedMessageSize;
    this.session.getRemote().sendString(toJSON(msg));
}

A demo for client-side encryption consumer

public void connect() {
    String protocolAndHostPort = "ws://localhost:55217";
    String topicName = "perssitent://public/default/tp1";
    StringBuilder consumerUri = new StringBuilder(protocolAndHostPort)
        .append("/ws/v2/consumer/persistent/")
        .append(topicName)
        .append("/")
        .append(subscriptionName)
        .append("?")
        .append("subscriptionType=").append(subscriptionType.toString())
        // Set "cryptoFailureAction" to "CONSUME".
        .append("&").append("cryptoFailureAction=CONSUME");
    WebSocketClient wssClient = new WebSocketClient();
    wssClient.start();
    Session session = wssClient.connect(this, buildConnectURL(), new ClientUpgradeRequest()).get();
}

public byte[] messageReceived(String text) {
    /**
     * A demo of the parameter "text":
     * {
     * 	 "messageId": "CAcQADAA",
     * 	 "payload": "ApU16CsV0iHO2zbX7T22jhGMzdjE5drm",
     * 	 "properties": {},
     * 	 "publishTime": "2023-08-22T02:40:32.856+08:00",
     * 	 "redeliveryCount": 0,
     * 	 "encryptionContext": {
     * 	 	  "keys": {
     * 	 		  "client-ecdsa.pem": {
     * 	 			  "keyValue": "BMQKA==",
     * 				  "metadata": {
     * 					  "k1": "v1"
     *          },
     * 		  "param": "SnqNyjPetp1dGBa6",
     * 		  "compressionType": "LZ4",
     * 		  "uncompressedMessageSize": 7,
     * 		  "batchSize": null
     *    }
     * }
     */
    ConsumerMessage msg = parseJsonToObject(text);
    /**
     * The constructor of encryptionContext:
     * {
     *  "client-ecdsa.pem": {
     *    "keyValue": "BMQKA==",
     *    "metadata": {
     *      "k1": "v1"
     *    }
     *  }
     * }
     */
    EncryptionContext encryptionContext = msg.encryptionContext;
    // base64Decode and decrypt message payload.
    byte[] decryptedPayload = decrypt(base64Decode(msg.payload), encryptionContext);
    //Un-compress is needed.
    byte[] unCompressedPayload = unCompressIfNeeded(decryptedPayload);
    return unCompressedPayload;
}

Test cases

  • Pub & Sub with WSS producer and consumer.
    • compression & decryption.
  • Pub with Java client library and Sub with WSS consumers.
    • non-compression & decryption.
    • compression & decryption.
    • compression & decryption & batch send.
  • Pub with WSS protocol and Sub with Java client library(verify it can auto decompression, decryption).
    • non-compression & decryption.
    • compression & decryption.

Footnotes

[1]: A workaround to avoid exposing the private key to Web Socket Proxy Server(should expose the public key to Web Socket Proxy Server). A quick background: there are three policies when a consumer cannot describe the message payload:

  • CONSUME: it responds to the user's original message payload and prints a warning log.
  • DISCARD: discard this message.
  • FAIL: add this message into unackMessagesTracker. How this message is ultimately handled depends on the policy of unacknowledged messages.

Workaround

  • Set cryptoFailureAction to CONSUME for the WSS consumer
  • Make the return value EncryptionKeyInfo to null for the CryptoKeyReader. This will make the internal consumer of Web Socket Proxy Server decrypt message payload fail.

Then the flow of Pub & Sub will be executed like the following:

  • Users do not encrypt message payload before the WSS producer sends messages.
  • The internal producer of WebSocket does message payload encryption by the feature: Support encryption in Web Socket Proxy Server
  • The decryption of the internal consumer of Web Socket Proxy Server message payload will be failed, and just send original message payload to the users.
  • Users decrypt the message payload themself.