Skip to content

Commit

Permalink
passing entire s3 notification payload as event data (#78)
Browse files Browse the repository at this point in the history
* adding s3 with param example

* fixing tests and adding logging to s3 signal
  • Loading branch information
magaldima committed Aug 1, 2018
1 parent 29f06e4 commit ab9b1b6
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 256 deletions.
60 changes: 60 additions & 0 deletions examples/s3-sensor-with-param.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: s3-example
labels:
sensors.argoproj.io/controller-instanceid: axis
spec:
signals:
- name: minioS3
artifact:
s3:
bucket: hello
event: s3:ObjectCreated:Put
endpoint: artifacts-minio.default:9000
target:
type: NATS
url: nats://example-nats-cluster:4222
attributes:
subject: hello
triggers:
- name: argo-workflow
resource:
namespace: default
group: argoproj.io
version: v1alpha1
kind: Workflow
# The artifact key from the workflow are overridden by the s3 notification key
parameters:
- src:
signal: minioS3
path: s3.object.key
dest: spec.templates.0.inputs.artifacts.0.key
source:
inline: |
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: input-artifact-s3-
spec:
entrypoint: input-artifact-s3-example
templates:
- name: input-artifact-s3-example
inputs:
artifacts:
- name: my-art
path: /my-artifact
s3:
endpoint: artifacts-minio.default:9000
bucket: hello
key: path/in/bucket
accessKey:
key: accesskey
name: artifacts-minio
secretKey:
key: secretkey
name: artifacts-minio
container:
image: debian:latest
command: [sh, -c]
args: ["ls -l /my-artifact"]
2 changes: 1 addition & 1 deletion pkg/apis/sensor/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ type ArtifactLocation struct {

// S3Artifact contains information about an artifact in S3
type S3Artifact struct {
S3Bucket `json:",inline" protobuf:"bytes,4,opt,name=s3Bucket"`
S3Bucket `json:",inline"`
Key string `json:"key,omitempty" protobuf:"bytes,1,opt,name=key"`
Event minio.NotificationEventType `json:"event,omitempty" protobuf:"bytes,2,opt,name=event"`
Filter *S3Filter `json:"filter,omitempty" protobuf:"bytes,3,opt,name=filter"`
Expand Down
78 changes: 78 additions & 0 deletions sdk/fake/signal-client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package fake

import (
"context"
"io"

"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
"github.com/argoproj/argo-events/sdk"
"github.com/micro/go-micro/client"
)

// SignalClient implements the sdk.SignalClient
// also contains a Send() method which should be invoked via a separate go-routine
// if your test is single-threaded to prevent blocking channels
type SignalClient interface {
sdk.SignalClient
Generate(*v1alpha1.Event)
}

// implements sdk.SignalService_ListenService as a simple loop
type simpleLoopListenService struct {
events chan *v1alpha1.Event
}

func (*simpleLoopListenService) SendMsg(interface{}) error {
return nil
}

func (*simpleLoopListenService) RecvMsg(interface{}) error {
return nil
}

func (f *simpleLoopListenService) Close() error {
close(f.events)
return nil
}

func (*simpleLoopListenService) Send(*sdk.SignalContext) error {
return nil
}

func (f *simpleLoopListenService) Recv() (*sdk.EventContext, error) {
event, ok := <-f.events
if !ok {
return nil, io.EOF
}
return &sdk.EventContext{Event: event}, nil
}

type fakeSignalClient struct {
loop *simpleLoopListenService
}

// NewClient returns a new fake signal client
func NewClient() SignalClient {
return &fakeSignalClient{
loop: &simpleLoopListenService{
events: make(chan *v1alpha1.Event),
},
}
}

func (*fakeSignalClient) Ping(context.Context) error {
return nil
}

func (f *fakeSignalClient) Listen(context.Context, *v1alpha1.Signal, ...client.CallOption) (sdk.SignalService_ListenService, error) {
return f.loop, nil
}

func (*fakeSignalClient) Handshake(*v1alpha1.Signal, sdk.SignalService_ListenService) error {
return nil
}

// Generate allows us to produce fake events for testing purposes
func (f *fakeSignalClient) Generate(e *v1alpha1.Event) {
go func() { f.loop.events <- e.DeepCopy() }()
}
11 changes: 1 addition & 10 deletions sdk/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,11 @@ type Listener interface {
Listen(*v1alpha1.Signal, <-chan struct{}) (<-chan *v1alpha1.Event, error)
}

// ArtifactListener is the interface for listening with artifacts
// In addition to including the basic Listener interface, this also
// enables access to read an artifact object to include in the event data payload
type ArtifactListener interface {
Listener
// TODO: change to use io.Reader and io.Closer interfaces?
Read(loc *v1alpha1.ArtifactLocation, key string) ([]byte, error)
}

// SignalClient is the interface for signal clients
type SignalClient interface {
Ping(context.Context) error
Listen(context.Context, *v1alpha1.Signal, ...client.CallOption) (SignalService_ListenService, error)
handshake(*v1alpha1.Signal, SignalService_ListenService) error
Handshake(*v1alpha1.Signal, SignalService_ListenService) error
}

// SignalServer is the interface for signal servers
Expand Down
4 changes: 2 additions & 2 deletions sdk/micro_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ func (m *microSignalClient) Listen(ctx context.Context, signal *v1alpha1.Signal,
if err != nil {
return nil, err
}
err = m.handshake(signal, stream)
err = m.Handshake(signal, stream)
if err != nil {
return nil, err
}
return stream, nil
}

// Handshake performs the initial signal handshaking with the server
func (m *microSignalClient) handshake(signal *v1alpha1.Signal, stream SignalService_ListenService) error {
func (m *microSignalClient) Handshake(signal *v1alpha1.Signal, stream SignalService_ListenService) error {
err := stream.Send(&SignalContext{Signal: signal})
if err != nil {
return err
Expand Down
16 changes: 2 additions & 14 deletions signals/artifact/micro/artifact_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,26 @@ package main
import (
"os"

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/sdk"
"github.com/argoproj/argo-events/signals/artifact"
"github.com/micro/go-micro"
k8s "github.com/micro/kubernetes/go/micro"
"k8s.io/client-go/kubernetes"
)

func main() {
svc := k8s.NewService(micro.Name("artifact"), micro.Metadata(sdk.SignalMetadata))
svc.Init()

// kubernetes configuration
kubeConfig, _ := os.LookupEnv(common.EnvVarKubeConfig)
rest, err := common.GetClientConfig(kubeConfig)
if err != nil {
panic(err)
}
kubeclient := kubernetes.NewForConfigOrDie(rest)

// namespace configuration
nm := common.DefaultSensorControllerNamespace

// stream configuration
// TODO: make this configurable while running through github.com/micro/go-config
// or use google/go-cloud runtimeVars
stream, ok := os.LookupEnv("stream-signal")
if !ok {
stream = "nats"
}
streamClient := sdk.NewMicroSignalClient(stream, svc.Client())

sdk.RegisterSignalServiceHandler(svc.Server(), sdk.NewMicroSignalServer(artifact.New(streamClient, kubeclient, nm)))
sdk.RegisterSignalServiceHandler(svc.Server(), sdk.NewMicroSignalServer(artifact.New(streamClient)))

if err := svc.Run(); err != nil {
panic(err)
Expand Down
66 changes: 23 additions & 43 deletions signals/artifact/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,15 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"strconv"
"strings"
"time"

"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
"github.com/argoproj/argo-events/sdk"
"github.com/argoproj/argo-events/store"
"github.com/golang/protobuf/proto"
minio "github.com/minio/minio-go"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

const (
Expand All @@ -46,18 +42,12 @@ const (
// receive struct to save or modify state.
// Listen() methods CAN retrieve fields from the s3 struct.
type s3 struct {
kubeClient kubernetes.Interface
namespace string
streamClient sdk.SignalClient
}

// New creates a new S3 signal
func New(client sdk.SignalClient, kubeClient kubernetes.Interface, nm string) sdk.ArtifactListener {
return &s3{
streamClient: client,
kubeClient: kubeClient,
namespace: nm,
}
func New(client sdk.SignalClient) sdk.Listener {
return &s3{streamClient: client}
}

func (s *s3) Listen(signal *v1alpha1.Signal, done <-chan struct{}) (<-chan *v1alpha1.Event, error) {
Expand Down Expand Up @@ -95,7 +85,6 @@ func (s *s3) Listen(signal *v1alpha1.Signal, done <-chan struct{}) (<-chan *v1al
// wait for stop signal
go func() {
<-done
close(events)
// TODO: should we cancel or gracefully shutdown and first send Terminate msg
err := stream.Send(sdk.Terminate)
if err != nil {
Expand All @@ -105,31 +94,40 @@ func (s *s3) Listen(signal *v1alpha1.Signal, done <-chan struct{}) (<-chan *v1al
if err != nil {
log.Panicf("failed to close stream: %s", err)
}
log.Printf("shut down signal '%s'", signal.Name)
}()
log.Printf("signal '%s' listening for S3 [%s] for bucket [%s]...", signal.Name, signal.Artifact.S3.Event, signal.Artifact.S3.Bucket)
return events, nil
}

// method should be invoked as a separate go routine within the artifact Start method
// intercepts the receive-only msgs off the stream, filters them, and writes artifact events
// to the sendCh.
func (s *s3) interceptFilterAndEnhanceEvents(sig *v1alpha1.Signal, sendCh chan *v1alpha1.Event, recvCh <-chan *v1alpha1.Event) {
loc := sig.Artifact.ArtifactLocation
defer close(sendCh)
for streamEvent := range recvCh {
// todo: apply general filtering on cloudEvents
event := proto.Clone(streamEvent).(*v1alpha1.Event)

notification := &minio.NotificationInfo{}
err := json.Unmarshal(streamEvent.Data, notification)
if err != nil {
// we ignore this - as this stream could be in use by another publisher of different notifications
log.Warnf("failed to unmarshal notification %s: %s", streamEvent.Data, err)
continue
}
if notification.Err != nil {
event := streamEvent.DeepCopy()
event.Context.Extensions[sdk.ContextExtensionErrorKey] = notification.Err.Error()
sendCh <- event
}
for _, record := range notification.Records {
if ok := applyFilter(&record, sig.Artifact.ArtifactLocation); !ok {
event := streamEvent.DeepCopy()

if ok := applyFilter(&record, loc); !ok {
// this record failed to pass the filter so we ignore it
log.Debugf("filtered event - record metadata [bucket: %s, event: %s, key: %s] "+
"does not match expected s3 [bucket: %s, event: %s, filter: %v]",
record.S3.Bucket.Name, record.EventName, record.S3.Object.Key,
loc.S3.Bucket, loc.S3.Event, loc.S3.Filter)
continue
}
port, _ := strconv.ParseInt(record.Source.Port, 10, 32)
Expand All @@ -146,13 +144,17 @@ func (s *s3) interceptFilterAndEnhanceEvents(sig *v1alpha1.Signal, sendCh chan *
Scheme: record.S3.SchemaVersion,
}
event.Context.EventID = record.S3.Object.ETag
event.Context.ContentType = "application/json"

// read the actual s3 artifact to put into the event data
b, err := s.Read(&sig.Artifact.ArtifactLocation, record.S3.Object.Key)
// re-marshal each record back into json
recordEvent := new(minio.NotificationEvent)
recordEventBytes, err := json.Marshal(recordEvent)
if err != nil {
event.Context.Extensions[sdk.ContextExtensionErrorKey] = err.Error()
log.Warnf("failed to re-marshal notification event into json: %s. falling back to the stream event's original data", err)
event.Data = streamEvent.Data
} else {
event.Data = recordEventBytes
}
event.Data = b
sendCh <- event
}
}
Expand Down Expand Up @@ -195,25 +197,3 @@ func getMetaTimestamp(tStr string) metav1.Time {
}
return metav1.Time{Time: t}
}

func (s *s3) Read(loc *v1alpha1.ArtifactLocation, key string) ([]byte, error) {
creds, err := store.GetCredentials(s.kubeClient, s.namespace, loc)
if err != nil {
return nil, err
}
client, err := store.NewMinioClient(loc.S3, *creds)
if err != nil {
return nil, err
}

obj, err := client.GetObject(loc.S3.Bucket, key, minio.GetObjectOptions{})
if err != nil {
return nil, err
}
defer obj.Close()
b, err := ioutil.ReadAll(obj)
if err != nil {
return nil, err
}
return b, nil
}

0 comments on commit ab9b1b6

Please sign in to comment.