Skip to content

Commit

Permalink
Add context watchdog for batch send cancellation
Browse files Browse the repository at this point in the history
* batch: context watchdog

* fix context cancellation
  • Loading branch information
jkaflik committed Oct 12, 2023
1 parent 815b7eb commit dcf2b3f
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 0 deletions.
13 changes: 13 additions & 0 deletions conn_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,14 @@ func (b *batch) Column(idx int) driver.BatchColumn {
}

func (b *batch) Send() (err error) {
stopCW := contextWatchdog(b.ctx, func() {
// close TCP connection on context cancel. There is no other way simple way to interrupt underlying operations.
// as verified in the test, this is safe to do and cleanups resources later on
_ = b.conn.conn.Close()
})

defer func() {
stopCW()
b.sent = true
b.release(err)
}()
Expand All @@ -192,6 +199,12 @@ func (b *batch) Send() (err error) {
}
if b.block.Rows() != 0 {
if err = b.conn.sendData(b.block, ""); err != nil {
// there might be an error caused by context cancellation
// in this case we should return context error instead of net.OpError
if ctxErr := b.ctx.Err(); ctxErr != nil {
return ctxErr
}

return err
}
}
Expand Down
47 changes: 47 additions & 0 deletions context_watchdog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed to ClickHouse, Inc. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. ClickHouse, Inc. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package clickhouse

import "context"

// contextWatchdog is a helper function to run a callback when the context is done.
// it has a cancellation function to prevent the callback from running.
// Useful for interrupting some logic when the context is done,
// but you want to not bother about context cancellation if your logic is already done.
// Example:
// stopCW := contextWatchdog(ctx, func() { /* do something */ })
// // do something else
// defer stopCW()
func contextWatchdog(ctx context.Context, callback func()) (cancel func()) {
exit := make(chan struct{})

go func() {
for {
select {
case <-exit:
return
case <-ctx.Done():
callback()
}
}
}()

return func() {
exit <- struct{}{}
}
}
52 changes: 52 additions & 0 deletions tests/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Licensed to ClickHouse, Inc. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. ClickHouse, Inc. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package tests

import (
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
"testing"
"time"
)

func TestBatchContextCancellation(t *testing.T) {
te, err := GetTestEnvironment(testSet)
require.NoError(t, err)
opts := ClientOptionsFromEnv(te, clickhouse.Settings{})
opts.MaxOpenConns = 1
conn, err := GetConnectionWithOptions(&opts)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

require.NoError(t, conn.Exec(context.Background(), "create table if not exists test_batch_cancellation (x String) engine=Memory"))
defer conn.Exec(context.Background(), "drop table if exists test_batch_cancellation")

b, err := conn.PrepareBatch(ctx, "insert into test_batch_cancellation")
require.NoError(t, err)
for i := 0; i < 1_000_000; i++ {
require.NoError(t, b.Append("value"))
}

require.Equal(t, context.DeadlineExceeded, b.Send())

// assert if connection is properly released after context cancellation
require.NoError(t, conn.Exec(context.Background(), "SELECT 1"))
}

0 comments on commit dcf2b3f

Please sign in to comment.