diff --git a/cmd/btfs/daemon.go b/cmd/btfs/daemon.go index 91f14ce38..ab770c05a 100644 --- a/cmd/btfs/daemon.go +++ b/cmd/btfs/daemon.go @@ -759,6 +759,7 @@ If the user need to start multiple nodes on the same machine, the configuration spin.Analytics(api, cctx.ConfigRoot, node, version.CurrentVersionNumber, hValue) spin.Hosts(node, env) spin.Contracts(node, req, env, nodepb.ContractStat_HOST.String()) + spin.RestartFixChequeCashOut() } // Give the user some immediate feedback when they hit C-c diff --git a/core/commands/cheque/cheque.go b/core/commands/cheque/cheque.go index b9bf4783c..7e578acc8 100644 --- a/core/commands/cheque/cheque.go +++ b/core/commands/cheque/cheque.go @@ -36,6 +36,20 @@ type ListChequeRet struct { Len int } +type fixCheque struct { + PeerID string + Token string + Beneficiary string + Vault string + TotalCashedAmount *big.Int + FixCashedAmount *big.Int +} + +type ListFixChequeRet struct { + FixCheques []fixCheque + Len int +} + type ReceiveCheque struct { PeerID string Token common.Address @@ -65,11 +79,12 @@ var ChequeCmd = &cmds.Command{ Vault services include issue cheque to peer, receive cheque and store operations.`, }, Subcommands: map[string]*cmds.Command{ - "cash": CashChequeCmd, - "cashstatus": ChequeCashStatusCmd, - "cashlist": ChequeCashListCmd, - "price": StorePriceCmd, - "price-all": StorePriceAllCmd, + "cash": CashChequeCmd, + "cashstatus": ChequeCashStatusCmd, + "cashlist": ChequeCashListCmd, + "price": StorePriceCmd, + "price-all": StorePriceAllCmd, + "fix_cheque_cashout": FixChequeCashOutCmd, "send": SendChequeCmd, "sendlist": ListSendChequesCmd, diff --git a/core/commands/cheque/fix_cheque_cashout.go b/core/commands/cheque/fix_cheque_cashout.go new file mode 100644 index 000000000..f4908454c --- /dev/null +++ b/core/commands/cheque/fix_cheque_cashout.go @@ -0,0 +1,75 @@ +package cheque + +import ( + "fmt" + cmds "github.com/bittorrent/go-btfs-cmds" + "github.com/bittorrent/go-btfs/chain" + "github.com/bittorrent/go-btfs/chain/tokencfg" + "github.com/bittorrent/go-btfs/utils" + "github.com/google/martian/log" + "golang.org/x/net/context" + "io" +) + +var FixChequeCashOutCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "List cheque(s) received from peers.", + }, + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + err := utils.CheckSimpleMode(env) + if err != nil { + return err + } + + listRet := ListFixChequeRet{} + listRet.FixCheques = make([]fixCheque, 0) + + for _, tokenAddr := range tokencfg.MpTokenAddr { + cheques, err := chain.SettleObject.SwapService.LastReceivedCheques(tokenAddr) + if err != nil { + return err + } + + for k, v := range cheques { + totalCashOutAmount, newCashOutAmount, err := chain.SettleObject.CashoutService.AdjustCashCheque( + context.Background(), v.Vault, v.Beneficiary, tokenAddr, false) + if err != nil { + return err + } + if newCashOutAmount != nil && newCashOutAmount.Uint64() > 0 { + var record fixCheque + record.PeerID = k + record.Token = v.Token.String() + record.Beneficiary = v.Beneficiary.String() + record.Vault = v.Vault.String() + record.TotalCashedAmount = totalCashOutAmount + record.FixCashedAmount = newCashOutAmount + + listRet.FixCheques = append(listRet.FixCheques, record) + } + } + } + listRet.Len = len(listRet.FixCheques) + + log.Infof("FixChequeCashOutCmd, listRet = %+v", listRet) + + return cmds.EmitOnce(res, &listRet) + }, + Type: ListFixChequeRet{}, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *ListFixChequeRet) error { + fmt.Fprintf(w, "fix: \n\t%-55s\t%-46s\t%-46s\t%-46s\tfix_cash_amount: \n", "peerID:", "vault:", "beneficiary:", "total_cash_amount:") + for iter := 0; iter < out.Len; iter++ { + fmt.Fprintf(w, "\t%-55s\t%-46s\t%-46s\t%d\t%d \n", + out.FixCheques[iter].PeerID, + out.FixCheques[iter].Vault, + out.FixCheques[iter].Beneficiary, + out.FixCheques[iter].TotalCashedAmount.Uint64(), + out.FixCheques[iter].FixCashedAmount.Uint64(), + ) + } + + return nil + }), + }, +} diff --git a/core/commands/commands_test.go b/core/commands/commands_test.go index 0a56d2d1b..aafe6fc7b 100644 --- a/core/commands/commands_test.go +++ b/core/commands/commands_test.go @@ -342,6 +342,13 @@ func TestCommands(t *testing.T) { "/bittorrent/scrape", "/bittorrent/metainfo", "/bittorrent/bencode", + "/multibase", + "/multibase/encode", + "/multibase/decode", + "/multibase/transcode", + "/multibase/list", + "/backup", + "/recovery", "/accesskey", "/accesskey/generate", "/accesskey/enable", @@ -350,13 +357,7 @@ func TestCommands(t *testing.T) { "/accesskey/delete", "/accesskey/get", "/accesskey/list", - "/multibase", - "/multibase/encode", - "/multibase/decode", - "/multibase/transcode", - "/multibase/list", - "/backup", - "/recovery", + "/cheque/fix_cheque_cashout", } cmdSet := make(map[string]struct{}) diff --git a/go.mod b/go.mod index fbc52759f..26cf9bcde 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( github.com/go-bindata/go-bindata/v3 v3.1.3 github.com/gogo/protobuf v1.3.2 github.com/golang/protobuf v1.5.3 + github.com/google/martian v2.1.0+incompatible github.com/google/uuid v1.3.0 github.com/gorilla/mux v1.7.3 github.com/hashicorp/go-multierror v1.1.1 diff --git a/go.sum b/go.sum index 1229c7e8e..17535d97b 100644 --- a/go.sum +++ b/go.sum @@ -572,6 +572,7 @@ github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/ github.com/google/gopacket v1.1.17/go.mod h1:UdDNZ1OO62aGYVnPhxT1U6aI7ukYtA/kB8vaU0diBUM= github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8= github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo= +github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= diff --git a/settlement/swap/swap_test.go b/settlement/swap/swap_test.go index 74c778f5b..a603ed26f 100644 --- a/settlement/swap/swap_test.go +++ b/settlement/swap/swap_test.go @@ -124,10 +124,12 @@ func (m *addressbookMock) PutVault(peer string, vault common.Address) error { } type cashoutMock struct { - cashCheque func(ctx context.Context, vault, recipient common.Address, token common.Address) (common.Hash, error) - cashoutStatus func(ctx context.Context, vaultAddress common.Address, token common.Address) (*vault.CashoutStatus, error) - cashoutResults func() ([]vault.CashOutResult, error) - hasCashoutAction func(ctx context.Context, peer common.Address, token common.Address) (bool, error) + cashCheque func(ctx context.Context, vault, recipient common.Address, token common.Address) (common.Hash, error) + cashoutStatus func(ctx context.Context, vaultAddress common.Address, token common.Address) (*vault.CashoutStatus, error) + cashoutResults func() ([]vault.CashOutResult, error) + hasCashoutAction func(ctx context.Context, peer common.Address, token common.Address) (bool, error) + adjustCashCheque func(ctx context.Context, vaultAddress, recipient common.Address, token common.Address, restartPassFlag bool) (totalCashOutAmount, newCashOutAmount *big.Int, err error) + restartFixChequeCashOut func() } func (m *cashoutMock) CashCheque(ctx context.Context, vault, recipient common.Address, token common.Address) (common.Hash, error) { @@ -142,6 +144,12 @@ func (m *cashoutMock) CashoutResults() ([]vault.CashOutResult, error) { func (m *cashoutMock) HasCashoutAction(ctx context.Context, peer common.Address, token common.Address) (bool, error) { return m.hasCashoutAction(ctx, peer, token) } +func (m *cashoutMock) AdjustCashCheque(ctx context.Context, vaultAddress, recipient common.Address, token common.Address, restartPassFlag bool) (totalCashOutAmount, newCashOutAmount *big.Int, err error) { + return m.adjustCashCheque(ctx, vaultAddress, recipient, token, restartPassFlag) +} +func (m *cashoutMock) RestartFixChequeCashOut() { + m.restartFixChequeCashOut() +} func TestReceiveCheque(t *testing.T) { store := mockstore.NewStateStore() vaultService := mockvault.NewVault( diff --git a/settlement/swap/vault/cashout.go b/settlement/swap/vault/cashout.go index 89540735d..9d480beb5 100644 --- a/settlement/swap/vault/cashout.go +++ b/settlement/swap/vault/cashout.go @@ -28,8 +28,10 @@ type CashoutService interface { CashCheque(ctx context.Context, vault, recipient common.Address, token common.Address) (common.Hash, error) // CashoutStatus gets the status of the latest cashout transaction for the vault CashoutStatus(ctx context.Context, vaultAddress common.Address, token common.Address) (*CashoutStatus, error) + AdjustCashCheque(ctx context.Context, vaultAddress, recipient common.Address, token common.Address, restartPassFlag bool) (totalCashOutAmount, newCashOutAmount *big.Int, err error) HasCashoutAction(ctx context.Context, peer common.Address, token common.Address) (bool, error) CashoutResults() ([]CashOutResult, error) + RestartFixChequeCashOut() } type cashoutService struct { @@ -178,6 +180,10 @@ func (s *cashoutService) CashoutResults() ([]CashOutResult, error) { // CashCheque sends a cashout transaction for the last cheque of the vault func (s *cashoutService) CashCheque(ctx context.Context, vault, recipient common.Address, token common.Address) (common.Hash, error) { + if RestartFixCashOutStatusLock { + return common.Hash{}, errors.New("Just started, it can not cash cheque, you will wait for about 40s to do it. ") + } + cheque, err := s.chequeStore.LastReceivedCheque(vault, token) if err != nil { return common.Hash{}, err @@ -215,6 +221,19 @@ func (s *cashoutService) CashCheque(ctx context.Context, vault, recipient common return common.Hash{}, err } + // 1.add cash out status + cashOutStateInfo := CashOutStatusStoreInfo{ + Token: token, + Vault: cheque.Vault, + Beneficiary: cheque.Beneficiary, + CumulativePayout: cheque.CumulativePayout, + TxHash: txHash.String(), + } + err = s.AddCashOutStatusStore(cashOutStateInfo) + if err != nil { + return common.Hash{}, err + } + // WaitForReceipt takes long time go func() { defer func() { @@ -223,6 +242,13 @@ func (s *cashoutService) CashCheque(ctx context.Context, vault, recipient common } }() s.storeCashResult(context.Background(), vault, txHash, cheque, token) + + // 2.delete cash out status + err = s.DeleteCashOutStatusStore(cashOutStateInfo) + if err != nil { + log.Errorf("delete cashout status, err = %v", err) + return + } }() return txHash, nil } @@ -304,6 +330,137 @@ func (s *cashoutService) storeCashResult(ctx context.Context, vault common.Addre return nil } +// AdjustCashCheque . +func (s *cashoutService) AdjustCashCheque(ctx context.Context, vaultAddress, recipient common.Address, token common.Address, restartPassFlag bool) (totalCashOutAmount, newCashOutAmount *big.Int, err error) { + if RestartFixCashOutStatusLock { + if !restartPassFlag { + return nil, nil, errors.New("Just started, it can not fix cash out status, you will wait for about 40s to do it. ") + } + } + + // 1.totalReceivedCashed + totalReceivedCashed := big.NewInt(0) + err = s.store.Get(tokencfg.AddToken(statestore.TotalReceivedCashedKey, token), &totalReceivedCashed) + if err != nil && err != storage.ErrNotFound { + return nil, nil, err + } + + // 2.alreadyPaidOut in renter contract + // blockchain calls below + contract := newVaultContractMuti(vaultAddress, s.transactionService) + alreadyPaidOutOnline, err := contract.PaidOut(ctx, recipient, token) + if err != nil { + return nil, nil, err + } + + // 3.compare it to fix. + diff := big.NewInt(0).Sub(alreadyPaidOutOnline, totalReceivedCashed) + log.Infof("AdjustCashCheque: diff > 0, vault=%s, recipient=%s, online=%s, local=%s, diff=%s", + vaultAddress.String(), recipient.String(), + alreadyPaidOutOnline.String(), totalReceivedCashed.String(), diff.String(), + ) + + if diff.Cmp(big.NewInt(0)) > 0 { + cashResult, err := s.fixStoreCashResult(vaultAddress, diff, token) + if err != nil { + return nil, nil, err + } + newCashOutAmount = cashResult.Amount + } + + return alreadyPaidOutOnline, newCashOutAmount, nil +} + +func (s *cashoutService) RestartFixChequeCashOut() { + if RestartFixCashOutStatusLock { + list, err := s.GetAllCashOutStatusStore() + if err != nil { + log.Infof("RestartFixChequeCashOut: GetAllCashOutStatusStore err = %v", err) + return + } + + if len(list) > 0 { + log.Infof("wait 30s, for fixing cash out status") + + // wait 30s, for online cashing out ok. + time.Sleep(time.Second * RestartWaitCashOutOnlineTime) + + for _, v := range list { + _, _, err := s.AdjustCashCheque(context.Background(), v.Vault, v.Beneficiary, v.Token, true) + if err != nil { + log.Infof("RestartFixChequeCashOut: AdjustCashCheque err = %v, info = %+v", err, v) + continue + } + + err = s.DeleteCashOutStatusStore(v) + if err != nil { + log.Infof("RestartFixChequeCashOut: DeleteCashOutStatusStore err = %v, info = %+v", err, v) + continue + } + } + } + RestartFixCashOutStatusLock = false + } + return +} + +func (s *cashoutService) fixStoreCashResult(vault common.Address, shouldPaidOut *big.Int, token common.Address) (cashResult *CashOutResult, err error) { + txHash := common.Hash{} //fix txHash: 0x0000... + cashResult = &CashOutResult{ + TxHash: txHash, + Vault: vault, + Token: token, + Amount: shouldPaidOut, + CashTime: time.Now().Unix(), + Status: "success", + } + + totalReceivedCashed := big.NewInt(0) + if err = s.store.Get(tokencfg.AddToken(statestore.TotalReceivedCashedKey, token), &totalReceivedCashed); err == nil || err == storage.ErrNotFound { + totalReceivedCashed = totalReceivedCashed.Add(totalReceivedCashed, shouldPaidOut) + err := s.store.Put(tokencfg.AddToken(statestore.TotalReceivedCashedKey, token), totalReceivedCashed) + if err != nil { + log.Infof("fixStoreCashResult:put totalReceivedCashdKey err:%+v", err) + } + } + + totalDailyReceivedCashed := big.NewInt(0) + if err = s.store.Get(statestore.GetTodayTotalDailyReceivedCashedKey(token), &totalDailyReceivedCashed); err == nil || err == storage.ErrNotFound { + totalDailyReceivedCashed = totalDailyReceivedCashed.Add(totalDailyReceivedCashed, shouldPaidOut) + err := s.store.Put(statestore.GetTodayTotalDailyReceivedCashedKey(token), totalDailyReceivedCashed) + if err != nil { + log.Infof("fixStoreCashResult:put totalReceivedDailyCashdKey err:%+v", err) + } + } + + // update TotalReceivedCountCashed + uncashed := 0 + err = s.store.Get(statestore.PeerReceivedUncashRecordsCountKey(vault, token), &uncashed) + if err != nil { + log.Infof("fixStoreCashResult:put totalReceivedCountCashed err:%+v", err) + } else { + cashedCount := 0 + err := s.store.Get(tokencfg.AddToken(statestore.TotalReceivedCashedCountKey, token), &cashedCount) + if err == nil || err == storage.ErrNotFound { + err := s.store.Put(tokencfg.AddToken(statestore.TotalReceivedCashedCountKey, token), cashedCount+uncashed) + if err != nil { + log.Infof("fixStoreCashResult:put totalReceivedCashedConuntKey err:%+v", err) + } else { + err := s.store.Put(statestore.PeerReceivedUncashRecordsCountKey(vault, token), 0) + if err != nil { + log.Infof("fixStoreCashResult:put totalReceivedCashedConuntKey err:%+v", err) + } + } + } + } + + err = s.store.Put(statestore.CashoutResultKey(vault), &cashResult) + if err != nil { + log.Infof("fixStoreCashResult:put cashoutResultKey err:%+v", err) + } + return +} + // CashoutStatus gets the status of the latest cashout transaction for the vault func (s *cashoutService) CashoutStatus(ctx context.Context, vaultAddress common.Address, token common.Address) (*CashoutStatus, error) { cheque, err := s.chequeStore.LastReceivedCheque(vaultAddress, token) diff --git a/settlement/swap/vault/cashout_status_store.go b/settlement/swap/vault/cashout_status_store.go new file mode 100644 index 000000000..c547670ab --- /dev/null +++ b/settlement/swap/vault/cashout_status_store.go @@ -0,0 +1,99 @@ +package vault + +import ( + "errors" + "fmt" + "github.com/ethereum/go-ethereum/common" + "math/big" + "time" +) + +var RestartFixCashOutStatusLock bool = true +var RestartWaitCashOutOnlineTime time.Duration = 30 //seconds + +// CashOutStatus from leveldb +var prefixKeyCashOutStatusStore = "keyCashOutStatusStore" // + txHash. +type CashOutStatusStoreInfo struct { + Token common.Address + Vault common.Address + Beneficiary common.Address + CumulativePayout *big.Int + TxHash string +} + +func getkeyCashOutStatusStore(txHash string) string { + return fmt.Sprintf("%s-%s", prefixKeyCashOutStatusStore, txHash) +} + +// AddCashOutStatusStore . +func (s *cashoutService) AddCashOutStatusStore(info CashOutStatusStoreInfo) (err error) { + if s.store == nil { + return errors.New("please start btfs node, at first! ") + } + + err = s.store.Put(getkeyCashOutStatusStore(info.TxHash), info) + if err != nil { + return err + } + + return nil +} + +// DeleteCashOutStatusStore . +func (s *cashoutService) DeleteCashOutStatusStore(info CashOutStatusStoreInfo) (err error) { + if s.store == nil { + return errors.New("please start btfs node, at first! ") + } + + err = s.store.Delete(getkeyCashOutStatusStore(info.TxHash)) + if err != nil { + if err.Error() == "storage: not found" { + return nil + } else { + return err + } + } + return +} + +// GetCashOutStatusStore . +func (s *cashoutService) GetCashOutStatusStore(txHash string) (bl bool, err error) { + if s.store == nil { + return bl, errors.New("please start btfs node, at first! ") + } + + var info CashOutStatusStoreInfo + err = s.store.Get(getkeyCashOutStatusStore(txHash), &info) + if err != nil { + if err.Error() == "storage: not found" { + return false, nil + } else { + return false, err + } + } + + return true, nil +} + +// GetAllCashOutStatusStore . +func (s *cashoutService) GetAllCashOutStatusStore() (infoList []CashOutStatusStoreInfo, err error) { + if s.store == nil { + return nil, errors.New("please start btfs node, at first! ") + } + + infoList = make([]CashOutStatusStoreInfo, 0) + err = s.store.Iterate(prefixKeyCashOutStatusStore, func(key, val []byte) (stop bool, err error) { + var info CashOutStatusStoreInfo + err = s.store.Get(string(key), &info) + if err != nil { + return false, err + } + infoList = append(infoList, info) + return false, nil + }) + if err != nil { + return nil, err + } + + return infoList, nil +} diff --git a/settlement/swap/vault/cashout_test.go b/settlement/swap/vault/cashout_test.go index 252d130f4..bca0f953b 100644 --- a/settlement/swap/vault/cashout_test.go +++ b/settlement/swap/vault/cashout_test.go @@ -23,6 +23,7 @@ var ( ) func TestCashout(t *testing.T) { + vault.RestartFixCashOutStatusLock = false vaultAddress := common.HexToAddress("abcd") recipientAddress := common.HexToAddress("efff") txHash := common.HexToHash("dddd") @@ -123,6 +124,7 @@ func TestCashout(t *testing.T) { } func TestCashoutBounced(t *testing.T) { + vault.RestartFixCashOutStatusLock = false vaultAddress := common.HexToAddress("abcd") recipientAddress := common.HexToAddress("efff") txHash := common.HexToHash("dddd") @@ -227,6 +229,7 @@ func TestCashoutBounced(t *testing.T) { } func TestCashoutStatusReverted(t *testing.T) { + vault.RestartFixCashOutStatusLock = false vaultAddress := common.HexToAddress("abcd") recipientAddress := common.HexToAddress("efff") txHash := common.HexToHash("dddd") @@ -307,6 +310,7 @@ func TestCashoutStatusReverted(t *testing.T) { } func TestCashoutStatusPending(t *testing.T) { + vault.RestartFixCashOutStatusLock = false vaultAddress := common.HexToAddress("abcd") recipientAddress := common.HexToAddress("efff") txHash := common.HexToHash("dddd") diff --git a/spin/cheque_cash_out.go b/spin/cheque_cash_out.go new file mode 100644 index 000000000..bfe5b2fd8 --- /dev/null +++ b/spin/cheque_cash_out.go @@ -0,0 +1,9 @@ +package spin + +import ( + "github.com/bittorrent/go-btfs/chain" +) + +func RestartFixChequeCashOut() { + chain.SettleObject.CashoutService.RestartFixChequeCashOut() +}