Skip to content

Commit

Permalink
Test for httptrigger and functions container/newdeploy (#2861)
Browse files Browse the repository at this point in the history
* Test for httptrigger and functions
* Fixes with multierror
* review changes

Signed-off-by: Sanket Sudake <sanketsudake@gmail.com>
  • Loading branch information
sanketsudake committed Oct 27, 2023
1 parent 2eb2eba commit 2223081
Show file tree
Hide file tree
Showing 24 changed files with 292 additions and 166 deletions.
21 changes: 11 additions & 10 deletions cmd/preupgradechecks/checks.go
Expand Up @@ -18,11 +18,10 @@ package main

import (
"context"
"errors"
"fmt"
"strings"

multierror "github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
"go.uber.org/zap"
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
Expand Down Expand Up @@ -52,17 +51,18 @@ const (
)

func makePreUpgradeTaskClient(clientGen crd.ClientGeneratorInterface, logger *zap.Logger) (*PreUpgradeTaskClient, error) {
var err error
fissionClient, err := clientGen.GetFissionClient()
if err != nil {
return nil, errors.Wrap(err, "failed to get fission client")
return nil, fmt.Errorf("failed to get fission client: %w", err)
}
k8sClient, err := clientGen.GetKubernetesClient()
if err != nil {
return nil, errors.Wrap(err, "failed to get kubernetes client")
return nil, fmt.Errorf("failed to get kubernetes client: %w", err)
}
apiExtClient, err := clientGen.GetApiExtensionsClient()
if err != nil {
return nil, errors.Wrap(err, "failed to get apiextensions client")
return nil, fmt.Errorf("failed to get apiextensions client: %w", err)
}

return &PreUpgradeTaskClient{
Expand Down Expand Up @@ -129,7 +129,7 @@ func (client *PreUpgradeTaskClient) VerifyFunctionSpecReferences(ctx context.Con

var err error
var fList *fv1.FunctionList
errs := &multierror.Error{}
var errs error

for _, namespace := range utils.DefaultNSResolver().FissionResourceNS {
for i := 0; i < maxRetries; i++ {
Expand All @@ -150,24 +150,25 @@ func (client *PreUpgradeTaskClient) VerifyFunctionSpecReferences(ctx context.Con
secrets := fn.Spec.Secrets
for _, secret := range secrets {
if secret.Namespace != "" && secret.Namespace != fn.ObjectMeta.Namespace {
errs = multierror.Append(errs, fmt.Errorf("function : %s.%s cannot reference a secret : %s in namespace : %s", fn.ObjectMeta.Name, fn.ObjectMeta.Namespace, secret.Name, secret.Namespace))
errs = errors.Join(errs, fmt.Errorf("function : %s.%s cannot reference a secret : %s in namespace : %s", fn.ObjectMeta.Name, fn.ObjectMeta.Namespace, secret.Name, secret.Namespace))
}
}

configmaps := fn.Spec.ConfigMaps
for _, configmap := range configmaps {
if configmap.Namespace != "" && configmap.Namespace != fn.ObjectMeta.Namespace {
errs = multierror.Append(errs, fmt.Errorf("function : %s.%s cannot reference a configmap : %s in namespace : %s", fn.ObjectMeta.Name, fn.ObjectMeta.Namespace, configmap.Name, configmap.Namespace))
errs = errors.Join(errs, fmt.Errorf("function : %s.%s cannot reference a configmap : %s in namespace : %s", fn.ObjectMeta.Name, fn.ObjectMeta.Namespace, configmap.Name, configmap.Namespace))

}
}

if fn.Spec.Package.PackageRef.Namespace != "" && fn.Spec.Package.PackageRef.Namespace != fn.ObjectMeta.Namespace {
errs = multierror.Append(errs, fmt.Errorf("function : %s.%s cannot reference a package : %s in namespace : %s", fn.ObjectMeta.Name, fn.ObjectMeta.Namespace, fn.Spec.Package.PackageRef.Name, fn.Spec.Package.PackageRef.Namespace))
errs = errors.Join(errs, fmt.Errorf("function : %s.%s cannot reference a package : %s in namespace : %s", fn.ObjectMeta.Name, fn.ObjectMeta.Namespace, fn.Spec.Package.PackageRef.Name, fn.Spec.Package.PackageRef.Namespace))
}
}
}

if errs.ErrorOrNil() != nil {
if errs != nil {
client.logger.Fatal("installation failed",
zap.Error(errs),
zap.String("summary", "a function cannot reference secrets, configmaps and packages outside it's own namespace"))
Expand Down
13 changes: 6 additions & 7 deletions pkg/apis/core/v1/validation.go
Expand Up @@ -129,25 +129,24 @@ func ValidateKubeLabel(field string, labels map[string]string) error {
}

func ValidateKubePort(field string, port int) error {
result := &multierror.Error{}
var err error

e := validation.IsValidPortNum(port)
if len(e) > 0 {
result = multierror.Append(result, MakeValidationErr(ErrorInvalidValue, field, port, e...))
err = errors.Join(err, MakeValidationErr(ErrorInvalidValue, field, port, e...))
}

return result.ErrorOrNil()
return err
}

func ValidateKubeName(field string, val string) error {
result := &multierror.Error{}
var err error

e := validation.IsDNS1123Label(val)
if len(e) > 0 {
result = multierror.Append(result, MakeValidationErr(ErrorInvalidValue, field, val, e...))
err = errors.Join(err, MakeValidationErr(ErrorInvalidValue, field, val, e...))
}

return result.ErrorOrNil()
return err
}

// validateNS is to match the k8s behaviour. Where it is not mandatory to provide a NS. And so we validate it if user has provided one.
Expand Down
3 changes: 2 additions & 1 deletion pkg/buildermgr/buildermgr.go
Expand Up @@ -18,6 +18,7 @@ package buildermgr

import (
"context"
"os"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -54,7 +55,7 @@ func Start(ctx context.Context, clientGen crd.ClientGeneratorInterface, logger *
}

podSpecPatch, err := util.GetSpecFromConfigMap(fv1.BuilderPodSpecPath)
if err != nil {
if err != nil && !os.IsNotExist(err) {
logger.Warn("error reading data for pod spec patch", zap.String("path", fv1.BuilderPodSpecPath), zap.Error(err))
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/canaryconfigmgr/config.go
Expand Up @@ -35,7 +35,7 @@ func ConfigureFeatures(ctx context.Context, logger *zap.Logger, unitTestMode boo
}

// get the featureConfig from config map mounted onto the file system
featureConfig, err := config.GetFeatureConfig()
featureConfig, err := config.GetFeatureConfig(logger)
if err != nil {
logger.Error("error getting feature config", zap.Error(err))
return err
Expand Down
7 changes: 7 additions & 0 deletions pkg/crd/key.go
Expand Up @@ -62,6 +62,13 @@ func CacheKeyURFromMeta(metadata *metav1.ObjectMeta) CacheKeyUR {
}
}

func CacheKeyURFromObject(obj metav1.Object) CacheKeyUR {
return CacheKeyUR{
UID: obj.GetUID(),
ResourceVersion: obj.GetResourceVersion(),
}
}

// CacheKeyURGFromMeta : Given metadata, create a key that uniquely identifies the contents
// of the object. Since resourceVersion changes on every update and
// UIDs are unique, uid+resourceVersion identifies the
Expand Down
18 changes: 8 additions & 10 deletions pkg/executor/api.go
Expand Up @@ -19,15 +19,14 @@ package executor
import (
"context"
"encoding/json"
"errors"
"fmt"
"html"
"io"
"net/http"
"strings"

"github.com/gorilla/mux"
"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
"go.uber.org/zap"

fv1 "github.com/fission/fission/pkg/apis/core/v1"
Expand Down Expand Up @@ -201,27 +200,26 @@ func (executor *Executor) tapServices(w http.ResponseWriter, r *http.Request) {
return
}

errs := &multierror.Error{}
var errs error
for _, req := range tapSvcReqs {
svcHost := strings.TrimPrefix(req.ServiceURL, "http://")

et, exists := executor.executorTypes[req.FnExecutorType]
if !exists {
errs = multierror.Append(errs,
errors.Errorf("error tapping service due to unknown executor type '%v' found",
errs = errors.Join(errs,
fmt.Errorf("error tapping service due to unknown executor type '%s' found",
req.FnExecutorType))
continue
}

err = et.TapService(ctx, svcHost)
if err != nil {
errs = multierror.Append(errs,
errors.Wrapf(err, "'%v' failed to tap function '%v' in '%v' with service url '%v'",
req.FnMetadata.Name, req.FnMetadata.Namespace, req.ServiceURL, req.FnExecutorType))
errs = errors.Join(errs,
fmt.Errorf("error tapping function '%s/%s' with executor '%s' and service url '%s': %w", req.FnMetadata.Namespace, req.FnMetadata.Name, req.FnExecutorType, req.ServiceURL, err))
}
}

if errs.ErrorOrNil() != nil {
if errs != nil {
logger.Error("error tapping function service", zap.Error(errs))
http.Error(w, "Not found", http.StatusNotFound)
return
Expand Down Expand Up @@ -249,7 +247,7 @@ func (executor *Executor) unTapService(w http.ResponseWriter, r *http.Request) {
}
t := tapSvcReq.FnExecutorType
if t != fv1.ExecutorTypePoolmgr {
msg := fmt.Sprintf("Unknown executor type '%v'", t)
msg := fmt.Sprintf("Unknown executor type '%s'", t)
http.Error(w, html.EscapeString(msg), http.StatusBadRequest)
return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/cms/cmscontroller.go
Expand Up @@ -72,7 +72,7 @@ func refreshPods(ctx context.Context, logger *zap.Logger, funcs []fv1.Function,
if exists {
err = et.RefreshFuncPods(ctx, logger, f)
} else {
err = errors.Errorf("Unknown executor type '%v'", f.Spec.InvokeStrategy.ExecutionStrategy.ExecutorType)
err = errors.Errorf("Unknown executor type '%s'", f.Spec.InvokeStrategy.ExecutionStrategy.ExecutorType)
}

if err != nil {
Expand Down
22 changes: 12 additions & 10 deletions pkg/executor/executor.go
Expand Up @@ -110,7 +110,9 @@ func MakeExecutor(ctx context.Context, logger *zap.Logger, cms *cms.ConfigSecret
func (executor *Executor) serveCreateFuncServices() {
for {
req := <-executor.requestChan
fnMetadata := &req.function.ObjectMeta
function := req.function
fnName := k8sCache.MetaObjectToName(function)
fnkeyUR := crd.CacheKeyURFromObject(function)

if req.function.Spec.InvokeStrategy.ExecutionStrategy.ExecutorType == fv1.ExecutorTypePoolmgr {
go func() {
Expand Down Expand Up @@ -138,13 +140,13 @@ func (executor *Executor) serveCreateFuncServices() {
}

// Cache miss -- is this first one to request the func?
wg, found := executor.fsCreateWg.Load(crd.CacheKeyURFromMeta(fnMetadata))
wg, found := executor.fsCreateWg.Load(fnkeyUR)
if !found {
// create a waitgroup for other requests for
// the same function to wait on
wg := &sync.WaitGroup{}
wg.Add(1)
executor.fsCreateWg.Store(crd.CacheKeyURFromMeta(fnMetadata), wg)
executor.fsCreateWg.Store(fnkeyUR, wg)

// launch a goroutine for each request, to parallelize
// the specialization of different functions
Expand Down Expand Up @@ -176,17 +178,17 @@ func (executor *Executor) serveCreateFuncServices() {
funcSvc: fsvc,
err: err,
}
executor.fsCreateWg.Delete(crd.CacheKeyURFromMeta(fnMetadata))
executor.fsCreateWg.Delete(fnkeyUR)
wg.Done()
}()
} else {
// There's an existing request for this function, wait for it to finish
go func() {
executor.logger.Debug("waiting for concurrent request for the same function",
zap.Any("function", fnMetadata))
zap.String("function", fnName.String()))
wg, ok := wg.(*sync.WaitGroup)
if !ok {
err := fmt.Errorf("could not convert value to workgroup for function %v in namespace %v", fnMetadata.Name, fnMetadata.Namespace)
err := fmt.Errorf("could not convert value to workgroup for function %s", fnName)
req.respChan <- &createFuncServiceResponse{
funcSvc: nil,
err: err,
Expand All @@ -201,7 +203,7 @@ func (executor *Executor) serveCreateFuncServices() {
// It normally happened if there are multiple requests are
// waiting for the same function and executor failed to cre-
// ate service for function.
err = errors.Wrapf(err, "error getting service for function %v in namespace %v", fnMetadata.Name, fnMetadata.Namespace)
err = errors.Wrapf(err, "error getting service for function %s", fnName)
req.respChan <- &createFuncServiceResponse{
funcSvc: fsvc,
err: err,
Expand All @@ -221,7 +223,7 @@ func (executor *Executor) createServiceForFunction(ctx context.Context, fn *fv1.
t := fn.Spec.InvokeStrategy.ExecutionStrategy.ExecutorType
e, ok := executor.executorTypes[t]
if !ok {
return nil, errors.Errorf("Unknown executor type '%v'", t)
return nil, errors.Errorf("Unknown executor type '%s'", t)
}

fsvc, fsvcErr := e.GetFuncSvc(ctx, fn)
Expand All @@ -242,7 +244,7 @@ func (executor *Executor) getFunctionServiceFromCache(ctx context.Context, fn *f
t := fn.Spec.InvokeStrategy.ExecutionStrategy.ExecutorType
e, ok := executor.executorTypes[t]
if !ok {
return nil, errors.Errorf("Unknown executor type '%v'", t)
return nil, errors.Errorf("Unknown executor type '%s'", t)
}
return e.GetFuncSvcFromCache(ctx, fn)
}
Expand Down Expand Up @@ -277,7 +279,7 @@ func StartExecutor(ctx context.Context, clientGen crd.ClientGeneratorInterface,
executorInstanceID := strings.ToLower(uniuri.NewLen(8))

podSpecPatch, err := util.GetSpecFromConfigMap(fv1.RuntimePodSpecPath)
if err != nil {
if err != nil && !os.IsNotExist(err) {
logger.Warn("error reading data for pod spec patch", zap.String("path", fv1.RuntimePodSpecPath), zap.Error(err))
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/executor/executortype/container/common.go
Expand Up @@ -18,9 +18,9 @@ package container

import (
"context"
"errors"
"strconv"

multierror "github.com/hashicorp/go-multierror"
"go.uber.org/zap"
apiv1 "k8s.io/api/core/v1"
k8s_err "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -66,15 +66,15 @@ func (cn *Container) getResources(fn *fv1.Function) apiv1.ResourceRequirements {

// cleanupContainer cleans all kubernetes objects related to function
func (cn *Container) cleanupContainer(ctx context.Context, ns string, name string) error {
result := &multierror.Error{}
var result error

err := cn.deleteSvc(ctx, ns, name)
if err != nil && !k8s_err.IsNotFound(err) {
cn.logger.Error("error deleting service for Container function",
zap.Error(err),
zap.String("function_name", name),
zap.String("function_namespace", ns))
result = multierror.Append(result, err)
result = errors.Join(result, err)
}

err = cn.hpaops.DeleteHpa(ctx, ns, name)
Expand All @@ -83,7 +83,7 @@ func (cn *Container) cleanupContainer(ctx context.Context, ns string, name strin
zap.Error(err),
zap.String("function_name", name),
zap.String("function_namespace", ns))
result = multierror.Append(result, err)
result = errors.Join(result, err)
}

err = cn.deleteDeployment(ctx, ns, name)
Expand All @@ -92,10 +92,10 @@ func (cn *Container) cleanupContainer(ctx context.Context, ns string, name strin
zap.Error(err),
zap.String("function_name", name),
zap.String("function_namespace", ns))
result = multierror.Append(result, err)
result = errors.Join(result, err)
}

return result.ErrorOrNil()
return result
}

// referencedResourcesRVSum returns the sum of resource version of all resources the function references to.
Expand Down

0 comments on commit 2223081

Please sign in to comment.