diff --git a/api/event-source.html b/api/event-source.html index e51776f001..bcbccbb5ee 100644 --- a/api/event-source.html +++ b/api/event-source.html @@ -1767,6 +1767,19 @@

PubSubEventSource +enableWorkflowIdentity
+ +bool + + + +(Optional) +

EnableWorkflowIdentity determines if your project authenticates to GCP with WorkflowIdentity or CredentialsFile. +If true, authentication is done with WorkflowIdentity. If false or omited, authentication is done with CredentialsFile.

+ + + + deleteSubscriptionOnFinish
bool diff --git a/api/event-source.md b/api/event-source.md index c107294300..78ddafa3e3 100644 --- a/api/event-source.md +++ b/api/event-source.md @@ -3462,6 +3462,31 @@ for GCP +enableWorkflowIdentity
bool + + + + + +(Optional) + +

+ +EnableWorkflowIdentity determines if your project authenticates to GCP +with WorkflowIdentity or CredentialsFile. If true, authentication is +done with WorkflowIdentity. If false or omited, authentication is done +with CredentialsFile. + +

+ + + + + + + + + deleteSubscriptionOnFinish
bool diff --git a/examples/event-sources/gcp-pubsub.yaml b/examples/event-sources/gcp-pubsub.yaml index cc84511522..53c5bf8dca 100644 --- a/examples/event-sources/gcp-pubsub.yaml +++ b/examples/event-sources/gcp-pubsub.yaml @@ -17,3 +17,18 @@ spec: topic: test # Refers to path of the credential file that is mounted in the gateway pod. credentialsFile: /creds/key.json + +# example-workload-identity: +# # jsonBody specifies that all event body payload coming from this +# # source will be JSON +# jsonBody: true +# # id of your project +# projectID: argo-events-XXXXX +# # (optional) id of project for topic, same as projectID by default +# # topicProjectID: "project-id" +# # topic name +# topic: test +# # Empty credentials file when using Workflow Identity +# credentialsFile: "" +# # If enableWorkflowIdentity is true, the projects uses Workflow Identity for authentication +# enableWorkflowIdentity: true diff --git a/gateways/server/gcp-pubsub/start.go b/gateways/server/gcp-pubsub/start.go index 127a4e96c6..c298c3dc1b 100644 --- a/gateways/server/gcp-pubsub/start.go +++ b/gateways/server/gcp-pubsub/start.go @@ -85,31 +85,35 @@ func (listener *EventListener) listenEvents(eventSource *gateways.EventSource, c ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Create a new topic with the given name if none exists logger.Infoln("setting up a client to connect to PubSub...") - client, err := pubsub.NewClient(ctx, pubsubEventSource.ProjectID, option.WithCredentialsFile(pubsubEventSource.CredentialsFile)) - if err != nil { - return errors.Wrapf(err, "failed to set up client for %s", eventSource.Name) + + var opt []option.ClientOption + projectId := pubsubEventSource.ProjectID + + if !pubsubEventSource.EnableWorkflowIdentity { + opt = append(opt, option.WithCredentialsFile(pubsubEventSource.CredentialsFile)) } - // use same client for topic and subscription by default - topicClient := client + // Use default ProjectID unless TopicProjectID exists if pubsubEventSource.TopicProjectID != "" && pubsubEventSource.TopicProjectID != pubsubEventSource.ProjectID { - topicClient, err = pubsub.NewClient(ctx, pubsubEventSource.TopicProjectID, option.WithCredentialsFile(pubsubEventSource.CredentialsFile)) - if err != nil { - return errors.Wrapf(err, "failed to set up client for %s", eventSource.Name) - } + projectId = pubsubEventSource.TopicProjectID + } + + // Create a new topic with the given name if none exists + client, err := pubsub.NewClient(ctx, projectId, opt...) + if err != nil { + return errors.Wrapf(err, "failed to set up client for %s", eventSource.Name) } logger.Infoln("getting topic information from PubSub...") - topic := topicClient.Topic(pubsubEventSource.Topic) + topic := client.Topic(pubsubEventSource.Topic) exists, err := topic.Exists(ctx) if err != nil { return errors.Wrapf(err, "failed to get status of the topic %s for %s", pubsubEventSource.Topic, eventSource.Name) } if !exists { logger.Infoln("topic doesn't exist, creating the PubSub topic...") - if _, err := topicClient.CreateTopic(ctx, pubsubEventSource.Topic); err != nil { + if _, err := client.CreateTopic(ctx, pubsubEventSource.Topic); err != nil { return errors.Wrapf(err, "failed to create the topic %s for %s", pubsubEventSource.Topic, eventSource.Name) } } diff --git a/gateways/server/gcp-pubsub/validate.go b/gateways/server/gcp-pubsub/validate.go index 4f3d95dc65..b4c9461c2c 100644 --- a/gateways/server/gcp-pubsub/validate.go +++ b/gateways/server/gcp-pubsub/validate.go @@ -19,6 +19,7 @@ package pubsub import ( "context" "fmt" + "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/gateways" apicommon "github.com/argoproj/argo-events/pkg/apis/common" @@ -65,8 +66,8 @@ func validate(eventSource *v1alpha1.PubSubEventSource) error { if eventSource.Topic == "" { return fmt.Errorf("must specify topic") } - if eventSource.CredentialsFile == "" { - return fmt.Errorf("must specify credentials file path") + if !eventSource.EnableWorkflowIdentity && eventSource.CredentialsFile == "" { + return fmt.Errorf("must specify credentials file path if not using Workflow Identity") } return nil } diff --git a/pkg/apis/eventsources/v1alpha1/types.go b/pkg/apis/eventsources/v1alpha1/types.go index 723c24dc57..ed6a60d8df 100644 --- a/pkg/apis/eventsources/v1alpha1/types.go +++ b/pkg/apis/eventsources/v1alpha1/types.go @@ -314,9 +314,13 @@ type PubSubEventSource struct { Topic string `json:"topic" protobuf:"bytes,3,name=topic"` // CredentialsFile is the file that contains credentials to authenticate for GCP CredentialsFile string `json:"credentialsFile" protobuf:"bytes,4,name=credentialsFile"` + // EnableWorkflowIdentity determines if your project authenticates to GCP with WorkflowIdentity or CredentialsFile. + // If true, authentication is done with WorkflowIdentity. If false or omited, authentication is done with CredentialsFile. + // +optional + EnableWorkflowIdentity bool `json:"enableWorkflowIdentity,omitempty" protobuf:"bytes,5,opt,name=enableWorkflowIdentity"` // DeleteSubscriptionOnFinish determines whether to delete the GCP PubSub subscription once the event source is stopped. // +optional - DeleteSubscriptionOnFinish bool `json:"deleteSubscriptionOnFinish,omitempty" protobuf:"bytes,1,opt,name=deleteSubscriptionOnFinish"` + DeleteSubscriptionOnFinish bool `json:"deleteSubscriptionOnFinish,omitempty" protobuf:"bytes,6,opt,name=deleteSubscriptionOnFinish"` // JSONBody specifies that all event body payload coming from this // source will be JSON // +optional