From 4534d3c87472609cd7a9578ec02d00e21388b6ba Mon Sep 17 00:00:00 2001 From: justinsb Date: Thu, 11 Apr 2024 11:09:29 -0400 Subject: [PATCH] mockgcp: Implement pubsub Aiming to support PubSubTopic --- config/tests/samples/create/harness.go | 3 + mockgcp/Makefile | 1 + mockgcp/fixup-third-party.sh | 4 + mockgcp/mock_http_roundtrip.go | 2 + mockgcp/mockpubsub/schema.go | 142 ++++++++++++++++++++ mockgcp/mockpubsub/service.go | 75 +++++++++++ mockgcp/mockpubsub/topic.go | 178 +++++++++++++++++++++++++ tests/e2e/unified_test.go | 4 + 8 files changed, 409 insertions(+) create mode 100644 mockgcp/mockpubsub/schema.go create mode 100644 mockgcp/mockpubsub/service.go create mode 100644 mockgcp/mockpubsub/topic.go diff --git a/config/tests/samples/create/harness.go b/config/tests/samples/create/harness.go index a9cec0b7b4..9b29834e35 100644 --- a/config/tests/samples/create/harness.go +++ b/config/tests/samples/create/harness.go @@ -593,6 +593,9 @@ func MaybeSkip(t *testing.T, name string, resources []*unstructured.Unstructured case schema.GroupKind{Group: "privateca.cnrm.cloud.google.com", Kind: "PrivateCACAPool"}: + case schema.GroupKind{Group: "pubsub.cnrm.cloud.google.com", Kind: "PubSubSchema"}: + case schema.GroupKind{Group: "pubsub.cnrm.cloud.google.com", Kind: "PubSubTopic"}: + case schema.GroupKind{Group: "redis.cnrm.cloud.google.com", Kind: "RedisInstance"}: case schema.GroupKind{Group: "resourcemanager.cnrm.cloud.google.com", Kind: "Project"}: diff --git a/mockgcp/Makefile b/mockgcp/Makefile index a223d48383..577ea6aab0 100644 --- a/mockgcp/Makefile +++ b/mockgcp/Makefile @@ -32,6 +32,7 @@ gen-proto: ./third_party/googleapis/mockgcp/devtools/artifactregistry/v1/*.proto \ ./third_party/googleapis/mockgcp/iam/admin/v1/*.proto \ ./third_party/googleapis/mockgcp/logging/v2/*.proto \ + ./third_party/googleapis/mockgcp/pubsub/v1/*.proto \ ./third_party/googleapis/mockgcp/cloud/aiplatform/v1beta1/annotation*.proto \ ./third_party/googleapis/mockgcp/cloud/aiplatform/v1beta1/data_item.proto \ ./third_party/googleapis/mockgcp/cloud/aiplatform/v1beta1/dataset*.proto \ diff --git a/mockgcp/fixup-third-party.sh b/mockgcp/fixup-third-party.sh index b0d361df8e..369b1122cb 100755 --- a/mockgcp/fixup-third-party.sh +++ b/mockgcp/fixup-third-party.sh @@ -27,6 +27,7 @@ mv google/cloud/ mockgcp/ mv google/container/ mockgcp/ mv google/iam/ mockgcp/ mv google/logging/ mockgcp/ +mv google/pubsub/ mockgcp/ mv google/storage/ mockgcp/ mv google/monitoring/ mockgcp/ mv google/api/apikeys/ mockgcp/api/ @@ -48,6 +49,9 @@ find . -type f -print0 | xargs -0 sed -i -e "s@google\.iam@mockgcp.iam@g" find . -type f -print0 | xargs -0 sed -i -e "s@google/logging/@mockgcp/logging/@g" find . -type f -print0 | xargs -0 sed -i -e "s@google\.logging@mockgcp.logging@g" +find . -type f -print0 | xargs -0 sed -i -e "s@google/pubsub/@mockgcp/pubsub/@g" +find . -type f -print0 | xargs -0 sed -i -e "s@google\.pubsub@mockgcp.pubsub@g" + find . -type f -print0 | xargs -0 sed -i -e "s@google/monitoring/@mockgcp/monitoring/@g" find . -type f -print0 | xargs -0 sed -i -e "s@google\.monitoring@mockgcp.monitoring@g" diff --git a/mockgcp/mock_http_roundtrip.go b/mockgcp/mock_http_roundtrip.go index 7efdf7e774..d57d1ed285 100644 --- a/mockgcp/mock_http_roundtrip.go +++ b/mockgcp/mock_http_roundtrip.go @@ -52,6 +52,7 @@ import ( "github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/mockmonitoring" "github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/mocknetworkservices" "github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/mockprivateca" + "github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/mockpubsub" "github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/mockpubsublite" "github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/mockredis" "github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/mockresourcemanager" @@ -126,6 +127,7 @@ func NewMockRoundTripper(t *testing.T, k8sClient client.Client, storage storage. services = append(services, mockmonitoring.New(env, storage)) services = append(services, mockpubsublite.New(env, storage)) services = append(services, mocknetworkservices.New(env, storage)) + services = append(services, mockpubsub.New(env, storage)) services = append(services, mockredis.New(env, storage)) services = append(services, mockserviceusage.New(env, storage)) services = append(services, mocksql.New(env, storage)) diff --git a/mockgcp/mockpubsub/schema.go b/mockgcp/mockpubsub/schema.go new file mode 100644 index 0000000000..53aeac3706 --- /dev/null +++ b/mockgcp/mockpubsub/schema.go @@ -0,0 +1,142 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mockpubsub + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/common/projects" + pb "github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/generated/mockgcp/pubsub/v1" + "github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/pkg/storage" + "github.com/golang/protobuf/ptypes/empty" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" +) + +type schemaService struct { + *MockService + pb.UnimplementedSchemaServiceServer +} + +func (s *schemaService) CreateSchema(ctx context.Context, req *pb.CreateSchemaRequest) (*pb.Schema, error) { + reqName := req.Parent + "/schemas/" + req.GetSchemaId() + name, err := s.parseSchemaName(reqName) + if err != nil { + return nil, err + } + fqn := name.String() + now := time.Now() + obj := proto.Clone(req.GetSchema()).(*pb.Schema) + obj.Name = name.String() + obj.RevisionId = fmt.Sprintf("r%d", now.Unix()) + obj.RevisionCreateTime = timestamppb.New(now) + s.populateDefaultsForSchema(name, obj) + if err := s.storage.Create(ctx, fqn, obj); err != nil { + return nil, err + } + return obj, nil +} + +func (s *schemaService) populateDefaultsForSchema(name *schemaName, obj *pb.Schema) { + // TODO: populate any default values here +} + +func (s *schemaService) GetSchema(ctx context.Context, req *pb.GetSchemaRequest) (*pb.Schema, error) { + name, err := s.parseSchemaName(req.Name) + if err != nil { + return nil, err + } + fqn := name.String() + obj := &pb.Schema{} + if err := s.storage.Get(ctx, fqn, obj); err != nil { + if status.Code(err) == codes.NotFound { + return nil, status.Errorf(codes.NotFound, "Resource not found (resource=%s).", name.String()) + } + return nil, err + } + return obj, nil +} + +func (s *schemaService) ListSchemas(ctx context.Context, req *pb.ListSchemasRequest) (*pb.ListSchemasResponse, error) { + project, err := s.Projects.GetProjectByID(req.Parent) + if err != nil { + return nil, err + } + + findPrefix := fmt.Sprintf("projects/%v/", project.ID) + + var schemas []*pb.Schema + + schemaKind := (&pb.Schema{}).ProtoReflect().Descriptor() + if err := s.storage.List(ctx, schemaKind, storage.ListOptions{}, func(obj proto.Message) error { + schema := obj.(*pb.Schema) + if strings.HasPrefix(schema.GetName(), findPrefix) { + schemas = append(schemas, schema) + } + return nil + }); err != nil { + return nil, err + } + + return &pb.ListSchemasResponse{ + Schemas: schemas, + }, nil +} + +func (s *schemaService) DeleteSchema(ctx context.Context, req *pb.DeleteSchemaRequest) (*empty.Empty, error) { + name, err := s.parseSchemaName(req.Name) + if err != nil { + return nil, err + } + fqn := name.String() + deletedObj := &pb.Schema{} + if err := s.storage.Delete(ctx, fqn, deletedObj); err != nil { + return nil, err + } + return &empty.Empty{}, nil +} + +type schemaName struct { + Project *projects.ProjectData + ID string +} + +func (n *schemaName) String() string { + return "projects/" + n.Project.ID + "/schemas/" + n.ID +} + +// parseSchemaName parses a string into a schemaName. +// The expected form is `projects/*/schemas/*`. +func (s *MockService) parseSchemaName(name string) (*schemaName, error) { + tokens := strings.Split(name, "/") + if len(tokens) == 4 && tokens[0] == "projects" && tokens[2] == "schemas" { + project, err := s.Projects.GetProjectByID(tokens[1]) + if err != nil { + return nil, err + } + name := &schemaName{ + Project: project, + ID: tokens[3], + } + return name, nil + } else { + return nil, status.Errorf(codes.InvalidArgument, "name %q is not valid", name) + } +} diff --git a/mockgcp/mockpubsub/service.go b/mockgcp/mockpubsub/service.go new file mode 100644 index 0000000000..d64af357ea --- /dev/null +++ b/mockgcp/mockpubsub/service.go @@ -0,0 +1,75 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mockpubsub + +import ( + "context" + "net/http" + + "google.golang.org/grpc" + + "github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/common" + "github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/common/httpmux" + "github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/common/operations" + pb "github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/generated/mockgcp/pubsub/v1" + "github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/pkg/storage" +) + +// MockService represents a mocked pubsub service. +type MockService struct { + *common.MockEnvironment + storage storage.Storage + operations *operations.Operations +} + +// New creates a MockService. +func New(env *common.MockEnvironment, storage storage.Storage) *MockService { + s := &MockService{ + MockEnvironment: env, + storage: storage, + operations: operations.NewOperationsService(storage), + } + return s +} + +func (s *MockService) ExpectedHost() string { + return "pubsub.googleapis.com" +} + +func (s *MockService) Register(grpcServer *grpc.Server) { + pb.RegisterPublisherServer(grpcServer, &publisherService{MockService: s}) + // pb.RegisterSubscriberServer(grpcServer, &subscriberService{MockService: s}) + pb.RegisterSchemaServiceServer(grpcServer, &schemaService{MockService: s}) +} + +func (s *MockService) NewHTTPMux(ctx context.Context, conn *grpc.ClientConn) (http.Handler, error) { + mux, err := httpmux.NewServeMux(ctx, conn, httpmux.Options{}, + pb.RegisterPublisherHandler, + pb.RegisterSubscriberHandler, + pb.RegisterSchemaServiceHandler, + ) + if err != nil { + return nil, err + } + + // Returns slightly non-standard errors + mux.RewriteError = func(ctx context.Context, error *httpmux.ErrorResponse) { + if error.Code == 404 { + error.Errors = nil + } + } + + return mux, nil +} diff --git a/mockgcp/mockpubsub/topic.go b/mockgcp/mockpubsub/topic.go new file mode 100644 index 0000000000..1164bdb762 --- /dev/null +++ b/mockgcp/mockpubsub/topic.go @@ -0,0 +1,178 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mockpubsub + +import ( + "context" + "fmt" + "strings" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/common/projects" + pb "github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/generated/mockgcp/pubsub/v1" + "github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/pkg/storage" +) + +type publisherService struct { + *MockService + pb.UnimplementedPublisherServer +} + +func (s *publisherService) CreateTopic(ctx context.Context, req *pb.Topic) (*pb.Topic, error) { + name, err := s.parseTopicName(req.Name) + if err != nil { + return nil, err + } + fqn := name.String() + + obj := proto.Clone(req).(*pb.Topic) + obj.Name = name.String() + + s.populateDefaultsForTopic(obj) + if err := s.storage.Create(ctx, fqn, obj); err != nil { + return nil, err + } + return obj, nil +} + +func (s *publisherService) populateDefaultsForTopic(obj *pb.Topic) { + +} + +func (s *publisherService) UpdateTopic(ctx context.Context, req *pb.UpdateTopicRequest) (*pb.Topic, error) { + reqName := req.Topic.Name + name, err := s.parseTopicName(reqName) + if err != nil { + return nil, err + } + fqn := name.String() + existing := &pb.Topic{} + if err := s.storage.Get(ctx, fqn, existing); err != nil { + return nil, err + } + + updated := req.GetTopic() + updated.Name = name.String() + + // Required. The update mask applies to the resource. + paths := req.GetUpdateMask().GetPaths() + if len(paths) == 0 { + return nil, status.Errorf(codes.InvalidArgument, "update_mask is required") + } + // TODO: Some sort of helper for fieldmask? + for _, path := range paths { + switch path { + // case "description": + // updated.Description = req.GetTopic().GetDescription() + // case "labels": + // updated.Labels = req.GetTopic().GetLabels() + default: + return nil, status.Errorf(codes.InvalidArgument, "update_mask path %q not valid", path) + } + } + + if err := s.storage.Update(ctx, fqn, updated); err != nil { + return nil, err + } + return updated, nil +} + +func (s *publisherService) GetTopic(ctx context.Context, req *pb.GetTopicRequest) (*pb.Topic, error) { + name, err := s.parseTopicName(req.Topic) + if err != nil { + return nil, err + } + fqn := name.String() + obj := &pb.Topic{} + if err := s.storage.Get(ctx, fqn, obj); err != nil { + if status.Code(err) == codes.NotFound { + return nil, status.Errorf(codes.NotFound, "Resource not found (resource=%s).", name.ID) + } + return nil, err + } + return obj, nil +} + +func (s *publisherService) ListTopics(ctx context.Context, req *pb.ListTopicsRequest) (*pb.ListTopicsResponse, error) { + project, err := s.Projects.GetProjectByID(req.Project) + if err != nil { + return nil, err + } + + findPrefix := fmt.Sprintf("projects/%v/", project.ID) + + var topics []*pb.Topic + + topicKind := (&pb.Topic{}).ProtoReflect().Descriptor() + if err := s.storage.List(ctx, topicKind, storage.ListOptions{}, func(obj proto.Message) error { + topic := obj.(*pb.Topic) + if strings.HasPrefix(topic.Name, findPrefix) { + topics = append(topics, topic) + } + return nil + }); err != nil { + return nil, err + } + + return &pb.ListTopicsResponse{ + Topics: topics, + NextPageToken: "", + }, nil +} + +func (s *publisherService) DeleteTopic(ctx context.Context, req *pb.DeleteTopicRequest) (*emptypb.Empty, error) { + name, err := s.parseTopicName(req.Topic) + if err != nil { + return nil, err + } + fqn := name.String() + deletedObj := &pb.Topic{} + if err := s.storage.Delete(ctx, fqn, deletedObj); err != nil { + return nil, err + } + return &emptypb.Empty{}, nil +} + +type topicName struct { + Project *projects.ProjectData + ID string +} + +func (n *topicName) String() string { + return fmt.Sprintf("projects/%s/topics/%s", n.Project.ID, n.ID) +} + +// parseTopicName parses a string into a topicName. +// The expected form is `projects/*/topics/*`. +func (s *MockService) parseTopicName(name string) (*topicName, error) { + tokens := strings.Split(name, "/") + if len(tokens) == 4 && tokens[0] == "projects" && tokens[2] == "topics" { + project, err := s.Projects.GetProjectByID(tokens[1]) + if err != nil { + return nil, err + } + name := &topicName{ + Project: project, + ID: tokens[3], + } + return name, nil + } else { + return nil, status.Errorf(codes.InvalidArgument, "name %q is not valid", name) + } +} diff --git a/tests/e2e/unified_test.go b/tests/e2e/unified_test.go index 75ca2529e5..e4149b0e5e 100644 --- a/tests/e2e/unified_test.go +++ b/tests/e2e/unified_test.go @@ -490,6 +490,10 @@ func testFixturesInSeries(ctx context.Context, t *testing.T, testPause bool, can // Specific to BigQuery addSetStringReplacement(".access[].userByEmail", "user@google.com") + // Specific to pubsub + addReplacement("revisionCreateTime", "2024-04-01T12:34:56.123456Z") + addReplacement("revisionId", "revision-id-placeholder") + // Replace any empty values in LROs; this is surprisingly difficult to fix in mockgcp // // "response": {