Skip to content

Commit

Permalink
feat: add client side statement parser (#38)
Browse files Browse the repository at this point in the history
* feat: add client side statement parser

* feat: statement parser

* feat: support PDML

* add tests for DML batches

* test: add more tests and documentation

* chore: rename Dml and Ddl to DML and DDL

* fix: add json file as string variable

* chore: remove unused file
  • Loading branch information
olavloite committed Oct 8, 2021
1 parent 2d698b7 commit 969bf52
Show file tree
Hide file tree
Showing 13 changed files with 2,152 additions and 42 deletions.
257 changes: 257 additions & 0 deletions aborted_transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,44 @@ func TestUpdateAborted(t *testing.T) {
}
}

func TestBatchUpdateAborted(t *testing.T) {
t.Parallel()

db, server, teardown := setupTestDBConnection(t)
defer teardown()

ctx := context.Background()
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
t.Fatalf("begin failed: %v", err)
}
server.TestSpanner.PutExecutionTime(testutil.MethodExecuteBatchDml, testutil.SimulatedExecutionTime{
Errors: []error{status.Error(codes.Aborted, "Aborted")},
})
if _, err := tx.ExecContext(ctx, "START BATCH DML"); err != nil {
t.Fatalf("start batch failed: %v", err)
}
if _, err := tx.ExecContext(ctx, testutil.UpdateBarSetFoo); err != nil {
t.Fatalf("update failed: %v", err)
}
if _, err := tx.ExecContext(ctx, "RUN BATCH"); err != nil {
t.Fatalf("run batch failed: %v", err)
}
err = tx.Commit()
if err != nil {
t.Fatalf("commit failed: %v", err)
}
reqs := drainRequestsFromServer(server.TestSpanner)
execReqs := requestsOfType(reqs, reflect.TypeOf(&sppb.ExecuteBatchDmlRequest{}))
if g, w := len(execReqs), 2; g != w {
t.Fatalf("batch request count mismatch\nGot: %v\nWant: %v", g, w)
}
commitReqs := requestsOfType(reqs, reflect.TypeOf(&sppb.CommitRequest{}))
if g, w := len(commitReqs), 1; g != w {
t.Fatalf("commit request count mismatch\nGot: %v\nWant: %v", g, w)
}
}

func TestQueryAborted(t *testing.T) {
testRetryReadWriteTransactionWithQueryWithRetrySuccess(t, func(server testutil.InMemSpannerServer) {
server.PutExecutionTime(testutil.MethodExecuteStreamingSql, testutil.SimulatedExecutionTime{
Expand Down Expand Up @@ -577,6 +615,58 @@ func TestSecondUpdateAborted(t *testing.T) {
}
}

func TestSecondBatchUpdateAborted(t *testing.T) {
t.Parallel()

db, server, teardown := setupTestDBConnection(t)
defer teardown()

ctx := context.Background()
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
t.Fatalf("begin failed: %v", err)
}
if _, err := tx.ExecContext(ctx, "START BATCH DML"); err != nil {
t.Fatalf("failed to start batch: %v", err)
}
if _, err := tx.ExecContext(ctx, testutil.UpdateSingersSetLastName); err != nil {
t.Fatalf("update singers failed: %v", err)
}
if _, err := tx.ExecContext(ctx, "RUN BATCH"); err != nil {
t.Fatalf("failed to run batch: %v", err)
}

server.TestSpanner.PutExecutionTime(testutil.MethodExecuteBatchDml, testutil.SimulatedExecutionTime{
Errors: []error{status.Error(codes.Aborted, "Aborted")},
})
if _, err := tx.ExecContext(ctx, "START BATCH DML"); err != nil {
t.Fatalf("failed to start batch: %v", err)
}
if _, err := tx.ExecContext(ctx, testutil.UpdateBarSetFoo); err != nil {
t.Fatalf("update bar failed: %v", err)
}
// This statement will return Aborted, the transaction will be retried internally and the statement is
// then executed once more and should return the correct value.
if _, err := tx.ExecContext(ctx, "RUN BATCH"); err != nil {
t.Fatalf("failed to run batch: %v", err)
}

if err := tx.Commit(); err != nil {
t.Fatalf("commit failed: %v", err)
}
reqs := drainRequestsFromServer(server.TestSpanner)
execReqs := requestsOfType(reqs, reflect.TypeOf(&sppb.ExecuteBatchDmlRequest{}))
// The server should receive 4 batch statements, as each update statement should
// be executed twice.
if g, w := len(execReqs), 4; g != w {
t.Fatalf("batch request count mismatch\nGot: %v\nWant: %v", g, w)
}
commitReqs := requestsOfType(reqs, reflect.TypeOf(&sppb.CommitRequest{}))
if g, w := len(commitReqs), 1; g != w {
t.Fatalf("commit request count mismatch\nGot: %v\nWant: %v", g, w)
}
}

func TestSecondUpdateAborted_FirstStatementWithSameError(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -707,6 +797,173 @@ func testSecondUpdateAborted_FirstResultChanged(t *testing.T, firstResult *testu
}
}

func TestBatchUpdateAbortedWithError(t *testing.T) {
t.Parallel()

db, server, teardown := setupTestDBConnection(t)
defer teardown()

// Make sure that one of the DML statements will return an error.
server.TestSpanner.PutStatementResult(testutil.UpdateSingersSetLastName, &testutil.StatementResult{
Type: testutil.StatementResultError,
Err: status.Error(codes.NotFound, "Table not found"),
})

ctx := context.Background()
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
t.Fatalf("begin failed: %v", err)
}
if _, err := tx.ExecContext(ctx, "START BATCH DML"); err != nil {
t.Fatalf("failed to start batch: %v", err)
}
if _, err := tx.ExecContext(ctx, testutil.UpdateBarSetFoo); err != nil {
t.Fatalf("dml statement failed: %v", err)
}
// This statement should fail with NotFound when the batch is executed.
// That will also be the result during the retry. Note that the error
// will not be returned now, but when the batch is executed.
if _, err = tx.ExecContext(ctx, testutil.UpdateSingersSetLastName); err != nil {
t.Fatalf("dml statement failed: %v", err)
}
// Note that even though Spanner returns the row count for a Batch DML that
// fails halfway, go/sql does not do that, so result will be nil for batch
// statements that fail. Internally, the driver still keeps track of the row
// counts that were returned, and uses that in the retry strategy.
if _, err := tx.ExecContext(ctx, "RUN BATCH"); spanner.ErrCode(err) != codes.NotFound {
t.Fatalf("error code mismatch\nGot: %v\nWant: %v", spanner.ErrCode(err), codes.NotFound)
}

// Abort the transaction. The internal retry should succeed as teh same error
// and the same row count is returned during the retry.
server.TestSpanner.PutExecutionTime(testutil.MethodCommitTransaction, testutil.SimulatedExecutionTime{
Errors: []error{status.Error(codes.Aborted, "Aborted")},
})
err = tx.Commit()
if err != nil {
t.Fatalf("commit failed: %v", err)
}
reqs := drainRequestsFromServer(server.TestSpanner)
execReqs := requestsOfType(reqs, reflect.TypeOf(&sppb.ExecuteBatchDmlRequest{}))
if g, w := len(execReqs), 2; g != w {
t.Fatalf("batch request count mismatch\nGot: %v\nWant: %v", g, w)
}
commitReqs := requestsOfType(reqs, reflect.TypeOf(&sppb.CommitRequest{}))
if g, w := len(commitReqs), 2; g != w {
t.Fatalf("commit request count mismatch\nGot: %v\nWant: %v", g, w)
}
}

func TestBatchUpdateAbortedWithError_DifferentRowCountDuringRetry(t *testing.T) {
t.Parallel()

db, server, teardown := setupTestDBConnection(t)
defer teardown()

// Make sure that one of the DML statements will return an error.
server.TestSpanner.PutStatementResult(testutil.UpdateSingersSetLastName, &testutil.StatementResult{
Type: testutil.StatementResultError,
Err: status.Error(codes.NotFound, "Table not found"),
})

ctx := context.Background()
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
t.Fatalf("begin failed: %v", err)
}
if _, err := tx.ExecContext(ctx, "START BATCH DML"); err != nil {
t.Fatalf("failed to start batch: %v", err)
}
if _, err := tx.ExecContext(ctx, testutil.UpdateBarSetFoo); err != nil {
t.Fatalf("dml statement failed: %v", err)
}
// This statement should fail with NotFound when the batch is executed.
// That will also be the result during the retry. Note that the error
// will not be returned now, but when the batch is executed.
if _, err = tx.ExecContext(ctx, testutil.UpdateSingersSetLastName); err != nil {
t.Fatalf("dml statement failed: %v", err)
}
if _, err := tx.ExecContext(ctx, "RUN BATCH"); spanner.ErrCode(err) != codes.NotFound {
t.Fatalf("error code mismatch\nGot: %v\nWant: %v", spanner.ErrCode(err), codes.NotFound)
}

// Change the returned row count of the first DML statement. This will cause
// the retry to fail.
server.TestSpanner.PutStatementResult(testutil.UpdateBarSetFoo, &testutil.StatementResult{
Type: testutil.StatementResultUpdateCount,
UpdateCount: testutil.UpdateBarSetFooRowCount + 1,
})
server.TestSpanner.PutExecutionTime(testutil.MethodCommitTransaction, testutil.SimulatedExecutionTime{
Errors: []error{status.Error(codes.Aborted, "Aborted")},
})
err = tx.Commit()
if err != ErrAbortedDueToConcurrentModification {
t.Fatalf("commit error mismatch\nGot: %v\nWant: %v", err, ErrAbortedDueToConcurrentModification)
}
reqs := drainRequestsFromServer(server.TestSpanner)
execReqs := requestsOfType(reqs, reflect.TypeOf(&sppb.ExecuteBatchDmlRequest{}))
if g, w := len(execReqs), 2; g != w {
t.Fatalf("batch request count mismatch\nGot: %v\nWant: %v", g, w)
}
commitReqs := requestsOfType(reqs, reflect.TypeOf(&sppb.CommitRequest{}))
// The commit should be attempted only once.
if g, w := len(commitReqs), 1; g != w {
t.Fatalf("commit request count mismatch\nGot: %v\nWant: %v", g, w)
}
}

func TestBatchUpdateAbortedWithError_DifferentErrorDuringRetry(t *testing.T) {
t.Parallel()

db, server, teardown := setupTestDBConnection(t)
defer teardown()

// Make sure that one of the DML statements will return an error.
server.TestSpanner.PutStatementResult(testutil.UpdateSingersSetLastName, &testutil.StatementResult{
Type: testutil.StatementResultError,
Err: status.Error(codes.NotFound, "Table not found"),
})

ctx := context.Background()
tx, err := db.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
t.Fatalf("begin failed: %v", err)
}
if _, err := tx.ExecContext(ctx, "START BATCH DML"); err != nil {
t.Fatalf("failed to start batch: %v", err)
}
if _, err = tx.ExecContext(ctx, testutil.UpdateSingersSetLastName); err != nil {
t.Fatalf("dml statement failed: %v", err)
}
if _, err := tx.ExecContext(ctx, "RUN BATCH"); spanner.ErrCode(err) != codes.NotFound {
t.Fatalf("error code mismatch\nGot: %v\nWant: %v", spanner.ErrCode(err), codes.NotFound)
}

// Remove the error for the DML statement and cause a retry. The missing
// error for the DML statement should fail the retry.
server.TestSpanner.PutStatementResult(testutil.UpdateSingersSetLastName, &testutil.StatementResult{
Type: testutil.StatementResultUpdateCount,
UpdateCount: testutil.UpdateSingersSetLastNameRowCount,
})
server.TestSpanner.PutExecutionTime(testutil.MethodCommitTransaction, testutil.SimulatedExecutionTime{
Errors: []error{status.Error(codes.Aborted, "Aborted")},
})
err = tx.Commit()
if err != ErrAbortedDueToConcurrentModification {
t.Fatalf("commit error mismatch\nGot: %v\nWant: %v", err, ErrAbortedDueToConcurrentModification)
}
reqs := drainRequestsFromServer(server.TestSpanner)
execReqs := requestsOfType(reqs, reflect.TypeOf(&sppb.ExecuteBatchDmlRequest{}))
if g, w := len(execReqs), 2; g != w {
t.Fatalf("batch request count mismatch\nGot: %v\nWant: %v", g, w)
}
commitReqs := requestsOfType(reqs, reflect.TypeOf(&sppb.CommitRequest{}))
// The commit should be attempted only once.
if g, w := len(commitReqs), 1; g != w {
t.Fatalf("commit request count mismatch\nGot: %v\nWant: %v", g, w)
}
}

func firstNonZero(values ...int) int {
for _, v := range values {
if v > 0 {
Expand Down
Loading

0 comments on commit 969bf52

Please sign in to comment.