Skip to content

Commit

Permalink
feat: Remove deprecated MQTTSend pipeline function (#592)
Browse files Browse the repository at this point in the history
closes #547

Signed-off-by: lenny <leonard.goodell@intel.com>
  • Loading branch information
lenny-goodell committed Dec 10, 2020
1 parent d5e638f commit c9ed7d5
Show file tree
Hide file tree
Showing 7 changed files with 12 additions and 444 deletions.
79 changes: 0 additions & 79 deletions appsdk/configurable.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/edgexfoundry/app-functions-sdk-go/appcontext"
"github.com/edgexfoundry/app-functions-sdk-go/pkg/transforms"
"github.com/edgexfoundry/app-functions-sdk-go/pkg/util"
"github.com/edgexfoundry/go-mod-core-contracts/models"
)

const (
Expand Down Expand Up @@ -338,84 +337,6 @@ func (dynamic AppFunctionsSDKConfigurable) HTTPPutXML(parameters map[string]stri
return dynamic.HTTPPut(parameters)
}

// MQTTSend sends data from the previous function to the specified MQTT broker.
// If no previous function exists, then the event that triggered the pipeline will be used.
// This function is a configuration function and returns a function pointer.
func (dynamic AppFunctionsSDKConfigurable) MQTTSend(parameters map[string]string, addr models.Addressable) appcontext.AppFunction {
var err error
qos := 0
retain := false
autoReconnect := false
// optional string params
cert := parameters[Cert]
key := parameters[Key]
skipVerify := parameters[SkipVerify]

qosVal, ok := parameters[Qos]
if ok {
qos, err = strconv.Atoi(qosVal)
if err != nil {
dynamic.Sdk.LoggingClient.Error("Unable to parse " + Qos + " value")
return nil
}
}
retainVal, ok := parameters[Retain]
if ok {
retain, err = strconv.ParseBool(retainVal)
if err != nil {
dynamic.Sdk.LoggingClient.Error("Unable to parse " + Retain + " value")
return nil
}
}
autoreconnectVal, ok := parameters[AutoReconnect]
if ok {
autoReconnect, err = strconv.ParseBool(autoreconnectVal)
if err != nil {
dynamic.Sdk.LoggingClient.Error("Unable to parse " + AutoReconnect + " value")
return nil
}
}
dynamic.Sdk.LoggingClient.Debug("MQTT Send Parameters", "Address", addr, Qos, qosVal, Retain, retainVal, AutoReconnect, autoreconnectVal, Cert, cert, Key, key)

var pair *transforms.KeyCertPair

if len(cert) > 0 && len(key) > 0 {
pair = &transforms.KeyCertPair{
CertFile: cert,
KeyFile: key,
}
}

// PersistOnError os optional and is false by default.
persistOnError := false
value, ok := parameters[PersistOnError]
if ok {
persistOnError, err = strconv.ParseBool(value)
if err != nil {
dynamic.Sdk.LoggingClient.Error(fmt.Sprintf("Could not parse '%s' to a bool for '%s' parameter", value, PersistOnError), "error", err)
return nil
}
}

mqttConfig := transforms.MqttConfig{}
mqttConfig.Qos = byte(qos)
mqttConfig.Retain = retain
mqttConfig.AutoReconnect = autoReconnect

if skipVerify != "" {
skipCertVerify, err := strconv.ParseBool(skipVerify)
if err != nil {
dynamic.Sdk.LoggingClient.Error(fmt.Sprintf("Could not parse '%s' to a bool for '%s' parameter", skipVerify, SkipVerify), "error", err)
return nil
}

mqttConfig.SkipCertVerify = skipCertVerify
}

sender := transforms.NewMQTTSender(dynamic.Sdk.LoggingClient, addr, pair, mqttConfig, persistOnError)
return sender.MQTTSend
}

// SetOutputData sets the output data to that passed in from the previous function.
// It will return an error and stop the pipeline if data passed in is not of type []byte, string or json.Mashaler
// This function is a configuration function and returns a function pointer.
Expand Down
23 changes: 4 additions & 19 deletions appsdk/configurable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package appsdk
import (
"testing"

"github.com/edgexfoundry/go-mod-core-contracts/models"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -164,20 +163,6 @@ func TestConfigurableHTTPPostXML(t *testing.T) {
assert.NotNil(t, trx, "return result from HTTPPostXML should not be nil")
}

func TestConfigurableMQTTSend(t *testing.T) {
configurable := AppFunctionsSDKConfigurable{
Sdk: &AppFunctionsSDK{
LoggingClient: lc,
},
}

params := make(map[string]string)
addr := models.Addressable{}
params[PersistOnError] = "true"
trx := configurable.MQTTSend(params, addr)
assert.NotNil(t, trx, "return result from MQTTSend should not be nil")
}

func TestConfigurableSetOutputData(t *testing.T) {
configurable := AppFunctionsSDKConfigurable{
Sdk: &AppFunctionsSDK{
Expand Down Expand Up @@ -224,7 +209,7 @@ func TestBatchByCount(t *testing.T) {
params := make(map[string]string)
params[BatchThreshold] = "30"
trx := configurable.BatchByCount(params)
assert.NotNil(t, trx, "return result from MQTTSend should not be nil")
assert.NotNil(t, trx, "return result from BatchByCount should not be nil")
}
func TestBatchByTime(t *testing.T) {
configurable := AppFunctionsSDKConfigurable{
Expand All @@ -236,7 +221,7 @@ func TestBatchByTime(t *testing.T) {
params := make(map[string]string)
params[TimeInterval] = "10"
trx := configurable.BatchByTime(params)
assert.NotNil(t, trx, "return result from MQTTSend should not be nil")
assert.NotNil(t, trx, "return result from BatchByTime should not be nil")
}
func TestBatchByTimeAndCount(t *testing.T) {
configurable := AppFunctionsSDKConfigurable{
Expand All @@ -250,7 +235,7 @@ func TestBatchByTimeAndCount(t *testing.T) {
params[TimeInterval] = "10"

trx := configurable.BatchByTimeAndCount(params)
assert.NotNil(t, trx, "return result from MQTTSend should not be nil")
assert.NotNil(t, trx, "return result from BatchByTimeAndCount should not be nil")
}

func TestJSONLogic(t *testing.T) {
Expand Down Expand Up @@ -286,7 +271,7 @@ func TestConfigurableMQTTSecretSend(t *testing.T) {
params[AuthMode] = "none"

trx := configurable.MQTTSecretSend(params)
assert.NotNil(t, trx, "return result from MQTTSend should not be nil")
assert.NotNil(t, trx, "return result from MQTTSecretSend should not be nil")
}

func TestAppFunctionsSDKConfigurable_AddTags(t *testing.T) {
Expand Down
9 changes: 3 additions & 6 deletions appsdk/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"context"
"errors"
"fmt"
"github.com/edgexfoundry/go-mod-messaging/messaging"
"github.com/edgexfoundry/go-mod-messaging/pkg/types"
nethttp "net/http"
"net/url"
"os"
Expand All @@ -32,9 +30,11 @@ import (
"sync"
"syscall"

"github.com/edgexfoundry/go-mod-messaging/messaging"
"github.com/edgexfoundry/go-mod-messaging/pkg/types"

"github.com/edgexfoundry/go-mod-core-contracts/clients"
"github.com/edgexfoundry/go-mod-core-contracts/clients/logger"
"github.com/edgexfoundry/go-mod-core-contracts/models"
"github.com/edgexfoundry/go-mod-registry/registry"
"github.com/gorilla/mux"

Expand Down Expand Up @@ -270,9 +270,6 @@ func (sdk *AppFunctionsSDK) LoadConfigurablePipeline() ([]appcontext.AppFunction
case reflect.TypeOf(map[string]string{}):
inputParameters[index] = reflect.ValueOf(configuration.Parameters)

case reflect.TypeOf(models.Addressable{}):
inputParameters[index] = reflect.ValueOf(configuration.Addressable)

default:
return nil, fmt.Errorf(
"function %s has an unsupported parameter type: %s",
Expand Down
40 changes: 4 additions & 36 deletions appsdk/sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,22 @@ package appsdk

import (
"fmt"
"github.com/gorilla/mux"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"net/http"
"os"
"reflect"
"testing"

"github.com/edgexfoundry/go-mod-core-contracts/clients/logger"
"github.com/edgexfoundry/go-mod-core-contracts/models"
"github.com/gorilla/mux"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/edgexfoundry/app-functions-sdk-go/appcontext"
"github.com/edgexfoundry/app-functions-sdk-go/internal/common"
"github.com/edgexfoundry/app-functions-sdk-go/internal/runtime"
triggerHttp "github.com/edgexfoundry/app-functions-sdk-go/internal/trigger/http"
"github.com/edgexfoundry/app-functions-sdk-go/internal/trigger/messagebus"
"github.com/edgexfoundry/app-functions-sdk-go/internal/webserver"
"github.com/edgexfoundry/go-mod-core-contracts/clients/logger"
)

var lc logger.LoggingClient
Expand Down Expand Up @@ -267,37 +266,6 @@ func TestLoadConfigurablePipelineNotABuiltInSdkFunction(t *testing.T) {
assert.Nil(t, appFunctions, "expected app functions list to be nil")
}

func TestLoadConfigurablePipelineAddressableConfig(t *testing.T) {
functionName := "MQTTSend"
functions := make(map[string]common.PipelineFunction)
functions[functionName] = common.PipelineFunction{
Parameters: map[string]string{"qos": "0", "autoreconnect": "false"},
Addressable: models.Addressable{
Address: "localhost",
Port: 1883,
Protocol: "tcp",
Publisher: "MyApp",
Topic: "sampleTopic",
},
}

sdk := AppFunctionsSDK{
LoggingClient: lc,
config: &common.ConfigurationStruct{
Writable: common.WritableInfo{
Pipeline: common.PipelineInfo{
ExecutionOrder: functionName,
Functions: functions,
},
},
},
}

appFunctions, err := sdk.LoadConfigurablePipeline()
require.NoError(t, err)
assert.NotNil(t, appFunctions, "expected app functions list to be set")
}

func TestLoadConfigurablePipelineNumFunctions(t *testing.T) {
functions := make(map[string]common.PipelineFunction)
functions["FilterByDeviceName"] = common.PipelineFunction{
Expand Down
4 changes: 1 addition & 3 deletions internal/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

bootstrapConfig "github.com/edgexfoundry/go-mod-bootstrap/config"

"github.com/edgexfoundry/go-mod-core-contracts/models"
"github.com/edgexfoundry/go-mod-messaging/pkg/types"

"github.com/edgexfoundry/app-functions-sdk-go/internal/store/db"
Expand Down Expand Up @@ -127,8 +126,7 @@ type PipelineInfo struct {

type PipelineFunction struct {
// Name string
Parameters map[string]string
Addressable models.Addressable
Parameters map[string]string
}

type StoreAndForwardInfo struct {
Expand Down
Loading

0 comments on commit c9ed7d5

Please sign in to comment.