-
Notifications
You must be signed in to change notification settings - Fork 0
/
keeper.go
161 lines (131 loc) · 5.08 KB
/
keeper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package keeper
import (
"fmt"
sdkmath "cosmossdk.io/math"
"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/store/prefix"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
ibckeeper "github.com/cosmos/ibc-go/v6/modules/core/keeper"
"github.com/tendermint/tendermint/libs/log"
"github.com/incubus-network/fanfury-sdk/v2/x/interchainquery/types"
)
// Keeper of this module maintains collections of registered zones.
type Keeper struct {
cdc codec.Codec
storeKey storetypes.StoreKey
callbacks map[string]types.QueryCallbacks
IBCKeeper *ibckeeper.Keeper
}
// NewKeeper returns a new instance of zones Keeper
func NewKeeper(cdc codec.Codec, storeKey storetypes.StoreKey, ibckeeper *ibckeeper.Keeper) Keeper {
return Keeper{
cdc: cdc,
storeKey: storeKey,
callbacks: make(map[string]types.QueryCallbacks),
IBCKeeper: ibckeeper,
}
}
func (k *Keeper) SetCallbackHandler(module string, handler types.QueryCallbacks) error {
_, found := k.callbacks[module]
if found {
return fmt.Errorf("callback handler already set for %s", module)
}
k.callbacks[module] = handler.RegisterCallbacks()
return nil
}
// Logger returns a module-specific logger.
func (k Keeper) Logger(ctx sdk.Context) log.Logger {
return ctx.Logger().With("module", fmt.Sprintf("x/%s", types.ModuleName))
}
func (k *Keeper) SetDatapointForID(ctx sdk.Context, id string, result []byte, height sdkmath.Int) error {
mapping := types.DataPoint{Id: id, RemoteHeight: height, LocalHeight: sdk.NewInt(ctx.BlockHeight()), Value: result}
store := prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefixData)
bz := k.cdc.MustMarshal(&mapping)
store.Set([]byte(id), bz)
return nil
}
func (k *Keeper) GetDatapointForID(ctx sdk.Context, id string) (types.DataPoint, error) {
mapping := types.DataPoint{}
store := prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefixData)
bz := store.Get([]byte(id))
if len(bz) == 0 {
return types.DataPoint{}, fmt.Errorf("unable to find data for id %s", id)
}
k.cdc.MustUnmarshal(bz, &mapping)
return mapping, nil
}
// IterateDatapoints iterate through datapoints
func (k Keeper) IterateDatapoints(ctx sdk.Context, fn func(index int64, dp types.DataPoint) (stop bool)) {
store := prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefixData)
iterator := sdk.KVStorePrefixIterator(store, nil)
defer iterator.Close()
i := int64(0)
for ; iterator.Valid(); iterator.Next() {
datapoint := types.DataPoint{}
k.cdc.MustUnmarshal(iterator.Value(), &datapoint)
stop := fn(i, datapoint)
if stop {
break
}
i++
}
}
// DeleteQuery delete datapoint
func (k Keeper) DeleteDatapoint(ctx sdk.Context, id string) {
store := prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefixData)
store.Delete([]byte(id))
}
func (k *Keeper) GetDatapoint(ctx sdk.Context, module string, connectionID string, chainID string, queryType string, request []byte) (types.DataPoint, error) {
id := GenerateQueryHash(connectionID, chainID, queryType, request, module)
return k.GetDatapointForID(ctx, id)
}
func (k *Keeper) GetDatapointOrRequest(ctx sdk.Context, module string, connectionID string, chainID string, queryType string, request []byte, maxAge uint64) (types.DataPoint, error) {
val, err := k.GetDatapoint(ctx, module, connectionID, chainID, queryType, request)
if err != nil {
// no datapoint
k.MakeRequest(ctx, connectionID, chainID, queryType, request, sdk.NewInt(-1), "", "", maxAge)
return types.DataPoint{}, fmt.Errorf("no data; query submitted")
}
if val.LocalHeight.LT(sdk.NewInt(ctx.BlockHeight() - int64(maxAge))) { // this is somewhat arbitrary; TODO: make this better
k.MakeRequest(ctx, connectionID, chainID, queryType, request, sdk.NewInt(-1), "", "", maxAge)
return types.DataPoint{}, fmt.Errorf("stale data; query submitted")
}
// check ttl
return val, nil
}
func (k *Keeper) MakeRequest(ctx sdk.Context, connectionID string, chainID string, queryType string, request []byte, period sdkmath.Int, module string, callbackID string, ttl uint64) {
k.Logger(ctx).Info(
"MakeRequest",
"connection_id", connectionID,
"chain_id", chainID,
"query_type", queryType,
"request", request,
"period", period,
"module", module,
"callback", callbackID,
"ttl", ttl,
)
key := GenerateQueryHash(connectionID, chainID, queryType, request, module)
existingQuery, found := k.GetQuery(ctx, key)
if !found {
if module != "" {
if _, exists := k.callbacks[module]; !exists {
err := fmt.Errorf("no callback handler registered for module %s", module)
k.Logger(ctx).Error(err.Error())
panic(err)
}
if exists := k.callbacks[module].Has(callbackID); !exists {
err := fmt.Errorf("no callback %s registered for module %s", callbackID, module)
k.Logger(ctx).Error(err.Error())
panic(err)
}
}
newQuery := k.NewQuery(ctx, module, connectionID, chainID, queryType, request, period, callbackID, ttl)
k.SetQuery(ctx, *newQuery)
} else {
// a re-request of an existing query triggers resetting of height to trigger immediately.
existingQuery.LastHeight = sdk.ZeroInt()
k.SetQuery(ctx, existingQuery)
}
}