Skip to content

Commit

Permalink
feat(metadata)!: replace REST device validation callback with Message…
Browse files Browse the repository at this point in the history
…Bus (#4366)

* feat(metadata)!: replace REST device validation callback with MessageBus

BREAKING CHANGE: adding new device requires device service running

closes #4236

Signed-off-by: Chris Hung <chris@iotechsys.com>
  • Loading branch information
Chris Hung committed Feb 22, 2023
1 parent 1e6b5b5 commit 15b3f24
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 132 deletions.
4 changes: 2 additions & 2 deletions internal/core/metadata/application/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func AddDevice(d models.Device, ctx context.Context, dic *di.Container) (id stri
dbClient := container.DBClientFrom(dic.Get)
lc := bootstrapContainer.LoggingClientFrom(dic.Get)

err := validateDeviceCallback(ctx, dic, dtos.FromDeviceModelToDTO(d))
err := validateDeviceCallback(dtos.FromDeviceModelToDTO(d), dic)
if err != nil {
return "", errors.NewCommonEdgeXWrapper(err)
}
Expand Down Expand Up @@ -134,7 +134,7 @@ func PatchDevice(dto dtos.UpdateDevice, ctx context.Context, dic *di.Container)
requests.ReplaceDeviceModelFieldsWithDTO(&device, dto)

deviceDTO := dtos.FromDeviceModelToDTO(device)
err = validateDeviceCallback(ctx, dic, deviceDTO)
err = validateDeviceCallback(deviceDTO, dic)
if err != nil {
return errors.NewCommonEdgeXWrapper(err)
}
Expand Down
49 changes: 22 additions & 27 deletions internal/core/metadata/application/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ package application
import (
"context"
"encoding/json"
"net/http"
"fmt"
"time"

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v3/di"
clients "github.com/edgexfoundry/go-mod-core-contracts/v3/clients/http"
"github.com/edgexfoundry/go-mod-core-contracts/v3/clients/interfaces"
"github.com/edgexfoundry/go-mod-core-contracts/v3/common"
"github.com/edgexfoundry/go-mod-core-contracts/v3/dtos"
"github.com/edgexfoundry/go-mod-core-contracts/v3/dtos/requests"
Expand All @@ -23,37 +22,33 @@ import (
"github.com/edgexfoundry/edgex-go/internal/core/metadata/container"
)

func newDeviceServiceCallbackClient(ctx context.Context, dic *di.Container, deviceServiceName string) (interfaces.DeviceServiceCallbackClient, errors.EdgeX) {
ds, err := DeviceServiceByName(deviceServiceName, ctx, dic)
if err != nil {
return nil, errors.NewCommonEdgeXWrapper(err)
}
return clients.NewDeviceServiceCallbackClient(ds.BaseAddress), nil
}

// validateDeviceCallback invoke device service's validation function for validating new or updated device
func validateDeviceCallback(ctx context.Context, dic *di.Container, device dtos.Device) errors.EdgeX {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
deviceServiceCallbackClient, err := newDeviceServiceCallbackClient(ctx, dic, device.ServiceName)
func validateDeviceCallback(device dtos.Device, dic *di.Container) errors.EdgeX {
configuration := container.ConfigurationFrom(dic.Get)
messagingClient := bootstrapContainer.MessagingClientFrom(dic.Get)

requestTimeout, err := time.ParseDuration(configuration.Service.RequestTimeout)
if err != nil {
lc.Errorf("fail to create a device service callback client by serviceName %s, err: %v", device.ServiceName, err)
return err
return errors.NewCommonEdgeX(errors.KindServerError, "failed to parse service.RequestTimeout", err)
}

// reusing AddDeviceRequest here as it contains the protocols field and opens up
// to other validation beyond protocols if ever needed
request := requests.NewAddDeviceRequest(device)
_, err = deviceServiceCallbackClient.ValidateDeviceCallback(ctx, request)
addDeviceRequest := requests.NewAddDeviceRequest(device)
requestBytes, err := json.Marshal(addDeviceRequest)
if err != nil {
// TODO: reconsider the validity in v3
// allow this case for the backward-compatability in v2
if err.Code() == http.StatusServiceUnavailable {
lc.Warnf("Skipping device validation for device %s (device service %s unavailable)", device.Name, device.ServiceName)
} else if err.Code() == http.StatusNotFound {
lc.Warnf("Skipping device validation for device %s (device service %s < v2.2)", device.Name, device.ServiceName)
} else {
return errors.NewCommonEdgeX(errors.KindServerError, "device validation failed", err)
}
return errors.NewCommonEdgeX(errors.KindServerError, "failed to JSON encoding AddDeviceRequest", err)
}

baseTopic := configuration.MessageBus.GetBaseTopicPrefix()
requestTopic := common.BuildTopic(baseTopic, device.ServiceName, common.ValidateDeviceSubscribeTopic)
responseTopicPrefix := common.BuildTopic(baseTopic, common.ResponseTopic, device.ServiceName)
requestEnvelope := types.NewMessageEnvelopeForRequest(requestBytes, nil)
res, err := messagingClient.Request(requestEnvelope, requestTopic, responseTopicPrefix, requestTimeout)
if err != nil {
return errors.NewCommonEdgeX(errors.KindServiceUnavailable, fmt.Sprintf("Error sending request to topic '%s'", requestTopic), err)
} else if res.ErrorCode == 1 {
return errors.NewCommonEdgeX(errors.KindServerError, fmt.Sprintf("Device %s validation failed: %s", device.Name, res.Payload), nil)
}

return nil
Expand Down
Loading

0 comments on commit 15b3f24

Please sign in to comment.