Skip to content

Commit

Permalink
Merge branch 'master' into jwks-ca
Browse files Browse the repository at this point in the history
  • Loading branch information
ItalyPaleAle committed Jan 12, 2024
2 parents 1535370 + 38b0511 commit 3106a14
Show file tree
Hide file tree
Showing 20 changed files with 1,119 additions and 855 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
diff --git a/tests/integration/suite/actors/healthz/healthz.go b/tests/integration/suite/actors/healthz/healthz.go
index 00142601d..528e91117 100644
--- a/tests/integration/suite/actors/healthz/healthz.go
+++ b/tests/integration/suite/actors/healthz/healthz.go
@@ -39,15 +39,17 @@ func init() {
// initerror tests that Daprd will block actor calls until actors have been
// initialized.
type initerror struct {
- daprd *daprd.Daprd
- place *placement.Placement
- configCalled chan struct{}
- blockConfig chan struct{}
+ daprd *daprd.Daprd
+ place *placement.Placement
+ configCalled chan struct{}
+ blockConfig chan struct{}
+ healthzCalled chan struct{}
}

func (i *initerror) Setup(t *testing.T) []framework.Option {
i.configCalled = make(chan struct{})
i.blockConfig = make(chan struct{})
+ i.healthzCalled = make(chan struct{})

handler := http.NewServeMux()
handler.HandleFunc("/dapr/config", func(w http.ResponseWriter, r *http.Request) {
@@ -55,6 +57,10 @@ func (i *initerror) Setup(t *testing.T) []framework.Option {
<-i.blockConfig
w.Write([]byte(`{"entities": ["myactortype"]}`))
})
+ handler.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(http.StatusOK)
+ close(i.healthzCalled)
+ })
handler.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`OK`))
})
@@ -119,6 +125,12 @@ func (i *initerror) Run(t *testing.T, ctx context.Context) {

close(i.blockConfig)

+ select {
+ case <-i.healthzCalled:
+ case <-time.After(time.Second * 15):
+ t.Fatal("timed out waiting for healthz call")
+ }
+
req, err = http.NewRequestWithContext(ctx, http.MethodPost, daprdURL, nil)
require.NoError(t, err)
resp, err = client.Do(req)
diff --git a/tests/integration/suite/actors/http/ttl.go b/tests/integration/suite/actors/http/ttl.go
index 47dbd8ff6..5135d424c 100644
--- a/tests/integration/suite/actors/http/ttl.go
+++ b/tests/integration/suite/actors/http/ttl.go
@@ -21,6 +21,7 @@ import (
"path/filepath"
"strconv"
"strings"
+ "sync"
"testing"
"time"

@@ -40,11 +41,13 @@ func init() {
}

type ttl struct {
- daprd *daprd.Daprd
- place *placement.Placement
+ daprd *daprd.Daprd
+ place *placement.Placement
+ healthzCalled chan struct{}
}

func (l *ttl) Setup(t *testing.T) []framework.Option {
+ l.healthzCalled = make(chan struct{})
configFile := filepath.Join(t.TempDir(), "config.yaml")
require.NoError(t, os.WriteFile(configFile, []byte(`
apiVersion: dapr.io/v1alpha1
@@ -61,6 +64,13 @@ spec:
handler.HandleFunc("/dapr/config", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"entities": ["myactortype"]}`))
})
+ var once sync.Once
+ handler.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(http.StatusOK)
+ once.Do(func() {
+ close(l.healthzCalled)
+ })
+ })
handler.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`OK`))
})
@@ -93,6 +103,12 @@ func (l *ttl) Run(t *testing.T, ctx context.Context) {
l.place.WaitUntilRunning(t, ctx)
l.daprd.WaitUntilRunning(t, ctx)

+ select {
+ case <-l.healthzCalled:
+ case <-time.After(time.Second * 15):
+ t.Fatal("timed out waiting for healthz call")
+ }
+
client := util.HTTPClient(t)

daprdURL := "http://localhost:" + strconv.Itoa(l.daprd.HTTPPort())
diff --git a/tests/integration/suite/actors/reminders/rebalancing.go b/tests/integration/suite/actors/reminders/rebalancing.go
index d64e73457..53f8e4fa6 100644
--- a/tests/integration/suite/actors/reminders/rebalancing.go
+++ b/tests/integration/suite/actors/reminders/rebalancing.go
@@ -394,6 +394,7 @@ func (i *rebalancing) reportStatusToPlacement(ctx context.Context, stream placem
Port: 1234,
Entities: entities,
Id: "invalidapp",
+ ApiLevel: 10,
})
if err != nil {
return fmt.Errorf("failed to send message: %w", err)
diff --git a/tests/integration/suite/placement/quorum/insecure.go b/tests/integration/suite/placement/quorum/insecure.go
index d0531bed4..df526cb29 100644
--- a/tests/integration/suite/placement/quorum/insecure.go
+++ b/tests/integration/suite/placement/quorum/insecure.go
@@ -124,7 +124,10 @@ func (i *insecure) Run(t *testing.T, ctx context.Context) {
if err != nil {
return false
}
- err = stream.Send(new(v1pb.Host))
+ err = stream.Send(&v1pb.Host{
+ Id: "app-1",
+ ApiLevel: 10,
+ })
if err != nil {
return false
}
@@ -133,7 +136,7 @@ func (i *insecure) Run(t *testing.T, ctx context.Context) {
return false
}
return true
- }, time.Second*10, time.Millisecond*100)
+ }, time.Second*30, time.Millisecond*100)

err = stream.Send(&v1pb.Host{
Name: "app-1",
diff --git a/tests/integration/suite/placement/quorum/jwks.go b/tests/integration/suite/placement/quorum/jwks.go
index 2d555299c..4ef55eb5c 100644
--- a/tests/integration/suite/placement/quorum/jwks.go
+++ b/tests/integration/suite/placement/quorum/jwks.go
@@ -169,7 +169,10 @@ func (j *jwks) Run(t *testing.T, ctx context.Context) {
if err != nil {
return false
}
- err = stream.Send(new(v1pb.Host))
+ err = stream.Send(&v1pb.Host{
+ Id: "app-1",
+ ApiLevel: 10,
+ })
if err != nil {
return false
}
1 change: 1 addition & 0 deletions .github/workflows/version-skew.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ jobs:

- name: Build & download binaries
run: |
go mod tidy -v
make build
mkdir -p downloads && cd downloads
curl -so daprd_linux_amd64.tar.gz -L https://github.com/dapr/dapr/releases/download/v${{ env.DAPR_PREV_VERSION }}/daprd_linux_amd64.tar.gz
Expand Down
181 changes: 181 additions & 0 deletions cmd/daprd/app/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package app

import (
"context"
"fmt"
"os"

"go.uber.org/automaxprocs/maxprocs"

// Register all components
_ "github.com/dapr/dapr/cmd/daprd/components"

"github.com/dapr/dapr/cmd/daprd/options"
"github.com/dapr/dapr/pkg/buildinfo"
bindingsLoader "github.com/dapr/dapr/pkg/components/bindings"
configurationLoader "github.com/dapr/dapr/pkg/components/configuration"
cryptoLoader "github.com/dapr/dapr/pkg/components/crypto"
lockLoader "github.com/dapr/dapr/pkg/components/lock"
httpMiddlewareLoader "github.com/dapr/dapr/pkg/components/middleware/http"
nrLoader "github.com/dapr/dapr/pkg/components/nameresolution"
pubsubLoader "github.com/dapr/dapr/pkg/components/pubsub"
secretstoresLoader "github.com/dapr/dapr/pkg/components/secretstores"
stateLoader "github.com/dapr/dapr/pkg/components/state"
workflowsLoader "github.com/dapr/dapr/pkg/components/workflows"
"github.com/dapr/dapr/pkg/modes"
"github.com/dapr/dapr/pkg/runtime/registry"
"github.com/dapr/dapr/pkg/security"
"github.com/dapr/kit/concurrency"
"github.com/dapr/kit/signals"

"github.com/dapr/dapr/pkg/runtime"
"github.com/dapr/kit/logger"
)

var (
log = logger.NewLogger("dapr.runtime")
logContrib = logger.NewLogger("dapr.contrib")
)

func Run() {
// set GOMAXPROCS
_, _ = maxprocs.Set()

opts := options.New(os.Args[1:])

if opts.RuntimeVersion {
//nolint:forbidigo
fmt.Println(buildinfo.Version())
os.Exit(0)
}

if opts.BuildInfo {
//nolint:forbidigo
fmt.Printf("Version: %s\nGit Commit: %s\nGit Version: %s\n", buildinfo.Version(), buildinfo.Commit(), buildinfo.GitVersion())
os.Exit(0)
}

if opts.WaitCommand {
runtime.WaitUntilDaprOutboundReady(opts.DaprHTTPPort)
os.Exit(0)
}

// Apply options to all loggers.
opts.Logger.SetAppID(opts.AppID)

if err := logger.ApplyOptionsToLoggers(&opts.Logger); err != nil {
log.Fatal(err)
}

log.Infof("Starting Dapr Runtime -- version %s -- commit %s", buildinfo.Version(), buildinfo.Commit())
log.Infof("Log level set to: %s", opts.Logger.OutputLevel)

secretstoresLoader.DefaultRegistry.Logger = logContrib
stateLoader.DefaultRegistry.Logger = logContrib
cryptoLoader.DefaultRegistry.Logger = logContrib
configurationLoader.DefaultRegistry.Logger = logContrib
lockLoader.DefaultRegistry.Logger = logContrib
pubsubLoader.DefaultRegistry.Logger = logContrib
nrLoader.DefaultRegistry.Logger = logContrib
bindingsLoader.DefaultRegistry.Logger = logContrib
workflowsLoader.DefaultRegistry.Logger = logContrib
httpMiddlewareLoader.DefaultRegistry.Logger = log // Note this uses log on purpose

reg := registry.NewOptions().
WithSecretStores(secretstoresLoader.DefaultRegistry).
WithStateStores(stateLoader.DefaultRegistry).
WithConfigurations(configurationLoader.DefaultRegistry).
WithLocks(lockLoader.DefaultRegistry).
WithPubSubs(pubsubLoader.DefaultRegistry).
WithNameResolutions(nrLoader.DefaultRegistry).
WithBindings(bindingsLoader.DefaultRegistry).
WithCryptoProviders(cryptoLoader.DefaultRegistry).
WithHTTPMiddlewares(httpMiddlewareLoader.DefaultRegistry).
WithWorkflows(workflowsLoader.DefaultRegistry)

ctx := signals.Context()
secProvider, err := security.New(ctx, security.Options{
SentryAddress: opts.SentryAddress,
ControlPlaneTrustDomain: opts.ControlPlaneTrustDomain,
ControlPlaneNamespace: opts.ControlPlaneNamespace,
TrustAnchors: opts.TrustAnchors,
AppID: opts.AppID,
MTLSEnabled: opts.EnableMTLS,
Mode: modes.DaprMode(opts.Mode),
})
if err != nil {
log.Fatal(err)
}

err = concurrency.NewRunnerManager(
secProvider.Run,
func(ctx context.Context) error {
sec, serr := secProvider.Handler(ctx)
if serr != nil {
return serr
}

rt, rerr := runtime.FromConfig(ctx, &runtime.Config{
AppID: opts.AppID,
PlacementServiceHostAddr: opts.PlacementServiceHostAddr,
AllowedOrigins: opts.AllowedOrigins,
ResourcesPath: opts.ResourcesPath,
ControlPlaneAddress: opts.ControlPlaneAddress,
AppProtocol: opts.AppProtocol,
Mode: opts.Mode,
DaprHTTPPort: opts.DaprHTTPPort,
DaprInternalGRPCPort: opts.DaprInternalGRPCPort,
DaprAPIGRPCPort: opts.DaprAPIGRPCPort,
DaprAPIListenAddresses: opts.DaprAPIListenAddresses,
DaprPublicPort: opts.DaprPublicPort,
ApplicationPort: opts.AppPort,
ProfilePort: opts.ProfilePort,
EnableProfiling: opts.EnableProfiling,
AppMaxConcurrency: opts.AppMaxConcurrency,
EnableMTLS: opts.EnableMTLS,
SentryAddress: opts.SentryAddress,
DaprHTTPMaxRequestSize: opts.DaprHTTPMaxRequestSize,
UnixDomainSocket: opts.UnixDomainSocket,
DaprHTTPReadBufferSize: opts.DaprHTTPReadBufferSize,
DaprGracefulShutdownSeconds: opts.DaprGracefulShutdownSeconds,
DaprBlockShutdownDuration: opts.DaprBlockShutdownDuration,
DisableBuiltinK8sSecretStore: opts.DisableBuiltinK8sSecretStore,
EnableAppHealthCheck: opts.EnableAppHealthCheck,
AppHealthCheckPath: opts.AppHealthCheckPath,
AppHealthProbeInterval: opts.AppHealthProbeInterval,
AppHealthProbeTimeout: opts.AppHealthProbeTimeout,
AppHealthThreshold: opts.AppHealthThreshold,
AppChannelAddress: opts.AppChannelAddress,
EnableAPILogging: opts.EnableAPILogging,
Config: opts.Config,
Metrics: opts.Metrics,
AppSSL: opts.AppSSL,
ComponentsPath: opts.ComponentsPath,
Registry: reg,
Security: sec,
})
if rerr != nil {
return rerr
}

return rt.Run(ctx)
},
).Run(ctx)
if err != nil {
log.Fatalf("Fatal error from runtime: %s", err)
}
log.Info("Daprd shutdown gracefully")
}

0 comments on commit 3106a14

Please sign in to comment.