Skip to content

Commit

Permalink
GOCBC-483: Update transcoder API
Browse files Browse the repository at this point in the history
Motivation
----------
We should use composition for building the default transcoder with
a serializer. We also shouldn't expose the serializer from the
transcoder.

Changes
-------
Update the signature of NewDefaultTranscoder to take serializer
as a parameter. Update subdoc and services options to
take a serializer directly. Add a serializer to cluster and
options.

Change-Id: I518b41fc9b0c9be1709d95d39be276702a633507
Reviewed-on: http://review.couchbase.org/110355
Reviewed-by: Mike Goldsmith <goldsmith.mike@gmail.com>
Reviewed-by: Brett Lawson <brett19@gmail.com>
Tested-by: Charles Dixon <chvckd@gmail.com>
  • Loading branch information
chvck committed Jul 25, 2019
1 parent f51a5ce commit 5d837e1
Show file tree
Hide file tree
Showing 15 changed files with 63 additions and 68 deletions.
5 changes: 3 additions & 2 deletions analyticsquery_options.go
Expand Up @@ -24,8 +24,9 @@ type AnalyticsQueryOptions struct {
// Experimental: This API is subject to change at any time.
Deferred bool

// Serializer is used to deserialize each row in the result. This should be a JSON deserializer as results are JSON.
Serializer Serializer
// JSONSerializer is used to deserialize each row in the result. This should be a JSON deserializer as results are JSON.
// NOTE: if not set then query will always default to DefaultJSONSerializer.
Serializer JSONSerializer
}

func (opts *AnalyticsQueryOptions) toMap(statement string) (map[string]interface{}, error) {
Expand Down
1 change: 1 addition & 0 deletions bucket.go
Expand Up @@ -29,6 +29,7 @@ func newBucket(sb *stateBlock, bucketName string, opts BucketOptions) *Bucket {
client: sb.client,

Transcoder: sb.Transcoder,
Serializer: sb.Serializer,
},
}
}
Expand Down
12 changes: 10 additions & 2 deletions cluster.go
Expand Up @@ -38,7 +38,11 @@ type ClusterOptions struct {
SearchTimeout time.Duration
ManagementTimeout time.Duration
EnableTracing bool
Transcoder Transcoder
// Transcoder is used for trancoding data used in KV operations.
Transcoder Transcoder
// Serializer is used for deserialization of data used in query, analytics, view and search operations. This
// will default to DefaultJSONSerializer. NOTE: This is entirely independent of Transcoder.
Serializer JSONSerializer
}

// ClusterCloseOptions is the set of options available when disconnecting from a Cluster.
Expand Down Expand Up @@ -104,7 +108,10 @@ func Connect(connStr string, opts ClusterOptions) (*Cluster, error) {
searchTimeout = opts.SearchTimeout
}
if opts.Transcoder == nil {
opts.Transcoder = NewDefaultTranscoder()
opts.Transcoder = NewDefaultTranscoder(&DefaultJSONSerializer{})
}
if opts.Serializer == nil {
opts.Serializer = &DefaultJSONSerializer{}
}

cluster := &Cluster{
Expand All @@ -124,6 +131,7 @@ func Connect(connStr string, opts ClusterOptions) (*Cluster, error) {
DuraTimeout: 40000 * time.Millisecond,
DuraPollTimeout: 100 * time.Millisecond,
Transcoder: opts.Transcoder,
Serializer: opts.Serializer,
},

queryCache: make(map[string]*n1qlCache),
Expand Down
7 changes: 3 additions & 4 deletions cluster_analyticsquery.go
Expand Up @@ -77,7 +77,7 @@ type AnalyticsResults struct {
httpProvider httpProvider
ctx context.Context

serializer Serializer
serializer JSONSerializer
}

// Next assigns the next result from the results into the value pointer, returning whether the read was successful.
Expand Down Expand Up @@ -367,7 +367,7 @@ func (c *Cluster) analyticsQuery(ctx context.Context, statement string, opts *An
// TODO: clientcontextid?

if opts.Serializer == nil {
opts.Serializer = c.sb.Transcoder.Serializer()
opts.Serializer = c.sb.Serializer
}

var retries uint
Expand Down Expand Up @@ -398,8 +398,7 @@ func (c *Cluster) analyticsQuery(ctx context.Context, statement string, opts *An
}

func (c *Cluster) executeAnalyticsQuery(ctx context.Context, opts map[string]interface{},
provider httpProvider, cancel context.CancelFunc, serializer Serializer) (*AnalyticsResults, error) {

provider httpProvider, cancel context.CancelFunc, serializer JSONSerializer) (*AnalyticsResults, error) {
// priority is sent as a header not in the body
priority, priorityCastOK := opts["priority"].(int)
if priorityCastOK {
Expand Down
2 changes: 1 addition & 1 deletion cluster_analyticsquery_test.go
Expand Up @@ -374,7 +374,7 @@ func TestBasicAnalyticsQuerySerializer(t *testing.T) {

queryOptions := &AnalyticsQueryOptions{
PositionalParameters: []interface{}{"brewery"},
Serializer: &DefaultSerializer{},
Serializer: &DefaultJSONSerializer{},
}

statement := "select `beer-sample`.* from `beer-sample` WHERE `type` = ? ORDER BY brewery_id, name"
Expand Down
13 changes: 6 additions & 7 deletions cluster_query.go
Expand Up @@ -75,7 +75,7 @@ type QueryResults struct {
ctx context.Context
enhancedStatements bool

serializer Serializer
serializer JSONSerializer
}

// Next assigns the next result from the results into the value pointer, returning whether the read was successful.
Expand Down Expand Up @@ -356,7 +356,7 @@ func (c *Cluster) query(ctx context.Context, statement string, opts *QueryOption
}

if opts.Serializer == nil {
opts.Serializer = c.sb.Transcoder.Serializer()
opts.Serializer = c.sb.Serializer
}

var res *QueryResults
Expand All @@ -378,7 +378,7 @@ func (c *Cluster) query(ctx context.Context, statement string, opts *QueryOption
}

func (c *Cluster) doPreparedN1qlQuery(ctx context.Context, queryOpts map[string]interface{},
provider httpProvider, cancel context.CancelFunc, serializer Serializer) (*QueryResults, error) {
provider httpProvider, cancel context.CancelFunc, serializer JSONSerializer) (*QueryResults, error) {

if capabilitySupporter, ok := provider.(clusterCapabilityProvider); ok {
if !c.supportsEnhancedPreparedStatements() &&
Expand Down Expand Up @@ -449,7 +449,7 @@ func (c *Cluster) prepareN1qlQuery(ctx context.Context, opts map[string]interfac

// There's no need to pass cancel here, if there's an error then we'll cancel further up the stack
// and if there isn't then we run another query later where we will cancel
prepRes, err := c.doRetryableQuery(ctx, prepOpts, provider, nil, &DefaultSerializer{})
prepRes, err := c.doRetryableQuery(ctx, prepOpts, provider, nil, &DefaultJSONSerializer{})
if err != nil {
return nil, err
}
Expand All @@ -467,7 +467,7 @@ func (c *Cluster) prepareN1qlQuery(ctx context.Context, opts map[string]interfac
}

func (c *Cluster) doRetryableQuery(ctx context.Context, queryOpts map[string]interface{},
provider httpProvider, cancel context.CancelFunc, serializer Serializer) (*QueryResults, error) {
provider httpProvider, cancel context.CancelFunc, serializer JSONSerializer) (*QueryResults, error) {
var res *QueryResults
var err error
var retries uint
Expand Down Expand Up @@ -507,8 +507,7 @@ type n1qlPrepData struct {
// settings. This function will inject any additional connection or request-level
// settings into the `opts` map.
func (c *Cluster) executeN1qlQuery(ctx context.Context, opts map[string]interface{},
provider httpProvider, cancel context.CancelFunc, endpoint string, serializer Serializer) (*QueryResults, error) {

provider httpProvider, cancel context.CancelFunc, endpoint string, serializer JSONSerializer) (*QueryResults, error) {
reqJSON, err := json.Marshal(opts)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal query request body")
Expand Down
3 changes: 2 additions & 1 deletion cluster_query_test.go
Expand Up @@ -1377,7 +1377,8 @@ func testGetClusterForHTTP(provider *mockHTTPProvider, n1qlTimeout, analyticsTim
c.sb.AnalyticsTimeout = analyticsTimeout
c.sb.SearchTimeout = searchTimeout

c.sb.Transcoder = NewDefaultTranscoder()
c.sb.Transcoder = NewDefaultTranscoder(&DefaultJSONSerializer{})
c.sb.Serializer = &DefaultJSONSerializer{}

return c
}
24 changes: 12 additions & 12 deletions collection_subdoc.go
Expand Up @@ -22,7 +22,7 @@ type LookupInOptions struct {
Context context.Context
Timeout time.Duration
WithExpiry bool
Transcoder Transcoder
Serializer JSONSerializer
}

// LookupInSpecGetOptions are the options available to LookupIn subdoc Get operations.
Expand Down Expand Up @@ -171,9 +171,9 @@ func (c *Collection) lookupIn(ctx context.Context, key string, ops []LookupInOp,
return nil, errors.New("too many lookupIn ops specified, maximum 16")
}

transcoder := opts.Transcoder
if transcoder == nil {
transcoder = c.sb.Transcoder
serializer := opts.Serializer
if serializer == nil {
serializer = &DefaultJSONSerializer{}
}

ctrl := c.newOpManager(ctx)
Expand All @@ -191,7 +191,7 @@ func (c *Collection) lookupIn(ctx context.Context, key string, ops []LookupInOp,

if res != nil {
resSet := &LookupInResult{}
resSet.serializer = transcoder.Serializer()
resSet.serializer = serializer
resSet.cas = Cas(res.Cas)
resSet.contents = make([]lookupInPartial, len(subdocs))

Expand Down Expand Up @@ -255,12 +255,12 @@ type MutateInOptions struct {
DurabilityLevel DurabilityLevel
InsertDocument bool
UpsertDocument bool
Transcoder Transcoder
Serializer JSONSerializer
// Internal: This should never be used and is not supported.
AccessDeleted bool
}

func (c *Collection) encodeMultiArray(in interface{}, serializer Serializer) ([]byte, error) {
func (c *Collection) encodeMultiArray(in interface{}, serializer JSONSerializer) ([]byte, error) {
out, err := serializer.Serialize(in)
if err != nil {
return nil, err
Expand Down Expand Up @@ -710,9 +710,9 @@ func (c *Collection) mutate(ctx context.Context, key string, ops []MutateInOp, o
flags |= SubdocDocFlagAccessDeleted
}

transcoder := opts.Transcoder
if transcoder == nil {
transcoder = c.sb.Transcoder
serializer := opts.Serializer
if serializer == nil {
serializer = &DefaultJSONSerializer{}
}

var subdocs []gocbcore.SubDocOp
Expand All @@ -730,9 +730,9 @@ func (c *Collection) mutate(ctx context.Context, key string, ops []MutateInOp, o
var marshaled []byte
var err error
if op.op.MultiValue {
marshaled, err = c.encodeMultiArray(op.op.Value, transcoder.Serializer())
marshaled, err = c.encodeMultiArray(op.op.Value, serializer)
} else {
marshaled, err = transcoder.Serializer().Serialize(op.op.Value)
marshaled, err = serializer.Serialize(op.op.Value)
}
if err != nil {
return nil, err
Expand Down
5 changes: 3 additions & 2 deletions queryoptions.go
Expand Up @@ -52,8 +52,9 @@ type QueryOptions struct {
// Custom allows specifying custom query options.
Custom map[string]interface{}

// Serializer is used to deserialize each row in the result. This should be a JSON deserializer as results are JSON.
Serializer Serializer
// JSONSerializer is used to deserialize each row in the result. This should be a JSON deserializer as results are JSON.
// NOTE: if not set then query will always default to DefaultJSONSerializer.
Serializer JSONSerializer
}

func (opts *QueryOptions) toMap(statement string) (map[string]interface{}, error) {
Expand Down
4 changes: 2 additions & 2 deletions results.go
Expand Up @@ -191,7 +191,7 @@ func (d *GetResult) set(paths []subdocPath, content interface{}, value interface
// LookupInResult is the return type for LookupIn.
type LookupInResult struct {
Result
serializer Serializer
serializer JSONSerializer
contents []lookupInPartial
pathMap map[string]int
}
Expand All @@ -201,7 +201,7 @@ type lookupInPartial struct {
err error
}

func (pr *lookupInPartial) as(valuePtr interface{}, serializer Serializer) error {
func (pr *lookupInPartial) as(valuePtr interface{}, serializer JSONSerializer) error {
if pr.err != nil {
return pr.err
}
Expand Down
8 changes: 4 additions & 4 deletions results_test.go
Expand Up @@ -61,7 +61,7 @@ func TestGetResultContent(t *testing.T) {

res := GetResult{
contents: dataset,
transcoder: NewDefaultTranscoder(),
transcoder: NewDefaultTranscoder(&DefaultJSONSerializer{}),
}

var doc testBeerDocument
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestGetResultFromSubDoc(t *testing.T) {

results := &LookupInResult{
contents: make([]lookupInPartial, 3),
serializer: NewDefaultTranscoder().Serializer(),
serializer: &DefaultJSONSerializer{},
}

var err error
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestGetResultFromSubDoc(t *testing.T) {
Address address `json:"address"`
}
var doc person
getResult := GetResult{transcoder: NewDefaultTranscoder()}
getResult := GetResult{transcoder: NewDefaultTranscoder(&DefaultJSONSerializer{})}
err = getResult.fromSubDoc([]LookupInOp{
{op: ops[0]},
{op: ops[1]},
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestLookupInResultContentAt(t *testing.T) {
err: errors.New("error"),
},
},
serializer: NewDefaultTranscoder().Serializer(),
serializer: &DefaultJSONSerializer{},
}

var name string
Expand Down
1 change: 1 addition & 0 deletions stateblock.go
Expand Up @@ -45,6 +45,7 @@ type stateBlock struct {
client func(*clientStateBlock) client

Transcoder Transcoder
Serializer JSONSerializer
}

func (sb *stateBlock) getCachedClient() client {
Expand Down
36 changes: 10 additions & 26 deletions transcoding.go
Expand Up @@ -14,16 +14,10 @@ type Transcoder interface {

// Encodes a Go type into bytes for storage.
Encode(interface{}) ([]byte, uint32, error)

// SetSerializer sets the Serializer to be used by the Transcoder.
SetSerializer(serializer Serializer)

// Serializer returns the Serializer being used by the Transcoder.
Serializer() Serializer
}

// Serializer is used a Transcoder for serialization/deserialization of JSON datatype values.
type Serializer interface {
// JSONSerializer is used a Transcoder for serialization/deserialization of JSON datatype values.
type JSONSerializer interface {
// Serialize serializes an interface into bytes.
Serialize(value interface{}) ([]byte, error)

Expand All @@ -34,13 +28,13 @@ type Serializer interface {
// DefaultTranscoder implements the default transcoding behaviour of
// all Couchbase SDKs.
type DefaultTranscoder struct {
serializer Serializer
serializer JSONSerializer
}

// NewDefaultTranscoder returns a new DefaultTranscoder initialized to use DefaultSerializer.
func NewDefaultTranscoder() *DefaultTranscoder {
// NewDefaultTranscoder returns a new DefaultTranscoder initialized to use DefaultSerializer.
func NewDefaultTranscoder(serializer JSONSerializer) *DefaultTranscoder {
return &DefaultTranscoder{
serializer: &DefaultSerializer{},
serializer: &DefaultJSONSerializer{},
}
}

Expand Down Expand Up @@ -121,22 +115,12 @@ func (t *DefaultTranscoder) Encode(value interface{}) ([]byte, uint32, error) {
return bytes, flags, nil
}

// Serializer returns the current serializer being used by the Transcoder.
func (t *DefaultTranscoder) Serializer() Serializer {
return t.serializer
}

// SetSerializer set the current serializer to be used by the Transcoder.
func (t *DefaultTranscoder) SetSerializer(serializer Serializer) {
t.serializer = serializer
}

// DefaultSerializer implements the Serializer interface using json.Marshal/Unmarshal.
type DefaultSerializer struct {
// DefaultJSONSerializer implements the JSONSerializer interface using json.Marshal/Unmarshal.
type DefaultJSONSerializer struct {
}

// Serialize applies the json.Marshal behaviour to serialize a Go type
func (s *DefaultSerializer) Serialize(value interface{}) ([]byte, error) {
func (s *DefaultJSONSerializer) Serialize(value interface{}) ([]byte, error) {
bytes, err := json.Marshal(value)
if err != nil {
return nil, err
Expand All @@ -146,7 +130,7 @@ func (s *DefaultSerializer) Serialize(value interface{}) ([]byte, error) {
}

// Deserialize applies the json.Unmarshal behaviour to deserialize into a Go type
func (s *DefaultSerializer) Deserialize(bytes []byte, out interface{}) error {
func (s *DefaultJSONSerializer) Deserialize(bytes []byte, out interface{}) error {
err := json.Unmarshal(bytes, &out)
if err != nil {
return err
Expand Down

0 comments on commit 5d837e1

Please sign in to comment.