Skip to content
Permalink
Browse files
feat(pubsublite): Move internal implementation details to internal/wi…
…re subpackage (#3123)

This makes it possible for the bulk of the Lite library internal implementation to be used from different packages.
Some cleanups so that the wire package compiles.
pubsublite.Message was deleted and the file was renamed to message_router.go.
  • Loading branch information
tmdiep committed Nov 6, 2020
1 parent 8c135e0 commit ed3fd1aed7dbc9396aecc70622ccfd302bbb4265
@@ -16,6 +16,7 @@ package pubsublite
import (
"context"

"cloud.google.com/go/pubsublite/internal/wire"
"google.golang.org/api/option"

vkit "cloud.google.com/go/pubsublite/apiv1"
@@ -34,10 +35,10 @@ type AdminClient struct {
// See https://cloud.google.com/pubsub/lite/docs/locations for the list of
// regions and zones where Cloud Pub/Sub Lite is available.
func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*AdminClient, error) {
if err := validateRegion(region); err != nil {
if err := wire.ValidateRegion(region); err != nil {
return nil, err
}
admin, err := newAdminClient(ctx, region, opts...)
admin, err := wire.NewAdminClient(ctx, region, opts...)
if err != nil {
return nil, err
}
@@ -0,0 +1,17 @@
# Wire

This directory contains internal implementation details for Cloud Pub/Sub Lite.
Its exported interface can change at any time.

## Conventions

The following are general conventions used in this package:

* Capitalized methods and fields of a struct denotes its public interface. They
are safe to call from outside the struct (e.g. accesses immutable fields or
guarded by a mutex). All other methods are considered internal implementation
details that should not be called from outside the struct.
* unsafeFoo() methods indicate that the caller is expected to have already
acquired the struct's mutex. Since Go does not support re-entrant locks, they
do not acquire the mutex. These are typically common util methods that need
to be atomic with other operations.
@@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package pubsublite
package wire

import "errors"

@@ -11,66 +11,14 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package pubsublite
package wire

import (
"crypto/sha256"
"fmt"
"math/big"
"math/rand"
"time"

"github.com/golang/protobuf/ptypes"

pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

// AttributeValues is a slice of strings.
type AttributeValues [][]byte

// Message represents a Pub/Sub message.
type Message struct {
// Data is the actual data in the message.
Data []byte

// Attributes can be used to label the message. A key may have multiple
// values.
Attributes map[string]AttributeValues

// EventTime is an optional, user-specified event time for this message.
EventTime time.Time

// OrderingKey identifies related messages for which publish order should
// be respected. Messages with the same ordering key are published to the
// same topic partition and subscribers will receive the messages in order.
// If the ordering key is empty, the message will be sent to an arbitrary
// partition.
OrderingKey []byte
}

func (m *Message) toProto() (*pb.PubSubMessage, error) {
msgpb := &pb.PubSubMessage{
Data: m.Data,
Key: m.OrderingKey,
}

if len(m.Attributes) > 0 {
msgpb.Attributes = make(map[string]*pb.AttributeValues)
for key, values := range m.Attributes {
msgpb.Attributes[key] = &pb.AttributeValues{Values: values}
}
}

if !m.EventTime.IsZero() {
ts, err := ptypes.TimestampProto(m.EventTime)
if err != nil {
return nil, fmt.Errorf("pubsublite: error converting message timestamp: %v", err)
}
msgpb.EventTime = ts
}
return msgpb, nil
}

// messageRouter outputs a partition number, given an ordering key. Results are
// undefined when:
// - setPartitionCount() is called with count <= 0.
@@ -11,27 +11,16 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package pubsublite
package wire

import (
"fmt"
"math/rand"
"testing"
"time"

"github.com/golang/protobuf/proto"

tspb "github.com/golang/protobuf/ptypes/timestamp"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
"cloud.google.com/go/pubsublite/internal/test"
)

type fakeSource struct {
ret int64
}

func (f *fakeSource) Int63() int64 { return f.ret }
func (f *fakeSource) Seed(seed int64) {}

type fakeMsgRouter struct {
multiplier int
partitionCount int
@@ -45,67 +34,10 @@ func (f *fakeMsgRouter) Route(orderingKey []byte) int {
return f.partitionCount * f.multiplier
}

func TestMessageToProto(t *testing.T) {
for _, tc := range []struct {
desc string
msg *Message
want *pb.PubSubMessage
}{
{
desc: "valid: minimal",
msg: &Message{
Data: []byte("Hello world"),
},
want: &pb.PubSubMessage{
Data: []byte("Hello world"),
},
},
{
desc: "valid: filled",
msg: &Message{
Data: []byte("foo"),
Attributes: map[string]AttributeValues{
"attr1": [][]byte{
[]byte("val1"),
[]byte("val2"),
},
},
EventTime: time.Unix(1555593697, 154358*1000),
OrderingKey: []byte("order"),
},
want: &pb.PubSubMessage{
Data: []byte("foo"),
Attributes: map[string]*pb.AttributeValues{
"attr1": {
Values: [][]byte{
[]byte("val1"),
[]byte("val2"),
},
},
},
EventTime: &tspb.Timestamp{
Seconds: 1555593697,
Nanos: 154358 * 1000,
},
Key: []byte("order"),
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
got, err := tc.msg.toProto()
if err != nil {
t.Errorf("toProto() err = %v", err)
} else if !proto.Equal(got, tc.want) {
t.Errorf("toProto() got = %v\nwant = %v", got, tc.want)
}
})
}
}

func TestRoundRobinMsgRouter(t *testing.T) {
// Using the same msgRouter for each test run ensures that it reinitializes
// when the partition count changes.
source := &fakeSource{}
source := &test.FakeSource{}
msgRouter := &roundRobinMsgRouter{rng: rand.New(source)}

for _, tc := range []struct {
@@ -125,7 +57,7 @@ func TestRoundRobinMsgRouter(t *testing.T) {
},
} {
t.Run(fmt.Sprintf("partitionCount=%d", tc.partitionCount), func(t *testing.T) {
source.ret = tc.source
source.Ret = tc.source
msgRouter.SetPartitionCount(tc.partitionCount)
for i, want := range tc.want {
got := msgRouter.Route([]byte("IGNORED"))
@@ -0,0 +1,51 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package wire

import (
"fmt"
"strings"
)

// ValidateZone verifies that the `input` string has the format of a valid
// Google Cloud zone. An example zone is "europe-west1-b".
// See https://cloud.google.com/compute/docs/regions-zones for more information.
func ValidateZone(input string) error {
parts := strings.Split(input, "-")
if len(parts) != 3 {
return fmt.Errorf("pubsublite: invalid zone %q", input)
}
return nil
}

// ValidateRegion verifies that the `input` string has the format of a valid
// Google Cloud region. An example region is "europe-west1".
// See https://cloud.google.com/compute/docs/regions-zones for more information.
func ValidateRegion(input string) error {
parts := strings.Split(input, "-")
if len(parts) != 2 {
return fmt.Errorf("pubsublite: invalid region %q", input)
}
return nil
}

type topicPartition struct {
Path string
Partition int
}

type subscriptionPartition struct {
Path string
Partition int
}
@@ -0,0 +1,78 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package wire

import "testing"

func TestValidateZone(t *testing.T) {
for _, tc := range []struct {
desc string
input string
wantErr bool
}{
{
desc: "valid",
input: "us-central1-a",
wantErr: false,
},
{
desc: "invalid: insufficient dashes",
input: "us-central1",
wantErr: true,
},
{
desc: "invalid: excess dashes",
input: "us-central1-a-b",
wantErr: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
err := ValidateZone(tc.input)
if (err != nil) != tc.wantErr {
t.Errorf("ValidateZone(%q) = %v, want err=%v", tc.input, err, tc.wantErr)
}
})
}
}

func TestValidateRegion(t *testing.T) {
for _, tc := range []struct {
desc string
input string
wantErr bool
}{
{
desc: "valid",
input: "europe-west1",
wantErr: false,
},
{
desc: "invalid: insufficient dashes",
input: "europewest1",
wantErr: true,
},
{
desc: "invalid: excess dashes",
input: "europe-west1-b",
wantErr: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
err := ValidateRegion(tc.input)
if (err != nil) != tc.wantErr {
t.Errorf("ValidateRegion(%q) = %v, want err=%v", tc.input, err, tc.wantErr)
}
})
}
}

0 comments on commit ed3fd1a

Please sign in to comment.