diff --git a/gateways/common/config.go b/gateways/common/config.go new file mode 100644 index 0000000000..212e7aa06f --- /dev/null +++ b/gateways/common/config.go @@ -0,0 +1,42 @@ +package common + +import ( + "errors" + "path" + "regexp" +) + +type WatchPathConfig struct { + // Directory to watch for events + Directory string `json:"directory"` + // Path is relative path of object to watch with respect to the directory + Path string `json:"path,omitempty"` + // PathRegexp is regexp of relative path of object to watch with respect to the directory + PathRegexp string `json:"pathRegexp,omitempty"` +} + +// Validate validates WatchPathConfig +func (c *WatchPathConfig) Validate() error { + if c.Directory == "" { + return errors.New("directory is required") + } + if !path.IsAbs(c.Directory) { + return errors.New("directory must be an absolute file path") + } + if c.Path == "" && c.PathRegexp == "" { + return errors.New("either path or pathRegexp must be specified") + } + if c.Path != "" && c.PathRegexp != "" { + return errors.New("path and pathRegexp cannot be specified together") + } + if c.Path != "" && path.IsAbs(c.Path) { + return errors.New("path must be a relative file path") + } + if c.PathRegexp != "" { + _, err := regexp.Compile(c.PathRegexp) + if err != nil { + return err + } + } + return nil +} diff --git a/gateways/common/config_test.go b/gateways/common/config_test.go new file mode 100644 index 0000000000..78d75d3a5b --- /dev/null +++ b/gateways/common/config_test.go @@ -0,0 +1,61 @@ +package common + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestPathConfigValidate(t *testing.T) { + var validConfigs []WatchPathConfig + validConfigs = append(validConfigs, WatchPathConfig{ + Directory: "/foo", + Path: "bar", + PathRegexp: "", + }) + validConfigs = append(validConfigs, WatchPathConfig{ + Directory: "/foo", + Path: "", + PathRegexp: "bar", + }) + for _, config := range validConfigs { + err := config.Validate() + assert.NoError(t, err) + } + + var invalidConfigs []WatchPathConfig + // empty dir + invalidConfigs = append(invalidConfigs, WatchPathConfig{ + Directory: "", + Path: "", + PathRegexp: "", + }) + // relative dir + invalidConfigs = append(invalidConfigs, WatchPathConfig{ + Directory: "foo", + Path: "bar", + PathRegexp: "", + }) + // both path and path regexp + invalidConfigs = append(invalidConfigs, WatchPathConfig{ + Directory: "/foo", + Path: "bar", + PathRegexp: "bar", + }) + // absolute path + invalidConfigs = append(invalidConfigs, WatchPathConfig{ + Directory: "/foo", + Path: "/bar", + PathRegexp: "", + }) + // invalid regexp + invalidConfigs = append(invalidConfigs, WatchPathConfig{ + Directory: "/foo", + Path: "", + PathRegexp: "][", + }) + for _, config := range invalidConfigs { + err := config.Validate() + assert.Error(t, err) + } +} diff --git a/gateways/community/hdfs/config.go b/gateways/community/hdfs/config.go index 00408eb84f..beb745a0e1 100644 --- a/gateways/community/hdfs/config.go +++ b/gateways/community/hdfs/config.go @@ -1,10 +1,14 @@ package hdfs import ( + "errors" + "github.com/ghodss/yaml" "github.com/rs/zerolog" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" + + "github.com/argoproj/argo-events/gateways/common" ) // EventSourceExecutor implements Eventing @@ -18,10 +22,8 @@ type EventSourceExecutor struct { // GatewayConfig contains information to setup a HDFS integration type GatewayConfig struct { - // Directory to watch for events - Directory string `json:"directory"` - // Path is relative path of object to watch with respect to the directory - Path string `json:"path"` + common.WatchPathConfig `json:",inline"` + // Type of file operations to watch Type string `json:"type"` // CheckInterval is a string that describes an interval duration to check the directory state, e.g. 1s, 30m, 2h... (defaults to 1m) @@ -72,3 +74,25 @@ func parseEventSource(eventSource string) (interface{}, error) { } return f, err } + +// Validate validates GatewayClientConfig +func (c *GatewayClientConfig) Validate() error { + if len(c.Addresses) == 0 { + return errors.New("addresses is required") + } + + hasKrbCCache := c.KrbCCacheSecret != nil + hasKrbKeytab := c.KrbKeytabSecret != nil + + if c.HDFSUser == "" && !hasKrbCCache && !hasKrbKeytab { + return errors.New("either hdfsUser, krbCCacheSecret or krbKeytabSecret is required") + } + if hasKrbKeytab && (c.KrbServicePrincipalName == "" || c.KrbConfigConfigMap == nil || c.KrbUsername == "" || c.KrbRealm == "") { + return errors.New("krbServicePrincipalName, krbConfigConfigMap, krbUsername and krbRealm are required with krbKeytabSecret") + } + if hasKrbCCache && (c.KrbServicePrincipalName == "" || c.KrbConfigConfigMap == nil) { + return errors.New("krbServicePrincipalName and krbConfigConfigMap are required with krbCCacheSecret") + } + + return nil +} diff --git a/gateways/community/hdfs/start.go b/gateways/community/hdfs/start.go index 6bce64cab0..5639a6230b 100644 --- a/gateways/community/hdfs/start.go +++ b/gateways/community/hdfs/start.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path/filepath" + "regexp" "strings" "time" @@ -98,6 +99,14 @@ func (ese *EventSourceExecutor) listenEvents(config *GatewayConfig, eventSource } op := naivewatcher.NewOp(config.Type) + var pathRegexp *regexp.Regexp + if config.PathRegexp != "" { + pathRegexp, err = regexp.Compile(config.PathRegexp) + if err != nil { + errorCh <- err + return + } + } ese.Log.Info().Str("event-source-name", eventSource.Name).Msg("starting to watch to HDFS notifications") for { select { @@ -108,7 +117,14 @@ func (ese *EventSourceExecutor) listenEvents(config *GatewayConfig, eventSource errorCh <- fmt.Errorf("HDFS watcher stopped") return } - if config.Path == strings.TrimPrefix(event.Name, config.Directory) && (op&event.Op != 0) { + matched := false + relPath := strings.TrimPrefix(event.Name, config.Directory) + if config.Path != "" && config.Path == relPath { + matched = true + } else if pathRegexp != nil && pathRegexp.MatchString(relPath) { + matched = true + } + if matched && (op&event.Op != 0) { ese.Log.Debug().Str("config-key", eventSource.Name).Str("event-type", event.Op.String()).Str("descriptor-name", event.Name).Msg("HDFS event") dataCh <- []byte(fmt.Sprintf("%v", event)) } diff --git a/gateways/community/hdfs/validate.go b/gateways/community/hdfs/validate.go index dee0954bff..62fa7ca882 100644 --- a/gateways/community/hdfs/validate.go +++ b/gateways/community/hdfs/validate.go @@ -19,7 +19,6 @@ package hdfs import ( "context" "errors" - "path" "time" "github.com/argoproj/argo-events/gateways/common/naivewatcher" @@ -38,18 +37,6 @@ func validateGatewayConfig(config interface{}) error { if gwc == nil { return gwcommon.ErrNilEventSource } - if gwc.Directory == "" { - return errors.New("directory is required") - } - if !path.IsAbs(gwc.Directory) { - return errors.New("directory must be an absolute file path") - } - if gwc.Path == "" { - return errors.New("path is required") - } - if path.IsAbs(gwc.Path) { - return errors.New("path must be a relative file path") - } if gwc.Type == "" { return errors.New("type is required") } @@ -63,29 +50,10 @@ func validateGatewayConfig(config interface{}) error { return errors.New("failed to parse interval") } } - - err := validateHDFSConfig(gwc) - - return err -} - -func validateHDFSConfig(gwc *GatewayConfig) error { - if len(gwc.Addresses) == 0 { - return errors.New("addresses is required") - } - - hasKrbCCache := gwc.KrbCCacheSecret != nil - hasKrbKeytab := gwc.KrbKeytabSecret != nil - - if gwc.HDFSUser == "" && !hasKrbCCache && !hasKrbKeytab { - return errors.New("either hdfsUser, krbCCacheSecret or krbKeytabSecret is required") - } - if hasKrbKeytab && (gwc.KrbServicePrincipalName == "" || gwc.KrbConfigConfigMap == nil || gwc.KrbUsername == "" || gwc.KrbRealm == "") { - return errors.New("krbServicePrincipalName, krbConfigConfigMap, krbUsername and krbRealm are required with krbKeytabSecret") - } - if hasKrbCCache && (gwc.KrbServicePrincipalName == "" || gwc.KrbConfigConfigMap == nil) { - return errors.New("krbServicePrincipalName and krbConfigConfigMap are required with krbCCacheSecret") + err := gwc.WatchPathConfig.Validate() + if err != nil { + return err } - - return nil + err = gwc.GatewayClientConfig.Validate() + return err } diff --git a/gateways/core/file/config.go b/gateways/core/file/config.go index f609e10de4..e5fbebdf8b 100644 --- a/gateways/core/file/config.go +++ b/gateways/core/file/config.go @@ -19,6 +19,8 @@ package file import ( "github.com/ghodss/yaml" "github.com/rs/zerolog" + + "github.com/argoproj/argo-events/gateways/common" ) // FileEventSourceExecutor implements Eventing @@ -29,10 +31,8 @@ type FileEventSourceExecutor struct { // fileWatcher contains configuration information for this gateway // +k8s:openapi-gen=true type fileWatcher struct { - // Directory to watch for events - Directory string `json:"directory"` - // Path is relative path of object to watch with respect to the directory - Path string `json:"path"` + common.WatchPathConfig `json:",inline"` + // Type of file operations to watch // Refer https://github.com/fsnotify/fsnotify/blob/master/fsnotify.go for more information Type string `json:"type"` diff --git a/gateways/core/file/start.go b/gateways/core/file/start.go index 17de7d161a..2eb99b6e0f 100644 --- a/gateways/core/file/start.go +++ b/gateways/core/file/start.go @@ -18,6 +18,7 @@ package file import ( "fmt" + "regexp" "strings" "github.com/argoproj/argo-events/gateways" @@ -60,6 +61,14 @@ func (ese *FileEventSourceExecutor) listenEvents(fwc *fileWatcher, eventSource * return } + var pathRegexp *regexp.Regexp + if fwc.PathRegexp != "" { + pathRegexp, err = regexp.Compile(fwc.PathRegexp) + if err != nil { + errorCh <- err + return + } + } ese.Log.Info().Str("event-source-name", eventSource.Name).Msg("starting to watch to file notifications") for { select { @@ -71,7 +80,14 @@ func (ese *FileEventSourceExecutor) listenEvents(fwc *fileWatcher, eventSource * return } // fwc.Path == event.Name is required because we don't want to send event when .swp files are created - if fwc.Path == strings.TrimPrefix(event.Name, fwc.Directory) && fwc.Type == event.Op.String() { + matched := false + relPath := strings.TrimPrefix(event.Name, fwc.Directory) + if fwc.Path != "" && fwc.Path == relPath { + matched = true + } else if pathRegexp != nil && pathRegexp.MatchString(relPath) { + matched = true + } + if matched && fwc.Type == event.Op.String() { ese.Log.Debug().Str("config-key", eventSource.Name).Str("event-type", event.Op.String()).Str("descriptor-name", event.Name).Msg("fs event") dataCh <- []byte(fmt.Sprintf("%v", event)) } diff --git a/gateways/core/file/validate.go b/gateways/core/file/validate.go index 3fa502aec6..ada546b2c7 100644 --- a/gateways/core/file/validate.go +++ b/gateways/core/file/validate.go @@ -37,11 +37,6 @@ func validateFileWatcher(config interface{}) error { if fwc.Type == "" { return fmt.Errorf("type must be specified") } - if fwc.Directory == "" { - return fmt.Errorf("directory must be specified") - } - if fwc.Path == "" { - return fmt.Errorf("path must be specified") - } - return nil + err := fwc.WatchPathConfig.Validate() + return err }