Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Encryption support producer #560

Merged
merged 18 commits into from
Sep 3, 2021
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ require (
github.com/google/uuid v1.1.2
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/compress v1.10.8
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why is the mod and sum file changing? Can these changes be done in a separate PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me recheck again :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synced with master branch

github.com/kr/pretty v0.2.0 // indirect
github.com/linkedin/goavro/v2 v2.9.8
github.com/opentracing/opentracing-go v1.2.0
github.com/pierrec/lz4 v2.0.5+incompatible
Expand All @@ -27,7 +26,6 @@ require (
github.com/stretchr/testify v1.5.1
go.uber.org/atomic v1.7.0
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)

replace github.com/apache/pulsar-client-go/oauth2 => ./oauth2
15 changes: 0 additions & 15 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b h1:AP/Y7sqYicnjGDfD5VcY4CIfh1hRXBUavxrvELjTiOE=
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
github.com/boynton/repl v0.0.0-20170116235056-348863958e3e/go.mod h1:Crc/GCZ3NXDVCio7Yr0o+SSrytpcFhLmVCIzi0s49t4=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/danieljoos/wincred v1.0.2 h1:zf4bhty2iLuwgjgpraD2E9UbvO+fe54XXGJbOwe23fU=
Expand All @@ -32,8 +31,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 h1:CaO/zOnF8VvUfEbhRatPcwKVWamvbYd8tQGRWacE9kU=
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1/go.mod h1:+hnT3ywWDTAFrW5aE+u2Sa/wT555ZqwoCS+pk3p6ry4=
github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
github.com/dvsekhvalnov/jose2go v0.0.0-20200901110807-248326c1351b h1:HBah4D48ypg3J7Np4N+HY/ZR76fx3HEUGxDU6Uk39oQ=
Expand Down Expand Up @@ -73,16 +70,13 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jawher/mow.cli v1.0.4/go.mod h1:5hQj2V8g+qYmLUVWqu4Wuja1pI57M83EChYLVZ0sMKk=
github.com/jawher/mow.cli v1.1.0/go.mod h1:aNaQlc7ozF3vw6IJ2dHjp2ZFiA4ozMIYY6PyuRJwlUg=
github.com/jawher/mow.cli v1.2.0/go.mod h1:y+pcA3jBAdo/GIZx/0rFjw/K2bVEODP9rfZOfaiq8Ko=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
Expand Down Expand Up @@ -169,8 +163,6 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
Expand All @@ -190,7 +182,6 @@ golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
Expand All @@ -212,7 +203,6 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand All @@ -227,7 +217,6 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190808195139-e713427fea3f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
Expand Down Expand Up @@ -259,9 +248,5 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
36 changes: 36 additions & 0 deletions pulsar/encryption.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsar

import "github.com/apache/pulsar-client-go/pulsar/crypto"

// ProducerEncryptionInfo encryption related fields required by the producer
type ProducerEncryptionInfo struct {
// KeyReader read RSA public/private key pairs
Keyreader crypto.KeyReader
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KeyReader

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


// MessageCrypto used to encrypt and decrypt the data and session keys
MessageCrypto crypto.MessageCrypto

// Keys list of encryption key names to encrypt session key
Keys []string

// ProducerCryptoFailureAction action to be taken on failure of message encryption
// default is ProducerCryptoFailureActionFail
ProducerCryptoFailureAction int
}
15 changes: 10 additions & 5 deletions pulsar/internal/batch_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/gogo/protobuf/proto"

"github.com/apache/pulsar-client-go/pulsar/internal/compression"
"github.com/apache/pulsar-client-go/pulsar/internal/crypto"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
)
Expand All @@ -35,7 +36,7 @@ type BuffersPool interface {
type BatcherBuilderProvider func(
maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level,
bufferPool BuffersPool, logger log.Logger,
bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
) (BatchBuilder, error)

// BatchBuilder is a interface of batch builders
Expand Down Expand Up @@ -93,13 +94,15 @@ type batchContainer struct {
buffersPool BuffersPool

log log.Logger

encryptor crypto.Encryptor
}

// newBatchContainer init a batchContainer
func newBatchContainer(
maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level,
bufferPool BuffersPool, logger log.Logger,
bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
) batchContainer {

bc := batchContainer{
Expand All @@ -122,6 +125,7 @@ func newBatchContainer(
compressionProvider: getCompressionProvider(compressionType, level),
buffersPool: bufferPool,
log: logger,
encryptor: encryptor,
}

if compressionType != pb.CompressionType_NONE {
Expand All @@ -135,12 +139,12 @@ func newBatchContainer(
func NewBatchBuilder(
maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level,
bufferPool BuffersPool, logger log.Logger,
bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
) (BatchBuilder, error) {

bc := newBatchContainer(
maxMessages, maxBatchSize, producerName, producerID, compressionType,
level, bufferPool, logger,
level, bufferPool, logger, encryptor,
)

return &bc, nil
Expand Down Expand Up @@ -229,8 +233,9 @@ func (bc *batchContainer) Flush() (
if buffer == nil {
buffer = NewBuffer(int(uncompressedSize * 3 / 2))
}

serializeBatch(
buffer, bc.cmdSend, bc.msgMetadata, bc.buffer, bc.compressionProvider,
buffer, bc.cmdSend, bc.msgMetadata, bc.buffer, bc.compressionProvider, bc.encryptor,
)

callbacks = bc.callbacks
Expand Down
25 changes: 17 additions & 8 deletions pulsar/internal/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/gogo/protobuf/proto"

"github.com/apache/pulsar-client-go/pulsar/internal/compression"
"github.com/apache/pulsar-client-go/pulsar/internal/crypto"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
)

Expand Down Expand Up @@ -221,9 +222,21 @@ func serializeBatch(wb Buffer,
cmdSend *pb.BaseCommand,
msgMetadata *pb.MessageMetadata,
uncompressedPayload Buffer,
compressionProvider compression.Provider) {
compressionProvider compression.Provider,
encryptor crypto.Encryptor) {
// Wire format
// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]

// compress the payload
compressedPayload := compressionProvider.Compress(nil, uncompressedPayload.ReadableSlice())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to compress before encrypting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per java implementation => Yes compress and then encrypt.


// encrypt the compressed payload
encryptedPayload, err := encryptor.Encrypt(compressedPayload, crypto.NewMessageMetadataSupplier(msgMetadata))
if err != nil {
// error occurred while encrypting the payload, ProducerCryptoFailureAction is set to Fail
panic(fmt.Sprintf("Encryption of message failed, ProducerCryptoFailureAction is set to Fail. Error :%v", err))
GPrabhudas marked this conversation as resolved.
Show resolved Hide resolved
}

cmdSize := uint32(proto.Size(cmdSend))
msgMetadataSize := uint32(proto.Size(msgMetadata))

Expand All @@ -234,7 +247,7 @@ func serializeBatch(wb Buffer,
// Write cmd
wb.WriteUint32(cmdSize)
wb.ResizeIfNeeded(cmdSize)
_, err := cmdSend.MarshalToSizedBuffer(wb.WritableSlice()[:cmdSize])
_, err = cmdSend.MarshalToSizedBuffer(wb.WritableSlice()[:cmdSize])
if err != nil {
panic(fmt.Sprintf("Protobuf error when serializing cmdSend: %v", err))
}
Expand All @@ -255,12 +268,8 @@ func serializeBatch(wb Buffer,
}
wb.WrittenBytes(msgMetadataSize)

// Make sure the buffer has enough space to hold the compressed data
// and perform the compression in-place
maxSize := uint32(compressionProvider.CompressMaxSize(int(uncompressedPayload.ReadableBytes())))
wb.ResizeIfNeeded(maxSize)
b := compressionProvider.Compress(wb.WritableSlice()[:0], uncompressedPayload.ReadableSlice())
wb.WrittenBytes(uint32(len(b)))
// add payload to the buffer
wb.Write(encryptedPayload)

// Write checksum at created checksum-placeholder
frameEndIdx := wb.WriterIndex()
Expand Down
33 changes: 33 additions & 0 deletions pulsar/internal/crypto/encryptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package crypto

import (
"github.com/apache/pulsar-client-go/pulsar/crypto"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
)

// Encryptor support encryption
type Encryptor interface {
Encrypt([]byte, crypto.MessageMetadataSupplier) ([]byte, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is internal we can pass the *pb.MessageMetadata and avoid having to create a supplier for each message when encryption is not being used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

// NewMessageMetadataSupplier returns wrapper for message metadata
func NewMessageMetadataSupplier(msgMetadata *pb.MessageMetadata) crypto.MessageMetadataSupplier {
return crypto.NewMessageMetadataSupplier(msgMetadata)
}
31 changes: 31 additions & 0 deletions pulsar/internal/crypto/noop_encryptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package crypto

import "github.com/apache/pulsar-client-go/pulsar/crypto"

type noopEncryptor struct{}

func NewNoopEncryptor() Encryptor {
return &noopEncryptor{}
}

// Encrypt Noop ecryptor
func (e *noopEncryptor) Encrypt(data []byte, msgMetadata crypto.MessageMetadataSupplier) ([]byte, error) {
return data, nil
}
78 changes: 78 additions & 0 deletions pulsar/internal/crypto/producer_encryptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package crypto

import (
"fmt"

"github.com/apache/pulsar-client-go/pulsar/crypto"
"github.com/apache/pulsar-client-go/pulsar/log"
)

type producerEncryptor struct {
keys []string
keyReader crypto.KeyReader
messageCrypto crypto.MessageCrypto
logger log.Logger
producerCryptoFailureAction int
}

func NewProducerEncryptor(keys []string,
keyReader crypto.KeyReader,
messageCrypto crypto.MessageCrypto,
producerCryptoFailureAction int,
logger log.Logger) Encryptor {
return &producerEncryptor{
keys: keys,
keyReader: keyReader,
messageCrypto: messageCrypto,
logger: logger,
producerCryptoFailureAction: producerCryptoFailureAction,
}
}

// Encrypt producer encryptor
func (e *producerEncryptor) Encrypt(payload []byte, msgMetadata crypto.MessageMetadataSupplier) ([]byte, error) {
// encryption is enabled but KeyReader interface is not implemented
if e.keyReader == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be detected and an error raised while setting up the producer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// crypto failure action is set to send
// send unencrypted message
if e.producerCryptoFailureAction == crypto.ProducerCryptoFailureActionSend {
return payload, nil
}
return nil, fmt.Errorf("KeyReader interface is not implemented and ProducerCryptoFailureAction is set to fail")
}

// encrypt payload
encryptedPayload, err := e.messageCrypto.Encrypt(e.keys, e.keyReader, msgMetadata, payload)

// error encryping the payload
if err != nil {
// error occurred in encrypting the payload
// crypto ProducerCryptoFailureAction is set to send
// send unencrypted message
if e.producerCryptoFailureAction == crypto.ProducerCryptoFailureActionSend {
e.logger.Errorf("Encryption of payload failed : %v", err)
e.logger.Warn("ProducerCryptoFailureAction is set to send, sending unecrypted message")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would log only one warning message here.

logger.WithError(err).Warnf("Encryption failed for payload sending unencrypted message ProducerCryptoFailureAction is set to send")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

return payload, nil
}

return nil, fmt.Errorf("ProducerCryptoFailureAction is set to Fail and error occurred in encrypting payload :%v", err)
}
return encryptedPayload, nil
}