Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First steps towards implementing the PostgreSQL protocol. #2491

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/gossip"
"github.com/cockroachdb/cockroach/proto"
"github.com/cockroachdb/cockroach/security"
"github.com/cockroachdb/cockroach/sql"
"github.com/cockroachdb/cockroach/storage"
"github.com/cockroachdb/cockroach/storage/engine"
"github.com/cockroachdb/cockroach/ts"
Expand Down Expand Up @@ -131,6 +132,11 @@ func (ts *TestServer) EventFeed() *util.Feed {
return nil
}

// SQLExecutor returns the sql.Executor used by the TestServer.
func (ts *TestServer) SQLExecutor() sql.Executor {
return ts.Server.sqlServer.Executor
}

// Start starts the TestServer by bootstrapping an in-memory store
// (defaults to maximum of 100M). The server is started, launching the
// node RPC server and all HTTP endpoints. Use the value of
Expand Down
31 changes: 31 additions & 0 deletions sql/pgwire/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2015 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
//
// Author: Ben Darnell

package pgwire

import (
"github.com/cockroachdb/cockroach/base"
"github.com/cockroachdb/cockroach/sql"
"github.com/cockroachdb/cockroach/util/stop"
)

// Context holds parameters needed to setup a postgres-compatible server.
type Context struct {
*base.Context
Executor sql.Executor
Stopper *stop.Stopper
}
98 changes: 98 additions & 0 deletions sql/pgwire/encoding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2015 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
//
// Author: Ben Darnell

package pgwire

import (
"encoding/binary"
"io"

"github.com/cockroachdb/cockroach/util"
)

const maxMessageSize = 1 << 24

// bufferedReader is implemented by bufio.Reader and bytes.Buffer.
type bufferedReader interface {
io.Reader
ReadString(delim byte) (string, error)
ReadByte() (byte, error)
}

// readMsg reads a length-prefixed message. It is only used directly
// during the authentication phase of the protocol; readTypedMsg is
// used at all other times.
func readUntypedMsg(rd io.Reader) ([]byte, error) {
var size int32
if err := binary.Read(rd, binary.BigEndian, &size); err != nil {
return nil, err
}
// size includes itself.
size -= 4
if size > maxMessageSize || size < 0 {
return nil, util.Errorf("message size %d out of bounds (0..%d)",
size, maxMessageSize)
}
data := make([]byte, size)
if _, err := io.ReadFull(rd, data); err != nil {
return nil, err
}
return data, nil
}

// readTypedMsg reads a message, returning its type code and body.
func readTypedMsg(rd bufferedReader) (messageType, []byte, error) {
typ, err := rd.ReadByte()
if err != nil {
return 0, nil, err
}
msg, err := readUntypedMsg(rd)
return messageType(typ), msg, err
}

// writeTypedMsg writes a message with the given type.
func writeTypedMsg(w io.Writer, typ messageType, msg []byte) error {
if _, err := w.Write([]byte{byte(typ)}); err != nil {
return err
}
if err := binary.Write(w, binary.BigEndian, int32(len(msg)+4)); err != nil {
return nil
}
_, err := w.Write(msg)
return err
}

// readString reads a null-terminated string.
func readString(rd bufferedReader) (string, error) {
s, err := rd.ReadString(0)
if err != nil {
return "", err
}
// Chop off the trailing delimiter.
return s[:len(s)-1], nil
}

// writeString writes a null-terminated string.
func writeString(w io.Writer, s string) error {
if _, err := io.WriteString(w, s); err != nil {
return err
}
if _, err := w.Write([]byte{0}); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You seem to always be passing a bytes.Buffer to writeString which supports a WriteByte method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe for writeString, but other write methods are used directly on a net.Conn. WriteByte seems like a small convenience to narrow the inputs to this method.

return err
}
return nil
}
36 changes: 36 additions & 0 deletions sql/pgwire/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2015 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
//
// Author: Ben Darnell

package pgwire

import (
"testing"

"github.com/cockroachdb/cockroach/security"
"github.com/cockroachdb/cockroach/security/securitytest"
"github.com/cockroachdb/cockroach/util/leaktest"
)

func init() {
security.SetReadFileFn(securitytest.Asset)
}

//go:generate ../../util/leaktest/add-leaktest.sh *_test.go

func TestMain(m *testing.M) {
leaktest.TestMainWithLeakCheck(m)
}
122 changes: 122 additions & 0 deletions sql/pgwire/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2015 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License. See the AUTHORS file
// for names of contributors.
//
// Author: Ben Darnell

package pgwire

import (
"bytes"
"net"

"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/log"
)

var (
versionSSL = []byte{0x12, 0x34, 0x56, 0x79}
version30 = []byte{0x00, 0x03, 0x00, 0x00}
)

// Server implements the server side of the PostgreSQL wire protocol.
type Server struct {
context *Context
listener net.Listener
conns []net.Conn
}

// NewServer creates a Server.
func NewServer(context *Context) *Server {
s := &Server{
context: context,
}
return s
}

// Start a server on the given address.
func (s *Server) Start(addr net.Addr) error {
ln, err := net.Listen(addr.Network(), addr.String())
if err != nil {
return err
}
s.listener = ln

s.context.Stopper.RunWorker(func() {
s.serve(ln)
})

s.context.Stopper.RunWorker(func() {
<-s.context.Stopper.ShouldStop()
s.close()
})
return nil
}

// Addr returns this Server's address.
func (s *Server) Addr() net.Addr {
return s.listener.Addr()
}

// serve connections on this listener until it is closed.
func (s *Server) serve(ln net.Listener) {
for {
conn, err := ln.Accept()
if err != nil {
// TODO(bdarnell): error handling
log.Error(err)
return
}

s.conns = append(s.conns, conn)
go func() {
if err := s.serveConn(conn); err != nil {
log.Error(err)
}
}()
}
}

// close this server, and all client connections.
func (s *Server) close() {
s.listener.Close()
for _, conn := range s.conns {
conn.Close()
}
}

// serveConn serves a single connection, driving the handshake process
// and delegating to the appropriate connection type.
func (s *Server) serveConn(conn net.Conn) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you be doing something like defer conn.Close() here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably. This could use some work across the board to make sure we handle connection closing (on both sides) gracefully.

defer conn.Close()
msg, err := readUntypedMsg(conn)
if err != nil {
return err
}
version := msg[:4]
rest := msg[4:]
if bytes.Compare(version, versionSSL) == 0 {
if len(rest) > 0 {
return util.Errorf("unexpected data after SSL request")
}
panic("TODO(bdarnell): ssl mode")
} else if bytes.Compare(version, version30) == 0 {
v3conn, err := newV3Conn(conn, rest, s.context.Executor)
if err != nil {
return err
}
return v3conn.serve()
}
return util.Errorf("unknown protocol version %q", version)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to return this error across the connection? Or is that not feasible since we don't support the protocol version. Just closing the connection seems a bit abrupt.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's not much we can do - the client could be speaking an older or newer version of the postgres protocol, or something else entirely. Anything we try to write here will be interpreted in unpredictable ways.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the postgres v3 protocol arrived with postgres 7.4 which was release 10/4/2010. We probably don't have anything to worry about.

}
Loading