Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Adding Topic Controller #35

Merged
merged 24 commits into from
Jul 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 64 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@ required = [

[[override]]
name = "github.com/knative/eventing"
# HEAD as of 2919-06-26
# HEAD as of 2019-06-26
revision = "677d3d17e84b7337f006220d6bb465af4f416f81"

[[override]]
grantr marked this conversation as resolved.
Show resolved Hide resolved
name = "github.com/knative/serving"
# HEAD as of 2919-06-26
revision = "50cdf8762deae58ed89763adef26a88e7efecac7"
grantr marked this conversation as resolved.
Show resolved Hide resolved

[[override]]
name = "go.uber.org/zap"
revision = "67bc79d13d155c02fd008f721863ff8cc5f30659"
Expand Down
3 changes: 3 additions & 0 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

"github.com/GoogleCloudPlatform/cloud-run-events/pkg/reconciler/pullsubscription"
"github.com/GoogleCloudPlatform/cloud-run-events/pkg/reconciler/topic"

"knative.dev/pkg/injection/sharedmain"
)

func main() {
sharedmain.Main("controller",
pullsubscription.NewController,
topic.NewController,
)
}
77 changes: 77 additions & 0 deletions cmd/pubsub/publisher/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
Copyright 2019 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"flag"
"log"

"cloud.google.com/go/compute/metadata"
"github.com/kelseyhightower/envconfig"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/GoogleCloudPlatform/cloud-run-events/pkg/pubsub/publisher"
)

type envConfig struct {
// Environment variable containing project id.
Project string `envconfig:"PROJECT_ID"`
grantr marked this conversation as resolved.
Show resolved Hide resolved

// Topic is the environment variable containing the PubSub Topic being
// subscribed to's name. In the form that is unique within the project.
// E.g. 'laconia', not 'projects/my-gcp-project/topics/laconia'.
Topic string `envconfig:"PUBSUB_TOPIC_ID" required:"true"`
grantr marked this conversation as resolved.
Show resolved Hide resolved
}

func main() {
flag.Parse()

ctx := context.Background()
logCfg := zap.NewProductionConfig() // TODO: to replace with a dynamically updating logger.
logCfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
logger, err := logCfg.Build()
if err != nil {
log.Fatalf("Unable to create logger: %v", err)
}

var env envConfig
if err := envconfig.Process("", &env); err != nil {
logger.Fatal("Failed to process env var", zap.Error(err))
}

if env.Project == "" {
project, err := metadata.ProjectID()
if err != nil {
logger.Fatal("failed to find project id. ", zap.Error(err))
}
env.Project = project
}

logger.Info("using project.", zap.String("project", env.Project))

startable := &publisher.Publisher{
ProjectID: env.Project,
TopicID: env.Topic,
}

logger.Info("Starting Pub/Sub Publisher.", zap.Any("publisher", startable))
if err := startable.Start(ctx); err != nil {
logger.Fatal("failed to start publisher: ", zap.Error(err))
}
}
6 changes: 6 additions & 0 deletions config/201-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ rules:
- deployments
verbs: *everything

- apiGroups:
- serving.knative.dev
resources:
- services
verbs: *everything

- apiGroups:
- batch
resources:
Expand Down
26 changes: 10 additions & 16 deletions pkg/apis/pubsub/v1alpha1/topic_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package v1alpha1

import (
appsv1 "k8s.io/api/apps/v1"

"knative.dev/pkg/apis"
"knative.dev/pkg/apis/duck/v1alpha1"
)
Expand Down Expand Up @@ -55,21 +53,17 @@ func (ts *TopicStatus) SetAddress(url *apis.URL) {
}
}

func (ts *TopicStatus) PropagateDeploymentAvailability(d *appsv1.Deployment) {
for _, cond := range d.Status.Conditions {
if cond.Type == appsv1.DeploymentAvailable {
switch cond.Status {
case "True":
ts.MarkDeployed()
case "False":
ts.MarkNotDeployed(cond.Reason, cond.Message)
default:
ts.MarkDeploying(cond.Reason, cond.Message)
}
return
}
func (ts *TopicStatus) PropagatePublisherStatus(ready *apis.Condition) {
switch {
case ready == nil:
ts.MarkDeploying("PublisherStatus", "Publisher has no Ready type status.")
case ready.IsTrue():
ts.MarkDeployed()
case ready.IsFalse():
ts.MarkNotDeployed(ready.Reason, ready.Message)
default:
ts.MarkDeploying(ready.Reason, ready.Message)
}
ts.MarkDeploying("DeploymentStatus", "Failed to inspect Deployment status.")
}

// MarkDeployed sets the condition that the publisher has been deployed.
Expand Down
2 changes: 1 addition & 1 deletion pkg/pubsub/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ limitations under the License.
package adapter

import (
"context"
"fmt"

cloudevents "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/cloudevents/transport"
"github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http"
cepubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub"
"go.uber.org/zap"
"golang.org/x/net/context"
"knative.dev/pkg/logging"

"github.com/GoogleCloudPlatform/cloud-run-events/pkg/apis/pubsub/v1alpha1"
Expand Down
2 changes: 1 addition & 1 deletion pkg/pubsub/invoker/invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ limitations under the License.
package invoker

import (
"context"
"errors"
"fmt"

cloudevents "github.com/cloudevents/sdk-go"
"github.com/cloudevents/sdk-go/pkg/cloudevents/transport"
cepubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub"
"golang.org/x/net/context"

"github.com/GoogleCloudPlatform/cloud-run-events/pkg/apis/pubsub/v1alpha1"
"github.com/GoogleCloudPlatform/cloud-run-events/pkg/kncloudevents"
Expand Down
92 changes: 92 additions & 0 deletions pkg/pubsub/publisher/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
Copyright 2019 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package publisher

import (
"fmt"

"context"

cloudevents "github.com/cloudevents/sdk-go"
cepubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub"

"github.com/GoogleCloudPlatform/cloud-run-events/pkg/kncloudevents"
)

// Publisher implements the Pub/Sub adapter to deliver Pub/Sub messages from a
// pre-existing topic/subscription to a Sink.
type Publisher struct {
// ProjectID is the pre-existing eventing project id to use.
ProjectID string
grantr marked this conversation as resolved.
Show resolved Hide resolved
// TopicID is the pre-existing eventing pub/sub topic id to use.
TopicID string

// inbound is the cloudevents client to use to receive events.
inbound cloudevents.Client
// outbound is the cloudevents client to use to send events.
outbound cloudevents.Client
}

func (a *Publisher) Start(ctx context.Context) error {
var err error

// Receive events on HTTP.
if a.inbound == nil {
if a.inbound, err = kncloudevents.NewDefaultClient(); err != nil {
return fmt.Errorf("failed to create inbound cloudevent client: %s", err.Error())
}
}

// Send Events on Pub/Sub.
if a.outbound == nil {
if a.outbound, err = a.newPubSubClient(ctx); err != nil {
return fmt.Errorf("failed to create outbound cloudevent client: %s", err.Error())
}
}

return a.inbound.StartReceiver(ctx, a.receive)
}

func (a *Publisher) receive(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {
if r, err := a.outbound.Send(ctx, event); err != nil {
return err
} else if r != nil {
resp.RespondWith(200, r)
}

return nil
}

func (a *Publisher) newPubSubClient(ctx context.Context) (cloudevents.Client, error) {
tOpts := []cepubsub.Option{
cepubsub.WithBinaryEncoding(),
cepubsub.WithProjectID(a.ProjectID),
cepubsub.WithTopicID(a.TopicID),
}

// Make a pubsub transport for the CloudEvents client.
t, err := cepubsub.New(ctx, tOpts...)
if err != nil {
return nil, err
}

// Use the transport to make a new CloudEvents client.
return cloudevents.NewClient(t,
cloudevents.WithUUIDs(),
cloudevents.WithTimeNow(),
)
}
Loading