-
Notifications
You must be signed in to change notification settings - Fork 42
/
module.go
226 lines (196 loc) · 8.01 KB
/
module.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
package module
import (
"context"
"encoding/json"
"fmt"
"sync"
"cosmossdk.io/core/appmodule"
cosmosMath "cosmossdk.io/math"
"github.com/allora-network/allora-chain/app/params"
state "github.com/allora-network/allora-chain/x/emissions"
keeper "github.com/allora-network/allora-chain/x/emissions/keeper"
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/codec"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/module"
gwruntime "github.com/grpc-ecosystem/grpc-gateway/runtime"
)
var (
_ module.AppModuleBasic = AppModule{}
_ module.HasGenesis = AppModule{}
_ appmodule.AppModule = AppModule{}
_ appmodule.HasBeginBlocker = AppModule{}
_ appmodule.HasEndBlocker = AppModule{}
)
// ConsensusVersion defines the current module consensus version.
const ConsensusVersion = 1
type AppModule struct {
cdc codec.Codec
keeper keeper.Keeper
}
// NewAppModule creates a new AppModule object
func NewAppModule(cdc codec.Codec, keeper keeper.Keeper) AppModule {
return AppModule{
cdc: cdc,
keeper: keeper,
}
}
// Name returns the state module's name.
func (AppModule) Name() string { return state.ModuleName }
// RegisterLegacyAminoCodec registers the state module's types on the LegacyAmino codec.
// New modules do not need to support Amino.
func (AppModule) RegisterLegacyAminoCodec(cdc *codec.LegacyAmino) {}
// RegisterGRPCGatewayRoutes registers the gRPC Gateway routes for the state module.
func (AppModule) RegisterGRPCGatewayRoutes(clientCtx client.Context, mux *gwruntime.ServeMux) {
if err := state.RegisterQueryHandlerClient(context.Background(), mux, state.NewQueryClient(clientCtx)); err != nil {
panic(err)
}
}
// RegisterInterfaces registers interfaces and implementations of the state module.
func (AppModule) RegisterInterfaces(registry codectypes.InterfaceRegistry) {
state.RegisterInterfaces(registry)
}
// ConsensusVersion implements AppModule/ConsensusVersion.
func (AppModule) ConsensusVersion() uint64 { return ConsensusVersion }
// RegisterServices registers a gRPC query service to respond to the module-specific gRPC queries.
func (am AppModule) RegisterServices(cfg module.Configurator) {
state.RegisterMsgServer(cfg.MsgServer(), keeper.NewMsgServerImpl(am.keeper))
state.RegisterQueryServer(cfg.QueryServer(), keeper.NewQueryServerImpl(am.keeper))
// Register in place module state migration migrations
// m := keeper.NewMigrator(am.keeper)
// if err := cfg.RegisterMigration(state.ModuleName, 1, m.Migrate1to2); err != nil {
// panic(fmt.Sprintf("failed to migrate x/%s from version 1 to 2: %v", state.ModuleName, err))
// }
}
// DefaultGenesis returns default genesis state as raw bytes for the module.
func (AppModule) DefaultGenesis(cdc codec.JSONCodec) json.RawMessage {
return cdc.MustMarshalJSON(state.NewGenesisState())
}
// ValidateGenesis performs genesis state validation for the circuit module.
func (AppModule) ValidateGenesis(cdc codec.JSONCodec, _ client.TxEncodingConfig, bz json.RawMessage) error {
var data state.GenesisState
if err := cdc.UnmarshalJSON(bz, &data); err != nil {
return fmt.Errorf("failed to unmarshal %s genesis state: %w", state.ModuleName, err)
}
return data.Validate()
}
// InitGenesis performs genesis initialization for the state module.
// It returns no validator updates.
func (am AppModule) InitGenesis(ctx sdk.Context, cdc codec.JSONCodec, data json.RawMessage) {
var genesisState state.GenesisState
cdc.MustUnmarshalJSON(data, &genesisState)
if err := am.keeper.InitGenesis(ctx, &genesisState); err != nil {
panic(fmt.Sprintf("failed to initialize %s genesis state: %v", state.ModuleName, err))
}
}
// ExportGenesis returns the exported genesis state as raw bytes for the circuit
// module.
func (am AppModule) ExportGenesis(ctx sdk.Context, cdc codec.JSONCodec) json.RawMessage {
gs, err := am.keeper.ExportGenesis(ctx)
if err != nil {
panic(fmt.Sprintf("failed to export %s genesis state: %v", state.ModuleName, err))
}
return cdc.MustMarshalJSON(gs)
}
func (am AppModule) BeginBlock(ctx context.Context) error {
fmt.Printf("\n ---------------- Emissions BeginBlock ------------------- \n")
percentRewardsToReputersAndWorkers, err := am.keeper.GetParamsPercentRewardsReputersWorkers(ctx)
if err != nil {
return err
}
feeCollectorAddress := am.keeper.AccountKeeper().GetModuleAddress(am.keeper.GetFeeCollectorName())
feesCollectedAndEmissionsMintedLastBlock := am.keeper.BankKeeper().GetBalance(ctx, feeCollectorAddress, params.DefaultBondDenom)
reputerWorkerCut := percentRewardsToReputersAndWorkers.MulInt(feesCollectedAndEmissionsMintedLastBlock.Amount).TruncateInt()
am.keeper.BankKeeper().SendCoinsFromModuleToModule(
ctx,
am.keeper.GetFeeCollectorName(),
state.AlloraRewardsAccountName,
sdk.NewCoins(sdk.NewCoin(params.DefaultBondDenom, reputerWorkerCut)),
)
return nil
}
// EndBlock returns the end blocker for the emissions module.
func (am AppModule) EndBlock(ctx context.Context) error {
fmt.Printf("\n ---------------- Emissions EndBlock ------------------- \n")
sdkCtx := sdk.UnwrapSDKContext(ctx)
// Ensure that enough blocks have passed to hit an epoch.
// If not, skip rewards calculation
blockNumber := sdkCtx.BlockHeight()
currentTime := uint64(sdkCtx.BlockTime().Unix())
lastRewardsUpdate, err := am.keeper.GetLastRewardsUpdate(sdkCtx)
if err != nil {
return err
}
topTopicsActiveWithDemand, metDemand, err := ChurnRequestsGetActiveTopicsAndDemand(sdkCtx, am.keeper, currentTime)
if err != nil {
fmt.Println("Error getting active topics and met demand: ", err)
return err
}
// send collected inference request fees to the fee collector account
// they will be paid out to reputers, workers, and cosmos validators
// in the following BeginBlock of the next block
err = am.keeper.BankKeeper().SendCoinsFromModuleToModule(
ctx,
state.AlloraRequestsAccountName,
am.keeper.GetFeeCollectorName(),
sdk.NewCoins(sdk.NewCoin(params.DefaultBondDenom, cosmosMath.NewInt(metDemand.BigInt().Int64()))))
if err != nil {
fmt.Println("Error sending coins from module to module: ", err)
return err
}
blocksSinceLastUpdate := blockNumber - lastRewardsUpdate
if blocksSinceLastUpdate < 0 {
panic("Block number is less than last rewards update block number")
}
epochLength, err := am.keeper.GetParamsEpochLength(ctx)
if err != nil {
return err
}
if blocksSinceLastUpdate >= epochLength {
err = emitRewards(sdkCtx, am)
// the following code does NOT halt the chain in case of an error in rewards payments
// if an error occurs and rewards payments are not made, globally they will still accumulate
// and we can retroactively pay them out
if err != nil {
fmt.Println("Error calculating global emission per topic: ", err)
panic(err)
}
}
var wg sync.WaitGroup
// Loop over and run epochs on topics whose inferences are demanded enough to be served
// Within each loop, execute the inference and weight cadence checks
for _, topic := range topTopicsActiveWithDemand {
// Parallelize the inference and weight cadence checks
wg.Add(1)
go func(topic state.Topic) {
defer wg.Done()
// Check the cadence of inferences
if currentTime-topic.InferenceLastRan >= topic.InferenceCadence {
fmt.Printf("Inference cadence met for topic: %v metadata: %s default arg: %s. \n",
topic.Id,
topic.Metadata,
topic.DefaultArg)
// Update the last inference ran
err = am.keeper.UpdateTopicInferenceLastRan(sdkCtx, topic.Id, currentTime)
if err != nil {
fmt.Println("Error updating last inference ran: ", err)
}
}
// Check the cadence of weight calculations
if currentTime-topic.WeightLastRan >= topic.WeightCadence {
fmt.Printf("Weight cadence met for topic: %v metadata: %s default arg: %s \n",
topic.Id,
topic.Metadata, topic.
DefaultArg)
// Update the last weight ran
err = am.keeper.UpdateTopicWeightLastRan(sdkCtx, topic.Id, currentTime)
if err != nil {
fmt.Println("Error updating last weight ran: ", err)
}
}
}(topic)
}
wg.Wait()
return nil
}