Skip to content

Commit

Permalink
Merge pull request #11467 from filecoin-project/feat/correct-lp-messa…
Browse files Browse the repository at this point in the history
…ge-send

feat: lpmessage: Correct message sending
  • Loading branch information
magik6k committed Nov 30, 2023
2 parents 8883ce0 + 76c0b05 commit 20d3b26
Show file tree
Hide file tree
Showing 5 changed files with 314 additions and 83 deletions.
2 changes: 1 addition & 1 deletion cmd/lotus-provider/proving.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ It will not send any messages to the chain. Since it can compute any deadline, o
return err
}

wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, deps.cfg.Fees, deps.cfg.Proving, deps.full, deps.verif, deps.lw,
wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, deps.cfg.Fees, deps.cfg.Proving, deps.full, deps.verif, deps.lw, nil,
deps.as, deps.maddrs, deps.db, deps.stor, deps.si, deps.cfg.Subsystems.WindowPostMaxTasks)
if err != nil {
return err
Expand Down
9 changes: 7 additions & 2 deletions cmd/lotus-provider/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/provider"
"github.com/filecoin-project/lotus/provider/lpmessage"
"github.com/filecoin-project/lotus/provider/lpwinning"
"github.com/filecoin-project/lotus/storage/ctladdr"
"github.com/filecoin-project/lotus/storage/paths"
Expand Down Expand Up @@ -146,14 +147,18 @@ var runCmd = &cli.Command{
}
cfg, db, full, verif, lw, as, maddrs, stor, si, localStore := deps.cfg, deps.db, deps.full, deps.verif, deps.lw, deps.as, deps.maddrs, deps.stor, deps.si, deps.localStore

var activeTasks []harmonytask.TaskInterface

sender, sendTask := lpmessage.NewSender(full, full, db)
activeTasks = append(activeTasks, sendTask)

///////////////////////////////////////////////////////////////////////
///// Task Selection
///////////////////////////////////////////////////////////////////////
var activeTasks []harmonytask.TaskInterface
{

if cfg.Subsystems.EnableWindowPost {
wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw,
wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw, sender,
as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks)
if err != nil {
return err
Expand Down
61 changes: 47 additions & 14 deletions lib/harmony/harmonydb/sql/20231103.sql
Original file line number Diff line number Diff line change
@@ -1,22 +1,55 @@
create table message_sends
(
from_key text not null,
nonce bigint not null,
to_addr text not null,
signed_data bytea not null,
signed_json jsonb not null,
signed_cid text not null,
send_time timestamp default CURRENT_TIMESTAMP,
send_reason text,
send_success bool default false not null,
from_key text not null,
to_addr text not null,
send_reason text not null,
send_task_id bigint not null,

unsigned_data bytea not null,
unsigned_cid text not null,

nonce bigint,
signed_data bytea,
signed_json jsonb,
signed_cid text,

send_time timestamp default null,
send_success boolean default null,
send_error text,

constraint message_sends_pk
primary key (from_key, nonce)
primary key (send_task_id, from_key)
);

comment on column message_sends.from_key is 'text f[1/3/4]... address';
comment on column message_sends.nonce is 'assigned message nonce';
comment on column message_sends.to_addr is 'text f[0/1/2/3/4]... address';
comment on column message_sends.signed_data is 'signed message data';
comment on column message_sends.signed_cid is 'signed message cid';
comment on column message_sends.send_reason is 'optional description of send reason';
comment on column message_sends.send_success is 'whether this message was broadcasted to the network already';
comment on column message_sends.send_task_id is 'harmony task id of the send task';

comment on column message_sends.unsigned_data is 'unsigned message data';
comment on column message_sends.unsigned_cid is 'unsigned message cid';

comment on column message_sends.nonce is 'assigned message nonce, set while the send task is executing';
comment on column message_sends.signed_data is 'signed message data, set while the send task is executing';
comment on column message_sends.signed_cid is 'signed message cid, set while the send task is executing';

comment on column message_sends.send_time is 'time when the send task was executed, set after pushing the message to the network';
comment on column message_sends.send_success is 'whether this message was broadcasted to the network already, null if not yet attempted, true if successful, false if failed';
comment on column message_sends.send_error is 'error message if send_success is false';

create unique index message_sends_success_index
on message_sends (from_key, nonce)
where send_success is not false;

comment on index message_sends_success_index is
'message_sends_success_index enforces sender/nonce uniqueness, it is a conditional index that only indexes rows where send_success is not false. This allows us to have multiple rows with the same sender/nonce, as long as only one of them was successfully broadcasted (true) to the network or is in the process of being broadcasted (null).';

create table message_send_locks
(
from_key text not null,
task_id bigint not null,
claimed_at timestamp not null,

constraint message_send_locks_pk
primary key (from_key)
);
4 changes: 1 addition & 3 deletions provider/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
//var log = logging.Logger("provider")

func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig,
api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker,
api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker, sender *lpmessage.Sender,
as *ctladdr.AddressSelector, addresses []dtypes.MinerAddress, db *harmonydb.DB,
stor paths.Store, idx paths.SectorIndex, max int) (*lpwindow.WdPostTask, *lpwindow.WdPostSubmitTask, *lpwindow.WdPostRecoverDeclareTask, error) {

Expand All @@ -29,8 +29,6 @@ func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc co
// todo config
ft := lpwindow.NewSimpleFaultTracker(stor, idx, 32, 5*time.Second, 300*time.Second)

sender := lpmessage.NewSender(api, api, db)

computeTask, err := lpwindow.NewWdPostTask(db, api, ft, lw, verif, chainSched, addresses, max)
if err != nil {
return nil, nil, nil, err
Expand Down
Loading

0 comments on commit 20d3b26

Please sign in to comment.