Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pulsar PubSub: allow TLS configuration for hostname and insecure connections #3270

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
0f4fa30
feat: allow tls configuration for hostname and insecure connections
toneill-newinnov Dec 14, 2023
79e084b
test: add pulsar SSL/TLS tests
toneill-newinnov Dec 19, 2023
d2c96cf
Merge branch 'dapr:main' into feature/expand-pulsar-tls-configuration
toneill818 Jan 3, 2024
ba47779
Merge branch 'main' into feature/expand-pulsar-tls-configuration
yaron2 Jan 4, 2024
c9f8d5a
Merge branch 'main' into feature/expand-pulsar-tls-configuration
yaron2 Jan 8, 2024
02b97ab
Merge branch 'main' into feature/expand-pulsar-tls-configuration
yaron2 Jan 8, 2024
e64c120
Merge branch 'main' into feature/expand-pulsar-tls-configuration
toneill818 Jan 9, 2024
11da5e5
test: remove unsed file and use same image as other tests
toneill-newinnov Jan 9, 2024
8a7fd1b
Merge branch 'main' into feature/expand-pulsar-tls-configuration
yaron2 Jan 16, 2024
745c2ed
test: remove unsed file and use same image as other tests
toneill-newinnov Jan 9, 2024
22879b8
fix: PR changes requests and renamed volumes in new compose file
toneill-newinnov Jan 22, 2024
58087d9
Avro Schema registry kafka pubsub implementation (#3292)
passuied Jan 10, 2024
de8bcc8
Kafka: Support for AWS IAM role auth (#3310)
eunicecompra Jan 10, 2024
9595b20
left uppercase in metadata (#3312)
passuied Jan 10, 2024
bcfb5b4
Kafka: Removed the AWS creds check from kafka metadata (#3313)
eunicecompra Jan 10, 2024
9987605
azappconfig SDk upgrade (#3283)
pravinpushkar Jan 11, 2024
5f1ec2f
Adding deleteWithPrefix for in-memory statestore (#3314)
RyanLettieri Jan 12, 2024
0d2c0a5
Add ExecuteInTransaction method for db.SQL (#3309)
ItalyPaleAle Jan 16, 2024
d5911ad
secret store: AWS connection validation for parameter store and secre…
elena-kolevska Jan 16, 2024
894f057
Metadata TryGetTTL: adds time.ParseDuration support (#3122)
robertojrojas Jan 16, 2024
95cf7ef
Merge branch 'feature/expand-pulsar-tls-configuration' of github.com:…
toneill-newinnov Jan 22, 2024
4d04b3e
Merge branch 'main' into feature/expand-pulsar-tls-configuration
yaron2 Jan 22, 2024
34c51d3
ci: set port 8080 for pulsar api to match call in tests
toneill-newinnov Jan 22, 2024
ad480c3
Merge branch 'feature/expand-pulsar-tls-configuration' of github.com:…
toneill-newinnov Jan 22, 2024
22b05ba
ci: fix eof error on http request to create multipartitioned topic
toneill-newinnov Jan 23, 2024
7480c32
Merge branch 'main' into feature/expand-pulsar-tls-configuration
berndverst Jan 25, 2024
fbf5a15
Update dapr/kit to the same version used in runtime
berndverst Apr 30, 2024
df41d65
ci: fix eof error on http request to create multipartitioned topic
toneill-newinnov May 1, 2024
1e946ee
Merge branch 'dapr:main' into feature/expand-pulsar-tls-configuration
toneill818 May 7, 2024
8b2b0a8
Merge branch 'main' into feature/expand-pulsar-tls-configuration
yaron2 May 14, 2024
a549c27
Merge branch 'main' into feature/expand-pulsar-tls-configuration
yaron2 May 15, 2024
5da4586
revert: Revert go mod changes
toneill-newinnov May 16, 2024
9ebd09f
add newline
toneill-newinnov May 16, 2024
c20651a
Merge branch 'main' into feature/expand-pulsar-tls-configuration
yaron2 May 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions pubsub/pulsar/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,23 @@ import (
)

type pulsarMetadata struct {
Host string `mapstructure:"host"`
ConsumerID string `mapstructure:"consumerID"`
EnableTLS bool `mapstructure:"enableTLS"`
DisableBatching bool `mapstructure:"disableBatching"`
BatchingMaxPublishDelay time.Duration `mapstructure:"batchingMaxPublishDelay"`
BatchingMaxSize uint `mapstructure:"batchingMaxSize"`
BatchingMaxMessages uint `mapstructure:"batchingMaxMessages"`
Tenant string `mapstructure:"tenant"`
Namespace string `mapstructure:"namespace"`
Persistent bool `mapstructure:"persistent"`
RedeliveryDelay time.Duration `mapstructure:"redeliveryDelay"`
internalTopicSchemas map[string]schemaMetadata `mapstructure:"-"`
PublicKey string `mapstructure:"publicKey"`
PrivateKey string `mapstructure:"privateKey"`
Keys string `mapstructure:"keys"`
Host string `mapstructure:"host"`
ConsumerID string `mapstructure:"consumerID"`
EnableTLS bool `mapstructure:"enableTLS"`
DisableBatching bool `mapstructure:"disableBatching"`
BatchingMaxPublishDelay time.Duration `mapstructure:"batchingMaxPublishDelay"`
BatchingMaxSize uint `mapstructure:"batchingMaxSize"`
BatchingMaxMessages uint `mapstructure:"batchingMaxMessages"`
Tenant string `mapstructure:"tenant"`
Namespace string `mapstructure:"namespace"`
Persistent bool `mapstructure:"persistent"`
RedeliveryDelay time.Duration `mapstructure:"redeliveryDelay"`
internalTopicSchemas map[string]schemaMetadata `mapstructure:"-"`
PublicKey string `mapstructure:"publicKey"`
PrivateKey string `mapstructure:"privateKey"`
Keys string `mapstructure:"keys"`
TLSAllowInsecureConnection bool `mapstructure:"tlsAllowInsecureConnection"`
TLSValidateHostname bool `mapstructure:"tlsValidateHostname"`

Token string `mapstructure:"token"`
oauth2.ClientCredentialsMetadata `mapstructure:",squash"`
Expand Down
14 changes: 13 additions & 1 deletion pubsub/pulsar/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ metadata:
sensitive: true
description: "Address of the Pulsar broker."
example: |
"localhost:6650", "http://pulsar-pj54qwwdpz4b-pulsar.ap-sg.public.pulsar.com:8080"
"localhost:6650", "http://pulsar-pj54qwwdpz4b-pulsar.ap-sg.public.pulsar.com:8080", "pulsar+ssl://localhost:6651"
- name: consumerID
type: string
description: "Used to set the subscription name or consumer ID."
Expand All @@ -74,6 +74,18 @@ metadata:
Enable TLS.
default: 'false'
example: '"true", "false"'
- name: tlsAllowInsecureConnection
type: bool
description: |
Allow insecure TLS connections
default: 'false'
example: '"true", "false"'
- name: tlsValidateHostname
type: bool
description: |
Validate hostname when using TLS
default: 'false'
example: '"true", "false"'
- name: tenant
description: |
The topic tenant within the instance. Tenants are essential to multi-tenancy in Pulsar, and spread
Expand Down
100 changes: 62 additions & 38 deletions pubsub/pulsar/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,29 +38,34 @@ import (
)

const (
host = "host"
consumerID = "consumerID"
enableTLS = "enableTLS"
deliverAt = "deliverAt"
deliverAfter = "deliverAfter"
disableBatching = "disableBatching"
batchingMaxPublishDelay = "batchingMaxPublishDelay"
batchingMaxSize = "batchingMaxSize"
batchingMaxMessages = "batchingMaxMessages"
tenant = "tenant"
namespace = "namespace"
persistent = "persistent"
redeliveryDelay = "redeliveryDelay"
avroProtocol = "avro"
jsonProtocol = "json"
protoProtocol = "proto"
partitionKey = "partitionKey"

defaultTenant = "public"
defaultNamespace = "default"
cachedNumProducer = 10
pulsarPrefix = "pulsar://"
pulsarToken = "token"
host = "host"
consumerID = "consumerID"
enableTLS = "enableTLS"
deliverAt = "deliverAt"
deliverAfter = "deliverAfter"
disableBatching = "disableBatching"
batchingMaxPublishDelay = "batchingMaxPublishDelay"
batchingMaxSize = "batchingMaxSize"
batchingMaxMessages = "batchingMaxMessages"
tenant = "tenant"
namespace = "namespace"
persistent = "persistent"
redeliveryDelay = "redeliveryDelay"
avroProtocol = "avro"
jsonProtocol = "json"
protoProtocol = "proto"
partitionKey = "partitionKey"
tlsAllowInsecureConnection = "tlsAllowInsecureConnection"
tlsValidateHostname = "tlsValidateHostname"

defaultTenant = "public"
defaultNamespace = "default"
defaultTLSAllowInsecureConnection = false
defaultTLSValidateHostname = false
cachedNumProducer = 10
pulsarPrefix = "pulsar://"
pulsarSSLPrefix = "pulsar+ssl://"
pulsarToken = "token"
// topicFormat is the format for pulsar, which have a well-defined structure: {persistent|non-persistent}://tenant/namespace/topic,
// see https://pulsar.apache.org/docs/en/concepts-messaging/#topics for details.
topicFormat = "%s://%s/%s/%s"
Expand Down Expand Up @@ -113,15 +118,17 @@ func NewPulsar(l logger.Logger) pubsub.PubSub {

func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) {
m := pulsarMetadata{
Persistent: true,
Tenant: defaultTenant,
Namespace: defaultNamespace,
internalTopicSchemas: map[string]schemaMetadata{},
DisableBatching: false,
BatchingMaxPublishDelay: defaultBatchingMaxPublishDelay,
BatchingMaxMessages: defaultMaxMessages,
BatchingMaxSize: defaultMaxBatchSize,
RedeliveryDelay: defaultRedeliveryDelay,
Persistent: true,
Tenant: defaultTenant,
Namespace: defaultNamespace,
internalTopicSchemas: map[string]schemaMetadata{},
DisableBatching: false,
BatchingMaxPublishDelay: defaultBatchingMaxPublishDelay,
BatchingMaxMessages: defaultMaxMessages,
BatchingMaxSize: defaultMaxBatchSize,
RedeliveryDelay: defaultRedeliveryDelay,
TLSAllowInsecureConnection: defaultTLSAllowInsecureConnection,
TLSValidateHostname: defaultTLSValidateHostname,
}

if err := kitmd.DecodeMetadata(meta.Properties, &m); err != nil {
Expand All @@ -132,6 +139,11 @@ func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) {
return nil, errors.New("pulsar error: missing pulsar host")
}

// If tls is disabled allow insecure connections
if !m.EnableTLS {
m.TLSAllowInsecureConnection = true
}

for k, v := range meta.Properties {
switch {
case strings.HasSuffix(k, topicJSONSchemaIdentifier):
Expand Down Expand Up @@ -163,16 +175,13 @@ func (p *Pulsar) Init(ctx context.Context, metadata pubsub.Metadata) error {
if err != nil {
return err
}
pulsarURL := m.Host
if !strings.HasPrefix(m.Host, "http://") &&
!strings.HasPrefix(m.Host, "https://") {
pulsarURL = fmt.Sprintf("%s%s", pulsarPrefix, m.Host)
}
pulsarURL := getPulsarURL(m)
options := pulsar.ClientOptions{
URL: pulsarURL,
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
TLSAllowInsecureConnection: !m.EnableTLS,
TLSAllowInsecureConnection: m.TLSAllowInsecureConnection,
TLSValidateHostname: m.TLSValidateHostname,
}

switch {
Expand Down Expand Up @@ -538,3 +547,18 @@ func isValidPEM(val string) bool {
block, _ := pem.Decode([]byte(val))
return block != nil
}

func getPulsarURL(m *pulsarMetadata) string {
pulsarURL := m.Host
if !strings.HasPrefix(m.Host, "http://") &&
!strings.HasPrefix(m.Host, "https://") &&
!strings.HasPrefix(m.Host, "pulsar+ssl://") &&
!strings.HasPrefix(m.Host, "pulsar://") {
if m.EnableTLS {
pulsarURL = fmt.Sprintf("%s%s", pulsarSSLPrefix, m.Host)
toneill818 marked this conversation as resolved.
Show resolved Hide resolved
} else {
pulsarURL = fmt.Sprintf("%s%s", pulsarPrefix, m.Host)
toneill818 marked this conversation as resolved.
Show resolved Hide resolved
}
}
return pulsarURL
}
2 changes: 2 additions & 0 deletions pubsub/pulsar/pulsar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func TestParsePulsarMetadata(t *testing.T) {
assert.Equal(t, uint(100), meta.BatchingMaxSize)
assert.Equal(t, uint(200), meta.BatchingMaxMessages)
assert.Empty(t, meta.internalTopicSchemas)
assert.True(t, meta.TLSAllowInsecureConnection)
assert.False(t, meta.TLSValidateHostname)
}

func TestParsePulsarSchemaMetadata(t *testing.T) {
Expand Down
Loading