Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ commit:
test: unit-test e2e-test

unit-test: build
set -e; for m in $$(go list ./... | egrep -v 'test|models|e2e'); do echo $$m; go test -v $$m; done
set -e; for m in $$(go list ./... | egrep -v 'test|models|e2e'); do echo $$m; go test -gcflags=all=-l -v $$m; done

e2e-test: build
PLUGIN_DIR=$(shell readlink -f bin/plugins) go test -v ./test/...
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
github.com/Microsoft/go-winio v0.5.0 // indirect
github.com/ProtonMail/go-crypto v0.0.0-20210428141323-04723f9f07d7 // indirect
github.com/acomagu/bufpipe v1.0.3 // indirect
github.com/agiledragon/gomonkey v2.0.2+incompatible // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/denisenkom/go-mssqldb v0.10.0 // indirect
github.com/emirpasic/gods v1.12.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ github.com/ProtonMail/go-crypto v0.0.0-20210428141323-04723f9f07d7 h1:YoJbenK9C6
github.com/ProtonMail/go-crypto v0.0.0-20210428141323-04723f9f07d7/go.mod h1:z4/9nQmJSSwwds7ejkxaJwO37dru3geImFUdJlaLzQo=
github.com/acomagu/bufpipe v1.0.3 h1:fxAGrHZTgQ9w5QqVItgzwj235/uYZYgbXitB+dLupOk=
github.com/acomagu/bufpipe v1.0.3/go.mod h1:mxdxdup/WdsKVreO5GpW4+M/1CE2sMG4jeGJ2sYmHc4=
github.com/agiledragon/gomonkey v2.0.2+incompatible h1:eXKi9/piiC3cjJD1658mEE2o3NjkJ5vDLgYjCQu0Xlw=
github.com/agiledragon/gomonkey v2.0.2+incompatible/go.mod h1:2NGfXu1a80LLr2cmWXGBDaHEjb1idR6+FVlX5T3D9hw=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 h1:kFOfPq6dUM1hTo4JG6LR5AXSUEsOjtdm0kw0FtQtMJA=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
Expand Down
25 changes: 18 additions & 7 deletions plugins/helper/api_async_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ func CreateAsyncApiClient(
}, nil
}

func (apiClient *ApiAsyncClient) GetMaxRetry() int {
return apiClient.maxRetry
}

func (apiClient *ApiAsyncClient) SetMaxRetry(
maxRetry int,
) {
apiClient.maxRetry = maxRetry
}

func (apiClient *ApiAsyncClient) DoAsync(
method string,
path string,
Expand All @@ -104,23 +114,24 @@ func (apiClient *ApiAsyncClient) DoAsync(
var body []byte
res, err = apiClient.Do(method, path, query, body, header)
if err == nil {
defer func(body io.ReadCloser) { body.Close() }(res.Body)
body, err = ioutil.ReadAll(res.Body)
res.Body.Close()
res.Body = io.NopCloser(bytes.NewBuffer(body))
if err == nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems res.Body will not be closed in the case of err != nil

res.Body = io.NopCloser(bytes.NewBuffer(body))
}
}

// it make sense to retry on request failure, but not error from handler and canceled error
if err != nil {
if retry < apiClient.maxRetry && err != context.Canceled {
apiClient.logError("retry #%d for %s", retry, err.Error())
retry++
return apiClient.scheduler.Submit(subFunc, apiClient.scheduler.subPool)
}
} else if res.StatusCode >= 400 {
err = fmt.Errorf("http code error[%d]:[%s]", res.StatusCode, body)
}
if err == nil {
if res.StatusCode >= 400 {
err = fmt.Errorf("http code error[%d]:[%s]", res.StatusCode, body)
}
}

// it is important to let handler have a chance to handle error, or it can hang indefinitely
// when error occurs
return handler(res, err)
Expand Down
313 changes: 313 additions & 0 deletions plugins/helper/api_async_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
package helper

import (
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"reflect"
"sync/atomic"
"testing"
"time"

"github.com/agiledragon/gomonkey"
"github.com/merico-dev/lake/plugins/core"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"gorm.io/gorm"
)

// TestReadder for test io data
type TestReader struct {
Err error
}

func (r *TestReader) Read(p []byte) (n int, err error) {
return 0, r.Err
}

func (r *TestReader) Close() error {
return nil
}

// it is better to move some where more public.
var ErrUnitTest error = fmt.Errorf("ErrorForTest[%d]", time.Now().UnixNano())

func callback(_ *http.Response, err error) error {
if err == nil {
return nil
}
return ErrUnitTest
}

func GetConfigForTest(basepath string) *viper.Viper {
// create the object and load the .env file
v := viper.New()
envfile := ".env"
envbasefile := basepath + ".env.example"
bytesRead, err := ioutil.ReadFile(envbasefile)
if err != nil {
logrus.Warn("Failed to read ["+envbasefile+"] file:", err)
}
err = ioutil.WriteFile(envfile, bytesRead, 0644)

if err != nil {
logrus.Warn("Failed to write config file ["+envfile+"] file:", err)
}

v.SetConfigFile(envfile)
err = v.ReadInConfig()
if err != nil {
path, _ := os.Getwd()
logrus.Warn("Now in the path [" + path + "]")
logrus.Warn("Failed to read ["+envfile+"] file:", err)
}
v.AutomaticEnv()
// This line is essential for reading and writing
v.WatchConfig()
return v
}

// Create an AsyncApiClient object for test
func CreateTestAsyncApiClient(t *testing.T) (*ApiAsyncClient, error) {
// create rate limit calculator
rateLimiter := &ApiRateLimitCalculator{
UserRateLimitPerHour: 36000, // ten times each seconed
}

// set the function of create new default taskcontext for the AsyncApiClient
gm := gomonkey.ApplyFunc(NewDefaultTaskContext, func(
cfg *viper.Viper,
_ core.Logger,
db *gorm.DB,
_ context.Context,
name string,
subtasks map[string]bool,
progress chan core.RunningProgress,
) core.TaskContext {
return &DefaultTaskContext{
&defaultExecContext{
cfg: cfg,
logger: &DefaultLogger{},
db: db,
ctx: context.Background(),
name: "Test",
data: nil,
progress: progress,
},
subtasks,
make(map[string]*DefaultSubTaskContext),
}
})
defer gm.Reset()
taskCtx := NewDefaultTaskContext(GetConfigForTest("../../"), nil, nil, nil, "", nil, nil)

// create ApiClient
apiClient := &ApiClient{}
apiClient.Setup("", nil, 3*time.Second)
apiClient.SetContext(taskCtx.GetContext())

return CreateAsyncApiClient(taskCtx, apiClient, rateLimiter)
}

// go test -gcflags=all=-l -run ^TestWaitAsync_EmptyWork
func TestWaitAsync_EmptyWork(t *testing.T) {
asyncApiClient, _ := CreateTestAsyncApiClient(t)

err := asyncApiClient.WaitAsync()
assert.Equal(t, err, nil)
}

// go test -gcflags=all=-l -run ^TestWaitAsync_WithWork
func TestWaitAsync_WithWork(t *testing.T) {
asyncApiClient, _ := CreateTestAsyncApiClient(t)

gm_info := gomonkey.ApplyMethod(reflect.TypeOf(&DefaultLogger{}), "Info", func(_ *DefaultLogger, _ string, _ ...interface{}) {
})
defer gm_info.Reset()

gm_do := gomonkey.ApplyMethod(reflect.TypeOf(&ApiClient{}), "Do", func(
apiClient *ApiClient,
method string,
path string,
query url.Values,
body interface{},
headers http.Header,
) (*http.Response, error) {
return &http.Response{
Body: &TestReader{Err: io.EOF},
StatusCode: 200,
}, nil
})
defer gm_do.Reset()

// check if the callback1 has been finished
waitSuc := false
callback1 := func(_ *http.Response, err error) error {
// wait 0.5 second for wait
time.Sleep(500 * time.Millisecond)
waitSuc = true
return nil
}

// begin to test
err := asyncApiClient.DoAsync("", "", nil, nil, nil, callback1, 0)
assert.Equal(t, err, nil)

err = asyncApiClient.WaitAsync()
assert.Equal(t, err, nil)
assert.Equal(t, waitSuc, true)
}

// go test -gcflags=all=-l -run ^TestWaitAsync_MutiWork
func TestWaitAsync_MutiWork(t *testing.T) {
asyncApiClient, _ := CreateTestAsyncApiClient(t)

gm_info := gomonkey.ApplyMethod(reflect.TypeOf(&DefaultLogger{}), "Info", func(_ *DefaultLogger, _ string, _ ...interface{}) {
})
defer gm_info.Reset()

gm_do := gomonkey.ApplyMethod(reflect.TypeOf(&ApiClient{}), "Do", func(
apiClient *ApiClient,
method string,
path string,
query url.Values,
body interface{},
headers http.Header,
) (*http.Response, error) {
return &http.Response{
Body: &TestReader{Err: io.EOF},
StatusCode: 200,
}, nil
})
defer gm_do.Reset()

// check if the callback2 has been finished
finishedCount := int64(0)
Comment thread
mappjzc marked this conversation as resolved.
callback2 := func(_ *http.Response, err error) error {
// wait 0.5 second for wait
time.Sleep(500 * time.Millisecond)
atomic.AddInt64(&finishedCount, 1)
return nil
}

testCount := int64(5)

// begin to test
for i := int64(0); i < testCount; i++ {
err := asyncApiClient.DoAsync("", "", nil, nil, nil, callback2, 0)
assert.Equal(t, err, nil)
}

err := asyncApiClient.WaitAsync()
assert.Equal(t, err, nil)
assert.Equal(t, finishedCount, testCount)
}

// go test -gcflags=all=-l -run ^TestDoAsync_OnceSuceess
func TestDoAsync_OnceSuceess(t *testing.T) {
asyncApiClient, _ := CreateTestAsyncApiClient(t)
gm_info := gomonkey.ApplyMethod(reflect.TypeOf(&DefaultLogger{}), "Info", func(_ *DefaultLogger, _ string, _ ...interface{}) {
})
defer gm_info.Reset()

gm_do := gomonkey.ApplyMethod(reflect.TypeOf(&ApiClient{}), "Do", func(
apiClient *ApiClient,
method string,
path string,
query url.Values,
body interface{},
headers http.Header,
) (*http.Response, error) {
return &http.Response{
Body: &TestReader{Err: io.EOF},
StatusCode: 200,
}, nil
})
defer gm_do.Reset()

// ready to test
err := asyncApiClient.DoAsync("", "", nil, nil, nil, callback, 0)
assert.Equal(t, err, nil)

err = asyncApiClient.WaitAsync()
assert.Equal(t, err, nil)
}

// go test -gcflags=all=-l -run ^TestDoAsync_TryAndFail
func TestDoAsync_TryAndFail(t *testing.T) {
asyncApiClient, _ := CreateTestAsyncApiClient(t)
gm_info := gomonkey.ApplyMethod(reflect.TypeOf(&DefaultLogger{}), "Info", func(_ *DefaultLogger, _ string, _ ...interface{}) {
})
defer gm_info.Reset()

gm_do := gomonkey.ApplyMethod(reflect.TypeOf(&ApiClient{}), "Do", func(
apiClient *ApiClient,
method string,
path string,
query url.Values,
body interface{},
headers http.Header,
) (*http.Response, error) {
return &http.Response{
Body: &TestReader{Err: ErrUnitTest},
StatusCode: 500,
}, nil
})
defer gm_do.Reset()

// ready to test
err := asyncApiClient.DoAsync("", "", nil, nil, nil, callback, 0)
assert.Equal(t, err, nil)

err = asyncApiClient.WaitAsync()
// there must have err and the err must be ErrUnitTest
if assert.NotNil(t, err) {
assert.Contains(t, err.Error(), ErrUnitTest.Error())
}
}

// go test -gcflags=all=-l -run ^TestDoAsync_TryAndSuceess
func TestDoAsync_TryAndSuceess(t *testing.T) {
asyncApiClient, _ := CreateTestAsyncApiClient(t)
gm_info := gomonkey.ApplyMethod(reflect.TypeOf(&DefaultLogger{}), "Info", func(_ *DefaultLogger, _ string, _ ...interface{}) {
})
defer gm_info.Reset()

// counting the retry times
times := 0
gm_do := gomonkey.ApplyMethod(reflect.TypeOf(&ApiClient{}), "Do", func(
apiClient *ApiClient,
method string,
path string,
query url.Values,
body interface{},
headers http.Header,
) (*http.Response, error) {
times++
if times <= 3 {
return &http.Response{
Body: &TestReader{Err: ErrUnitTest},
StatusCode: 301,
}, nil
} else {
return &http.Response{
Body: &TestReader{Err: io.EOF},
StatusCode: 200,
}, nil
}
})
defer gm_do.Reset()
asyncApiClient.SetMaxRetry(5)

// ready to test
err := asyncApiClient.DoAsync("", "", nil, nil, nil, callback, 0)
assert.Equal(t, err, nil)

err = asyncApiClient.WaitAsync()
assert.Equal(t, err, nil)
}
1 change: 1 addition & 0 deletions plugins/helper/api_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func (apiClient *ApiClient) Do(
if apiClient.afterReponse != nil {
err = apiClient.afterReponse(res)
if err != nil {
res.Body.Close()
return nil, err
}
}
Expand Down
Loading