From cfe8ca43a22472ea8b7a9a0968617015ba2c9d63 Mon Sep 17 00:00:00 2001 From: Lz Date: Sat, 14 Aug 2021 11:54:34 +0800 Subject: [PATCH 1/7] fix(transactin):#303 added min_confirmation in 0chain_blobber.yml; replaced native http.Client with resty for performance --- .gitignore | 2 + code/go/0chain.net/blobber/main.go | 16 + .../0chain.net/blobbercore/config/config.go | 9 +- code/go/0chain.net/core/transaction/http.go | 292 +++++------------- code/go/0chain.net/core/transaction/vars.go | 29 ++ config/0chain_blobber.yaml | 4 + go.mod | 3 + go.sum | 4 +- 8 files changed, 144 insertions(+), 215 deletions(-) create mode 100644 code/go/0chain.net/core/transaction/vars.go diff --git a/.gitignore b/.gitignore index 27beede31..6c3267d34 100644 --- a/.gitignore +++ b/.gitignore @@ -32,3 +32,5 @@ docker.local/blobber1/files/e55/003/7d3/f56519dff06156f16a19a40389414f94cfd4c3a5 docker.local/blobber1/files/e55/003/7d3/f56519dff06156f16a19a40389414f94cfd4c3a548f46c218cb484e/refs/123/6e9/a3b/6c8bbc889679c68af4372b7d68e2f2343c03ae7aaa4b25fbb8257c3 docker.local/blobber1/files/e55/003/7d3/f56519dff06156f16a19a40389414f94cfd4c3a548f46c218cb484e/refs/388/12d/bd3/b084ff297706d7bd307996c3363c2df53acc49c97b83201f8dc7bd9 docker.local/blobber1/data/badgerdb/blobberstate/MANIFEST + +dev.local/data \ No newline at end of file diff --git a/code/go/0chain.net/blobber/main.go b/code/go/0chain.net/blobber/main.go index 071d0229b..e89aebf01 100644 --- a/code/go/0chain.net/blobber/main.go +++ b/code/go/0chain.net/blobber/main.go @@ -30,6 +30,7 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/core/logging" . "github.com/0chain/blobber/code/go/0chain.net/core/logging" "github.com/0chain/blobber/code/go/0chain.net/core/node" + "github.com/0chain/blobber/code/go/0chain.net/core/transaction" "github.com/0chain/gosdk/zcncore" "github.com/gorilla/handlers" @@ -139,6 +140,21 @@ func setupWorkerConfig() { config.Configuration.MaxStake = int64(viper.GetFloat64("max_stake") * 1e10) config.Configuration.NumDelegates = viper.GetInt("num_delegates") config.Configuration.ServiceCharge = viper.GetFloat64("service_charge") + + config.Configuration.MinSubmit = viper.GetInt("min_submit") + if config.Configuration.MinSubmit < 1 { + config.Configuration.MinSubmit = 50 + } else if config.Configuration.MinSubmit > 100 { + config.Configuration.MinSubmit = 100 + } + config.Configuration.MinConfirmation = viper.GetInt("min_confirmation") + if config.Configuration.MinConfirmation < 1 { + config.Configuration.MinConfirmation = 50 + } else if config.Configuration.MinConfirmation > 100 { + config.Configuration.MinConfirmation = 100 + } + + transaction.MinConfirmation = config.Configuration.MinConfirmation } func setupMinioConfig(reader io.Reader) error { diff --git a/code/go/0chain.net/blobbercore/config/config.go b/code/go/0chain.net/blobbercore/config/config.go index f3aae7093..332e65110 100644 --- a/code/go/0chain.net/blobbercore/config/config.go +++ b/code/go/0chain.net/blobbercore/config/config.go @@ -64,7 +64,7 @@ const ( ) type GeolocationConfig struct { - Latitude float64 `mapstructure:"latitude"` + Latitude float64 `mapstructure:"latitude"` Longitude float64 `mapstructure:"longitude"` } @@ -125,6 +125,11 @@ type Config struct { ServiceCharge float64 `json:"service_charge"` Geolocation GeolocationConfig `mapstructure:"geolocation"` + + // MinSubmit minial submit from miners + MinSubmit int + // MinConfirmation minial confirmation from sharders + MinConfirmation int } /*Configuration of the system */ @@ -145,7 +150,7 @@ func Geolocation() GeolocationConfig { g := Configuration.Geolocation if g.Latitude > 90.00 || g.Latitude < -90.00 || g.Longitude > 180.00 || g.Longitude < -180.00 { - panic("Fatal error in config file") + panic("Fatal error in config file") } return g diff --git a/code/go/0chain.net/core/transaction/http.go b/code/go/0chain.net/core/transaction/http.go index 8c8bd925b..5f355e90a 100644 --- a/code/go/0chain.net/core/transaction/http.go +++ b/code/go/0chain.net/core/transaction/http.go @@ -1,10 +1,10 @@ package transaction import ( + "context" "crypto/sha1" "encoding/hex" - //"encoding/json" "fmt" "io" "io/ioutil" @@ -12,17 +12,13 @@ import ( "net/http" "net/url" - //"sync" - "time" - "github.com/0chain/blobber/code/go/0chain.net/core/chain" "github.com/0chain/blobber/code/go/0chain.net/core/common" . "github.com/0chain/blobber/code/go/0chain.net/core/logging" - "github.com/0chain/gosdk/core/util" + "github.com/0chain/errors" + "github.com/0chain/gosdk/core/resty" "github.com/0chain/gosdk/zcncore" - //"github.com/0chain/blobber/code/go/0chain.net/core/util" - "go.uber.org/zap" ) @@ -33,68 +29,12 @@ const REGISTER_CLIENT = "v1/client/put" const ( SLEEP_FOR_TXN_CONFIRMATION = 5 - SC_REST_API_ATTEMPTS = 3 ) var ErrNoTxnDetail = common.NewError("missing_transaction_detail", "No transaction detail was found on any of the sharders") -// func SendTransaction(txn *Transaction, chain *chain.Chain) { -// // Get miners -// miners := chain.Miners.GetRandomNodes(chain.Miners.Size()) -// for _, miner := range miners { -// url := fmt.Sprintf("%v/%v", miner.GetURLBase(), TXN_SUBMIT_URL) -// go sendTransactionToURL(url, txn, nil) -// } -// } - type SCRestAPIHandler func(response map[string][]byte, numSharders int, err error) -// func SendTransactionSync(txn *Transaction, chain *chain.Chain) { -// wg := sync.WaitGroup{} -// wg.Add(chain.Miners.Size()) -// // Get miners -// miners := chain.Miners.GetRandomNodes(chain.Miners.Size()) -// for _, miner := range miners { -// url := fmt.Sprintf("%v/%v", miner.GetURLBase(), TXN_SUBMIT_URL) -// go sendTransactionToURL(url, txn, &wg) -// } -// wg.Wait() -// } - -// func SendPostRequestSync(relativeURL string, data []byte, chain *chain.Chain) { -// wg := sync.WaitGroup{} -// wg.Add(chain.Miners.Size()) -// // Get miners -// miners := chain.Miners.GetRandomNodes(chain.Miners.Size()) -// for _, miner := range miners { -// url := fmt.Sprintf("%v/%v", miner.GetURLBase(), relativeURL) -// go util.SendPostRequest(url, data, &wg) -// } -// wg.Wait() -// } - -// func SendPostRequestAsync(relativeURL string, data []byte, chain *chain.Chain) { -// // Get miners -// miners := chain.Miners.GetRandomNodes(chain.Miners.Size()) -// for _, miner := range miners { -// url := fmt.Sprintf("%v/%v", miner.GetURLBase(), relativeURL) -// go util.SendPostRequest(url, data, nil) -// } -// } - -// func sendTransactionToURL(url string, txn *Transaction, wg *sync.WaitGroup) ([]byte, error) { -// if wg != nil { -// defer wg.Done() -// } -// jsObj, err := json.Marshal(txn) -// if err != nil { -// Logger.Error("Error in serializing the transaction", zap.String("error", err.Error()), zap.Any("transaction", txn)) -// return nil, err -// } - -// return util.SendPostRequest(url, jsObj, nil) -// } - func VerifyTransaction(txnHash string, chain *chain.Chain) (*Transaction, error) { txn, err := NewTransactionEntity() if err != nil { @@ -107,177 +47,107 @@ func VerifyTransaction(txnHash string, chain *chain.Chain) (*Transaction, error) return nil, err } return txn, nil - - // numSharders := chain.Sharders.Size() - // numSuccess := 0 - // var retTxn *Transaction - // // Get sharders - // sharders := chain.Sharders.GetRandomNodes(numSharders) - // for _, sharder := range sharders { - // url := fmt.Sprintf("%v/%v%v", sharder.GetURLBase(), TXN_VERIFY_URL, txnHash) - // var netTransport = &http.Transport{ - // Dial: (&net.Dialer{ - // Timeout: 5 * time.Second, - // }).Dial, - // TLSHandshakeTimeout: 5 * time.Second, - // } - // var netClient = &http.Client{ - // Timeout: time.Second * 10, - // Transport: netTransport, - // } - // resp, err := netClient.Get(url) - // if err != nil { - // Logger.Error("Error getting transaction confirmation", zap.Any("error", err)) - // numSharders-- - // } else { - // if resp.StatusCode != 200 { - // continue - // } - // defer resp.Body.Close() - // contents, err := ioutil.ReadAll(resp.Body) - // if err != nil { - // Logger.Error("Error reading response from transaction confirmation", zap.Any("error", err)) - // continue - // } - // var objmap map[string]json.RawMessage - // err = json.Unmarshal(contents, &objmap) - // if err != nil { - // Logger.Error("Error unmarshalling response", zap.Any("error", err)) - // continue - // } - // if _, ok := objmap["txn"]; !ok { - // Logger.Info("Not transaction information. Only block summary.", zap.Any("sharder", url), zap.Any("output", string(contents))) - // if _, ok := objmap["block_hash"]; ok { - // numSuccess++ - // continue - // } - // Logger.Info("Sharder does not have the block summary", zap.Any("sharder", url), zap.Any("output", string(contents))) - // continue - // } - // txn := &Transaction{} - // err = json.Unmarshal(objmap["txn"], txn) - // if err != nil { - // Logger.Error("Error unmarshalling to get transaction response", zap.Any("error", err)) - // } - // if len(txn.Signature) > 0 { - // retTxn = txn - // } - - // numSuccess++ - // } - // } - // if numSharders == 0 || float64(numSuccess*1.0/numSharders) > float64(0.5) { - // if retTxn != nil { - // return retTxn, nil - // } - // return nil, ErrNoTxnDetail - // } - // return nil, common.NewError("transaction_not_found", "Transaction was not found on any of the sharders") } -func MakeSCRestAPICall(scAddress string, relativePath string, params map[string]string, chain *chain.Chain, handler SCRestAPIHandler) ([]byte, error) { +// MakeSCRestAPICall execute api reqeust from sharders, and parse and return result +func MakeSCRestAPICall(scAddress string, relativePath string, params map[string]string, chain *chain.Chain) ([]byte, error) { var resMaxCounterBody []byte - resBodies := make(map[string][]byte) var hashMaxCounter int hashCounters := make(map[string]int) network := zcncore.GetNetwork() numSharders := len(network.Sharders) - sharders := util.GetRandom(network.Sharders, numSharders) - - for _, sharder := range sharders { - // Make one or more requests (in case of unavailability, see 503/504 errors) - var err error - var resp *http.Response - var counter int = SC_REST_API_ATTEMPTS - - netTransport := &http.Transport{ - Dial: (&net.Dialer{ - Timeout: 5 * time.Second, - }).Dial, - TLSHandshakeTimeout: 5 * time.Second, + + if numSharders == 0 { + return nil, ErrNoAvailableSharder + } + + transport := &http.Transport{ + Dial: (&net.Dialer{ + Timeout: DefaultDialTimeout, + }).Dial, + TLSHandshakeTimeout: DefaultDialTimeout, + } + + r := resty.New(transport, func(req *http.Request, resp *http.Response, cancelFunc context.CancelFunc, err error) error { + + if err != nil { + return errors.Throw(ErrBadRequest, err.Error()) } - netClient := &http.Client{ - Timeout: 10 * time.Second, - Transport: netTransport, + if resp.StatusCode != http.StatusOK { + resBody, _ := ioutil.ReadAll(resp.Body) + resp.Body.Close() + + Logger.Error("[sharder]"+resp.Status, zap.String("url", req.URL.String()), zap.String("response", string(resBody))) + + return errors.Throw(ErrBadRequest, req.URL.String()+" "+resp.Status) + } - uString := fmt.Sprintf("%v/%v%v%v", sharder, SC_REST_API_URL, scAddress, relativePath) - u, _ := url.Parse(uString) - q := u.Query() - for k, v := range params { - q.Add(k, v) + hash := sha1.New() + teeReader := io.TeeReader(resp.Body, hash) + resBody, err := ioutil.ReadAll(teeReader) + resp.Body.Close() + + if err != nil { + Logger.Error("[sharder]"+resp.Status, zap.String("url", req.URL.String()), zap.String("response", string(resBody))) + + return errors.Throw(ErrBadRequest, req.URL.String()+" "+err.Error()) + } - u.RawQuery = q.Encode() - - for counter > 0 { - resp, err = netClient.Get(u.String()) - if err != nil { - break - } - - // if it's not available, retry if there are any retry attempts - if resp.StatusCode == 503 || resp.StatusCode == 504 { - resp.Body.Close() - counter-- - } else { - break - } + + hashString := hex.EncodeToString(hash.Sum(nil)) + hashCounters[hashString]++ + + if hashCounters[hashString] > hashMaxCounter { + hashMaxCounter = hashCounters[hashString] + resMaxCounterBody = resBody } - if err != nil { - Logger.Error("Error getting response for sc rest api", zap.Any("error", err), zap.Any("sharder_url", sharder)) - numSharders-- - } else { - if resp.StatusCode != 200 { - resBody, _ := ioutil.ReadAll(resp.Body) - Logger.Error("Got error response from sc rest api", zap.Any("response", string(resBody))) - resp.Body.Close() - continue - } - - defer resp.Body.Close() // TODO: is it really needed here? or put it above and drop other "Body.Close"s - - hash := sha1.New() - teeReader := io.TeeReader(resp.Body, hash) - resBody, err := ioutil.ReadAll(teeReader) - - if err != nil { - Logger.Error("Error reading response", zap.Any("error", err)) - resp.Body.Close() - continue - } - - hashString := hex.EncodeToString(hash.Sum(nil)) - hashCounters[hashString]++ - - if hashCounters[hashString] > hashMaxCounter { - hashMaxCounter = hashCounters[hashString] - resMaxCounterBody = resBody - } - - resBodies[sharder] = resMaxCounterBody // TODO: check it! looks suspicious. assigned value is not set for some interations. maybe should be = resBody? - resp.Body.Close() + consensus := int(float64(hashMaxCounter) / float64(numSharders) * 100) + + // It is confirmed, and cancel other requests for performance + if consensus > 0 && consensus >= MinConfirmation { + cancelFunc() + return nil } - } - var err error + return nil + }, + resty.WithTimeout(DefaultRequestTimeout), + resty.WithRetry(DefaultRetry)) + + urls := make([]string, 0, len(network.Sharders)) - // is it less than or equal to 50% - if hashMaxCounter <= (numSharders / 2) { - err = common.NewError("invalid_response", "Sharder responses were invalid. Hash mismatch") + q := url.Values{} + for k, v := range params { + q.Add(k, v) } - if handler != nil { - handler(resBodies, numSharders, err) + for _, sharder := range network.Sharders { + + u := fmt.Sprintf("%v/%v%v%v", sharder, SC_REST_API_URL, scAddress, relativePath) + + urls = append(urls, u+"?"+q.Encode()) } - // is it more than 50% - if hashMaxCounter > (numSharders / 2) { - return resMaxCounterBody, nil + r.DoGet(context.Background(), urls...) + + errs := r.Wait() + + consensus := int(float64(hashMaxCounter) / float64(numSharders) * 100) + + if consensus < MinConfirmation { + msgList := make([]string, 0, len(errs)) + + for _, msg := range errs { + msgList = append(msgList, msg) + } + return errors.Throw(ErrTooLessConfirmation, msgList...) } - return nil, err + return resMaxCounterBody, nil + } diff --git a/code/go/0chain.net/core/transaction/vars.go b/code/go/0chain.net/core/transaction/vars.go new file mode 100644 index 000000000..57725bf8b --- /dev/null +++ b/code/go/0chain.net/core/transaction/vars.go @@ -0,0 +1,29 @@ +package transaction + +import ( + "errors" + "time" +) + +var ( + // DefaultDialTimeout default timeout of a dialer + DefaultDialTimeout = 5 * time.Second + // DefaultRequestTimeout default time out of a http request + DefaultRequestTimeout = 10 * time.Second + // DefaultRetry retry times if a request is failed with 5xx status code + DefaultRetry = 3 + + // MinConfirmation minial confirmation from sharders + MinConfirmation = 50 +) + +var ( + // ErrBadRequest bad request from sharder + ErrBadRequest = errors.New("[sharder]bad request") + + // ErrNoAvailableSharder no any available sharder + ErrNoAvailableSharder = errors.New("[txn] there is no any available sharder") + + // ErrTooLessConfirmation too less sharder to confirm transaction + ErrTooLessConfirmation = errors.New("[txn] too less sharders to confirm it") +) diff --git a/config/0chain_blobber.yaml b/config/0chain_blobber.yaml index 70a5fbc51..155ca445f 100755 --- a/config/0chain_blobber.yaml +++ b/config/0chain_blobber.yaml @@ -51,6 +51,10 @@ max_stake: 100.0 num_delegates: 50 # service charge of the blobber service_charge: 0.30 +# min submit from miners +min_submit: 50 +# min confirmation from sharder +min_confirmation: 50 block_worker: http://198.18.0.98:9091 diff --git a/go.mod b/go.mod index 3e2e79b10..feb12f76e 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,7 @@ module github.com/0chain/blobber require ( + github.com/0chain/errors v1.0.1 github.com/0chain/gosdk v1.2.81 github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f // indirect @@ -38,3 +39,5 @@ require ( ) go 1.13 + +replace github.com/0chain/gosdk => ../gosdk diff --git a/go.sum b/go.sum index 93c239bd3..5d82e51fc 100644 --- a/go.sum +++ b/go.sum @@ -36,8 +36,8 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= collectd.org v0.3.0/go.mod h1:A/8DzQBkF6abtvrT2j/AU/4tiBgJWYyh0y/oB/4MlWE= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= -github.com/0chain/gosdk v1.2.81 h1:whsNQ5M4NGHHV+wBPlTxNp3hNxxzkWkB1p89g7IVWjM= -github.com/0chain/gosdk v1.2.81/go.mod h1:Q+jKgS1tNJ9te1U2g91XB5sQ+sh/v2v8lRIzvgFXNUQ= +github.com/0chain/errors v1.0.1 h1:e1jyKmYtF5jQdFOeEAjdfyLe5D46HrNO46Z97tWvEh8= +github.com/0chain/errors v1.0.1/go.mod h1:5t76jLb56TKfg/K2VD+eUMmNZJ42QsIRI8KzWuztwU4= github.com/Azure/azure-pipeline-go v0.2.1/go.mod h1:UGSo8XybXnIGZ3epmeBw7Jdz+HiUVpqIlpz/HKHylF4= github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuIlp9AfUH5G1tvCHc= github.com/Azure/azure-storage-blob-go v0.7.0/go.mod h1:f9YQKtsG1nMisotuTPpO0tjNuEjKRYAcJU8/ydDI++4= From 80c270d47e59f75621471c81dd6d69be24968157 Mon Sep 17 00:00:00 2001 From: Lz Date: Sat, 14 Aug 2021 15:40:23 +0800 Subject: [PATCH 2/7] fix(transaction):#303 removed SCRestAPIHandler from MakeSCRestAPICall --- code/go/0chain.net/blobbercore/allocation/protocol.go | 4 ++-- code/go/0chain.net/blobbercore/allocation/workers.go | 3 +-- code/go/0chain.net/blobbercore/challenge/worker.go | 2 +- code/go/0chain.net/blobbercore/readmarker/worker.go | 2 +- code/go/0chain.net/core/transaction/http.go | 4 ++-- code/go/0chain.net/validatorcore/storage/protocol.go | 2 +- 6 files changed, 8 insertions(+), 9 deletions(-) diff --git a/code/go/0chain.net/blobbercore/allocation/protocol.go b/code/go/0chain.net/blobbercore/allocation/protocol.go index a256435c4..5bd71ca5e 100644 --- a/code/go/0chain.net/blobbercore/allocation/protocol.go +++ b/code/go/0chain.net/blobbercore/allocation/protocol.go @@ -196,7 +196,7 @@ func RequestReadPools(clientID, allocationID string) ( "allocation_id": allocationID, "blobber_id": blobberID, }, - chain.GetServerChain(), nil) + chain.GetServerChain()) if err != nil { return nil, fmt.Errorf("requesting read pools stat: %v", err) } @@ -245,7 +245,7 @@ func RequestWritePools(clientID, allocationID string) ( "allocation_id": allocationID, "blobber_id": blobberID, }, - chain.GetServerChain(), nil) + chain.GetServerChain()) if err != nil { return nil, fmt.Errorf("requesting write pools stat: %v", err) } diff --git a/code/go/0chain.net/blobbercore/allocation/workers.go b/code/go/0chain.net/blobbercore/allocation/workers.go index 2d4c91cf8..e82dcadb6 100644 --- a/code/go/0chain.net/blobbercore/allocation/workers.go +++ b/code/go/0chain.net/blobbercore/allocation/workers.go @@ -178,8 +178,7 @@ func requestAllocation(allocID string) ( transaction.STORAGE_CONTRACT_ADDRESS, "/allocation", map[string]string{"allocation": allocID}, - chain.GetServerChain(), - nil) + chain.GetServerChain()) if err != nil { return } diff --git a/code/go/0chain.net/blobbercore/challenge/worker.go b/code/go/0chain.net/blobbercore/challenge/worker.go index 339730644..d33f7a66b 100644 --- a/code/go/0chain.net/blobbercore/challenge/worker.go +++ b/code/go/0chain.net/blobbercore/challenge/worker.go @@ -201,7 +201,7 @@ func FindChallenges(ctx context.Context) { var blobberChallenges BCChallengeResponse blobberChallenges.Challenges = make([]*ChallengeEntity, 0) - retBytes, err := transaction.MakeSCRestAPICall(transaction.STORAGE_CONTRACT_ADDRESS, "/openchallenges", params, chain.GetServerChain(), nil) + retBytes, err := transaction.MakeSCRestAPICall(transaction.STORAGE_CONTRACT_ADDRESS, "/openchallenges", params, chain.GetServerChain()) if err != nil { Logger.Error("Error getting the open challenges from the blockchain", zap.Error(err)) diff --git a/code/go/0chain.net/blobbercore/readmarker/worker.go b/code/go/0chain.net/blobbercore/readmarker/worker.go index ffabc4764..c6a5fb3fc 100644 --- a/code/go/0chain.net/blobbercore/readmarker/worker.go +++ b/code/go/0chain.net/blobbercore/readmarker/worker.go @@ -35,7 +35,7 @@ func RedeemReadMarker(ctx context.Context, rmEntity *ReadMarkerEntity) ( latestRMBytes, err = transaction.MakeSCRestAPICall( transaction.STORAGE_CONTRACT_ADDRESS, "/latestreadmarker", params, - chain.GetServerChain(), nil) + chain.GetServerChain()) if err != nil { Logger.Error("Error from sc rest api call", zap.Error(err)) diff --git a/code/go/0chain.net/core/transaction/http.go b/code/go/0chain.net/core/transaction/http.go index 5f355e90a..19710049a 100644 --- a/code/go/0chain.net/core/transaction/http.go +++ b/code/go/0chain.net/core/transaction/http.go @@ -143,9 +143,9 @@ func MakeSCRestAPICall(scAddress string, relativePath string, params map[string] msgList := make([]string, 0, len(errs)) for _, msg := range errs { - msgList = append(msgList, msg) + msgList = append(msgList, msg.Error()) } - return errors.Throw(ErrTooLessConfirmation, msgList...) + return nil, errors.Throw(ErrTooLessConfirmation, msgList...) } return resMaxCounterBody, nil diff --git a/code/go/0chain.net/validatorcore/storage/protocol.go b/code/go/0chain.net/validatorcore/storage/protocol.go index f9175988b..c58319887 100644 --- a/code/go/0chain.net/validatorcore/storage/protocol.go +++ b/code/go/0chain.net/validatorcore/storage/protocol.go @@ -91,7 +91,7 @@ func (sp *ValidatorProtocolImpl) VerifyChallengeTransaction(ctx context.Context, params := make(map[string]string) params["blobber"] = blobberID params["challenge"] = challengeRequest.ChallengeID - challengeBytes, err := transaction.MakeSCRestAPICall(transaction.STORAGE_CONTRACT_ADDRESS, "/getchallenge", params, chain.GetServerChain(), nil) + challengeBytes, err := transaction.MakeSCRestAPICall(transaction.STORAGE_CONTRACT_ADDRESS, "/getchallenge", params, chain.GetServerChain()) if err != nil { return nil, common.NewError("invalid_challenge", "Invalid challenge id. Challenge not found in blockchain. "+err.Error()) From 3dda34dde4afd8f29ebb6d0e021a8aef2f563719 Mon Sep 17 00:00:00 2001 From: Lz Date: Mon, 16 Aug 2021 16:27:14 +0800 Subject: [PATCH 3/7] fix(transaction):#303 moved default http values to resty --- code/go/0chain.net/core/transaction/http.go | 8 ++++---- code/go/0chain.net/core/transaction/vars.go | 7 ------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/code/go/0chain.net/core/transaction/http.go b/code/go/0chain.net/core/transaction/http.go index 19710049a..a72303c80 100644 --- a/code/go/0chain.net/core/transaction/http.go +++ b/code/go/0chain.net/core/transaction/http.go @@ -65,9 +65,9 @@ func MakeSCRestAPICall(scAddress string, relativePath string, params map[string] transport := &http.Transport{ Dial: (&net.Dialer{ - Timeout: DefaultDialTimeout, + Timeout: resty.DefaultDialTimeout, }).Dial, - TLSHandshakeTimeout: DefaultDialTimeout, + TLSHandshakeTimeout: resty.DefaultDialTimeout, } r := resty.New(transport, func(req *http.Request, resp *http.Response, cancelFunc context.CancelFunc, err error) error { @@ -116,8 +116,8 @@ func MakeSCRestAPICall(scAddress string, relativePath string, params map[string] return nil }, - resty.WithTimeout(DefaultRequestTimeout), - resty.WithRetry(DefaultRetry)) + resty.WithTimeout(resty.DefaultRequestTimeout), + resty.WithRetry(resty.DefaultRetry)) urls := make([]string, 0, len(network.Sharders)) diff --git a/code/go/0chain.net/core/transaction/vars.go b/code/go/0chain.net/core/transaction/vars.go index 57725bf8b..e33c80a9a 100644 --- a/code/go/0chain.net/core/transaction/vars.go +++ b/code/go/0chain.net/core/transaction/vars.go @@ -2,16 +2,9 @@ package transaction import ( "errors" - "time" ) var ( - // DefaultDialTimeout default timeout of a dialer - DefaultDialTimeout = 5 * time.Second - // DefaultRequestTimeout default time out of a http request - DefaultRequestTimeout = 10 * time.Second - // DefaultRetry retry times if a request is failed with 5xx status code - DefaultRetry = 3 // MinConfirmation minial confirmation from sharders MinConfirmation = 50 From 4e68953c1c7813f5de434f6591fe1d04d1121f2a Mon Sep 17 00:00:00 2001 From: Lz Date: Mon, 16 Aug 2021 21:45:38 +0800 Subject: [PATCH 4/7] fix(conf):#303 tidy gomod --- go.mod | 6 +++--- go.sum | 6 ++++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index feb12f76e..ec089afe5 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,8 @@ module github.com/0chain/blobber require ( - github.com/0chain/errors v1.0.1 - github.com/0chain/gosdk v1.2.81 + github.com/0chain/errors v1.0.2 + github.com/0chain/gosdk v1.2.82 github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f // indirect github.com/didip/tollbooth v4.0.2+incompatible @@ -40,4 +40,4 @@ require ( go 1.13 -replace github.com/0chain/gosdk => ../gosdk +//replace github.com/0chain/gosdk => ../gosdk diff --git a/go.sum b/go.sum index 5d82e51fc..5ba678e3d 100644 --- a/go.sum +++ b/go.sum @@ -36,8 +36,10 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= collectd.org v0.3.0/go.mod h1:A/8DzQBkF6abtvrT2j/AU/4tiBgJWYyh0y/oB/4MlWE= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= -github.com/0chain/errors v1.0.1 h1:e1jyKmYtF5jQdFOeEAjdfyLe5D46HrNO46Z97tWvEh8= -github.com/0chain/errors v1.0.1/go.mod h1:5t76jLb56TKfg/K2VD+eUMmNZJ42QsIRI8KzWuztwU4= +github.com/0chain/errors v1.0.2 h1:IIUMeh/qFlqDcyHesjU92CpRMVz9dIQWAtZooqrYinA= +github.com/0chain/errors v1.0.2/go.mod h1:5t76jLb56TKfg/K2VD+eUMmNZJ42QsIRI8KzWuztwU4= +github.com/0chain/gosdk v1.2.82 h1:qm0BkPAaiZqKbrgCPf9csaZqTaBkPTrqy9Ehu3Q2D4M= +github.com/0chain/gosdk v1.2.82/go.mod h1:Bl/wsHdlktgXybdzkHPfaf6ATe7mPmew/xF3ki2gskQ= github.com/Azure/azure-pipeline-go v0.2.1/go.mod h1:UGSo8XybXnIGZ3epmeBw7Jdz+HiUVpqIlpz/HKHylF4= github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuIlp9AfUH5G1tvCHc= github.com/Azure/azure-storage-blob-go v0.7.0/go.mod h1:f9YQKtsG1nMisotuTPpO0tjNuEjKRYAcJU8/ydDI++4= From cf894e78241519e9c2969e905295508cec813152 Mon Sep 17 00:00:00 2001 From: Lz Date: Mon, 16 Aug 2021 22:01:07 +0800 Subject: [PATCH 5/7] fix(tansaction):#303 updated random logic --- code/go/0chain.net/core/transaction/http.go | 80 ++++++++++++++++----- 1 file changed, 61 insertions(+), 19 deletions(-) diff --git a/code/go/0chain.net/core/transaction/http.go b/code/go/0chain.net/core/transaction/http.go index a72303c80..0f2c05a7c 100644 --- a/code/go/0chain.net/core/transaction/http.go +++ b/code/go/0chain.net/core/transaction/http.go @@ -4,6 +4,8 @@ import ( "context" "crypto/sha1" "encoding/hex" + "math" + "strconv" "fmt" "io" @@ -17,6 +19,7 @@ import ( . "github.com/0chain/blobber/code/go/0chain.net/core/logging" "github.com/0chain/errors" "github.com/0chain/gosdk/core/resty" + "github.com/0chain/gosdk/core/util" "github.com/0chain/gosdk/zcncore" "go.uber.org/zap" @@ -63,6 +66,33 @@ func MakeSCRestAPICall(scAddress string, relativePath string, params map[string] return nil, ErrNoAvailableSharder } + minNumConfirmation := int(math.Ceil(float64(MinConfirmation*numSharders) / 100)) + + rand := util.NewRand(numSharders) + + selectedSharders := make([]string, 0, minNumConfirmation+1) + + // random pick minNumConfirmation+1 first + for i := 0; i <= minNumConfirmation; i++ { + n, err := rand.Next() + + if err != nil { + break + } + + selectedSharders = append(selectedSharders, network.Sharders[n]) + } + + numSuccess := 0 + + header := map[string]string{ + "Content-Type": "application/json; charset=utf-8", + "Access-Control-Allow-Origin": "*", + } + + //leave first item for ErrTooLessConfirmation + var msgList = make([]string, 1, numSharders) + transport := &http.Transport{ Dial: (&net.Dialer{ Timeout: resty.DefaultDialTimeout, @@ -72,16 +102,21 @@ func MakeSCRestAPICall(scAddress string, relativePath string, params map[string] r := resty.New(transport, func(req *http.Request, resp *http.Response, cancelFunc context.CancelFunc, err error) error { - if err != nil { - return errors.Throw(ErrBadRequest, err.Error()) + if err != nil { //network issue + msgList = append(msgList, err.Error()) + return err } + url := req.URL.String() + if resp.StatusCode != http.StatusOK { resBody, _ := ioutil.ReadAll(resp.Body) resp.Body.Close() Logger.Error("[sharder]"+resp.Status, zap.String("url", req.URL.String()), zap.String("response", string(resBody))) + msgList = append(msgList, url+": ["+strconv.Itoa(resp.StatusCode)+"] "+string(resBody)) + return errors.Throw(ErrBadRequest, req.URL.String()+" "+resp.Status) } @@ -93,7 +128,7 @@ func MakeSCRestAPICall(scAddress string, relativePath string, params map[string] if err != nil { Logger.Error("[sharder]"+resp.Status, zap.String("url", req.URL.String()), zap.String("response", string(resBody))) - + msgList = append(msgList, url+": "+err.Error()) return errors.Throw(ErrBadRequest, req.URL.String()+" "+err.Error()) } @@ -106,18 +141,11 @@ func MakeSCRestAPICall(scAddress string, relativePath string, params map[string] resMaxCounterBody = resBody } - consensus := int(float64(hashMaxCounter) / float64(numSharders) * 100) - - // It is confirmed, and cancel other requests for performance - if consensus > 0 && consensus >= MinConfirmation { - cancelFunc() - return nil - } - return nil }, resty.WithTimeout(resty.DefaultRequestTimeout), - resty.WithRetry(resty.DefaultRetry)) + resty.WithRetry(resty.DefaultRetry), + resty.WithHeader(header)) urls := make([]string, 0, len(network.Sharders)) @@ -133,18 +161,32 @@ func MakeSCRestAPICall(scAddress string, relativePath string, params map[string] urls = append(urls, u+"?"+q.Encode()) } - r.DoGet(context.Background(), urls...) + for { + r.DoGet(context.TODO(), urls...) + + r.Wait() - errs := r.Wait() + if numSuccess >= minNumConfirmation { + break + } - consensus := int(float64(hashMaxCounter) / float64(numSharders) * 100) + // pick more one sharder to query transaction + n, err := rand.Next() - if consensus < MinConfirmation { - msgList := make([]string, 0, len(errs)) + if errors.Is(err, util.ErrNoItem) { + break + } - for _, msg := range errs { - msgList = append(msgList, msg.Error()) + urls = []string{ + fmt.Sprintf("%v/%v%v%v", network.Sharders[n], SC_REST_API_URL, scAddress, relativePath) + "?" + q.Encode(), } + + } + + if numSuccess < minNumConfirmation { + + msgList[0] = fmt.Sprintf("min_confirmation is %v%%, but got %v/%v sharders", MinConfirmation, numSuccess, numSharders) + return nil, errors.Throw(ErrTooLessConfirmation, msgList...) } From fd69b030334965dc893d0efc9c7b2ef70ec996f4 Mon Sep 17 00:00:00 2001 From: Lz Date: Mon, 16 Aug 2021 22:54:50 +0800 Subject: [PATCH 6/7] fix(tansaction):#306 improved blobber register performance --- .../blobbercore/handler/protocol.go | 4 +--- code/go/0chain.net/core/transaction/http.go | 20 +++++++++---------- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/code/go/0chain.net/blobbercore/handler/protocol.go b/code/go/0chain.net/blobbercore/handler/protocol.go index c3bfad812..6c9a38f44 100644 --- a/code/go/0chain.net/blobbercore/handler/protocol.go +++ b/code/go/0chain.net/blobbercore/handler/protocol.go @@ -94,7 +94,6 @@ func getStorageNode() (*transaction.StorageNode, error) { // Add or update blobber on blockchain func BlobberAdd(ctx context.Context) (string, error) { - time.Sleep(transaction.SLEEP_FOR_TXN_CONFIRMATION * time.Second) // initialize storage node (ie blobber) txn, err := transaction.NewTransactionEntity() @@ -153,7 +152,6 @@ func BlobberHealthCheck(ctx context.Context) (string, error) { } func TransactionVerify(txnHash string) (t *transaction.Transaction, err error) { - time.Sleep(transaction.SLEEP_FOR_TXN_CONFIRMATION * time.Second) for i := 0; i < util.MAX_RETRIES; i++ { time.Sleep(transaction.SLEEP_FOR_TXN_CONFIRMATION * time.Second) @@ -162,7 +160,7 @@ func TransactionVerify(txnHash string) (t *transaction.Transaction, err error) { } } - return + return nil, errors.New("[txn]max retries exceeded with " + txnHash) } func WalletRegister() error { diff --git a/code/go/0chain.net/core/transaction/http.go b/code/go/0chain.net/core/transaction/http.go index 0f2c05a7c..eb740fd0f 100644 --- a/code/go/0chain.net/core/transaction/http.go +++ b/code/go/0chain.net/core/transaction/http.go @@ -83,7 +83,12 @@ func MakeSCRestAPICall(scAddress string, relativePath string, params map[string] selectedSharders = append(selectedSharders, network.Sharders[n]) } - numSuccess := 0 + transport := &http.Transport{ + Dial: (&net.Dialer{ + Timeout: resty.DefaultDialTimeout, + }).Dial, + TLSHandshakeTimeout: resty.DefaultDialTimeout, + } header := map[string]string{ "Content-Type": "application/json; charset=utf-8", @@ -93,13 +98,6 @@ func MakeSCRestAPICall(scAddress string, relativePath string, params map[string] //leave first item for ErrTooLessConfirmation var msgList = make([]string, 1, numSharders) - transport := &http.Transport{ - Dial: (&net.Dialer{ - Timeout: resty.DefaultDialTimeout, - }).Dial, - TLSHandshakeTimeout: resty.DefaultDialTimeout, - } - r := resty.New(transport, func(req *http.Request, resp *http.Response, cancelFunc context.CancelFunc, err error) error { if err != nil { //network issue @@ -166,7 +164,7 @@ func MakeSCRestAPICall(scAddress string, relativePath string, params map[string] r.Wait() - if numSuccess >= minNumConfirmation { + if hashMaxCounter >= minNumConfirmation { break } @@ -183,9 +181,9 @@ func MakeSCRestAPICall(scAddress string, relativePath string, params map[string] } - if numSuccess < minNumConfirmation { + if hashMaxCounter < minNumConfirmation { - msgList[0] = fmt.Sprintf("min_confirmation is %v%%, but got %v/%v sharders", MinConfirmation, numSuccess, numSharders) + msgList[0] = fmt.Sprintf("min_confirmation is %v%%, but got %v/%v sharders", MinConfirmation, hashMaxCounter, numSharders) return nil, errors.Throw(ErrTooLessConfirmation, msgList...) } From beeabfee16a69df0952b79e502b45c170b656ad5 Mon Sep 17 00:00:00 2001 From: Lz Date: Mon, 16 Aug 2021 23:31:34 +0800 Subject: [PATCH 7/7] fix(tansaction):#303 fixed golangci-lint --- code/go/0chain.net/core/transaction/http.go | 29 ++++++++++----------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/code/go/0chain.net/core/transaction/http.go b/code/go/0chain.net/core/transaction/http.go index eb740fd0f..c9051bfc0 100644 --- a/code/go/0chain.net/core/transaction/http.go +++ b/code/go/0chain.net/core/transaction/http.go @@ -79,10 +79,23 @@ func MakeSCRestAPICall(scAddress string, relativePath string, params map[string] if err != nil { break } - selectedSharders = append(selectedSharders, network.Sharders[n]) } + urls := make([]string, 0, len(network.Sharders)) + + q := url.Values{} + for k, v := range params { + q.Add(k, v) + } + + for _, sharder := range selectedSharders { + + u := fmt.Sprintf("%v/%v%v%v", sharder, SC_REST_API_URL, scAddress, relativePath) + + urls = append(urls, u+"?"+q.Encode()) + } + transport := &http.Transport{ Dial: (&net.Dialer{ Timeout: resty.DefaultDialTimeout, @@ -145,20 +158,6 @@ func MakeSCRestAPICall(scAddress string, relativePath string, params map[string] resty.WithRetry(resty.DefaultRetry), resty.WithHeader(header)) - urls := make([]string, 0, len(network.Sharders)) - - q := url.Values{} - for k, v := range params { - q.Add(k, v) - } - - for _, sharder := range network.Sharders { - - u := fmt.Sprintf("%v/%v%v%v", sharder, SC_REST_API_URL, scAddress, relativePath) - - urls = append(urls, u+"?"+q.Encode()) - } - for { r.DoGet(context.TODO(), urls...)