Skip to content

Commit

Permalink
feat(bootstrap): Integrate go-mod-bootstrap for common bootstraping
Browse files Browse the repository at this point in the history
closes #264 & #149

Signed-off-by: lenny <leonard.goodell@intel.com>
  • Loading branch information
lenny committed Apr 17, 2020
1 parent 818d309 commit 1034e84
Show file tree
Hide file tree
Showing 36 changed files with 1,461 additions and 1,417 deletions.
139 changes: 0 additions & 139 deletions Attribution.txt

This file was deleted.

1 change: 0 additions & 1 deletion Makefile
Expand Up @@ -7,5 +7,4 @@ test:
$(GO) vet ./...
gofmt -l .
[ "`gofmt -l .`" = "" ]
./bin/test-attribution-txt.sh
./bin/test-go-mod-tidy.sh
8 changes: 6 additions & 2 deletions appcontext/context.go
@@ -1,5 +1,5 @@
//
// Copyright (c) 2019 Intel Corporation
// Copyright (c) 2020 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,7 +48,7 @@ type Context struct {
// OutputData is used for specifying the data that is to be outputted. Leverage the .Complete() function to set.
OutputData []byte
// This holds the configuration for your service. This is the preferred way to access your custom application settings that have been set in the configuration.
Configuration common.ConfigurationStruct
Configuration *common.ConfigurationStruct
// LoggingClient is exposed to allow logging following the preferred logging strategy within EdgeX.
LoggingClient logger.LoggingClient
// EventClient exposes Core Data's EventClient API
Expand Down Expand Up @@ -99,6 +99,10 @@ func (context *Context) SetRetryData(payload []byte) {
// CoreServices then your deviceName and readingName must exist in the CoreMetadata and be properly registered in EdgeX.
func (context *Context) PushToCoreData(deviceName string, readingName string, value interface{}) (*models.Event, error) {
context.LoggingClient.Debug("Pushing to CoreData")
if context.EventClient == nil {
return nil, fmt.Errorf("unable to Push To CoreData: '%s' is missing from Clients configuration", common.CoreDataClientName)
}

now := time.Now().UnixNano()
val, err := util.CoerceType(value)
if err != nil {
Expand Down
185 changes: 185 additions & 0 deletions appsdk/configupdates.go
@@ -0,0 +1,185 @@
//
// Copyright (c) 2020 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package appsdk

import (
"context"
"fmt"
"reflect"
"sync"

"github.com/edgexfoundry/go-mod-bootstrap/bootstrap/config"

"github.com/edgexfoundry/app-functions-sdk-go/internal/bootstrap/handlers"
)

// ConfigUpdateProcessor contains the data need to process configuration updates
type ConfigUpdateProcessor struct {
sdk *AppFunctionsSDK
}

// NewConfigUpdateProcessor creates a new ConfigUpdateProcessor which process configuration updates triggered from
// the Configuration Provider
func NewConfigUpdateProcessor(sdk *AppFunctionsSDK) *ConfigUpdateProcessor {
return &ConfigUpdateProcessor{sdk: sdk}
}

// WaitForConfigUpdates waits for signal that configuration has been updated (triggered from by Configuration Provider)
// and then determines what was updated and does any special processing, if needed, for the updates.
func (processor *ConfigUpdateProcessor) WaitForConfigUpdates(configUpdated config.UpdatedStream) {
sdk := processor.sdk
sdk.appWg.Add(1)

go func() {
defer sdk.appWg.Done()

sdk.LoggingClient.Info("Waiting for App Service configuration updates...")

previousWriteable := sdk.config.Writable

for {
select {
case <-sdk.appCtx.Done():
sdk.LoggingClient.Info("Exiting waiting for App Service configuration updates")
return

case <-configUpdated:
currentWritable := sdk.config.Writable
// Verify something actually changed, if not no need to process anything.
// This can happen on initial startup
if reflect.DeepEqual(currentWritable, previousWriteable) {
sdk.LoggingClient.Info("No actual configuration changes detected. Skipping processing updates.")
continue
}

sdk.LoggingClient.Info("Processing App Service configuration updates")

// Note: Updates occur one setting at a time so only have to look for single changes
switch {
case previousWriteable.LogLevel != currentWritable.LogLevel:
_ = sdk.LoggingClient.SetLogLevel(currentWritable.LogLevel)
sdk.LoggingClient.Info(fmt.Sprintf("Logging level changed to %s", currentWritable.LogLevel))

case !reflect.DeepEqual(previousWriteable.InsecureSecrets, currentWritable.InsecureSecrets):
sdk.LoggingClient.Info("Insecure Secrets have been updated")

case previousWriteable.StoreAndForward.MaxRetryCount != currentWritable.StoreAndForward.MaxRetryCount:
if currentWritable.StoreAndForward.MaxRetryCount < 0 {
sdk.LoggingClient.Warn(
fmt.Sprintf("StoreAndForward MaxRetryCount can not be less than 0, defaulting to 1"))
currentWritable.StoreAndForward.MaxRetryCount = 1
}
sdk.LoggingClient.Info(
fmt.Sprintf(
"StoreAndForward MaxRetryCount changed to %d",
currentWritable.StoreAndForward.MaxRetryCount))

case previousWriteable.StoreAndForward.RetryInterval != currentWritable.StoreAndForward.RetryInterval:
processor.processConfigChangedStoreForwardRetryInterval()

case previousWriteable.StoreAndForward.Enabled != currentWritable.StoreAndForward.Enabled:
processor.processConfigChangedStoreForwardEnabled()

case !reflect.DeepEqual(previousWriteable.Pipeline, currentWritable.Pipeline):
processor.processConfigChangedPipeline()

default:
// Nothing of interest changed
sdk.LoggingClient.Info("No configuration changes detected that require special processing.")
}

// grab new copy of the writeable configuration for comparing against when next update occurs
previousWriteable = currentWritable
}
}
}()
}

// processConfigChangedStoreForwardRetryInterval handles when the Store and Forward RetryInterval setting has been updated
func (processor *ConfigUpdateProcessor) processConfigChangedStoreForwardRetryInterval() {
sdk := processor.sdk

if sdk.config.Writable.StoreAndForward.Enabled {
sdk.stopStoreForward()
sdk.startStoreForward()
}
}

// processConfigChangedStoreForwardEnabled handles when the Store and Forward Enabled setting has been updated
func (processor *ConfigUpdateProcessor) processConfigChangedStoreForwardEnabled() {
sdk := processor.sdk

if sdk.config.Writable.StoreAndForward.Enabled {
// StoreClient must be set up for StoreAndForward
if sdk.storeClient == nil {
var err error
sdk.storeClient, err = handlers.InitializeStoreClient(sdk.secretProvider, sdk.config)
if err != nil {
// Error already logged
sdk.config.Writable.StoreAndForward.Enabled = false
return
}

sdk.runtime.Initialize(sdk.storeClient, sdk.secretProvider)
}

sdk.startStoreForward()
} else {
sdk.stopStoreForward()
}
}

// processConfigChangedPipeline handles when any of the Pipeline settings have been updated
func (processor *ConfigUpdateProcessor) processConfigChangedPipeline() {
sdk := processor.sdk

if sdk.usingConfigurablePipeline {
transforms, err := sdk.LoadConfigurablePipeline()
if err != nil {
sdk.LoggingClient.Error("unable to reload Configurable Pipeline from Registry: " + err.Error())
return
}
err = sdk.SetFunctionsPipeline(transforms...)
if err != nil {
sdk.LoggingClient.Error("unable to set Configurable Pipeline from Registry: " + err.Error())
return
}

sdk.LoggingClient.Info("Reloaded Configurable Pipeline from Registry")
}
}

// startStoreForward starts the Store and Forward processing
func (sdk *AppFunctionsSDK) startStoreForward() {
var storeForwardEnabledCtx context.Context
sdk.storeForwardWg = &sync.WaitGroup{}
storeForwardEnabledCtx, sdk.storeForwardCancelCtx = context.WithCancel(context.Background())
sdk.runtime.StartStoreAndForward(
sdk.appWg,
sdk.appCtx,
sdk.storeForwardWg,
storeForwardEnabledCtx,
sdk.ServiceKey,
sdk.config,
sdk.edgexClients)
}

// stopStoreForward stops the Store and Forward processing
func (sdk *AppFunctionsSDK) stopStoreForward() {
sdk.LoggingClient.Info("Canceling Store and Forward retry loop")
sdk.storeForwardCancelCtx()
sdk.storeForwardWg.Wait()
}

0 comments on commit 1034e84

Please sign in to comment.