Skip to content

Commit

Permalink
[eclipse-kanto#8] MQTTS support in the container-management's local c…
Browse files Browse the repository at this point in the history
…onnection

Signed-off-by: Kristiyan Gostev <kristiyan.gostev@bosch.io>
  • Loading branch information
Kristiyan Gostev committed Oct 24, 2022
1 parent 7296c78 commit edd57a6
Show file tree
Hide file tree
Showing 15 changed files with 370 additions and 10 deletions.
3 changes: 3 additions & 0 deletions containerm/daemon/daemon_command.go
Expand Up @@ -86,4 +86,7 @@ func setupCommandFlags(cmd *cobra.Command) {
flagSet.Int64Var(&cfg.ThingsConfig.ThingsConnectionConfig.AcknowledgeTimeout, "things-conn-ack-timeout", cfg.ThingsConfig.ThingsConnectionConfig.AcknowledgeTimeout, "Specify the acknowledgement timeout for the MQTT requests in milliseconds")
flagSet.Int64Var(&cfg.ThingsConfig.ThingsConnectionConfig.SubscribeTimeout, "things-conn-sub-timeout", cfg.ThingsConfig.ThingsConnectionConfig.SubscribeTimeout, "Specify the subscribe timeout for the MQTT requests in milliseconds")
flagSet.Int64Var(&cfg.ThingsConfig.ThingsConnectionConfig.UnsubscribeTimeout, "things-conn-unsub-timeout", cfg.ThingsConfig.ThingsConnectionConfig.UnsubscribeTimeout, "Specify the unsubscribe timeout for the MQTT requests in milliseconds")
flagSet.StringVar(&cfg.ThingsConfig.ThingsConnectionConfig.RootCA, "things-conn-root-ca", cfg.ThingsConfig.ThingsConnectionConfig.RootCA, "Specify the PEM encoded CA certificates file")
flagSet.StringVar(&cfg.ThingsConfig.ThingsConnectionConfig.ClientCert, "things-conn-client-cert", cfg.ThingsConfig.ThingsConnectionConfig.ClientCert, "Specify the PEM encoded certificate file to authenticate to the MQTT server/broker")
flagSet.StringVar(&cfg.ThingsConfig.ThingsConnectionConfig.ClientKey, "things-conn-client-key", cfg.ThingsConfig.ThingsConnectionConfig.ClientKey, "Specify the PEM encoded unencrypted private key file to authenticate to the MQTT server/broker")
}
6 changes: 5 additions & 1 deletion containerm/daemon/daemon_config.go
Expand Up @@ -14,8 +14,9 @@ package main

import (
"encoding/json"
"github.com/eclipse-kanto/container-management/containerm/log"
"time"

"github.com/eclipse-kanto/container-management/containerm/log"
)

// config refers to daemon's whole configurations.
Expand Down Expand Up @@ -149,4 +150,7 @@ type thingsConnectionConfig struct {
AcknowledgeTimeout int64 `json:"acknowledge_timeout,omitempty"`
SubscribeTimeout int64 `json:"subscribe_timeout,omitempty"`
UnsubscribeTimeout int64 `json:"unsubscribe_timeout,omitempty"`
RootCA string `json:"root_ca,omitempty"`
ClientCert string `json:"client_cert,omitempty"`
ClientKey string `json:"client_key,omitempty"`
}
5 changes: 4 additions & 1 deletion containerm/daemon/daemon_config_util.go
Expand Up @@ -14,11 +14,11 @@ package main

import (
"encoding/json"
"github.com/eclipse-kanto/container-management/containerm/containers/types"
"io/ioutil"
"os"
"time"

"github.com/eclipse-kanto/container-management/containerm/containers/types"
"github.com/eclipse-kanto/container-management/containerm/ctr"
"github.com/eclipse-kanto/container-management/containerm/log"
"github.com/eclipse-kanto/container-management/containerm/mgr"
Expand Down Expand Up @@ -105,6 +105,9 @@ func extractThingsOptions(daemonConfig *config) []things.ContainerThingsManagerO
things.WithConnectionAcknowledgeTimeout(time.Duration(daemonConfig.ThingsConfig.ThingsConnectionConfig.AcknowledgeTimeout)*time.Millisecond),
things.WithConnectionSubscribeTimeout(time.Duration(daemonConfig.ThingsConfig.ThingsConnectionConfig.SubscribeTimeout)*time.Millisecond),
things.WithConnectionUnsubscribeTimeout(time.Duration(daemonConfig.ThingsConfig.ThingsConnectionConfig.UnsubscribeTimeout)*time.Millisecond),
things.WithRootCA(daemonConfig.ThingsConfig.ThingsConnectionConfig.RootCA),
things.WithClientCert(daemonConfig.ThingsConfig.ThingsConnectionConfig.ClientCert),
things.WithClientKey(daemonConfig.ThingsConfig.ThingsConnectionConfig.ClientKey),
)
return thingsOpts
}
Expand Down
14 changes: 13 additions & 1 deletion containerm/daemon/daemon_test.go
Expand Up @@ -370,7 +370,19 @@ func TestSetCommandFlags(t *testing.T) {
"test_flags_things-conn-unsub-timeout": {
flag: "things-conn-unsub-timeout",
expectedType: reflect.Int64.String(),
},
},
"test_flags_things-conn-root-ca": {
flag: "things-conn-root-ca",
expectedType: reflect.String.String(),
},
"test_flags_things-conn-client-cert": {
flag: "things-conn-client-cert",
expectedType: reflect.String.String(),
},
"test_flags_things-conn-client-key": {
flag: "things-conn-client-key",
expectedType: reflect.String.String(),
},
}

for testName, testCase := range tests {
Expand Down
15 changes: 12 additions & 3 deletions containerm/things/things_containers_service_init.go
Expand Up @@ -36,7 +36,10 @@ func newThingsContainerManager(mgr mgr.ContainerManager, eventsMgr events.Contai
connectTimeout time.Duration,
acknowledgeTimeout time.Duration,
subscribeTimeout time.Duration,
unsubscribeTimeout time.Duration) *containerThingsMgr {
unsubscribeTimeout time.Duration,
rootCA string,
clientCert string,
clientKey string) *containerThingsMgr {
thingsMgr := &containerThingsMgr{
storageRoot: storagePath,
mgr: mgr,
Expand All @@ -56,7 +59,10 @@ func newThingsContainerManager(mgr mgr.ContainerManager, eventsMgr events.Contai
WithConnectTimeout(connectTimeout).
WithAcknowledgeTimeout(acknowledgeTimeout).
WithSubscribeTimeout(subscribeTimeout).
WithUnsubscribeTimeout(unsubscribeTimeout)
WithUnsubscribeTimeout(unsubscribeTimeout).
WithRootCA(rootCA).
WithClientCert(clientCert).
WithClientKey(clientKey)

thingsMgr.thingsClient = client.NewClient(thingsClientOpts)
return thingsMgr
Expand Down Expand Up @@ -92,5 +98,8 @@ func registryInit(registryCtx *registry.ServiceRegistryContext) (interface{}, er
tOpts.connectTimeout,
tOpts.acknowledgeTimeout,
tOpts.subscribeTimeout,
tOpts.unsubscribeTimeout), nil
tOpts.unsubscribeTimeout,
tOpts.rootCA,
tOpts.clientCert,
tOpts.clientKey,), nil
}
31 changes: 30 additions & 1 deletion containerm/things/things_containers_service_opts.go
Expand Up @@ -12,7 +12,9 @@

package things

import "time"
import (
"time"
)

// ContainerThingsManagerOpt represents the available configuration options for the ContainerThingsManager service
type ContainerThingsManagerOpt func(thingsOptions *thingsOpts) error
Expand All @@ -29,6 +31,9 @@ type thingsOpts struct {
acknowledgeTimeout time.Duration
subscribeTimeout time.Duration
unsubscribeTimeout time.Duration
rootCA string
clientCert string
clientKey string
}

func applyOptsThings(thingsOpts *thingsOpts, opts ...ContainerThingsManagerOpt) error {
Expand Down Expand Up @@ -127,3 +132,27 @@ func WithConnectionUnsubscribeTimeout(unsubscribeTimeout time.Duration) Containe
return nil
}
}

// WithRootCA configures the CA certificate for TLS communication
func WithRootCA(rootCA string) ContainerThingsManagerOpt {
return func(thingsOptions *thingsOpts) error {
thingsOptions.rootCA = rootCA
return nil
}
}

// WithClientCert configures certificate to authenticate to the MQTT server/broker
func WithClientCert(clientCert string) ContainerThingsManagerOpt {
return func(thingsOptions *thingsOpts) error {
thingsOptions.clientCert = clientCert
return nil
}
}

// WithClientKey configures the private key to authenticate to the MQTT server/broker
func WithClientKey(clientKey string) ContainerThingsManagerOpt {
return func(thingsOptions *thingsOpts) error {
thingsOptions.clientKey = clientKey
return nil
}
}
10 changes: 8 additions & 2 deletions containerm/things/things_containers_service_test.go
Expand Up @@ -48,7 +48,10 @@ func TestThingsContainerServiceConnectWithCredentials(t *testing.T) {
0,
0,
0,
0)
0,
"",
"",
"")
setupThingMock(controller)

listener, err := net.Listen("tcp4", testMQTTBrokerURL)
Expand Down Expand Up @@ -93,7 +96,10 @@ func TestThingsContainerServiceConnectNoCredentials(t *testing.T) {
0,
0,
0,
0)
0,
"",
"",
"")
setupThingMock(controller)

listener, err := net.Listen("tcp4", testMQTTBrokerURL)
Expand Down
5 changes: 4 additions & 1 deletion containerm/things/things_test_base_test.go
Expand Up @@ -84,5 +84,8 @@ func setupThingsContainerManager(controller *gomock.Controller) {
0,
0,
0,
0)
0,
"",
"",
"")
}
3 changes: 3 additions & 0 deletions go.mod
Expand Up @@ -31,6 +31,7 @@ require (
github.com/sirupsen/logrus v1.8.1
github.com/spf13/cobra v1.0.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.0
github.com/vishvananda/netlink v1.1.1-0.20201029203352-d40f9887b852
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
Expand All @@ -52,6 +53,7 @@ require (
github.com/coreos/bbolt v1.3.3 // indirect
github.com/coreos/etcd v3.3.18+incompatible // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deckarep/golang-set v1.8.0 // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-events v0.0.0-20190806004212-e31b211e4f1c // indirect
Expand Down Expand Up @@ -94,6 +96,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/opencontainers/selinux v1.10.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414 // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
Expand Down
33 changes: 33 additions & 0 deletions things/client/client.go
Expand Up @@ -14,11 +14,14 @@ package client

import (
"fmt"
"net/url"
"sync"
"time"

"github.com/eclipse-kanto/container-management/things/api/handlers"
"github.com/eclipse-kanto/container-management/things/api/model"
"github.com/eclipse-kanto/container-management/things/client/config"

MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/google/uuid"
)
Expand Down Expand Up @@ -67,6 +70,10 @@ func (client *Client) Connect() error {
})
}

if err := setupLocalTLS(pahoOpts, client.cfg); err != nil {
return err
}

//create and start a client using the created ClientOptions
client.pahoClient = MQTT.NewClient(pahoOpts)

Expand All @@ -76,6 +83,32 @@ func (client *Client) Connect() error {
return nil
}

func setupLocalTLS(pahoOpts *MQTT.ClientOptions, configuration *Configuration,) error {
u, err := url.Parse(configuration.broker)
if err != nil {
return err
}

if isConnectionSecure(u.Scheme) {
tlsConfig, err := config.NewLocalTLSConfig(configuration.rootCA, configuration.clientCert, configuration.clientKey)
if err != nil {
return err
}
pahoOpts.TLSConfig = tlsConfig
}

return nil
}

func isConnectionSecure(schema string) bool {
switch schema {
case "wss", "ssl", "tls", "mqtts", "mqtt+ssl", "tcps":
return true
default:
}
return false
}

// Disconnect unsubscribes and disconects the client
func (client *Client) Disconnect() {
if client.pahoClient.IsConnectionOpen() {
Expand Down
35 changes: 35 additions & 0 deletions things/client/client_config.go
Expand Up @@ -39,6 +39,9 @@ type Configuration struct {
unsubscribeTimeout time.Duration
initHook InitializedHook
thingsRegistryChangedHandler handlers.ThingsRegistryChangedHandler
rootCA string
clientCert string
clientKey string
}

// NewConfiguration creates a new Configuration instance
Expand Down Expand Up @@ -134,6 +137,21 @@ func (cfg *Configuration) RegistryChangedHandler() handlers.ThingsRegistryChange
return cfg.thingsRegistryChangedHandler
}

// RootCA provides the currently configured CA Certificate
func (cfg *Configuration) RootCA() string {
return cfg.rootCA
}

// ClientCert provides the currently configured certificate used to authenticate to the MQTT server/broker
func (cfg *Configuration) ClientCert() string {
return cfg.clientCert
}

// Cert provides the currently configured key used to authenticate to the MQTT server/broker
func (cfg *Configuration) ClientKey() string {
return cfg.clientKey
}

// WithBroker configures the MQTT's broker the Client to connect to
func (cfg *Configuration) WithBroker(broker string) *Configuration {
cfg.broker = broker
Expand Down Expand Up @@ -229,3 +247,20 @@ func (cfg *Configuration) WithUnsubscribeTimeout(unsubscribeTimeout time.Duratio
cfg.unsubscribeTimeout = unsubscribeTimeout
return cfg
}

// WithRootCA configures the CA certificate for TLS communication
func (cfg *Configuration) WithRootCA(rootCA string) *Configuration {
cfg.rootCA = rootCA
return cfg
}
// WithClientCert configures certificate to authenticate to the MQTT server/broker
func (cfg *Configuration) WithClientCert(cert string) *Configuration {
cfg.clientCert = cert
return cfg
}

// WithClientKey configures the private key to authenticate to the MQTT server/broker
func (cfg *Configuration) WithClientKey(clientKey string) *Configuration {
cfg.clientKey = clientKey
return cfg
}
27 changes: 27 additions & 0 deletions things/client/config/testdata/certificate.pem
@@ -0,0 +1,27 @@
-----BEGIN CERTIFICATE-----
MIIEjzCCAncCFAEWn/QU1vzg07IeNzhaX/6pNlTeMA0GCSqGSIb3DQEBCwUAMIGD
MQswCQYDVQQGEwJVUzETMBEGA1UECAwKQ2FsaWZvcm5pYTEWMBQGA1UEBwwNU2Fu
IEZyYW5jaXNjbzEVMBMGA1UECgwMRXhhbXBsZSBJbmMuMRYwFAYDVQQLDA1JVCBE
ZXBhcnRtZW50MRgwFgYDVQQDDA93d3cuZXhhbXBsZS5jb20wHhcNMjEwNTEzMDg0
NTU3WhcNMjExMDEwMDg0NTU3WjCBgzELMAkGA1UEBhMCVVMxFjAUBgNVBAgMDU1h
c3NhY2h1c2V0dHMxDzANBgNVBAcMBkJvc3RvbjEVMBMGA1UECgwMRXhhbXBsZSBJ
bmMuMRYwFAYDVQQLDA1IUiBEZXBhcnRtZW50MRwwGgYDVQQDDBN3d3cuZXhhbXBs
ZS1pbmMuY29tMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEApmF7J4WB
BujYYNt86YHSWrv3ffX3Odt57y4kqhWuMM8VcA2RXTlJO3SXX8xLF/+lsaZZJmfg
xR1tJ0hKfBYt78H03bjrylWLOQoRqlyVuz0SF3ueR9vx3mPIO0F0E3Q2mC4SLHbr
5kW4aj3/NszLzgvZPbOchfcktdCd1vME+pM8lPPY6z8qWJzlWjOYdymWUV8z9qlA
c3VCnYQ+UwJY51vTcMPpalByrMPWkiNic+9Onl8KHCik27vNMIVVLYLa6763UwU8
qCKT+jZj6nlCT8w5oqoJNsaSd/EGGFtGR+qZj6V2TrsI2cNknyY7Qf/QN0+zH0nm
XsOxK6jW8q7RGwIDAQABMA0GCSqGSIb3DQEBCwUAA4ICAQBe/kcT2L54PxZqb3GU
liwYJGjB+9fkqTyMwglt8dAm3it9F/POyXtoKB8a1AuaZ/FJlR+AUOFv+f3i0ZnE
Ek0OAsllVPclv7HhywD1HzrbLh0PreGsBnYgyrW7qZKAfevus0U0GrjhcrY7zCoA
EBFWWqcWqhRCFXYwgI13ZNLhYl7r+NIWLza1bPcnWVfY2g19/nctR53ZFFiVkvlk
FCYGat0SPQWvjFIKaCNQQL6IZSxqk95W87kEWrac9A+bQzENpWLfwu86O5r6vKNK
pHmd47Sy8hyhf75/SEOQuWBEkgT+sPXU7TykvFB8kzO1Wmsz7D2/d5pvkjZF31dQ
m4ZHIuclPOETtAwiY13dI94vAhgruK0FRFn7jyfePn20CFqUCOO9cEQytysCO/n7
4xJPbIVcvUO825Kbos71OWfNkLEi1tlLkFpe73/rSXnZRWweqAThrY7jxGxhveI4
iYrOOYEqGdM6VfLvVhYXhsc/MDqiqJLdbhQAS/lE8bJrnVnVRYnnb0ExfNTwqVU3
8YZB6JbT+j9556c675j0sfa0J8qgDRHsWR7EG5u5wENnDH/s142dYcjJ5S6tCumM
K/GExzmrJFHmLfqKTSxAquMYvDVufICcklL067DacRJKHkyx5KvKkgc3R8rTaX2o
D+0ioElFJXVQ7rULiWzxZs+Gdw==
-----END CERTIFICATE-----
27 changes: 27 additions & 0 deletions things/client/config/testdata/key.pem
@@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEApmF7J4WBBujYYNt86YHSWrv3ffX3Odt57y4kqhWuMM8VcA2R
XTlJO3SXX8xLF/+lsaZZJmfgxR1tJ0hKfBYt78H03bjrylWLOQoRqlyVuz0SF3ue
R9vx3mPIO0F0E3Q2mC4SLHbr5kW4aj3/NszLzgvZPbOchfcktdCd1vME+pM8lPPY
6z8qWJzlWjOYdymWUV8z9qlAc3VCnYQ+UwJY51vTcMPpalByrMPWkiNic+9Onl8K
HCik27vNMIVVLYLa6763UwU8qCKT+jZj6nlCT8w5oqoJNsaSd/EGGFtGR+qZj6V2
TrsI2cNknyY7Qf/QN0+zH0nmXsOxK6jW8q7RGwIDAQABAoIBAH09m6qgQAOnellO
XrSW2HUcUKwsXjDbGOoF3et57mknOIfkbqux14I9vUSLT2t9MIiNI0ZZo0Q9ZlDP
heHqACId6eiMrlDcG7SP88Q9dShATEII95g349T3X13bYzjRndbntx5pViE8EhlH
GblyZ2duW9SqQwREiQmjQ2zt+a1zuKUAGdysS+4101UHUj6tC/RjiNN/TCXXRJIX
GOJ4WFLY+f2bXgSmqbK7wqN9nPxmxl/+bv4hO32Gsv06ejuy/6+GFJZa+n3ASU2r
/ptr4vgCK+t6I0OWVTpvYUboEwAXam4JfAu12zLtczfXVFJjzklUvSClIG9aJ6DP
2B3LEuECgYEA1RenCuVl6sM2o858X8iOLTMuCZWX5VFKY1+vvLwB/+CBfy7/dDYw
lbv+xaots0rY9Wn784ewi9zJbdnXE1YNgj0utIMHzylXvTolnDYsoO7SYpIwqtpa
PyzPcAV3Khkd5LGe1hf9VmOJfTF/563ztLXip0HUeIvzgB/maqfAQW8CgYEAx+H4
GZ3ycdL03x7Zvp5g5yDPZGvxqVIEIFmliagEFyBgqogbXuOvsvZDTg+gKV5/QyVg
FWokz5VtC6U9UcWfF7LJses/Hsedh9IeICd9UIzkey8UmS2mDBeGfTHVTNuczml5
VzmTK8jGUO5aRVRyOt0tqVf6Oozo8ImdI1fOPRUCgYBI4p4wC+agNcUqoiXIXUDE
FQ1aGeCqfvOCqefiFixY6OFiLyERDrfvfy3VTi/zc1ZiGq4izfaE4C/Fcw0tf/F+
6o5fD7JMGUf5YTocBCufoBA1xur+hVD46srI9hWcQJsI7ff2Ip50Pfd46sVk6QrC
dLPhoZKa6MOQv1iAgoAv4QKBgQDAfiO6N9vmNizQOxujcU8NBxHzOekvEOccaHj9
Cqt1wh6V3CHPziHEjVjf8jhh3rlcZsATn3b32oV7c5SMDW9bGTkYeN7+u2pABOAy
QxVx312iLAMASW/hsT45jyZFsDFgrz7F+5J51g72nbSdk+e2PI7eyPUYMd+a1kxY
XxUkyQKBgBde+jg2UjGHAfW6ZWSPHWvi74oD42VzbJg6/o/KFaZGEyHdKWgrwBW7
8wNyNTMOMSHyuvtMnc5oLd492aO2a+yurgzt7yIelVuphSKda0wd/0PljKb0lwlb
VzldR4bXolbyjNXky1KWmvSNkmbPFNGSCeyHmKQgSLXKGuWbj4ic
-----END RSA PRIVATE KEY-----

0 comments on commit edd57a6

Please sign in to comment.