forked from mongodb/mongo-go-driver
-
Notifications
You must be signed in to change notification settings - Fork 0
/
reply.go
149 lines (132 loc) · 4.09 KB
/
reply.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package wiremessage
import (
"errors"
"fmt"
"strings"
"github.com/mongodb/mongo-go-driver/bson"
)
// Reply represents the OP_REPLY message of the MongoDB wire protocol.
type Reply struct {
MsgHeader Header
ResponseFlags ReplyFlag
CursorID int64
StartingFrom int32
NumberReturned int32
Documents []bson.Reader
}
// MarshalWireMessage implements the Marshaler and WireMessage interfaces.
//
// See AppendWireMessage for a description of the rules this method follows.
func (r Reply) MarshalWireMessage() ([]byte, error) {
b := make([]byte, 0, r.Len())
return r.AppendWireMessage(b)
}
// ValidateWireMessage implements the Validator and WireMessage interfaces.
func (r Reply) ValidateWireMessage() error {
if int(r.MsgHeader.MessageLength) != r.Len() {
return errors.New("incorrect header: message length is not correct")
}
if r.MsgHeader.OpCode != OpReply {
return errors.New("incorrect header: op code is not OpReply")
}
return nil
}
// AppendWireMessage implements the Appender and WireMessage interfaces.
//
// AppendWireMessage will set the MessageLength property of the MsgHeader
// if it is zero. It will also set the OpCode to OpQuery if the OpCode is
// zero. If either of these properties are non-zero and not correct, this
// method will return both the []byte with the wire message appended to it
// and an invalid header error.
func (r Reply) AppendWireMessage(b []byte) ([]byte, error) {
var err error
err = r.MsgHeader.SetDefaults(r.Len(), OpReply)
b = r.MsgHeader.AppendHeader(b)
b = appendInt32(b, int32(r.ResponseFlags))
b = appendInt64(b, r.CursorID)
b = appendInt32(b, r.StartingFrom)
b = appendInt32(b, r.NumberReturned)
for _, d := range r.Documents {
b = append(b, d...)
}
return b, err
}
// String implements the fmt.Stringer interface.
func (r Reply) String() string {
return fmt.Sprintf(
`OP_REPLY{MsgHeader: %s, ResponseFlags: %s, CursorID: %d, StartingFrom: %d, NumberReturned: %d, Documents: %v}`,
r.MsgHeader, r.ResponseFlags, r.CursorID, r.StartingFrom, r.NumberReturned, r.Documents,
)
}
// Len implements the WireMessage interface.
func (r Reply) Len() int {
// Header + Flags + CursorID + StartingFrom + NumberReturned + Length of Length of Documents
docsLen := 0
for _, d := range r.Documents {
docsLen += len(d)
}
return 16 + 4 + 8 + 4 + 4 + docsLen
}
// UnmarshalWireMessage implements the Unmarshaler interface.
func (r *Reply) UnmarshalWireMessage(b []byte) error {
var err error
r.MsgHeader, err = ReadHeader(b, 0)
if err != nil {
return err
}
if r.MsgHeader.MessageLength < 36 {
return errors.New("invalid OP_REPLY: header length too small")
}
if len(b) < int(r.MsgHeader.MessageLength) {
return errors.New("invalid OP_REPLY: []byte too small")
}
r.ResponseFlags = ReplyFlag(readInt32(b, 16))
r.CursorID = readInt64(b, 20)
r.StartingFrom = readInt32(b, 28)
r.NumberReturned = readInt32(b, 32)
pos := 36
for pos < len(b) {
rdr, size, err := readDocument(b, int32(pos))
if err.Message != "" {
err.Type = ErrOpReply
return err
}
r.Documents = append(r.Documents, rdr)
pos += size
}
return nil
}
// ReplyFlag represents the flags of an OP_REPLY message.
type ReplyFlag int32
// These constants represent the individual flags of an OP_REPLY message.
const (
CursorNotFound ReplyFlag = 1 << iota
QueryFailure
ShardConfigStale
AwaitCapable
)
// String implements the fmt.Stringer interface.
func (rf ReplyFlag) String() string {
strs := make([]string, 0)
if rf&CursorNotFound == CursorNotFound {
strs = append(strs, "CursorNotFound")
}
if rf&QueryFailure == QueryFailure {
strs = append(strs, "QueryFailure")
}
if rf&ShardConfigStale == ShardConfigStale {
strs = append(strs, "ShardConfigStale")
}
if rf&AwaitCapable == AwaitCapable {
strs = append(strs, "AwaitCapable")
}
str := "["
str += strings.Join(strs, ", ")
str += "]"
return str
}