Skip to content

Commit

Permalink
Merge pull request #8343 from filecoin-project/feat/more-snap-config
Browse files Browse the repository at this point in the history
feat: sealing: More SnapDeals config knobs
  • Loading branch information
magik6k committed Mar 25, 2022
2 parents 11db419 + e767981 commit 7c4d3a4
Show file tree
Hide file tree
Showing 16 changed files with 431 additions and 169 deletions.
15 changes: 15 additions & 0 deletions .circleci/config.yml
Expand Up @@ -917,6 +917,11 @@ workflows:
suite: itest-sector_finalize_early
target: "./itests/sector_finalize_early_test.go"

- test:
name: test-itest-sector_make_cc_avail
suite: itest-sector_make_cc_avail
target: "./itests/sector_make_cc_avail_test.go"

- test:
name: test-itest-sector_miner_collateral
suite: itest-sector_miner_collateral
Expand All @@ -927,6 +932,16 @@ workflows:
suite: itest-sector_pledge
target: "./itests/sector_pledge_test.go"

- test:
name: test-itest-sector_prefer_no_upgrade
suite: itest-sector_prefer_no_upgrade
target: "./itests/sector_prefer_no_upgrade_test.go"

- test:
name: test-itest-sector_revert_available
suite: itest-sector_revert_available
target: "./itests/sector_revert_available_test.go"

- test:
name: test-itest-sector_terminate
suite: itest-sector_terminate
Expand Down
19 changes: 17 additions & 2 deletions documentation/en/default-lotus-miner-config.toml
Expand Up @@ -325,18 +325,33 @@
# env var: LOTUS_SEALING_MAXWAITDEALSSECTORS
#MaxWaitDealsSectors = 2

# Upper bound on how many sectors can be sealing at the same time when creating new CC sectors (0 = unlimited)
# Upper bound on how many sectors can be sealing+upgrading at the same time when creating new CC sectors (0 = unlimited)
#
# type: uint64
# env var: LOTUS_SEALING_MAXSEALINGSECTORS
#MaxSealingSectors = 0

# Upper bound on how many sectors can be sealing at the same time when creating new sectors with deals (0 = unlimited)
# Upper bound on how many sectors can be sealing+upgrading at the same time when creating new sectors with deals (0 = unlimited)
#
# type: uint64
# env var: LOTUS_SEALING_MAXSEALINGSECTORSFORDEALS
#MaxSealingSectorsForDeals = 0

# Prefer creating new sectors even if there are sectors Available for upgrading.
# This setting combined with MaxUpgradingSectors set to a value higher than MaxSealingSectorsForDeals makes it
# possible to use fast sector upgrades to handle high volumes of storage deals, while still using the simple sealing
# flow when the volume of storage deals is lower.
#
# type: bool
# env var: LOTUS_SEALING_PREFERNEWSECTORSFORDEALS
#PreferNewSectorsForDeals = false

# Upper bound on how many sectors can be sealing+upgrading at the same time when upgrading CC sectors with deals (0 = MaxSealingSectorsForDeals)
#
# type: uint64
# env var: LOTUS_SEALING_MAXUPGRADINGSECTORS
#MaxUpgradingSectors = 0

# CommittedCapacitySectorLifetime is the duration a Committed Capacity (CC) sector will
# live before it must be extended or converted into sector containing deals before it is
# terminated. Value must be between 180-540 days inclusive
Expand Down
2 changes: 0 additions & 2 deletions extern/sector-storage/piece_provider.go
Expand Up @@ -166,8 +166,6 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,

r, err := p.tryReadUnsealedPiece(ctx, unsealed, sector, pieceOffset, size)

log.Debugf("result of first tryReadUnsealedPiece: r=%s, err=%s", r, err)

if xerrors.Is(err, storiface.ErrSectorNotFound) {
log.Debugf("no unsealed sector file with unsealed piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
err = nil
Expand Down
101 changes: 60 additions & 41 deletions extern/storage-sealing/input.go
Expand Up @@ -554,7 +554,7 @@ func (m *Sealing) calcTargetExpiration(ctx context.Context, ssize abi.SectorSize
return curEpoch + minDur, curEpoch + maxDur, nil
}

func (m *Sealing) tryGetUpgradeSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) (bool, error) {
func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) (bool, error) {
if len(m.available) == 0 {
return false, nil
}
Expand Down Expand Up @@ -623,6 +623,24 @@ func (m *Sealing) tryGetUpgradeSector(ctx context.Context, sp abi.RegisteredSeal
return true, m.sectors.Send(uint64(candidate.Number), SectorStartCCUpdate{})
}

// call with m.inputLk
func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi.RegisteredSealProof) (abi.SectorNumber, error) {
sid, err := m.sc.Next()
if err != nil {
return 0, xerrors.Errorf("getting sector number: %w", err)
}

err = m.sealer.NewSector(ctx, m.minerSector(sp, sid))
if err != nil {
return 0, xerrors.Errorf("initializing sector: %w", err)
}

// update stats early, fsm planner would do that async
m.stats.updateSector(ctx, cfg, m.minerSectorID(sid), UndefinedSectorState)

return sid, err
}

func (m *Sealing) tryGetDealSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) error {
m.startupWait.Wait()

Expand All @@ -635,58 +653,59 @@ func (m *Sealing) tryGetDealSector(ctx context.Context, sp abi.RegisteredSealPro
return xerrors.Errorf("getting storage config: %w", err)
}

if cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals {
return nil
}

// if we're above WaitDeals limit, we don't want to add more staging sectors
if cfg.MaxWaitDealsSectors > 0 && m.stats.curStaging() >= cfg.MaxWaitDealsSectors {
return nil
}

got, err := m.tryGetUpgradeSector(ctx, sp, ef)
if err != nil {
return err
}
if got {
return nil
maxUpgrading := cfg.MaxSealingSectorsForDeals
if cfg.MaxUpgradingSectors > 0 {
maxUpgrading = cfg.MaxUpgradingSectors
}

if !cfg.MakeNewSectorForDeals {
return nil
}
canCreate := cfg.MakeNewSectorForDeals && !(cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals)
canUpgrade := !(maxUpgrading > 0 && m.stats.curSealing() >= maxUpgrading)

sid, err := m.createSector(ctx, cfg, sp)
if err != nil {
return err
}
// we want to try to upgrade when:
// - we can upgrade and prefer upgrades
// - we don't prefer upgrades, but can't create a new sector
shouldUpgrade := canUpgrade && (!cfg.PreferNewSectorsForDeals || !canCreate)

m.nextDealSector = &sid

log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp)
return m.sectors.Send(uint64(sid), SectorStart{
ID: sid,
SectorType: sp,
})
}

// call with m.inputLk
func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi.RegisteredSealProof) (abi.SectorNumber, error) {
// Now actually create a new sector

sid, err := m.sc.Next()
if err != nil {
return 0, xerrors.Errorf("getting sector number: %w", err)
}
log.Infow("new deal sector decision",
"sealing", m.stats.curSealing(),
"maxSeal", cfg.MaxSealingSectorsForDeals,
"maxUpgrade", maxUpgrading,
"preferNew", cfg.PreferNewSectorsForDeals,
"canCreate", canCreate,
"canUpgrade", canUpgrade,
"shouldUpgrade", shouldUpgrade)

err = m.sealer.NewSector(ctx, m.minerSector(sp, sid))
if err != nil {
return 0, xerrors.Errorf("initializing sector: %w", err)
if shouldUpgrade {
got, err := m.maybeUpgradeSector(ctx, sp, ef)
if err != nil {
return err
}
if got {
return nil
}
}

// update stats early, fsm planner would do that async
m.stats.updateSector(ctx, cfg, m.minerSectorID(sid), UndefinedSectorState)
if canCreate {
sid, err := m.createSector(ctx, cfg, sp)
if err != nil {
return err
}
m.nextDealSector = &sid

return sid, nil
log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp)
if err := m.sectors.Send(uint64(sid), SectorStart{
ID: sid,
SectorType: sp,
}); err != nil {
return err
}
}
return nil
}

func (m *Sealing) StartPacking(sid abi.SectorNumber) error {
Expand Down
4 changes: 4 additions & 0 deletions extern/storage-sealing/sealiface/config.go
Expand Up @@ -18,6 +18,10 @@ type Config struct {
// includes failed, 0 = no limit
MaxSealingSectorsForDeals uint64

PreferNewSectorsForDeals bool

MaxUpgradingSectors uint64

MakeNewSectorForDeals bool

MakeCCSectorsAvailable bool
Expand Down
124 changes: 18 additions & 106 deletions itests/ccupgrade_test.go
Expand Up @@ -7,16 +7,13 @@ import (
"testing"
"time"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/network"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/itests/kit"
)

Expand All @@ -32,7 +29,23 @@ func TestCCUpgrade(t *testing.T) {
//stm: @MINER_SECTOR_LIST_001
kit.QuietMiningLogs()

runTestCCUpgrade(t)
n := runTestCCUpgrade(t)

t.Run("post", func(t *testing.T) {
ctx := context.Background()
ts, err := n.ChainHead(ctx)
require.NoError(t, err)
start := ts.Height()
// wait for a full proving period
t.Log("waiting for chain")

n.WaitTillChain(ctx, func(ts *types.TipSet) bool {
if ts.Height() > start+abi.ChainEpoch(2880) {
return true
}
return false
})
})
}

func runTestCCUpgrade(t *testing.T) *kit.TestFullNode {
Expand Down Expand Up @@ -60,7 +73,7 @@ func runTestCCUpgrade(t *testing.T) *kit.TestFullNode {
require.NoError(t, err)
require.Less(t, 50000, int(si.Expiration))
}
waitForSectorActive(ctx, t, CCUpgrade, client, maddr)
client.WaitForSectorActive(ctx, t, CCUpgrade, maddr)

//stm: @SECTOR_CC_UPGRADE_001
err = miner.SectorMarkForUpgrade(ctx, sl[0], true)
Expand Down Expand Up @@ -88,104 +101,3 @@ func runTestCCUpgrade(t *testing.T) *kit.TestFullNode {

return client
}

func waitForSectorActive(ctx context.Context, t *testing.T, sn abi.SectorNumber, node *kit.TestFullNode, maddr address.Address) {
for {
active, err := node.StateMinerActiveSectors(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)
for _, si := range active {
if si.SectorNumber == sn {
fmt.Printf("ACTIVE\n")
return
}
}

time.Sleep(time.Second)
}
}

func TestCCUpgradeAndPoSt(t *testing.T) {
kit.QuietMiningLogs()
t.Run("upgrade and then post", func(t *testing.T) {
ctx := context.Background()
n := runTestCCUpgrade(t)
ts, err := n.ChainHead(ctx)
require.NoError(t, err)
start := ts.Height()
// wait for a full proving period
t.Log("waiting for chain")

n.WaitTillChain(ctx, func(ts *types.TipSet) bool {
if ts.Height() > start+abi.ChainEpoch(2880) {
return true
}
return false
})
})
}

func TestAbortUpgradeAvailable(t *testing.T) {
kit.QuietMiningLogs()

ctx := context.Background()
blockTime := 1 * time.Millisecond

client, miner, ens := kit.EnsembleMinimal(t, kit.GenesisNetworkVersion(network.Version15), kit.ThroughRPC())
ens.InterconnectAll().BeginMiningMustPost(blockTime)

maddr, err := miner.ActorAddress(ctx)
if err != nil {
t.Fatal(err)
}

CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1)
fmt.Printf("CCUpgrade: %d\n", CCUpgrade)

miner.PledgeSectors(ctx, 1, 0, nil)
sl, err := miner.SectorsList(ctx)
require.NoError(t, err)
require.Len(t, sl, 1, "expected 1 sector")
require.Equal(t, CCUpgrade, sl[0], "unexpected sector number")
{
si, err := client.StateSectorGetInfo(ctx, maddr, CCUpgrade, types.EmptyTSK)
require.NoError(t, err)
require.Less(t, 50000, int(si.Expiration))
}
waitForSectorActive(ctx, t, CCUpgrade, client, maddr)

err = miner.SectorMarkForUpgrade(ctx, sl[0], true)
require.NoError(t, err)

sl, err = miner.SectorsList(ctx)
require.NoError(t, err)
require.Len(t, sl, 1, "expected 1 sector")

ss, err := miner.SectorsStatus(ctx, sl[0], false)
require.NoError(t, err)

for i := 0; i < 100; i++ {
ss, err = miner.SectorsStatus(ctx, sl[0], false)
require.NoError(t, err)
if ss.State == api.SectorState(sealing.Proving) {
time.Sleep(50 * time.Millisecond)
continue
}

require.Equal(t, api.SectorState(sealing.Available), ss.State)
break
}

require.NoError(t, miner.SectorAbortUpgrade(ctx, sl[0]))

for i := 0; i < 100; i++ {
ss, err = miner.SectorsStatus(ctx, sl[0], false)
require.NoError(t, err)
if ss.State == api.SectorState(sealing.Available) {
time.Sleep(50 * time.Millisecond)
continue
}

require.Equal(t, api.SectorState(sealing.Proving), ss.State)
break
}
}

0 comments on commit 7c4d3a4

Please sign in to comment.