Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor webhooks package #7040

Merged
merged 9 commits into from Apr 19, 2024
3 changes: 3 additions & 0 deletions api/ttn/lorawan/v3/applicationserver.proto
Expand Up @@ -102,6 +102,9 @@ message AsConfiguration {
message Webhooks {
int64 unhealthy_attempts_threshold = 1;
google.protobuf.Duration unhealthy_retry_interval = 2;

reserved 3;
reserved "queue";
}
Webhooks webhooks = 2;
}
Expand Down
5 changes: 4 additions & 1 deletion api/ttn/lorawan/v3/applicationserver_web.proto
Expand Up @@ -224,7 +224,10 @@ message ApplicationWebhook {

google.protobuf.FieldMask field_mask = 21;

// next: 23
reserved 24;
reserved "queue";

// next: 25
}

message ApplicationWebhooks {
Expand Down
38 changes: 19 additions & 19 deletions config/messages.json
Expand Up @@ -2681,6 +2681,24 @@
"file": "registry.go"
}
},
"error:pkg/applicationserver/io/web/sink:request": {
"translations": {
"en": "request"
},
"description": {
"package": "pkg/applicationserver/io/web/sink",
"file": "http.go"
}
},
"error:pkg/applicationserver/io/web/sink:webhook_disabled": {
"translations": {
"en": "webhook disabled"
},
"description": {
"package": "pkg/applicationserver/io/web/sink",
"file": "health.go"
}
},
"error:pkg/applicationserver/io/web:decode_body": {
"translations": {
"en": "decode body"
Expand Down Expand Up @@ -2735,15 +2753,6 @@
"file": "webhooks.go"
}
},
"error:pkg/applicationserver/io/web:request": {
"translations": {
"en": "request"
},
"description": {
"package": "pkg/applicationserver/io/web",
"file": "webhooks.go"
}
},
"error:pkg/applicationserver/io/web:template_not_found": {
"translations": {
"en": "template `{template_id}` not found"
Expand All @@ -2762,15 +2771,6 @@
"file": "webhooks.go"
}
},
"error:pkg/applicationserver/io/web:webhook_disabled": {
"translations": {
"en": "webhook disabled"
},
"description": {
"package": "pkg/applicationserver/io/web",
"file": "health_sink.go"
}
},
"error:pkg/applicationserver/io/web:webhook_not_found": {
"translations": {
"en": "webhook not found"
Expand Down Expand Up @@ -9994,7 +9994,7 @@
"en": "fail to send webhook"
},
"description": {
"package": "pkg/applicationserver/io/web",
"package": "pkg/applicationserver/io/web/sink",
"file": "observability.go"
}
},
Expand Down
8 changes: 2 additions & 6 deletions pkg/applicationserver/applicationserver.go
Expand Up @@ -1499,14 +1499,10 @@ func (as *ApplicationServer) decryptDownlinkMessage(ctx context.Context, ids *tt
return as.decryptAndDecodeDownlink(ctx, dev, msg, link.DefaultFormatters)
}

type ctxConfigKeyType struct{}

// GetConfig returns the Application Server config based on the context.
func (as *ApplicationServer) GetConfig(ctx context.Context) (*Config, error) {
if val, ok := ctx.Value(&ctxConfigKeyType{}).(*Config); ok {
return val, nil
}
return as.config, nil
c := *as.config
return &c, nil
}

// GetMQTTConfig returns the MQTT frontend configuration based on the context.
Expand Down
13 changes: 6 additions & 7 deletions pkg/applicationserver/config.go
Expand Up @@ -26,6 +26,7 @@ import (
loracloudgeolocationv3 "go.thethings.network/lorawan-stack/v3/pkg/applicationserver/io/packages/loragls/v3"
"go.thethings.network/lorawan-stack/v3/pkg/applicationserver/io/pubsub"
"go.thethings.network/lorawan-stack/v3/pkg/applicationserver/io/web"
"go.thethings.network/lorawan-stack/v3/pkg/applicationserver/io/web/sink"
"go.thethings.network/lorawan-stack/v3/pkg/applicationserver/lastseen"
"go.thethings.network/lorawan-stack/v3/pkg/applicationserver/metadata"
"go.thethings.network/lorawan-stack/v3/pkg/component"
Expand Down Expand Up @@ -242,7 +243,7 @@ type ApplicationPackagesConfig struct {
// NewWebhooks returns a new web.Webhooks based on the configuration.
// If Target is empty, this method returns nil.
func (c WebhooksConfig) NewWebhooks(ctx context.Context, server io.Server) (web.Webhooks, error) {
var sink web.Sink
var target sink.Sink
switch c.Target {
case "":
return nil, nil
Expand All @@ -252,9 +253,7 @@ func (c WebhooksConfig) NewWebhooks(ctx context.Context, server io.Server) (web.
return nil, err
}
client.Timeout = c.Timeout
sink = &web.HTTPClientSink{
Client: client,
}
target = sink.NewHTTPClientSink(client)
default:
return nil, errWebhooksTarget.WithAttributes("target", c.Target)
}
Expand All @@ -264,12 +263,12 @@ func (c WebhooksConfig) NewWebhooks(ctx context.Context, server io.Server) (web.
if c.UnhealthyAttemptsThreshold > 0 || c.UnhealthyRetryInterval > 0 {
registry := web.NewHealthStatusRegistry(c.Registry)
registry = web.NewCachedHealthStatusRegistry(registry)
sink = web.NewHealthCheckSink(sink, registry, c.UnhealthyAttemptsThreshold, c.UnhealthyRetryInterval)
target = sink.NewHealthCheckSink(target, registry, c.UnhealthyAttemptsThreshold, c.UnhealthyRetryInterval)
}
if c.QueueSize > 0 || c.Workers > 0 {
sink = web.NewPooledSink(ctx, server, sink, c.Workers, c.QueueSize)
target = sink.NewPooledSink(ctx, server, target, c.Workers, c.QueueSize)
}
return web.NewWebhooks(ctx, server, c.Registry, sink, c.Downlinks)
return web.NewWebhooks(ctx, server, c.Registry, target, c.Downlinks)
}

// NewPubSub returns a new pubsub.PubSub based on the configuration.
Expand Down
37 changes: 37 additions & 0 deletions pkg/applicationserver/io/web/config.go
Expand Up @@ -16,6 +16,7 @@ package web

import (
"context"
"fmt"
"net/url"
"os"

Expand Down Expand Up @@ -82,3 +83,39 @@ type DownlinksConfig struct {
PublicAddress string `name:"public-address" description:"Public address of the HTTP webhooks frontend"`
PublicTLSAddress string `name:"public-tls-address" description:"Public address of the HTTPS webhooks frontend"`
}

// URL returns the URL for the downlink operation.
func (c DownlinksConfig) URL(
_ context.Context,
webhookID *ttnpb.ApplicationWebhookIdentifiers,
devID *ttnpb.EndDeviceIdentifiers,
op string,
) string {
deriv := c
baseURL := deriv.PublicTLSAddress
if baseURL == "" {
baseURL = deriv.PublicAddress
}
return fmt.Sprintf(
"%s/as/applications/%s/webhooks/%s/devices/%s/down/%s",
baseURL,
webhookID.ApplicationIds.ApplicationId,
webhookID.WebhookId,
devID.DeviceId,
op,
)
}

// Domain returns the domain of the public address.
func (c DownlinksConfig) Domain(_ context.Context) string {
deriv := c
baseURL := deriv.PublicTLSAddress
if baseURL == "" {
baseURL = deriv.PublicAddress
}
u, err := url.Parse(baseURL)
if err != nil {
return ""
}
return u.Host
}
91 changes: 91 additions & 0 deletions pkg/applicationserver/io/web/health.go
@@ -0,0 +1,91 @@
// Copyright © 2024 The Things Network Foundation, The Things Industries B.V.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package web

import (
"context"

"go.thethings.network/lorawan-stack/v3/pkg/applicationserver/io/web/internal"
"go.thethings.network/lorawan-stack/v3/pkg/applicationserver/io/web/sink"
"go.thethings.network/lorawan-stack/v3/pkg/ttnpb"
)

type healthStatusRegistry struct {
registry WebhookRegistry
}

// Get implements HealthStatusRegistry.
func (reg *healthStatusRegistry) Get(ctx context.Context) (*ttnpb.ApplicationWebhookHealth, error) {
ids := internal.WebhookIDFromContext(ctx)
web, err := reg.registry.Get(ctx, ids, []string{"health_status"})
if err != nil {
return nil, err
}
return web.HealthStatus, nil
}

// Set implements HealthStatusRegistry.
func (reg *healthStatusRegistry) Set(
ctx context.Context, f func(*ttnpb.ApplicationWebhookHealth) (*ttnpb.ApplicationWebhookHealth, error),
) error {
ids := internal.WebhookIDFromContext(ctx)
_, err := reg.registry.Set(
ctx,
ids,
[]string{"health_status"},
func(wh *ttnpb.ApplicationWebhook) (*ttnpb.ApplicationWebhook, []string, error) {
if wh == nil {
// The webhook has been deleted during execution.
return nil, nil, nil
}
updated, err := f(wh.HealthStatus)
if err != nil {
return nil, nil, err
}
wh.HealthStatus = updated
return wh, []string{"health_status"}, nil
},
)
return err
}

// NewHealthStatusRegistry constructs a HealthStatusRegistry on top of the provided WebhookRegistry.
func NewHealthStatusRegistry(registry WebhookRegistry) sink.HealthStatusRegistry {
return &healthStatusRegistry{registry}
}

type cachedHealthStatusRegistry struct {
registry sink.HealthStatusRegistry
}

// Get implements HealthStatusRegistry.
func (reg *cachedHealthStatusRegistry) Get(ctx context.Context) (*ttnpb.ApplicationWebhookHealth, error) {
if h, ok := internal.WebhookHealthFromContext(ctx); ok {
return h, nil
}
return reg.registry.Get(ctx)
}

// Set implements HealthStatusRegistry.
func (reg *cachedHealthStatusRegistry) Set(
ctx context.Context, f func(*ttnpb.ApplicationWebhookHealth) (*ttnpb.ApplicationWebhookHealth, error),
) error {
return reg.registry.Set(ctx, f)
}

// NewCachedHealthStatusRegistry constructs a HealthStatusRegistry which allows the Get response to be cached.
func NewCachedHealthStatusRegistry(registry sink.HealthStatusRegistry) sink.HealthStatusRegistry {
return &cachedHealthStatusRegistry{registry}
}