Skip to content

Commit

Permalink
Support regexp path (#191)
Browse files Browse the repository at this point in the history
  • Loading branch information
dtaniwaki authored and VaibhavPage committed Mar 2, 2019
1 parent f2cf46f commit 7c89f75
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 54 deletions.
42 changes: 42 additions & 0 deletions gateways/common/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package common

import (
"errors"
"path"
"regexp"
)

type WatchPathConfig struct {
// Directory to watch for events
Directory string `json:"directory"`
// Path is relative path of object to watch with respect to the directory
Path string `json:"path,omitempty"`
// PathRegexp is regexp of relative path of object to watch with respect to the directory
PathRegexp string `json:"pathRegexp,omitempty"`
}

// Validate validates WatchPathConfig
func (c *WatchPathConfig) Validate() error {
if c.Directory == "" {
return errors.New("directory is required")
}
if !path.IsAbs(c.Directory) {
return errors.New("directory must be an absolute file path")
}
if c.Path == "" && c.PathRegexp == "" {
return errors.New("either path or pathRegexp must be specified")
}
if c.Path != "" && c.PathRegexp != "" {
return errors.New("path and pathRegexp cannot be specified together")
}
if c.Path != "" && path.IsAbs(c.Path) {
return errors.New("path must be a relative file path")
}
if c.PathRegexp != "" {
_, err := regexp.Compile(c.PathRegexp)
if err != nil {
return err
}
}
return nil
}
61 changes: 61 additions & 0 deletions gateways/common/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package common

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestPathConfigValidate(t *testing.T) {
var validConfigs []WatchPathConfig
validConfigs = append(validConfigs, WatchPathConfig{
Directory: "/foo",
Path: "bar",
PathRegexp: "",
})
validConfigs = append(validConfigs, WatchPathConfig{
Directory: "/foo",
Path: "",
PathRegexp: "bar",
})
for _, config := range validConfigs {
err := config.Validate()
assert.NoError(t, err)
}

var invalidConfigs []WatchPathConfig
// empty dir
invalidConfigs = append(invalidConfigs, WatchPathConfig{
Directory: "",
Path: "",
PathRegexp: "",
})
// relative dir
invalidConfigs = append(invalidConfigs, WatchPathConfig{
Directory: "foo",
Path: "bar",
PathRegexp: "",
})
// both path and path regexp
invalidConfigs = append(invalidConfigs, WatchPathConfig{
Directory: "/foo",
Path: "bar",
PathRegexp: "bar",
})
// absolute path
invalidConfigs = append(invalidConfigs, WatchPathConfig{
Directory: "/foo",
Path: "/bar",
PathRegexp: "",
})
// invalid regexp
invalidConfigs = append(invalidConfigs, WatchPathConfig{
Directory: "/foo",
Path: "",
PathRegexp: "][",
})
for _, config := range invalidConfigs {
err := config.Validate()
assert.Error(t, err)
}
}
32 changes: 28 additions & 4 deletions gateways/community/hdfs/config.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package hdfs

import (
"errors"

"github.com/ghodss/yaml"
"github.com/rs/zerolog"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"

"github.com/argoproj/argo-events/gateways/common"
)

// EventSourceExecutor implements Eventing
Expand All @@ -18,10 +22,8 @@ type EventSourceExecutor struct {

// GatewayConfig contains information to setup a HDFS integration
type GatewayConfig struct {
// Directory to watch for events
Directory string `json:"directory"`
// Path is relative path of object to watch with respect to the directory
Path string `json:"path"`
common.WatchPathConfig `json:",inline"`

// Type of file operations to watch
Type string `json:"type"`
// CheckInterval is a string that describes an interval duration to check the directory state, e.g. 1s, 30m, 2h... (defaults to 1m)
Expand Down Expand Up @@ -72,3 +74,25 @@ func parseEventSource(eventSource string) (interface{}, error) {
}
return f, err
}

// Validate validates GatewayClientConfig
func (c *GatewayClientConfig) Validate() error {
if len(c.Addresses) == 0 {
return errors.New("addresses is required")
}

hasKrbCCache := c.KrbCCacheSecret != nil
hasKrbKeytab := c.KrbKeytabSecret != nil

if c.HDFSUser == "" && !hasKrbCCache && !hasKrbKeytab {
return errors.New("either hdfsUser, krbCCacheSecret or krbKeytabSecret is required")
}
if hasKrbKeytab && (c.KrbServicePrincipalName == "" || c.KrbConfigConfigMap == nil || c.KrbUsername == "" || c.KrbRealm == "") {
return errors.New("krbServicePrincipalName, krbConfigConfigMap, krbUsername and krbRealm are required with krbKeytabSecret")
}
if hasKrbCCache && (c.KrbServicePrincipalName == "" || c.KrbConfigConfigMap == nil) {
return errors.New("krbServicePrincipalName and krbConfigConfigMap are required with krbCCacheSecret")
}

return nil
}
18 changes: 17 additions & 1 deletion gateways/community/hdfs/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"
"path/filepath"
"regexp"
"strings"
"time"

Expand Down Expand Up @@ -98,6 +99,14 @@ func (ese *EventSourceExecutor) listenEvents(config *GatewayConfig, eventSource
}

op := naivewatcher.NewOp(config.Type)
var pathRegexp *regexp.Regexp
if config.PathRegexp != "" {
pathRegexp, err = regexp.Compile(config.PathRegexp)
if err != nil {
errorCh <- err
return
}
}
ese.Log.Info().Str("event-source-name", eventSource.Name).Msg("starting to watch to HDFS notifications")
for {
select {
Expand All @@ -108,7 +117,14 @@ func (ese *EventSourceExecutor) listenEvents(config *GatewayConfig, eventSource
errorCh <- fmt.Errorf("HDFS watcher stopped")
return
}
if config.Path == strings.TrimPrefix(event.Name, config.Directory) && (op&event.Op != 0) {
matched := false
relPath := strings.TrimPrefix(event.Name, config.Directory)
if config.Path != "" && config.Path == relPath {
matched = true
} else if pathRegexp != nil && pathRegexp.MatchString(relPath) {
matched = true
}
if matched && (op&event.Op != 0) {
ese.Log.Debug().Str("config-key", eventSource.Name).Str("event-type", event.Op.String()).Str("descriptor-name", event.Name).Msg("HDFS event")
dataCh <- []byte(fmt.Sprintf("%v", event))
}
Expand Down
42 changes: 5 additions & 37 deletions gateways/community/hdfs/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package hdfs
import (
"context"
"errors"
"path"
"time"

"github.com/argoproj/argo-events/gateways/common/naivewatcher"
Expand All @@ -38,18 +37,6 @@ func validateGatewayConfig(config interface{}) error {
if gwc == nil {
return gwcommon.ErrNilEventSource
}
if gwc.Directory == "" {
return errors.New("directory is required")
}
if !path.IsAbs(gwc.Directory) {
return errors.New("directory must be an absolute file path")
}
if gwc.Path == "" {
return errors.New("path is required")
}
if path.IsAbs(gwc.Path) {
return errors.New("path must be a relative file path")
}
if gwc.Type == "" {
return errors.New("type is required")
}
Expand All @@ -63,29 +50,10 @@ func validateGatewayConfig(config interface{}) error {
return errors.New("failed to parse interval")
}
}

err := validateHDFSConfig(gwc)

return err
}

func validateHDFSConfig(gwc *GatewayConfig) error {
if len(gwc.Addresses) == 0 {
return errors.New("addresses is required")
}

hasKrbCCache := gwc.KrbCCacheSecret != nil
hasKrbKeytab := gwc.KrbKeytabSecret != nil

if gwc.HDFSUser == "" && !hasKrbCCache && !hasKrbKeytab {
return errors.New("either hdfsUser, krbCCacheSecret or krbKeytabSecret is required")
}
if hasKrbKeytab && (gwc.KrbServicePrincipalName == "" || gwc.KrbConfigConfigMap == nil || gwc.KrbUsername == "" || gwc.KrbRealm == "") {
return errors.New("krbServicePrincipalName, krbConfigConfigMap, krbUsername and krbRealm are required with krbKeytabSecret")
}
if hasKrbCCache && (gwc.KrbServicePrincipalName == "" || gwc.KrbConfigConfigMap == nil) {
return errors.New("krbServicePrincipalName and krbConfigConfigMap are required with krbCCacheSecret")
err := gwc.WatchPathConfig.Validate()
if err != nil {
return err
}

return nil
err = gwc.GatewayClientConfig.Validate()
return err
}
8 changes: 4 additions & 4 deletions gateways/core/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package file
import (
"github.com/ghodss/yaml"
"github.com/rs/zerolog"

"github.com/argoproj/argo-events/gateways/common"
)

// FileEventSourceExecutor implements Eventing
Expand All @@ -29,10 +31,8 @@ type FileEventSourceExecutor struct {
// fileWatcher contains configuration information for this gateway
// +k8s:openapi-gen=true
type fileWatcher struct {
// Directory to watch for events
Directory string `json:"directory"`
// Path is relative path of object to watch with respect to the directory
Path string `json:"path"`
common.WatchPathConfig `json:",inline"`

// Type of file operations to watch
// Refer https://github.com/fsnotify/fsnotify/blob/master/fsnotify.go for more information
Type string `json:"type"`
Expand Down
18 changes: 17 additions & 1 deletion gateways/core/file/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package file

import (
"fmt"
"regexp"
"strings"

"github.com/argoproj/argo-events/gateways"
Expand Down Expand Up @@ -60,6 +61,14 @@ func (ese *FileEventSourceExecutor) listenEvents(fwc *fileWatcher, eventSource *
return
}

var pathRegexp *regexp.Regexp
if fwc.PathRegexp != "" {
pathRegexp, err = regexp.Compile(fwc.PathRegexp)
if err != nil {
errorCh <- err
return
}
}
ese.Log.Info().Str("event-source-name", eventSource.Name).Msg("starting to watch to file notifications")
for {
select {
Expand All @@ -71,7 +80,14 @@ func (ese *FileEventSourceExecutor) listenEvents(fwc *fileWatcher, eventSource *
return
}
// fwc.Path == event.Name is required because we don't want to send event when .swp files are created
if fwc.Path == strings.TrimPrefix(event.Name, fwc.Directory) && fwc.Type == event.Op.String() {
matched := false
relPath := strings.TrimPrefix(event.Name, fwc.Directory)
if fwc.Path != "" && fwc.Path == relPath {
matched = true
} else if pathRegexp != nil && pathRegexp.MatchString(relPath) {
matched = true
}
if matched && fwc.Type == event.Op.String() {
ese.Log.Debug().Str("config-key", eventSource.Name).Str("event-type", event.Op.String()).Str("descriptor-name", event.Name).Msg("fs event")
dataCh <- []byte(fmt.Sprintf("%v", event))
}
Expand Down
9 changes: 2 additions & 7 deletions gateways/core/file/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,6 @@ func validateFileWatcher(config interface{}) error {
if fwc.Type == "" {
return fmt.Errorf("type must be specified")
}
if fwc.Directory == "" {
return fmt.Errorf("directory must be specified")
}
if fwc.Path == "" {
return fmt.Errorf("path must be specified")
}
return nil
err := fwc.WatchPathConfig.Validate()
return err
}

0 comments on commit 7c89f75

Please sign in to comment.