Skip to content

Commit

Permalink
adding observer to query (#1023)
Browse files Browse the repository at this point in the history
* introduced query observer

* PR Feedback

* TestObserve_Pagination

* TestObserve_Pagination fixed

* query.attempt removing unnecessary iter.pos

* Batch observation

* tidy up

* PR Feedback
  • Loading branch information
Javier Zunzunegui authored and Zariel committed Jan 11, 2018
1 parent 6c01199 commit 2f405b2
Show file tree
Hide file tree
Showing 8 changed files with 354 additions and 15 deletions.
3 changes: 2 additions & 1 deletion AUTHORS
Expand Up @@ -99,4 +99,5 @@ Ben Krebsbach <ben.krebsbach@gmail.com>
Vivian Mathews <vivian.mathews.3@gmail.com>
Sascha Steinbiss <satta@debian.org>
Seth Rosenblum <seth.t.rosenblum@gmail.com>
Luke Hines <lukehines@protonmail.com>
Javier Zunzunegui <javier.zunzunegui.b@gmail.com>
Luke Hines <lukehines@protonmail.com>
212 changes: 212 additions & 0 deletions cassandra_test.go
Expand Up @@ -5,6 +5,8 @@ package gocql
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"math"
"math/big"
Expand Down Expand Up @@ -185,6 +187,151 @@ func TestTracing(t *testing.T) {
}
}

func TestObserve(t *testing.T) {
session := createSession(t)
defer session.Close()

if err := createTable(session, `CREATE TABLE gocql_test.observe (id int primary key)`); err != nil {
t.Fatal("create:", err)
}

var (
observedErr error
observedKeyspace string
observedStmt string
)

const keyspace = "gocql_test"

resetObserved := func() {
observedErr = errors.New("placeholder only") // used to distinguish err=nil cases
observedKeyspace = ""
observedStmt = ""
}

observer := funcQueryObserver(func(ctx context.Context, o ObservedQuery) {
observedKeyspace = o.Keyspace
observedStmt = o.Statement
observedErr = o.Err
})

// select before inserted, will error but the reporting is err=nil as the query is valid
resetObserved()
var value int
if err := session.Query(`SELECT id FROM observe WHERE id = ?`, 43).Observer(observer).Scan(&value); err == nil {
t.Fatal("select: expected error")
} else if observedErr != nil {
t.Fatalf("select: observed error expected nil, got %q", observedErr)
} else if observedKeyspace != keyspace {
t.Fatal("select: unexpected observed keyspace", observedKeyspace)
} else if observedStmt != `SELECT id FROM observe WHERE id = ?` {
t.Fatal("select: unexpected observed stmt", observedStmt)
}

resetObserved()
if err := session.Query(`INSERT INTO observe (id) VALUES (?)`, 42).Observer(observer).Exec(); err != nil {
t.Fatal("insert:", err)
} else if observedErr != nil {
t.Fatal("insert:", observedErr)
} else if observedKeyspace != keyspace {
t.Fatal("insert: unexpected observed keyspace", observedKeyspace)
} else if observedStmt != `INSERT INTO observe (id) VALUES (?)` {
t.Fatal("insert: unexpected observed stmt", observedStmt)
}

resetObserved()
value = 0
if err := session.Query(`SELECT id FROM observe WHERE id = ?`, 42).Observer(observer).Scan(&value); err != nil {
t.Fatal("select:", err)
} else if value != 42 {
t.Fatalf("value: expected %d, got %d", 42, value)
} else if observedErr != nil {
t.Fatal("select:", observedErr)
} else if observedKeyspace != keyspace {
t.Fatal("select: unexpected observed keyspace", observedKeyspace)
} else if observedStmt != `SELECT id FROM observe WHERE id = ?` {
t.Fatal("select: unexpected observed stmt", observedStmt)
}

// also works from session observer
resetObserved()
oSession := createSession(t, func(config *ClusterConfig) { config.QueryObserver = observer })
if err := oSession.Query(`SELECT id FROM observe WHERE id = ?`, 42).Scan(&value); err != nil {
t.Fatal("select:", err)
} else if observedErr != nil {
t.Fatal("select:", err)
} else if observedKeyspace != keyspace {
t.Fatal("select: unexpected observed keyspace", observedKeyspace)
} else if observedStmt != `SELECT id FROM observe WHERE id = ?` {
t.Fatal("select: unexpected observed stmt", observedStmt)
}

// reports errors when the query is poorly formed
resetObserved()
value = 0
if err := session.Query(`SELECT id FROM unknown_table WHERE id = ?`, 42).Observer(observer).Scan(&value); err == nil {
t.Fatal("select: expecting error")
} else if observedErr == nil {
t.Fatal("select: expecting observed error")
} else if observedKeyspace != keyspace {
t.Fatal("select: unexpected observed keyspace", observedKeyspace)
} else if observedStmt != `SELECT id FROM unknown_table WHERE id = ?` {
t.Fatal("select: unexpected observed stmt", observedStmt)
}
}

func TestObserve_Pagination(t *testing.T) {
session := createSession(t)
defer session.Close()

if err := createTable(session, `CREATE TABLE gocql_test.observe2 (id int, PRIMARY KEY (id))`); err != nil {
t.Fatal("create:", err)
}

var observedRows int

resetObserved := func() {
observedRows = -1
}

observer := funcQueryObserver(func(ctx context.Context, o ObservedQuery) {
observedRows = o.Rows
})

// insert 100 entries, relevant for pagination
for i := 0; i < 50; i++ {
if err := session.Query(`INSERT INTO observe2 (id) VALUES (?)`, i).Exec(); err != nil {
t.Fatal("insert:", err)
}
}

resetObserved()

// read the 100 entries in paginated entries of size 10. Expecting 5 observations, each with 10 rows
scanner := session.Query(`SELECT id FROM observe2 LIMIT 100`).
Observer(observer).
PageSize(10).
Iter().Scanner()
for i := 0; i < 50; i++ {
if !scanner.Next() {
t.Fatalf("next: should still be true: %d", i)
}
if i%10 == 0 {
if observedRows != 10 {
t.Fatalf("next: expecting a paginated query with 10 entries, got: %d (%d)", observedRows, i)
}
} else if observedRows != -1 {
t.Fatalf("next: not expecting paginated query (-1 entries), got: %d", observedRows)
}

resetObserved()
}

if scanner.Next() {
t.Fatal("next: no more entries where expected")
}
}

func TestPaging(t *testing.T) {
session := createSession(t)
defer session.Close()
Expand Down Expand Up @@ -1644,6 +1791,71 @@ func TestBatchStats(t *testing.T) {
}
}

type funcBatchObserver func(context.Context, ObservedBatch)

func (f funcBatchObserver) ObserveBatch(ctx context.Context, o ObservedBatch) {
f(ctx, o)
}

func TestBatchObserve(t *testing.T) {
session := createSession(t)
defer session.Close()

if session.cfg.ProtoVersion == 1 {
t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
}

if err := createTable(session, `CREATE TABLE gocql_test.batch_observe_table (id int, other int, PRIMARY KEY (id))`); err != nil {
t.Fatal("create table:", err)
}

type observation struct {
observedErr error
observedKeyspace string
observedStmts []string
}

var observedBatch *observation

batch := NewBatch(LoggedBatch)
batch.Observer(funcBatchObserver(func(ctx context.Context, o ObservedBatch) {
if observedBatch != nil {
t.Fatal("batch observe called more than once")
}

observedBatch = &observation{
observedKeyspace: o.Keyspace,
observedStmts: o.Statements,
observedErr: o.Err,
}
}))
for i := 0; i < 100; i++ {
// hard coding 'i' into one of the values for better testing of observation
batch.Query(fmt.Sprintf(`INSERT INTO batch_observe_table (id,other) VALUES (?,%d)`, i), i)
}

if err := session.ExecuteBatch(batch); err != nil {
t.Fatal("execute batch:", err)
}
if observedBatch == nil {
t.Fatal("batch observation has not been called")
}
if len(observedBatch.observedStmts) != 100 {
t.Fatal("expecting 100 observed statements, got", len(observedBatch.observedStmts))
}
if observedBatch.observedErr != nil {
t.Fatal("not expecting to observe an error", observedBatch.observedErr)
}
if observedBatch.observedKeyspace != "gocql_test" {
t.Fatalf("expecting keyspace 'gocql_test', got %q", observedBatch.observedKeyspace)
}
for i, stmt := range observedBatch.observedStmts {
if stmt != fmt.Sprintf(`INSERT INTO batch_observe_table (id,other) VALUES (?,%d)`, i) {
t.Fatal("unexpected query", stmt)
}
}
}

//TestNilInQuery tests to see that a nil value passed to a query is handled by Cassandra
//TODO validate the nil value by reading back the nil. Need to fix Unmarshalling.
func TestNilInQuery(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions cluster.go
Expand Up @@ -115,6 +115,14 @@ type ClusterConfig struct {
// See https://issues.apache.org/jira/browse/CASSANDRA-10786
DisableSkipMetadata bool

// QueryObserver will set the provided query observer on all queries created from this session.
// Use it to collect metrics / stats from queries by providing an implementation of QueryObserver.
QueryObserver QueryObserver

// BatchObserver will set the provided batch observer on all queries created from this session.
// Use it to collect metrics / stats from batche queries by providing an implementation of BatchObserver.
BatchObserver BatchObserver

// internal config for testing
disableControlConn bool
}
Expand Down
2 changes: 1 addition & 1 deletion cluster_test.go
@@ -1,9 +1,9 @@
package gocql

import (
"net"
"testing"
"time"
"net"
)

func TestNewCluster_Defaults(t *testing.T) {
Expand Down
11 changes: 8 additions & 3 deletions common_test.go
Expand Up @@ -70,7 +70,7 @@ func createTable(s *Session, table string) error {
return nil
}

func createCluster() *ClusterConfig {
func createCluster(opts ...func(*ClusterConfig)) *ClusterConfig {
cluster := NewCluster(clusterHosts...)
cluster.ProtoVersion = *flagProto
cluster.CQLVersion = *flagCQL
Expand All @@ -90,6 +90,11 @@ func createCluster() *ClusterConfig {
}

cluster = addSslOptions(cluster)

for _, opt := range opts {
opt(cluster)
}

return cluster
}

Expand Down Expand Up @@ -140,8 +145,8 @@ func createSessionFromCluster(cluster *ClusterConfig, tb testing.TB) *Session {
return session
}

func createSession(tb testing.TB) *Session {
cluster := createCluster()
func createSession(tb testing.TB, opts ...func(config *ClusterConfig)) *Session {
cluster := createCluster(opts...)
return createSessionFromCluster(cluster, tb)
}

Expand Down
5 changes: 3 additions & 2 deletions query_executor.go
Expand Up @@ -6,7 +6,7 @@ import (

type ExecutableQuery interface {
execute(conn *Conn) *Iter
attempt(time.Duration)
attempt(keyspace string, end, start time.Time, iter *Iter)
retryPolicy() RetryPolicy
GetRoutingKey() ([]byte, error)
Keyspace() string
Expand All @@ -21,8 +21,9 @@ type queryExecutor struct {
func (q *queryExecutor) attemptQuery(qry ExecutableQuery, conn *Conn) *Iter {
start := time.Now()
iter := qry.execute(conn)
end := time.Now()

qry.attempt(time.Since(start))
qry.attempt(q.pool.keyspace, end, start, iter)

return iter
}
Expand Down

0 comments on commit 2f405b2

Please sign in to comment.