From a5fd66e5dcee56f9c04493d31ca4d1d0c6e7bcd3 Mon Sep 17 00:00:00 2001 From: Nddtfjiang Date: Mon, 25 Apr 2022 09:13:58 +0000 Subject: [PATCH 1/5] feat: add set and get max retry Add SetMaxRetry and GetMaxRetry to ApiAsyncClient for change maxRetry after created. Nddtfjiang --- plugins/helper/api_async_client.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/plugins/helper/api_async_client.go b/plugins/helper/api_async_client.go index 17a1a8dd75e..f41c8c2bb4e 100644 --- a/plugins/helper/api_async_client.go +++ b/plugins/helper/api_async_client.go @@ -88,6 +88,16 @@ func CreateAsyncApiClient( }, nil } +func (apiClient *ApiAsyncClient) GetMaxRetry() int { + return apiClient.maxRetry +} + +func (apiClient *ApiAsyncClient) SetMaxRetry( + maxRetry int, +) { + apiClient.maxRetry = maxRetry +} + func (apiClient *ApiAsyncClient) DoAsync( method string, path string, From b9e7ee8ad9a15e8d26715eb0dec073565db14900 Mon Sep 17 00:00:00 2001 From: Nddtfjiang Date: Mon, 25 Apr 2022 09:16:20 +0000 Subject: [PATCH 2/5] fix: fix an error lost fix an error lost in DoAsync of ApiAsyncClient, after ReadAll. Nddtfjiang --- plugins/helper/api_async_client.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/plugins/helper/api_async_client.go b/plugins/helper/api_async_client.go index f41c8c2bb4e..930e2a6eb67 100644 --- a/plugins/helper/api_async_client.go +++ b/plugins/helper/api_async_client.go @@ -115,8 +115,10 @@ func (apiClient *ApiAsyncClient) DoAsync( res, err = apiClient.Do(method, path, query, body, header) if err == nil { body, err = ioutil.ReadAll(res.Body) - res.Body.Close() - res.Body = io.NopCloser(bytes.NewBuffer(body)) + if err == nil { + res.Body.Close() + res.Body = io.NopCloser(bytes.NewBuffer(body)) + } } // it make sense to retry on request failure, but not error from handler and canceled error if err != nil { From 83e2fef0bc0f6e1e660fb5953e28e6701ff23613 Mon Sep 17 00:00:00 2001 From: Nddtfjiang Date: Mon, 25 Apr 2022 09:33:03 +0000 Subject: [PATCH 3/5] fix: check if the log nil befor use Check if the log is nil befor log print In Log of DefaultLogger. Nddtfjiang --- plugins/helper/default_logger.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/plugins/helper/default_logger.go b/plugins/helper/default_logger.go index d0563cbda57..728e3c6b7f4 100644 --- a/plugins/helper/default_logger.go +++ b/plugins/helper/default_logger.go @@ -17,6 +17,9 @@ func NewDefaultLogger(log *logrus.Logger, prefix string) *DefaultLogger { } func (l *DefaultLogger) IsLevelEnabled(level core.LogLevel) bool { + if l.log == nil { + return false + } return l.log.IsLevelEnabled(logrus.Level(level)) } From 4930f8f42fe160841f688cec8391df97152a8e48 Mon Sep 17 00:00:00 2001 From: Nddtfjiang Date: Mon, 25 Apr 2022 09:55:47 +0000 Subject: [PATCH 4/5] test: unit test for asyncapiclient TestWaitAsync_EmptyWork TestWaitAsync_WithWork TestWaitAsync_MutiWork TestDoAsync_OnceSuceess TestDoAsync_TryAndFail TestDoAsync_TryAndSuceess Add gomonkey Nddtfjiang --- Makefile | 2 +- go.mod | 1 + go.sum | 2 + plugins/helper/api_async_client_test.go | 313 ++++++++++++++++++++++++ 4 files changed, 317 insertions(+), 1 deletion(-) create mode 100644 plugins/helper/api_async_client_test.go diff --git a/Makefile b/Makefile index 63b5ffbaf20..4450fdd0c4f 100644 --- a/Makefile +++ b/Makefile @@ -33,7 +33,7 @@ commit: test: unit-test e2e-test unit-test: build - set -e; for m in $$(go list ./... | egrep -v 'test|models|e2e'); do echo $$m; go test -v $$m; done + set -e; for m in $$(go list ./... | egrep -v 'test|models|e2e'); do echo $$m; go test -gcflags=all=-l -v $$m; done e2e-test: build PLUGIN_DIR=$(shell readlink -f bin/plugins) go test -v ./test/... diff --git a/go.mod b/go.mod index a5689c6c8e2..6165750dcdb 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/Microsoft/go-winio v0.5.0 // indirect github.com/ProtonMail/go-crypto v0.0.0-20210428141323-04723f9f07d7 // indirect github.com/acomagu/bufpipe v1.0.3 // indirect + github.com/agiledragon/gomonkey v2.0.2+incompatible // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/denisenkom/go-mssqldb v0.10.0 // indirect github.com/emirpasic/gods v1.12.0 // indirect diff --git a/go.sum b/go.sum index 73154ff3c00..261459d745e 100644 --- a/go.sum +++ b/go.sum @@ -47,6 +47,8 @@ github.com/ProtonMail/go-crypto v0.0.0-20210428141323-04723f9f07d7 h1:YoJbenK9C6 github.com/ProtonMail/go-crypto v0.0.0-20210428141323-04723f9f07d7/go.mod h1:z4/9nQmJSSwwds7ejkxaJwO37dru3geImFUdJlaLzQo= github.com/acomagu/bufpipe v1.0.3 h1:fxAGrHZTgQ9w5QqVItgzwj235/uYZYgbXitB+dLupOk= github.com/acomagu/bufpipe v1.0.3/go.mod h1:mxdxdup/WdsKVreO5GpW4+M/1CE2sMG4jeGJ2sYmHc4= +github.com/agiledragon/gomonkey v2.0.2+incompatible h1:eXKi9/piiC3cjJD1658mEE2o3NjkJ5vDLgYjCQu0Xlw= +github.com/agiledragon/gomonkey v2.0.2+incompatible/go.mod h1:2NGfXu1a80LLr2cmWXGBDaHEjb1idR6+FVlX5T3D9hw= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 h1:kFOfPq6dUM1hTo4JG6LR5AXSUEsOjtdm0kw0FtQtMJA= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= diff --git a/plugins/helper/api_async_client_test.go b/plugins/helper/api_async_client_test.go new file mode 100644 index 00000000000..3d706e729d8 --- /dev/null +++ b/plugins/helper/api_async_client_test.go @@ -0,0 +1,313 @@ +package helper + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "os" + "reflect" + "sync/atomic" + "testing" + "time" + + "github.com/agiledragon/gomonkey" + "github.com/merico-dev/lake/plugins/core" + "github.com/sirupsen/logrus" + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" + "gorm.io/gorm" +) + +// TestReadder for test io data +type TestReader struct { + Err error +} + +func (r *TestReader) Read(p []byte) (n int, err error) { + return 0, r.Err +} + +func (r *TestReader) Close() error { + return nil +} + +// it is better to move some where more public. +var ErrUnitTest error = fmt.Errorf("ErrorForTest[%d]", time.Now().UnixNano()) + +func callback(_ *http.Response, err error) error { + if err == nil { + return nil + } + return ErrUnitTest +} + +func GetConfigForTest(basepath string) *viper.Viper { + // create the object and load the .env file + v := viper.New() + envfile := ".env" + envbasefile := basepath + ".env.example" + bytesRead, err := ioutil.ReadFile(envbasefile) + if err != nil { + logrus.Warn("Failed to read ["+envbasefile+"] file:", err) + } + err = ioutil.WriteFile(envfile, bytesRead, 0644) + + if err != nil { + logrus.Warn("Failed to write config file ["+envfile+"] file:", err) + } + + v.SetConfigFile(envfile) + err = v.ReadInConfig() + if err != nil { + path, _ := os.Getwd() + logrus.Warn("Now in the path [" + path + "]") + logrus.Warn("Failed to read ["+envfile+"] file:", err) + } + v.AutomaticEnv() + // This line is essential for reading and writing + v.WatchConfig() + return v +} + +// Create an AsyncApiClient object for test +func CreateTestAsyncApiClient(t *testing.T) (*ApiAsyncClient, error) { + // create rate limit calculator + rateLimiter := &ApiRateLimitCalculator{ + UserRateLimitPerHour: 36000, // ten times each seconed + } + + // set the function of create new default taskcontext for the AsyncApiClient + gm := gomonkey.ApplyFunc(NewDefaultTaskContext, func( + cfg *viper.Viper, + _ core.Logger, + db *gorm.DB, + _ context.Context, + name string, + subtasks map[string]bool, + progress chan core.RunningProgress, + ) core.TaskContext { + return &DefaultTaskContext{ + &defaultExecContext{ + cfg: cfg, + logger: &DefaultLogger{}, + db: db, + ctx: context.Background(), + name: "Test", + data: nil, + progress: progress, + }, + subtasks, + make(map[string]*DefaultSubTaskContext), + } + }) + defer gm.Reset() + taskCtx := NewDefaultTaskContext(GetConfigForTest("../../"), nil, nil, nil, "", nil, nil) + + // create ApiClient + apiClient := &ApiClient{} + apiClient.Setup("", nil, 3*time.Second) + apiClient.SetContext(taskCtx.GetContext()) + + return CreateAsyncApiClient(taskCtx, apiClient, rateLimiter) +} + +// go test -gcflags=all=-l -run ^TestWaitAsync_EmptyWork +func TestWaitAsync_EmptyWork(t *testing.T) { + asyncApiClient, _ := CreateTestAsyncApiClient(t) + + err := asyncApiClient.WaitAsync() + assert.Equal(t, err, nil) +} + +// go test -gcflags=all=-l -run ^TestWaitAsync_WithWork +func TestWaitAsync_WithWork(t *testing.T) { + asyncApiClient, _ := CreateTestAsyncApiClient(t) + + gm_info := gomonkey.ApplyMethod(reflect.TypeOf(&DefaultLogger{}), "Info", func(_ *DefaultLogger, _ string, _ ...interface{}) { + }) + defer gm_info.Reset() + + gm_do := gomonkey.ApplyMethod(reflect.TypeOf(&ApiClient{}), "Do", func( + apiClient *ApiClient, + method string, + path string, + query url.Values, + body interface{}, + headers http.Header, + ) (*http.Response, error) { + return &http.Response{ + Body: &TestReader{Err: io.EOF}, + StatusCode: 200, + }, nil + }) + defer gm_do.Reset() + + // check if the callback1 has been finished + waitSuc := false + callback1 := func(_ *http.Response, err error) error { + // wait 0.5 second for wait + time.Sleep(500 * time.Millisecond) + waitSuc = true + return nil + } + + // begin to test + err := asyncApiClient.DoAsync("", "", nil, nil, nil, callback1, 0) + assert.Equal(t, err, nil) + + err = asyncApiClient.WaitAsync() + assert.Equal(t, err, nil) + assert.Equal(t, waitSuc, true) +} + +// go test -gcflags=all=-l -run ^TestWaitAsync_MutiWork +func TestWaitAsync_MutiWork(t *testing.T) { + asyncApiClient, _ := CreateTestAsyncApiClient(t) + + gm_info := gomonkey.ApplyMethod(reflect.TypeOf(&DefaultLogger{}), "Info", func(_ *DefaultLogger, _ string, _ ...interface{}) { + }) + defer gm_info.Reset() + + gm_do := gomonkey.ApplyMethod(reflect.TypeOf(&ApiClient{}), "Do", func( + apiClient *ApiClient, + method string, + path string, + query url.Values, + body interface{}, + headers http.Header, + ) (*http.Response, error) { + return &http.Response{ + Body: &TestReader{Err: io.EOF}, + StatusCode: 200, + }, nil + }) + defer gm_do.Reset() + + // check if the callback2 has been finished + finishedCount := int64(0) + callback2 := func(_ *http.Response, err error) error { + // wait 0.5 second for wait + time.Sleep(500 * time.Millisecond) + atomic.AddInt64(&finishedCount, 1) + return nil + } + + testCount := int64(5) + + // begin to test + for i := int64(0); i < testCount; i++ { + err := asyncApiClient.DoAsync("", "", nil, nil, nil, callback2, 0) + assert.Equal(t, err, nil) + } + + err := asyncApiClient.WaitAsync() + assert.Equal(t, err, nil) + assert.Equal(t, finishedCount, testCount) +} + +// go test -gcflags=all=-l -run ^TestDoAsync_OnceSuceess +func TestDoAsync_OnceSuceess(t *testing.T) { + asyncApiClient, _ := CreateTestAsyncApiClient(t) + gm_info := gomonkey.ApplyMethod(reflect.TypeOf(&DefaultLogger{}), "Info", func(_ *DefaultLogger, _ string, _ ...interface{}) { + }) + defer gm_info.Reset() + + gm_do := gomonkey.ApplyMethod(reflect.TypeOf(&ApiClient{}), "Do", func( + apiClient *ApiClient, + method string, + path string, + query url.Values, + body interface{}, + headers http.Header, + ) (*http.Response, error) { + return &http.Response{ + Body: &TestReader{Err: io.EOF}, + StatusCode: 200, + }, nil + }) + defer gm_do.Reset() + + // ready to test + err := asyncApiClient.DoAsync("", "", nil, nil, nil, callback, 0) + assert.Equal(t, err, nil) + + err = asyncApiClient.WaitAsync() + assert.Equal(t, err, nil) +} + +// go test -gcflags=all=-l -run ^TestDoAsync_TryAndFail +func TestDoAsync_TryAndFail(t *testing.T) { + asyncApiClient, _ := CreateTestAsyncApiClient(t) + gm_info := gomonkey.ApplyMethod(reflect.TypeOf(&DefaultLogger{}), "Info", func(_ *DefaultLogger, _ string, _ ...interface{}) { + }) + defer gm_info.Reset() + + gm_do := gomonkey.ApplyMethod(reflect.TypeOf(&ApiClient{}), "Do", func( + apiClient *ApiClient, + method string, + path string, + query url.Values, + body interface{}, + headers http.Header, + ) (*http.Response, error) { + return &http.Response{ + Body: &TestReader{Err: ErrUnitTest}, + StatusCode: 500, + }, nil + }) + defer gm_do.Reset() + + // ready to test + err := asyncApiClient.DoAsync("", "", nil, nil, nil, callback, 0) + assert.Equal(t, err, nil) + + err = asyncApiClient.WaitAsync() + // there must have err and the err must be ErrUnitTest + if assert.NotNil(t, err) { + assert.Contains(t, err.Error(), ErrUnitTest.Error()) + } +} + +// go test -gcflags=all=-l -run ^TestDoAsync_TryAndSuceess +func TestDoAsync_TryAndSuceess(t *testing.T) { + asyncApiClient, _ := CreateTestAsyncApiClient(t) + gm_info := gomonkey.ApplyMethod(reflect.TypeOf(&DefaultLogger{}), "Info", func(_ *DefaultLogger, _ string, _ ...interface{}) { + }) + defer gm_info.Reset() + + // counting the retry times + times := 0 + gm_do := gomonkey.ApplyMethod(reflect.TypeOf(&ApiClient{}), "Do", func( + apiClient *ApiClient, + method string, + path string, + query url.Values, + body interface{}, + headers http.Header, + ) (*http.Response, error) { + times++ + if times <= 3 { + return &http.Response{ + Body: &TestReader{Err: ErrUnitTest}, + StatusCode: 301, + }, nil + } else { + return &http.Response{ + Body: &TestReader{Err: io.EOF}, + StatusCode: 200, + }, nil + } + }) + defer gm_do.Reset() + asyncApiClient.SetMaxRetry(5) + + // ready to test + err := asyncApiClient.DoAsync("", "", nil, nil, nil, callback, 0) + assert.Equal(t, err, nil) + + err = asyncApiClient.WaitAsync() + assert.Equal(t, err, nil) +} From 839d6777192e95bc21434cae4112ff06fb8a9cd6 Mon Sep 17 00:00:00 2001 From: Nddtfjiang Date: Tue, 26 Apr 2022 11:31:56 +0000 Subject: [PATCH 5/5] fix: fix http response body without close in sometimes at apiclient add defer in DoAsync for close the http response. add res.body.close() when afterReponse failed befor it return a nil res. Nddtfjiang --- plugins/helper/api_async_client.go | 11 +++++------ plugins/helper/api_client.go | 1 + 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/plugins/helper/api_async_client.go b/plugins/helper/api_async_client.go index 930e2a6eb67..a6e44569c9d 100644 --- a/plugins/helper/api_async_client.go +++ b/plugins/helper/api_async_client.go @@ -114,12 +114,13 @@ func (apiClient *ApiAsyncClient) DoAsync( var body []byte res, err = apiClient.Do(method, path, query, body, header) if err == nil { + defer func(body io.ReadCloser) { body.Close() }(res.Body) body, err = ioutil.ReadAll(res.Body) if err == nil { - res.Body.Close() res.Body = io.NopCloser(bytes.NewBuffer(body)) } } + // it make sense to retry on request failure, but not error from handler and canceled error if err != nil { if retry < apiClient.maxRetry && err != context.Canceled { @@ -127,12 +128,10 @@ func (apiClient *ApiAsyncClient) DoAsync( retry++ return apiClient.scheduler.Submit(subFunc, apiClient.scheduler.subPool) } + } else if res.StatusCode >= 400 { + err = fmt.Errorf("http code error[%d]:[%s]", res.StatusCode, body) } - if err == nil { - if res.StatusCode >= 400 { - err = fmt.Errorf("http code error[%d]:[%s]", res.StatusCode, body) - } - } + // it is important to let handler have a chance to handle error, or it can hang indefinitely // when error occurs return handler(res, err) diff --git a/plugins/helper/api_client.go b/plugins/helper/api_client.go index 53a8b64f3a9..3ebeba28864 100644 --- a/plugins/helper/api_client.go +++ b/plugins/helper/api_client.go @@ -204,6 +204,7 @@ func (apiClient *ApiClient) Do( if apiClient.afterReponse != nil { err = apiClient.afterReponse(res) if err != nil { + res.Body.Close() return nil, err } }