Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mysql: Batch select queries when required. #1605

Merged
merged 6 commits into from
Dec 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions politeiad/backendv2/tstorebe/store/mysql/encrypt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,14 @@ package mysql
import (
"bytes"
"testing"

"github.com/decred/politeia/util"
)

func TestEncryptDecrypt(t *testing.T) {
password := "passwordsosikrit"
blob := []byte("encryptmeyo")

// setup fake context
s := &mysql{
testing: true,
}
s.argon2idKey(password, util.NewArgon2Params())
// Setup a mysql struct
s, cleanup := newTestMySQL(t)
defer cleanup()

// Encrypt and make sure cleartext isn't the same as the encypted blob.
eb, err := s.encrypt(nil, nil, blob)
Expand Down
177 changes: 126 additions & 51 deletions politeiad/backendv2/tstorebe/store/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ import (
"context"
"database/sql"
"fmt"
"strings"
"sync/atomic"
"time"

"github.com/DATA-DOG/go-sqlmock"
"github.com/decred/politeia/politeiad/backendv2/tstorebe/store"
"github.com/decred/politeia/util"
"github.com/pkg/errors"

// MySQL driver.
_ "github.com/go-sql-driver/mysql"
Expand All @@ -28,6 +31,11 @@ const (
// Database table names
tableNameKeyValue = "kv"
tableNameNonce = "nonce"

// maxPlaceholders is the maximum number of placeholders, "(?, ?, ?)", that
// can be used in a prepared statement. MySQL uses an uint16 for this, so
// the limit is the the maximum value of an uint16.
maxPlaceholders = 65535
)

// tableKeyValue defines the key-value table.
Expand All @@ -50,7 +58,10 @@ type mysql struct {
shutdown uint64
db *sql.DB
key [32]byte
testing bool // Only set during unit tests

// The following fields are only used during unit tests.
testing bool
mock sqlmock.Sqlmock
}

func ctxWithTimeout() (context.Context, func()) {
Expand Down Expand Up @@ -201,66 +212,51 @@ func (s *mysql) Get(keys []string) (map[string][]byte, error) {
return nil, store.ErrShutdown
}

ctx, cancel := ctxWithTimeout()
defer cancel()
// Build the select statements
statements := buildSelectStatements(keys, maxPlaceholders)

// Build query. A placeholder parameter (?) is required for each
// key being requested.
//
// Ex 3 keys: "SELECT k, v FROM kv WHERE k IN (?, ?, ?)"
sql := "SELECT k, v FROM kv WHERE k IN ("
for i := 0; i < len(keys); i++ {
sql += "?"
// Don't add a comma on the last one
if i < len(keys)-1 {
sql += ","
}
}
sql += ");"

log.Tracef("%v", sql)
log.Debugf("Get %v blobs using %v prepared statements",
len(keys), len(statements))

// The keys must be converted to []interface{} for the query method
// to accept them.
args := make([]interface{}, len(keys))
for i, v := range keys {
args[i] = v
}
// Execute the statements
reply := make(map[string][]byte, len(keys))
for i, e := range statements {
log.Debugf("Executing select statement %v/%v", i+1, len(statements))

// Get blobs
rows, err := s.db.QueryContext(ctx, sql, args...)
if err != nil {
return nil, fmt.Errorf("query: %v", err)
}
defer rows.Close()
ctx, cancel := ctxWithTimeout()
defer cancel()

reply := make(map[string][]byte, len(keys))
for rows.Next() {
var k string
var v []byte
err = rows.Scan(&k, &v)
rows, err := s.db.QueryContext(ctx, e.Query, e.Args...)
if err != nil {
return nil, fmt.Errorf("scan: %v", err)
return nil, errors.WithStack(err)
}
reply[k] = v
}
err = rows.Err()
if err != nil {
return nil, fmt.Errorf("next: %v", err)
}
defer rows.Close()

// Decrypt data blobs
for k, v := range reply {
encrypted := isEncrypted(v)
log.Tracef("Blob is encrypted: %v", encrypted)
if !encrypted {
continue
// Unpack the reply
for rows.Next() {
var k string
var v []byte
err = rows.Scan(&k, &v)
if err != nil {
return nil, errors.WithStack(err)
}

// Decrypt the blob if required
if isEncrypted(v) {
log.Tracef("Encrypted blob: %v", k)
v, _, err = s.decrypt(v)
if err != nil {
return nil, err
}
}

// Save the blob
reply[k] = v
}
b, _, err := s.decrypt(v)
err = rows.Err()
if err != nil {
return nil, fmt.Errorf("decrypt: %v", err)
return nil, errors.WithStack(err)
}
reply[k] = b
}

return reply, nil
Expand All @@ -279,6 +275,85 @@ func (s *mysql) Close() {
s.db.Close()
}

// selectStatement contains the query string and arguments for a SELECT
// statement.
type selectStatement struct {
Query string
Args []interface{}
}

// buildSelectStatements builds the SELECT statements that can be executed
// against the MySQL key-value store. The maximum number of records that will
// be retrieved in any individual SELECT statement is determined by the size
// argument. The keys are split up into multiple statements if they exceed this
// limit.
func buildSelectStatements(keys []string, size int) []selectStatement {
statements := make([]selectStatement, 0, (len(keys)/size)+1)
var startIdx int
for startIdx < len(keys) {
// Find the end index
endIdx := startIdx + size
if endIdx > len(keys) {
// We've reached the end of the slice
endIdx = len(keys)
}

// startIdx is included. endIdx is excluded.
statementKeys := keys[startIdx:endIdx]

// Build the query
q := buildSelectQuery(len(statementKeys))
log.Tracef("%v", q)

// Convert the keys to interfaces. The sql query
// methods require arguments be interfaces.
args := make([]interface{}, len(statementKeys))
for i, v := range statementKeys {
args[i] = v
}

// Save the statement
statements = append(statements, selectStatement{
Query: q,
Args: args,
})

// Update the start index
startIdx = endIdx
}

return statements
}

// buildSelectQuery returns a query string for the MySQL key-value store.
//
// Example: "SELECT k, v FROM kv WHERE k IN (?,?);"
func buildSelectQuery(placeholders int) string {
return fmt.Sprintf("SELECT k, v FROM kv WHERE k IN %v;",
buildPlaceholders(placeholders))
}

// buildPlaceholders builds and returns a parameter placeholder string with the
// specified number of placeholders.
//
// Input: 1 Output: "(?)"
// Input: 3 Output: "(?,?,?)"
func buildPlaceholders(placeholders int) string {
var b strings.Builder

b.WriteString("(")
for i := 0; i < placeholders; i++ {
b.WriteString("?")
// Don't add a comma on the last one
if i < placeholders-1 {
b.WriteString(",")
}
}
b.WriteString(")")

return b.String()
}

// New connects to a mysql instance using the given connection params,
// and returns pointer to the created mysql struct.
func New(host, user, password, dbname string) (*mysql, error) {
Expand Down
Loading