/
frontend.go
131 lines (118 loc) · 2.73 KB
/
frontend.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package pgproto3
import (
"encoding/binary"
"io"
"github.com/jackc/pgx/chunkreader"
"github.com/pkg/errors"
)
type Frontend struct {
cr *chunkreader.ChunkReader
w io.Writer
// Backend message flyweights
authentication Authentication
backendKeyData BackendKeyData
bindComplete BindComplete
closeComplete CloseComplete
commandComplete CommandComplete
copyBothResponse CopyBothResponse
copyData CopyData
copyInResponse CopyInResponse
copyOutResponse CopyOutResponse
copyDone CopyDone
copyFail CopyFail
dataRow DataRow
emptyQueryResponse EmptyQueryResponse
errorResponse ErrorResponse
functionCallResponse FunctionCallResponse
noData NoData
noticeResponse NoticeResponse
notificationResponse NotificationResponse
parameterDescription ParameterDescription
parameterStatus ParameterStatus
parseComplete ParseComplete
readyForQuery ReadyForQuery
rowDescription RowDescription
portalSuspended PortalSuspended
bodyLen int
msgType byte
partialMsg bool
}
func NewFrontend(r io.Reader, w io.Writer) (*Frontend, error) {
cr := chunkreader.NewChunkReader(r)
return &Frontend{cr: cr, w: w}, nil
}
func (b *Frontend) Send(msg FrontendMessage) error {
_, err := b.w.Write(msg.Encode(nil))
return err
}
func (b *Frontend) Receive() (BackendMessage, error) {
if !b.partialMsg {
header, err := b.cr.Next(5)
if err != nil {
return nil, err
}
b.msgType = header[0]
b.bodyLen = int(binary.BigEndian.Uint32(header[1:])) - 4
b.partialMsg = true
}
var msg BackendMessage
switch b.msgType {
case '1':
msg = &b.parseComplete
case '2':
msg = &b.bindComplete
case '3':
msg = &b.closeComplete
case 'A':
msg = &b.notificationResponse
case 'c':
msg = &b.copyDone
case 'f':
msg = &b.copyFail
case 'C':
msg = &b.commandComplete
case 'd':
msg = &b.copyData
case 'D':
msg = &b.dataRow
case 'E':
msg = &b.errorResponse
case 'G':
msg = &b.copyInResponse
case 'H':
msg = &b.copyOutResponse
case 'I':
msg = &b.emptyQueryResponse
case 'K':
msg = &b.backendKeyData
case 'n':
msg = &b.noData
case 'N':
msg = &b.noticeResponse
case 'R':
msg = &b.authentication
case 'S':
msg = &b.parameterStatus
case 't':
msg = &b.parameterDescription
case 'T':
msg = &b.rowDescription
case 'V':
msg = &b.functionCallResponse
case 'W':
msg = &b.copyBothResponse
case 'Z':
msg = &b.readyForQuery
case 's':
msg = &b.portalSuspended
default:
return nil, errors.Errorf("unknown message type: %c", b.msgType)
}
msgBody, err := b.cr.Next(b.bodyLen)
if err != nil {
return nil, err
}
b.partialMsg = false
err = msg.Decode(msgBody)
return msg, err
}