Skip to content

Commit

Permalink
mockgcp: Implement pubsub
Browse files Browse the repository at this point in the history
Aiming to support PubSubTopic
  • Loading branch information
justinsb committed May 8, 2024
1 parent d0a0087 commit 4534d3c
Show file tree
Hide file tree
Showing 8 changed files with 409 additions and 0 deletions.
3 changes: 3 additions & 0 deletions config/tests/samples/create/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,9 @@ func MaybeSkip(t *testing.T, name string, resources []*unstructured.Unstructured

case schema.GroupKind{Group: "privateca.cnrm.cloud.google.com", Kind: "PrivateCACAPool"}:

case schema.GroupKind{Group: "pubsub.cnrm.cloud.google.com", Kind: "PubSubSchema"}:
case schema.GroupKind{Group: "pubsub.cnrm.cloud.google.com", Kind: "PubSubTopic"}:

case schema.GroupKind{Group: "redis.cnrm.cloud.google.com", Kind: "RedisInstance"}:

case schema.GroupKind{Group: "resourcemanager.cnrm.cloud.google.com", Kind: "Project"}:
Expand Down
1 change: 1 addition & 0 deletions mockgcp/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ gen-proto:
./third_party/googleapis/mockgcp/devtools/artifactregistry/v1/*.proto \
./third_party/googleapis/mockgcp/iam/admin/v1/*.proto \
./third_party/googleapis/mockgcp/logging/v2/*.proto \
./third_party/googleapis/mockgcp/pubsub/v1/*.proto \
./third_party/googleapis/mockgcp/cloud/aiplatform/v1beta1/annotation*.proto \
./third_party/googleapis/mockgcp/cloud/aiplatform/v1beta1/data_item.proto \
./third_party/googleapis/mockgcp/cloud/aiplatform/v1beta1/dataset*.proto \
Expand Down
4 changes: 4 additions & 0 deletions mockgcp/fixup-third-party.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ mv google/cloud/ mockgcp/
mv google/container/ mockgcp/
mv google/iam/ mockgcp/
mv google/logging/ mockgcp/
mv google/pubsub/ mockgcp/
mv google/storage/ mockgcp/
mv google/monitoring/ mockgcp/
mv google/api/apikeys/ mockgcp/api/
Expand All @@ -48,6 +49,9 @@ find . -type f -print0 | xargs -0 sed -i -e "s@google\.iam@mockgcp.iam@g"
find . -type f -print0 | xargs -0 sed -i -e "s@google/logging/@mockgcp/logging/@g"
find . -type f -print0 | xargs -0 sed -i -e "s@google\.logging@mockgcp.logging@g"

find . -type f -print0 | xargs -0 sed -i -e "s@google/pubsub/@mockgcp/pubsub/@g"
find . -type f -print0 | xargs -0 sed -i -e "s@google\.pubsub@mockgcp.pubsub@g"

find . -type f -print0 | xargs -0 sed -i -e "s@google/monitoring/@mockgcp/monitoring/@g"
find . -type f -print0 | xargs -0 sed -i -e "s@google\.monitoring@mockgcp.monitoring@g"

Expand Down
2 changes: 2 additions & 0 deletions mockgcp/mock_http_roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/mockmonitoring"
"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/mocknetworkservices"
"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/mockprivateca"
"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/mockpubsub"
"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/mockpubsublite"
"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/mockredis"
"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/mockresourcemanager"
Expand Down Expand Up @@ -126,6 +127,7 @@ func NewMockRoundTripper(t *testing.T, k8sClient client.Client, storage storage.
services = append(services, mockmonitoring.New(env, storage))
services = append(services, mockpubsublite.New(env, storage))
services = append(services, mocknetworkservices.New(env, storage))
services = append(services, mockpubsub.New(env, storage))
services = append(services, mockredis.New(env, storage))
services = append(services, mockserviceusage.New(env, storage))
services = append(services, mocksql.New(env, storage))
Expand Down
142 changes: 142 additions & 0 deletions mockgcp/mockpubsub/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mockpubsub

import (
"context"
"fmt"
"strings"
"time"

"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/common/projects"
pb "github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/generated/mockgcp/pubsub/v1"
"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/pkg/storage"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)

type schemaService struct {
*MockService
pb.UnimplementedSchemaServiceServer
}

func (s *schemaService) CreateSchema(ctx context.Context, req *pb.CreateSchemaRequest) (*pb.Schema, error) {
reqName := req.Parent + "/schemas/" + req.GetSchemaId()
name, err := s.parseSchemaName(reqName)
if err != nil {
return nil, err
}
fqn := name.String()
now := time.Now()
obj := proto.Clone(req.GetSchema()).(*pb.Schema)
obj.Name = name.String()
obj.RevisionId = fmt.Sprintf("r%d", now.Unix())
obj.RevisionCreateTime = timestamppb.New(now)
s.populateDefaultsForSchema(name, obj)
if err := s.storage.Create(ctx, fqn, obj); err != nil {
return nil, err
}
return obj, nil
}

func (s *schemaService) populateDefaultsForSchema(name *schemaName, obj *pb.Schema) {
// TODO: populate any default values here
}

func (s *schemaService) GetSchema(ctx context.Context, req *pb.GetSchemaRequest) (*pb.Schema, error) {
name, err := s.parseSchemaName(req.Name)
if err != nil {
return nil, err
}
fqn := name.String()
obj := &pb.Schema{}
if err := s.storage.Get(ctx, fqn, obj); err != nil {
if status.Code(err) == codes.NotFound {
return nil, status.Errorf(codes.NotFound, "Resource not found (resource=%s).", name.String())
}
return nil, err
}
return obj, nil
}

func (s *schemaService) ListSchemas(ctx context.Context, req *pb.ListSchemasRequest) (*pb.ListSchemasResponse, error) {
project, err := s.Projects.GetProjectByID(req.Parent)
if err != nil {
return nil, err
}

findPrefix := fmt.Sprintf("projects/%v/", project.ID)

var schemas []*pb.Schema

schemaKind := (&pb.Schema{}).ProtoReflect().Descriptor()
if err := s.storage.List(ctx, schemaKind, storage.ListOptions{}, func(obj proto.Message) error {
schema := obj.(*pb.Schema)
if strings.HasPrefix(schema.GetName(), findPrefix) {
schemas = append(schemas, schema)
}
return nil
}); err != nil {
return nil, err
}

return &pb.ListSchemasResponse{
Schemas: schemas,
}, nil
}

func (s *schemaService) DeleteSchema(ctx context.Context, req *pb.DeleteSchemaRequest) (*empty.Empty, error) {
name, err := s.parseSchemaName(req.Name)
if err != nil {
return nil, err
}
fqn := name.String()
deletedObj := &pb.Schema{}
if err := s.storage.Delete(ctx, fqn, deletedObj); err != nil {
return nil, err
}
return &empty.Empty{}, nil
}

type schemaName struct {
Project *projects.ProjectData
ID string
}

func (n *schemaName) String() string {
return "projects/" + n.Project.ID + "/schemas/" + n.ID
}

// parseSchemaName parses a string into a schemaName.
// The expected form is `projects/*/schemas/*`.
func (s *MockService) parseSchemaName(name string) (*schemaName, error) {
tokens := strings.Split(name, "/")
if len(tokens) == 4 && tokens[0] == "projects" && tokens[2] == "schemas" {
project, err := s.Projects.GetProjectByID(tokens[1])
if err != nil {
return nil, err
}
name := &schemaName{
Project: project,
ID: tokens[3],
}
return name, nil
} else {
return nil, status.Errorf(codes.InvalidArgument, "name %q is not valid", name)
}
}
75 changes: 75 additions & 0 deletions mockgcp/mockpubsub/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mockpubsub

import (
"context"
"net/http"

"google.golang.org/grpc"

"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/common"
"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/common/httpmux"
"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/common/operations"
pb "github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/generated/mockgcp/pubsub/v1"
"github.com/GoogleCloudPlatform/k8s-config-connector/mockgcp/pkg/storage"
)

// MockService represents a mocked pubsub service.
type MockService struct {
*common.MockEnvironment
storage storage.Storage
operations *operations.Operations
}

// New creates a MockService.
func New(env *common.MockEnvironment, storage storage.Storage) *MockService {
s := &MockService{
MockEnvironment: env,
storage: storage,
operations: operations.NewOperationsService(storage),
}
return s
}

func (s *MockService) ExpectedHost() string {
return "pubsub.googleapis.com"
}

func (s *MockService) Register(grpcServer *grpc.Server) {
pb.RegisterPublisherServer(grpcServer, &publisherService{MockService: s})
// pb.RegisterSubscriberServer(grpcServer, &subscriberService{MockService: s})
pb.RegisterSchemaServiceServer(grpcServer, &schemaService{MockService: s})
}

func (s *MockService) NewHTTPMux(ctx context.Context, conn *grpc.ClientConn) (http.Handler, error) {
mux, err := httpmux.NewServeMux(ctx, conn, httpmux.Options{},
pb.RegisterPublisherHandler,
pb.RegisterSubscriberHandler,
pb.RegisterSchemaServiceHandler,
)
if err != nil {
return nil, err
}

// Returns slightly non-standard errors
mux.RewriteError = func(ctx context.Context, error *httpmux.ErrorResponse) {
if error.Code == 404 {
error.Errors = nil
}
}

return mux, nil
}
Loading

0 comments on commit 4534d3c

Please sign in to comment.