diff --git a/plc4go/spi/transactions/RequestTransaction.go b/plc4go/spi/transactions/RequestTransaction.go new file mode 100644 index 00000000000..de8c5bed842 --- /dev/null +++ b/plc4go/spi/transactions/RequestTransaction.go @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package transactions + +import ( + "context" + "fmt" + "github.com/apache/plc4x/plc4go/spi/pool" + "github.com/pkg/errors" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "sync" + "time" +) + +// RequestTransaction represents a transaction +type RequestTransaction interface { + fmt.Stringer + // FailRequest signals that this transaction has failed + FailRequest(err error) error + // EndRequest signals that this transaction is done + EndRequest() error + // Submit submits a RequestTransactionRunnable to the RequestTransactionManager + Submit(operation RequestTransactionRunnable) + // AwaitCompletion wait for this RequestTransaction to finish. Returns an error if it finished unsuccessful + AwaitCompletion(ctx context.Context) error + // IsCompleted indicates that the that this RequestTransaction is completed + IsCompleted() bool +} + +/////////////////////////////////////// +/////////////////////////////////////// +// +// Internal section +// + +type requestTransaction struct { + parent *requestTransactionManager + transactionId int32 + + /** The initial operation to perform to kick off the request */ + operation pool.Runnable + completionFuture pool.CompletionFuture + + stateChangeMutex sync.Mutex + completed bool + + transactionLog zerolog.Logger +} + +// +// Internal section +// +/////////////////////////////////////// +/////////////////////////////////////// + +func (t *requestTransaction) FailRequest(err error) error { + t.stateChangeMutex.Lock() + defer t.stateChangeMutex.Unlock() + if t.completed { + return errors.Wrap(err, "calling fail on a already completed transaction") + } + t.transactionLog.Trace().Msg("Fail the request") + t.completed = true + return t.parent.failRequest(t, err) +} + +func (t *requestTransaction) EndRequest() error { + t.stateChangeMutex.Lock() + defer t.stateChangeMutex.Unlock() + if t.completed { + return errors.New("calling end on a already completed transaction") + } + t.transactionLog.Trace().Msg("Ending the request") + t.completed = true + // Remove it from Running Requests + return t.parent.endRequest(t) +} + +func (t *requestTransaction) Submit(operation RequestTransactionRunnable) { + t.stateChangeMutex.Lock() + defer t.stateChangeMutex.Unlock() + if t.completed { + t.transactionLog.Warn().Msg("calling submit on a already completed transaction") + return + } + if t.operation != nil { + t.transactionLog.Warn().Msg("Operation already set") + } + t.transactionLog.Trace().Msgf("Submission of transaction %d", t.transactionId) + t.operation = func() { + t.transactionLog.Trace().Msgf("Start execution of transaction %d", t.transactionId) + operation(t) + t.transactionLog.Trace().Msgf("Completed execution of transaction %d", t.transactionId) + } + t.parent.submitTransaction(t) +} + +func (t *requestTransaction) AwaitCompletion(ctx context.Context) error { + timeout, cancelFunc := context.WithTimeout(ctx, time.Minute*30) // This is intentionally set very high + defer cancelFunc() + for t.completionFuture == nil { + time.Sleep(time.Millisecond * 10) + if err := timeout.Err(); err != nil { + log.Error().Msg("Timout after a long time. This means something is very of here") + return errors.Wrap(err, "Error waiting for completion future to be set") + } + } + if err := t.completionFuture.AwaitCompletion(ctx); err != nil { + return err + } + stillActive := true + for stillActive { + stillActive = false + for _, runningRequest := range t.parent.runningRequests { + if runningRequest.transactionId == t.transactionId { + stillActive = true + break + } + } + } + return nil +} + +func (t *requestTransaction) IsCompleted() bool { + return t.completed +} + +func (t *requestTransaction) String() string { + return fmt.Sprintf("Transaction{tid:%d}", t.transactionId) +} diff --git a/plc4go/spi/transactions/RequestTransactionManager.go b/plc4go/spi/transactions/RequestTransactionManager.go index c06e0de71e4..8a93dc6f874 100644 --- a/plc4go/spi/transactions/RequestTransactionManager.go +++ b/plc4go/spi/transactions/RequestTransactionManager.go @@ -22,7 +22,6 @@ package transactions import ( "container/list" "context" - "fmt" "github.com/apache/plc4x/plc4go/spi/options" "github.com/apache/plc4x/plc4go/spi/pool" "io" @@ -46,21 +45,6 @@ func init() { type RequestTransactionRunnable func(RequestTransaction) -// RequestTransaction represents a transaction -type RequestTransaction interface { - fmt.Stringer - // FailRequest signals that this transaction has failed - FailRequest(err error) error - // EndRequest signals that this transaction is done - EndRequest() error - // Submit submits a RequestTransactionRunnable to the RequestTransactionManager - Submit(operation RequestTransactionRunnable) - // AwaitCompletion wait for this RequestTransaction to finish. Returns an error if it finished unsuccessful - AwaitCompletion(ctx context.Context) error - // IsCompleted indicates that the that this RequestTransaction is completed - IsCompleted() bool -} - // RequestTransactionManager handles transactions type RequestTransactionManager interface { io.Closer @@ -109,20 +93,6 @@ type withCustomExecutor struct { executor pool.Executor } -type requestTransaction struct { - parent *requestTransactionManager - transactionId int32 - - /** The initial operation to perform to kick off the request */ - operation pool.Runnable - completionFuture pool.CompletionFuture - - stateChangeMutex sync.Mutex - completed bool - - transactionLog zerolog.Logger -} - type requestTransactionManager struct { runningRequests []*requestTransaction // How many transactions are allowed to run at the same time? @@ -189,18 +159,6 @@ func (r *requestTransactionManager) processWorklog() { } } -type completedFuture struct { - err error -} - -func (c completedFuture) AwaitCompletion(_ context.Context) error { - return c.err -} - -func (completedFuture) Cancel(_ bool, _ error) { - // No op -} - func (r *requestTransactionManager) StartTransaction() RequestTransaction { r.transactionMutex.Lock() defer r.transactionMutex.Unlock() @@ -288,74 +246,3 @@ func (r *requestTransactionManager) CloseGraceful(timeout time.Duration) error { r.runningRequests = nil return r.executor.Close() } - -func (t *requestTransaction) FailRequest(err error) error { - t.stateChangeMutex.Lock() - defer t.stateChangeMutex.Unlock() - if t.completed { - return errors.Wrap(err, "calling fail on a already completed transaction") - } - t.transactionLog.Trace().Msg("Fail the request") - t.completed = true - return t.parent.failRequest(t, err) -} - -func (t *requestTransaction) EndRequest() error { - t.stateChangeMutex.Lock() - defer t.stateChangeMutex.Unlock() - if t.completed { - return errors.New("calling end on a already completed transaction") - } - t.transactionLog.Trace().Msg("Ending the request") - t.completed = true - // Remove it from Running Requests - return t.parent.endRequest(t) -} - -func (t *requestTransaction) Submit(operation RequestTransactionRunnable) { - t.stateChangeMutex.Lock() - defer t.stateChangeMutex.Unlock() - if t.completed { - t.transactionLog.Warn().Msg("calling submit on a already completed transaction") - return - } - if t.operation != nil { - t.transactionLog.Warn().Msg("Operation already set") - } - t.transactionLog.Trace().Msgf("Submission of transaction %d", t.transactionId) - t.operation = func() { - t.transactionLog.Trace().Msgf("Start execution of transaction %d", t.transactionId) - operation(t) - t.transactionLog.Trace().Msgf("Completed execution of transaction %d", t.transactionId) - } - t.parent.submitTransaction(t) -} - -func (t *requestTransaction) AwaitCompletion(ctx context.Context) error { - for t.completionFuture == nil { - time.Sleep(time.Millisecond * 10) - // TODO: this should timeout and not loop infinite... - } - if err := t.completionFuture.AwaitCompletion(ctx); err != nil { - return err - } - stillActive := true - for stillActive { - stillActive = false - for _, runningRequest := range t.parent.runningRequests { - if runningRequest.transactionId == t.transactionId { - stillActive = true - break - } - } - } - return nil -} - -func (t *requestTransaction) IsCompleted() bool { - return t.completed -} - -func (t *requestTransaction) String() string { - return fmt.Sprintf("Transaction{tid:%d}", t.transactionId) -} diff --git a/plc4go/spi/transactions/RequestTransactionManager_test.go b/plc4go/spi/transactions/RequestTransactionManager_test.go index 7384f878713..4445edad1e3 100644 --- a/plc4go/spi/transactions/RequestTransactionManager_test.go +++ b/plc4go/spi/transactions/RequestTransactionManager_test.go @@ -26,10 +26,8 @@ import ( "github.com/apache/plc4x/plc4go/spi/options" "github.com/apache/plc4x/plc4go/spi/pool" "github.com/apache/plc4x/plc4go/spi/testutils" - "github.com/pkg/errors" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" "testing" "time" ) @@ -453,292 +451,6 @@ func Test_requestTransactionManager_submitTransaction(t *testing.T) { } } -func Test_requestTransaction_AwaitCompletion(t1 *testing.T) { - type fields struct { - parent *requestTransactionManager - transactionId int32 - operation pool.Runnable - completionFuture pool.CompletionFuture - transactionLog zerolog.Logger - } - type args struct { - ctx context.Context - } - tests := []struct { - name string - fields fields - args args - mockSetup func(t *testing.T, fields *fields, args *args) - wantErr bool - }{ - { - name: "just wait", - fields: fields{ - parent: &requestTransactionManager{ - runningRequests: []*requestTransaction{ - func() *requestTransaction { - r := &requestTransaction{} - go func() { - time.Sleep(100 * time.Millisecond) - // We fake an ending transaction like that - r.transactionId = 1 - }() - return r - }(), - }, - }, - }, - args: args{ - ctx: func() context.Context { - ctx, cancelFunc := context.WithCancel(context.Background()) - cancelFunc() - return ctx - }(), - }, - mockSetup: func(t *testing.T, fields *fields, args *args) { - completionFuture := NewMockCompletionFuture(t) - expect := completionFuture.EXPECT() - expect.AwaitCompletion(mock.Anything).Return(nil) - fields.completionFuture = completionFuture - }, - }, - } - for _, tt := range tests { - t1.Run(tt.name, func(t1 *testing.T) { - if tt.mockSetup != nil { - tt.mockSetup(t1, &tt.fields, &tt.args) - } - t := &requestTransaction{ - parent: tt.fields.parent, - transactionId: tt.fields.transactionId, - operation: tt.fields.operation, - completionFuture: tt.fields.completionFuture, - transactionLog: tt.fields.transactionLog, - } - if err := t.AwaitCompletion(tt.args.ctx); (err != nil) != tt.wantErr { - t1.Errorf("AwaitCompletion() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} - -func Test_requestTransaction_EndRequest(t1 *testing.T) { - type fields struct { - parent *requestTransactionManager - transactionId int32 - operation pool.Runnable - completionFuture pool.CompletionFuture - transactionLog zerolog.Logger - completed bool - } - tests := []struct { - name string - fields fields - wantErr bool - }{ - { - name: "just end it", - fields: fields{ - parent: &requestTransactionManager{}, - }, - wantErr: true, - }, - { - name: "end it completed", - fields: fields{ - parent: &requestTransactionManager{}, - completed: true, - }, - wantErr: true, - }, - } - for _, tt := range tests { - t1.Run(tt.name, func(t1 *testing.T) { - t := &requestTransaction{ - parent: tt.fields.parent, - transactionId: tt.fields.transactionId, - operation: tt.fields.operation, - completionFuture: tt.fields.completionFuture, - transactionLog: tt.fields.transactionLog, - completed: tt.fields.completed, - } - if err := t.EndRequest(); (err != nil) != tt.wantErr { - t1.Errorf("EndRequest() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} - -func Test_requestTransaction_FailRequest(t1 *testing.T) { - type fields struct { - parent *requestTransactionManager - transactionId int32 - operation pool.Runnable - completionFuture pool.CompletionFuture - transactionLog zerolog.Logger - completed bool - } - type args struct { - err error - } - tests := []struct { - name string - fields fields - args args - mockSetup func(t *testing.T, fields *fields, args *args) - wantErr assert.ErrorAssertionFunc - }{ - { - name: "just fail it", - fields: fields{ - parent: &requestTransactionManager{}, - }, - mockSetup: func(t *testing.T, fields *fields, args *args) { - completionFuture := NewMockCompletionFuture(t) - expect := completionFuture.EXPECT() - expect.Cancel(true, nil).Return() - fields.completionFuture = completionFuture - }, - wantErr: assert.Error, - }, - { - name: "just fail it (completed)", - args: args{ - err: errors.New("nope"), - }, - fields: fields{ - parent: &requestTransactionManager{}, - completed: true, - }, - wantErr: assert.Error, - }, - } - for _, tt := range tests { - t1.Run(tt.name, func(t *testing.T) { - if tt.mockSetup != nil { - tt.mockSetup(t, &tt.fields, &tt.args) - } - r := &requestTransaction{ - parent: tt.fields.parent, - transactionId: tt.fields.transactionId, - operation: tt.fields.operation, - completionFuture: tt.fields.completionFuture, - transactionLog: tt.fields.transactionLog, - completed: tt.fields.completed, - } - tt.wantErr(t, r.FailRequest(tt.args.err), "FailRequest() error = %v", tt.args.err) - }) - } -} - -func Test_requestTransaction_String(t1 *testing.T) { - type fields struct { - parent *requestTransactionManager - transactionId int32 - operation pool.Runnable - completionFuture pool.CompletionFuture - transactionLog zerolog.Logger - } - tests := []struct { - name string - fields fields - want string - }{ - { - name: "give a string", - want: "Transaction{tid:0}", - }, - } - for _, tt := range tests { - t1.Run(tt.name, func(t1 *testing.T) { - t := &requestTransaction{ - parent: tt.fields.parent, - transactionId: tt.fields.transactionId, - operation: tt.fields.operation, - completionFuture: tt.fields.completionFuture, - transactionLog: tt.fields.transactionLog, - } - if got := t.String(); got != tt.want { - t1.Errorf("String() = %v, want %v", got, tt.want) - } - }) - } -} - -func Test_requestTransaction_Submit(t1 *testing.T) { - type fields struct { - parent *requestTransactionManager - transactionId int32 - operation pool.Runnable - completionFuture pool.CompletionFuture - transactionLog zerolog.Logger - completed bool - } - type args struct { - operation RequestTransactionRunnable - } - tests := []struct { - name string - fields fields - args args - }{ - { - name: "submit something", - fields: fields{ - parent: &requestTransactionManager{}, - }, - args: args{ - operation: func(_ RequestTransaction) { - // NOOP - }, - }, - }, - { - name: "submit something again", - fields: fields{ - parent: &requestTransactionManager{}, - operation: func() { - // NOOP - }, - }, - args: args{ - operation: func(_ RequestTransaction) { - // NOOP - }, - }, - }, - { - name: "submit completed", - fields: fields{ - parent: &requestTransactionManager{}, - operation: func() { - // NOOP - }, - completed: true, - }, - args: args{ - operation: func(_ RequestTransaction) { - // NOOP - }, - }, - }, - } - for _, tt := range tests { - t1.Run(tt.name, func(t1 *testing.T) { - t := &requestTransaction{ - parent: tt.fields.parent, - transactionId: tt.fields.transactionId, - operation: tt.fields.operation, - completionFuture: tt.fields.completionFuture, - transactionLog: tt.fields.transactionLog, - completed: tt.fields.completed, - } - t.Submit(tt.args.operation) - t.operation() - }) - } -} - func Test_requestTransactionManager_Close(t *testing.T) { type fields struct { runningRequests []*requestTransaction diff --git a/plc4go/spi/transactions/RequestTransaction_test.go b/plc4go/spi/transactions/RequestTransaction_test.go new file mode 100644 index 00000000000..5dad61f345c --- /dev/null +++ b/plc4go/spi/transactions/RequestTransaction_test.go @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package transactions + +import ( + "context" + "testing" + "time" + + "github.com/apache/plc4x/plc4go/spi/pool" + + "github.com/pkg/errors" + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func Test_requestTransaction_EndRequest(t1 *testing.T) { + type fields struct { + parent *requestTransactionManager + transactionId int32 + operation pool.Runnable + completionFuture pool.CompletionFuture + transactionLog zerolog.Logger + completed bool + } + tests := []struct { + name string + fields fields + wantErr bool + }{ + { + name: "just end it", + fields: fields{ + parent: &requestTransactionManager{}, + }, + wantErr: true, + }, + { + name: "end it completed", + fields: fields{ + parent: &requestTransactionManager{}, + completed: true, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t1.Run(tt.name, func(t1 *testing.T) { + t := &requestTransaction{ + parent: tt.fields.parent, + transactionId: tt.fields.transactionId, + operation: tt.fields.operation, + completionFuture: tt.fields.completionFuture, + transactionLog: tt.fields.transactionLog, + completed: tt.fields.completed, + } + if err := t.EndRequest(); (err != nil) != tt.wantErr { + t1.Errorf("EndRequest() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func Test_requestTransaction_FailRequest(t1 *testing.T) { + type fields struct { + parent *requestTransactionManager + transactionId int32 + operation pool.Runnable + completionFuture pool.CompletionFuture + transactionLog zerolog.Logger + completed bool + } + type args struct { + err error + } + tests := []struct { + name string + fields fields + args args + mockSetup func(t *testing.T, fields *fields, args *args) + wantErr assert.ErrorAssertionFunc + }{ + { + name: "just fail it", + fields: fields{ + parent: &requestTransactionManager{}, + }, + mockSetup: func(t *testing.T, fields *fields, args *args) { + completionFuture := NewMockCompletionFuture(t) + expect := completionFuture.EXPECT() + expect.Cancel(true, nil).Return() + fields.completionFuture = completionFuture + }, + wantErr: assert.Error, + }, + { + name: "just fail it (completed)", + args: args{ + err: errors.New("nope"), + }, + fields: fields{ + parent: &requestTransactionManager{}, + completed: true, + }, + wantErr: assert.Error, + }, + } + for _, tt := range tests { + t1.Run(tt.name, func(t *testing.T) { + if tt.mockSetup != nil { + tt.mockSetup(t, &tt.fields, &tt.args) + } + r := &requestTransaction{ + parent: tt.fields.parent, + transactionId: tt.fields.transactionId, + operation: tt.fields.operation, + completionFuture: tt.fields.completionFuture, + transactionLog: tt.fields.transactionLog, + completed: tt.fields.completed, + } + tt.wantErr(t, r.FailRequest(tt.args.err), "FailRequest() error = %v", tt.args.err) + }) + } +} + +func Test_requestTransaction_String(t1 *testing.T) { + type fields struct { + parent *requestTransactionManager + transactionId int32 + operation pool.Runnable + completionFuture pool.CompletionFuture + transactionLog zerolog.Logger + } + tests := []struct { + name string + fields fields + want string + }{ + { + name: "give a string", + want: "Transaction{tid:0}", + }, + } + for _, tt := range tests { + t1.Run(tt.name, func(t1 *testing.T) { + t := &requestTransaction{ + parent: tt.fields.parent, + transactionId: tt.fields.transactionId, + operation: tt.fields.operation, + completionFuture: tt.fields.completionFuture, + transactionLog: tt.fields.transactionLog, + } + if got := t.String(); got != tt.want { + t1.Errorf("String() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_requestTransaction_Submit(t1 *testing.T) { + type fields struct { + parent *requestTransactionManager + transactionId int32 + operation pool.Runnable + completionFuture pool.CompletionFuture + transactionLog zerolog.Logger + completed bool + } + type args struct { + operation RequestTransactionRunnable + } + tests := []struct { + name string + fields fields + args args + }{ + { + name: "submit something", + fields: fields{ + parent: &requestTransactionManager{}, + }, + args: args{ + operation: func(_ RequestTransaction) { + // NOOP + }, + }, + }, + { + name: "submit something again", + fields: fields{ + parent: &requestTransactionManager{}, + operation: func() { + // NOOP + }, + }, + args: args{ + operation: func(_ RequestTransaction) { + // NOOP + }, + }, + }, + { + name: "submit completed", + fields: fields{ + parent: &requestTransactionManager{}, + operation: func() { + // NOOP + }, + completed: true, + }, + args: args{ + operation: func(_ RequestTransaction) { + // NOOP + }, + }, + }, + } + for _, tt := range tests { + t1.Run(tt.name, func(t1 *testing.T) { + t := &requestTransaction{ + parent: tt.fields.parent, + transactionId: tt.fields.transactionId, + operation: tt.fields.operation, + completionFuture: tt.fields.completionFuture, + transactionLog: tt.fields.transactionLog, + completed: tt.fields.completed, + } + t.Submit(tt.args.operation) + t.operation() + }) + } +} + +func Test_requestTransaction_AwaitCompletion(t1 *testing.T) { + type fields struct { + parent *requestTransactionManager + transactionId int32 + operation pool.Runnable + completionFuture pool.CompletionFuture + transactionLog zerolog.Logger + } + type args struct { + ctx context.Context + } + tests := []struct { + name string + fields fields + args args + mockSetup func(t *testing.T, fields *fields, args *args) + wantErr bool + }{ + { + name: "just wait", + fields: fields{ + parent: &requestTransactionManager{ + runningRequests: []*requestTransaction{ + func() *requestTransaction { + r := &requestTransaction{} + go func() { + time.Sleep(100 * time.Millisecond) + // We fake an ending transaction like that + r.transactionId = 1 + }() + return r + }(), + }, + }, + }, + args: args{ + ctx: func() context.Context { + ctx, cancelFunc := context.WithCancel(context.Background()) + cancelFunc() + return ctx + }(), + }, + mockSetup: func(t *testing.T, fields *fields, args *args) { + completionFuture := NewMockCompletionFuture(t) + expect := completionFuture.EXPECT() + expect.AwaitCompletion(mock.Anything).Return(nil) + fields.completionFuture = completionFuture + }, + }, + } + for _, tt := range tests { + t1.Run(tt.name, func(t1 *testing.T) { + if tt.mockSetup != nil { + tt.mockSetup(t1, &tt.fields, &tt.args) + } + t := &requestTransaction{ + parent: tt.fields.parent, + transactionId: tt.fields.transactionId, + operation: tt.fields.operation, + completionFuture: tt.fields.completionFuture, + transactionLog: tt.fields.transactionLog, + } + if err := t.AwaitCompletion(tt.args.ctx); (err != nil) != tt.wantErr { + t1.Errorf("AwaitCompletion() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/plc4go/spi/transactions/completedFuture.go b/plc4go/spi/transactions/completedFuture.go new file mode 100644 index 00000000000..ac8227f5797 --- /dev/null +++ b/plc4go/spi/transactions/completedFuture.go @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package transactions + +import "context" + +type completedFuture struct { + err error +} + +func (c completedFuture) AwaitCompletion(_ context.Context) error { + return c.err +} + +func (completedFuture) Cancel(_ bool, _ error) { + // No op +}