Skip to content

Commit

Permalink
pgwire: get rid of conn.execCfg
Browse files Browse the repository at this point in the history
The reference from a connection to an ExecutorConfig is no longer
needed.

Release note: None
  • Loading branch information
andreimatei committed Nov 15, 2018
1 parent 941e8fe commit b2a3ee2
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 42 deletions.
2 changes: 1 addition & 1 deletion pkg/server/authentication.go
Expand Up @@ -265,7 +265,7 @@ func (s *authenticationServer) verifyPassword(
ctx context.Context, username string, password string,
) (bool, error) {
exists, hashedPassword, err := sql.GetUserHashedPassword(
ctx, s.server.execCfg, s.memMetrics, username,
ctx, s.server.execCfg.InternalExecutor, s.memMetrics, username,
)
if err != nil {
return false, err
Expand Down
43 changes: 9 additions & 34 deletions pkg/sql/pgwire/conn.go
Expand Up @@ -32,7 +32,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand Down Expand Up @@ -66,7 +65,6 @@ type conn struct {

sessionArgs sql.SessionArgs
resultsBufferBytes int64
execCfg *sql.ExecutorConfig
metrics *ServerMetrics

// rd is a buffered reader consuming conn. All reads from conn go through
Expand Down Expand Up @@ -96,27 +94,6 @@ type conn struct {
msgBuilder writeBuffer
}

// ATTENTION: After changing this value in a unit test, you probably want to
// open a new connection pool since the connections in the existing one are not
// affected.
//
// TODO(andrei): This setting is under "sql.defaults", but there's no way to
// control the setting on a per-connection basis. We should introduce a
// corresponding session variable.
var connResultsBufferSize = settings.RegisterByteSizeSetting(
"sql.defaults.conn_results_buffer_size",
"the size of each connection's results buffer. Within a query (or a batch "+
"of queries), CRDB buffers results until the buffer overflows, at which point "+
"the results are delivered to the client. Once any results are delivered, "+
"CRDB can no longer perform automatic retries for the transactions in "+
"question. On the other hand, increasing the buffer size can increase the "+
"latency the client perceives until the first result row for a query is "+
"delivered.\n"+
"Updating the setting only affects future connections.\n"+
"Setting to 0 disables any buffering.",
16<<10, // 16 KiB
)

// serveConn creates a conn that will serve the netConn. It returns once the
// network connection is closed.
//
Expand Down Expand Up @@ -159,11 +136,12 @@ func serveConn(
ctx context.Context,
netConn net.Conn,
sArgs sql.SessionArgs,
resultsBufferBytes int64,
metrics *ServerMetrics,
reserved mon.BoundAccount,
sqlServer *sql.Server,
draining func() bool,
execCfg *sql.ExecutorConfig,
ie *sql.InternalExecutor,
stopper *stop.Stopper,
insecure bool,
) error {
Expand All @@ -173,9 +151,9 @@ func serveConn(
log.Infof(ctx, "new connection with options: %+v", sArgs)
}

c := newConn(netConn, sArgs, metrics, execCfg, connResultsBufferSize.Get(&execCfg.Settings.SV))
c := newConn(netConn, sArgs, metrics, resultsBufferBytes)

if err := c.handleAuthentication(ctx, insecure); err != nil {
if err := c.handleAuthentication(ctx, insecure, ie); err != nil {
_ = c.conn.Close()
reserved.Close(ctx)
return err
Expand All @@ -187,17 +165,12 @@ func serveConn(
}

func newConn(
netConn net.Conn,
sArgs sql.SessionArgs,
metrics *ServerMetrics,
execCfg *sql.ExecutorConfig,
resultsBufferBytes int64,
netConn net.Conn, sArgs sql.SessionArgs, metrics *ServerMetrics, resultsBufferBytes int64,
) *conn {
c := &conn{
conn: netConn,
sessionArgs: sArgs,
resultsBufferBytes: resultsBufferBytes,
execCfg: execCfg,
metrics: metrics,
rd: *bufio.NewReader(netConn),
}
Expand Down Expand Up @@ -1289,7 +1262,9 @@ func (r *pgwireReader) ReadByte() (byte, error) {
// name, if different from the one given initially. Note: at this
// point the sql.Session does not exist yet! If need exists to access the
// database to look up authentication data, use the internal executor.
func (c *conn) handleAuthentication(ctx context.Context, insecure bool) error {
func (c *conn) handleAuthentication(
ctx context.Context, insecure bool, ie *sql.InternalExecutor,
) error {
sendError := func(err error) error {
_ /* err */ = writeErr(err, &c.msgBuilder, c.conn)
return err
Expand All @@ -1298,7 +1273,7 @@ func (c *conn) handleAuthentication(ctx context.Context, insecure bool) error {
// Check that the requested user exists and retrieve the hashed
// password in case password authentication is needed.
exists, hashedPassword, err := sql.GetUserHashedPassword(
ctx, c.execCfg, &c.metrics.SQLMemMetrics, c.sessionArgs.User,
ctx, ie, &c.metrics.SQLMemMetrics, c.sessionArgs.User,
)
if err != nil {
return sendError(err)
Expand Down
5 changes: 2 additions & 3 deletions pkg/sql/pgwire/conn_test.go
Expand Up @@ -332,8 +332,7 @@ func waitForClientConn(ln net.Listener) (*conn, error) {
}

metrics := makeServerMetrics(sql.MemoryMetrics{} /* sqlMemMetrics */, metric.TestSampleInterval)
pgwireConn := newConn(
conn, sql.SessionArgs{}, &metrics, &sql.ExecutorConfig{}, 16<<10 /* resultsBufferBytes */)
pgwireConn := newConn(conn, sql.SessionArgs{}, &metrics, 16<<10 /* resultsBufferBytes */)
return pgwireConn, nil
}

Expand Down Expand Up @@ -783,7 +782,7 @@ func TestMaliciousInputs(t *testing.T) {
metrics := makeServerMetrics(sqlMetrics, time.Second /* histogramWindow */)

conn := newConn(
r, sql.SessionArgs{}, &metrics, nil, /* execCfg */
r, sql.SessionArgs{}, &metrics,
// resultsBufferBytes - really small so that it overflows when we
// produce a few results.
10,
Expand Down
29 changes: 27 additions & 2 deletions pkg/sql/pgwire/server.go
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pkg/errors"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand All @@ -41,6 +42,27 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// ATTENTION: After changing this value in a unit test, you probably want to
// open a new connection pool since the connections in the existing one are not
// affected.
//
// TODO(andrei): This setting is under "sql.defaults", but there's no way to
// control the setting on a per-connection basis. We should introduce a
// corresponding session variable.
var connResultsBufferSize = settings.RegisterByteSizeSetting(
"sql.defaults.conn_results_buffer_size",
"the size of each connection's results buffer. Within a query (or a batch "+
"of queries), CRDB buffers results until the buffer overflows, at which point "+
"the results are delivered to the client. Once any results are delivered, "+
"CRDB can no longer perform automatic retries for the transactions in "+
"question. On the other hand, increasing the buffer size can increase the "+
"latency the client perceives until the first result row for a query is "+
"delivered.\n"+
"Updating the setting only affects future connections.\n"+
"Setting to 0 disables any buffering.",
16<<10, // 16 KiB
)

const (
// ErrSSLRequired is returned when a client attempts to connect to a
// secure server in cleartext.
Expand Down Expand Up @@ -452,8 +474,11 @@ func (s *Server) ServeConn(ctx context.Context, conn net.Conn) error {
return errors.Errorf("unable to pre-allocate %d bytes for this connection: %v",
baseSQLMemoryBudget, err)
}
return serveConn(ctx, conn, sArgs, &s.metrics, reserved, s.SQLServer,
s.IsDraining, s.execCfg, s.stopper, s.cfg.Insecure)
return serveConn(
ctx, conn, sArgs,
connResultsBufferSize.Get(&s.execCfg.Settings.SV),
&s.metrics, reserved, s.SQLServer,
s.IsDraining, s.execCfg.InternalExecutor, s.stopper, s.cfg.Insecure)
}

func parseOptions(ctx context.Context, data []byte) (sql.SessionArgs, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/user.go
Expand Up @@ -27,7 +27,7 @@ import (
// GetUserHashedPassword returns the hashedPassword for the given username if
// found in system.users.
func GetUserHashedPassword(
ctx context.Context, execCfg *ExecutorConfig, metrics *MemoryMetrics, username string,
ctx context.Context, ie *InternalExecutor, metrics *MemoryMetrics, username string,
) (bool, []byte, error) {
normalizedUsername := tree.Name(username).Normalize()
// Always return no password for the root user, even if someone manually inserts one.
Expand All @@ -37,7 +37,7 @@ func GetUserHashedPassword(

const getHashedPassword = `SELECT "hashedPassword" FROM system.users ` +
`WHERE username=$1 AND "isRole" = false`
values, err := execCfg.InternalExecutor.QueryRow(
values, err := ie.QueryRow(
ctx, "get-hashed-pwd", nil /* txn */, getHashedPassword, normalizedUsername)
if err != nil {
return false, nil, errors.Wrapf(err, "error looking up user %s", normalizedUsername)
Expand Down

0 comments on commit b2a3ee2

Please sign in to comment.