Skip to content

Commit

Permalink
added catch for sig verif error, retry DoBroadcast
Browse files Browse the repository at this point in the history
  • Loading branch information
Aven Dauz authored and Aven Dauz committed Aug 7, 2021
1 parent d483557 commit 3f383b6
Showing 1 changed file with 125 additions and 122 deletions.
247 changes: 125 additions & 122 deletions x/curium/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ import (

type (
Keeper struct {
cdc *codec.Codec
storeKey sdk.StoreKey
memKey sdk.StoreKey
rpcPort uint64
accKeeper *keeper.AccountKeeper
cdc *codec.Codec
storeKey sdk.StoreKey
memKey sdk.StoreKey
rpcPort uint64
accKeeper *keeper.AccountKeeper
GasMeterKeeper *gasmeter.GasMeterKeeper
}
)
Expand All @@ -65,23 +65,22 @@ func NewKeeper(cdc *codec.Codec, storeKey, memKey sdk.StoreKey, laddr string, ac
regex, _ := regexp.Compile(".*:")
port, _ := math.ParseUint64(regex.ReplaceAllString(laddr, ""))
return &Keeper{
cdc: cdc,
storeKey: storeKey,
memKey: memKey,
rpcPort: port,
accKeeper: &accKeeper,
cdc: cdc,
storeKey: storeKey,
memKey: memKey,
rpcPort: port,
accKeeper: &accKeeper,
GasMeterKeeper: gasMeterKeeper,
}
}

type AccountState struct {
seqNum uint64
accntNum uint64
seqNum uint64
accntNum uint64
requested bool
}


func updateAccountState (accnt exported.Account, state AccountState) (AccountState, error) {
func updateAccountState(accnt exported.Account, state AccountState) (AccountState, error) {
if state.requested {
state.seqNum = state.seqNum + 1
return state, nil
Expand All @@ -93,12 +92,18 @@ func updateAccountState (accnt exported.Account, state AccountState) (AccountSta
}
}

func getKeyring (keyringDir string) (cryptoKeys.Keybase, error) {
return cryptoKeys.NewKeyring("BluzelleService", cryptoKeys.BackendTest, keyringDir, nil)
func resetAccountState(accnt exported.Account, state AccountState) (AccountState, error) {
state.accntNum = accnt.GetAccountNumber()
state.seqNum = accnt.GetSequence()
state.requested = true
return state, nil
}

func getKeyring(keyringDir string) (cryptoKeys.Keybase, error) {
return cryptoKeys.NewKeyring("BluzelleService", cryptoKeys.BackendTest, keyringDir, nil)
}

func getAccountAddress (keyring cryptoKeys.Keybase, from string) (sdk.AccAddress, error) {
func getAccountAddress(keyring cryptoKeys.Keybase, from string) (sdk.AccAddress, error) {

keyringKeys, err := keyring.Get(from)

Expand All @@ -120,12 +125,12 @@ func (reader KeyringReader) GetAddress(from string) (sdk.AccAddress, error) {

}

func getGasPriceUbnt () (sdk.DecCoins, error) {
func getGasPriceUbnt() (sdk.DecCoins, error) {
minGasPriceString := viper.GetString("minimum-gas-prices")
return sdk.ParseDecCoins(minGasPriceString)
}

func pollForTransaction (ctx rpctypes.Context, hash []byte) (*coretypes.ResultTx, error) {
func pollForTransaction(ctx rpctypes.Context, hash []byte) (*coretypes.ResultTx, error) {
result, err := core.Tx(&ctx, hash, false)
deadline, _ := ctx.Context().Deadline()
if time.Now().Before(deadline) {
Expand All @@ -138,7 +143,6 @@ func pollForTransaction (ctx rpctypes.Context, hash []byte) (*coretypes.ResultTx
return result, nil
}


func (k Keeper) NewMsgBroadcaster(keyringDir string, cdc *codec.Codec) MsgBroadcaster {
accKeeper := k.accKeeper

Expand All @@ -149,8 +153,8 @@ func (k Keeper) NewMsgBroadcaster(keyringDir string, cdc *codec.Codec) MsgBroadc
return func(ctx sdk.Context, msgs []sdk.Msg, from string) chan *MsgBroadcasterResponse {
resp := make(chan *MsgBroadcasterResponse)

go func () {
defer func () {
go func() {
defer func() {
close(resp)
}()

Expand All @@ -162,145 +166,146 @@ func (k Keeper) NewMsgBroadcaster(keyringDir string, cdc *codec.Codec) MsgBroadc

}

func DoBroadcast (resp chan *MsgBroadcasterResponse, keyringDir string, cdc *codec.Codec, curiumKeeper Keeper, accKeeper *keeper.AccountKeeper, ctx sdk.Context, msgs []sdk.Msg, from string, state AccountState) {

returnError := func(err error) {
curiumKeeper.Logger(ctx).Error("DoBroadcast, returnError(), ~~~", "error", err)
resp <- &MsgBroadcasterResponse{
Error: err,
}
}

defer func () {
r:= recover()
curiumKeeper.Logger(ctx).Error("DoBroadcast", "error", r)

if r != nil {
returnError(r.(error))
} else {
returnError(errors.New("nil error returned"))
}
}()

kr, err := getKeyring(keyringDir)


if err != nil {
returnError(err)
return
}


addr, err := getAccountAddress(kr, from)
func DoBroadcast(resp chan *MsgBroadcasterResponse, keyringDir string, cdc *codec.Codec, curiumKeeper Keeper, accKeeper *keeper.AccountKeeper, ctx sdk.Context, msgs []sdk.Msg, from string, state AccountState) {



if addr == nil {
returnError(errors.New("Nft address is nil"))
returnError := func(err error) {
curiumKeeper.Logger(ctx).Error("DoBroadcast, returnError(), ~~~", "error", err)
resp <- &MsgBroadcasterResponse{
Error: err,
}
}

defer func() {
r := recover()
curiumKeeper.Logger(ctx).Error("DoBroadcast", "error", r)


if err != nil {
returnError(err)
return
if r != nil {
returnError(r.(error))
} else {
returnError(errors.New("nil error returned"))
}
}()

kr, err := getKeyring(keyringDir)

if err != nil {
returnError(err)
return
}

accnt := accKeeper.GetAccount(ctx, addr)



if accnt == nil {
returnError(errors.New("from account does not exist"))
return
}
addr, err := getAccountAddress(kr, from)

if addr == nil {
returnError(errors.New("Nft address is nil"))
}

if err != nil {
returnError(err)
return
}

accnt := accKeeper.GetAccount(ctx, addr)

if accnt == nil {
returnError(errors.New("from account does not exist"))
return
}

gasPrice, err := getGasPriceUbnt()
gasPrice, err := getGasPriceUbnt()

if err != nil {
returnError(err)
return
}

fmt.Println("***** Seq num before update", state.seqNum)

if err != nil {
returnError(err)
return
}
state, err = updateAccountState(accnt, state)

fmt.Println("***** Seq num before update", state.seqNum)
if err != nil {
returnError(err)
return
}

state, err = updateAccountState(accnt, state)
fmt.Println("***** Seq num after update", state.seqNum)
// Create a new TxBuilder.
txBuilder := auth.NewTxBuilder(
utils.GetTxEncoder(cdc),
state.accntNum,
state.seqNum,
10000000,
1,
false,
ctx.ChainID(),
"memo", nil,
gasPrice,
).WithKeybase(kr)

if err != nil {
returnError(err)
return
}
fmt.Println("Before building and signing")

fmt.Println("***** Seq num after update", state.seqNum)
// Create a new TxBuilder.
txBuilder := auth.NewTxBuilder(
utils.GetTxEncoder(cdc),
state.accntNum,
state.seqNum,
10000000,
1,
false,
ctx.ChainID(),
"memo", nil,
gasPrice,
).WithKeybase(kr)
signedMsgs, err := txBuilder.BuildAndSign(from, clientkeys.DefaultKeyPass, msgs)

fmt.Println("After building and signing")

fmt.Println("Before building and signing")
if err != nil {
fmt.Println("******** ERROR FROM BUILDING AND SIGNING", err)
returnError(err)
return
}

signedMsgs, err := txBuilder.BuildAndSign(from, clientkeys.DefaultKeyPass, msgs)
if accnt == nil {
returnError(sdkerrors.New("curium", 2, "Cannot broadcast message, accnt does not exist"))
return
}

fmt.Println("After building and signing")
rpcCtx := rpctypes.Context{}

if err != nil {
fmt.Println("******** ERROR FROM BUILDING AND SIGNING", err)
returnError(err)
return
}
fmt.Println("Before broadcasting")

if accnt == nil {
returnError(sdkerrors.New("curium", 2, "Cannot broadcast message, accnt does not exist"))
return
}
broadcastResult, err := core.BroadcastTxSync(&rpcCtx, signedMsgs)

rpcCtx := rpctypes.Context{}
fmt.Println("After broadcasting", broadcastResult, msgs)

fmt.Println("Before broadcasting")
if err != nil {
returnError(err)
return
}

broadcastResult, err := core.BroadcastTxSync(&rpcCtx, signedMsgs)
accntSeqString, err := regexp.Compile("verify correct account sequence")

fmt.Println("After broadcasting", broadcastResult, msgs)
if err != nil {
returnError(err)
return
}

fmt.Println(accntSeqString.MatchString(broadcastResult.Log))

if accntSeqString.MatchString(broadcastResult.Log) {
returnError(errors.New("Sequencing error, retrying broadcast"))
newAccntState, err := resetAccountState(accnt, state)
if err != nil {
returnError(err)
return
}

result, err := pollForTransaction(rpcCtx, broadcastResult.Hash)
DoBroadcast(resp, keyringDir, cdc, curiumKeeper, accKeeper, ctx, msgs, from, newAccntState)
return
}

fmt.Println("Response after polling for transaction", result)
fmt.Println("Error from polling for transaction", err)
result, err := pollForTransaction(rpcCtx, broadcastResult.Hash)

if err != nil {
returnError(err)
return
}

resp <- &MsgBroadcasterResponse{
Response: &result.TxResult,
Data: &result.TxResult.Data,
}
fmt.Println("Response after polling for transaction", result)
fmt.Println("Error from polling for transaction", err)

if err != nil {
returnError(err)
return
}

resp <- &MsgBroadcasterResponse{
Response: &result.TxResult,
Data: &result.TxResult.Data,
}

}

Expand Down Expand Up @@ -381,5 +386,3 @@ func httpGet(url string) ([]byte, error) {
body, err := ioutil.ReadAll(resp.Body)
return body, err
}


0 comments on commit 3f383b6

Please sign in to comment.