Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sealing: More SnapDeals config knobs #8343

Merged
merged 7 commits into from Mar 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 15 additions & 0 deletions .circleci/config.yml
Expand Up @@ -917,6 +917,11 @@ workflows:
suite: itest-sector_finalize_early
target: "./itests/sector_finalize_early_test.go"

- test:
name: test-itest-sector_make_cc_avail
suite: itest-sector_make_cc_avail
target: "./itests/sector_make_cc_avail_test.go"

- test:
name: test-itest-sector_miner_collateral
suite: itest-sector_miner_collateral
Expand All @@ -927,6 +932,16 @@ workflows:
suite: itest-sector_pledge
target: "./itests/sector_pledge_test.go"

- test:
name: test-itest-sector_prefer_no_upgrade
suite: itest-sector_prefer_no_upgrade
target: "./itests/sector_prefer_no_upgrade_test.go"

- test:
name: test-itest-sector_revert_available
suite: itest-sector_revert_available
target: "./itests/sector_revert_available_test.go"

- test:
name: test-itest-sector_terminate
suite: itest-sector_terminate
Expand Down
19 changes: 17 additions & 2 deletions documentation/en/default-lotus-miner-config.toml
Expand Up @@ -325,18 +325,33 @@
# env var: LOTUS_SEALING_MAXWAITDEALSSECTORS
#MaxWaitDealsSectors = 2

# Upper bound on how many sectors can be sealing at the same time when creating new CC sectors (0 = unlimited)
# Upper bound on how many sectors can be sealing+upgrading at the same time when creating new CC sectors (0 = unlimited)
#
# type: uint64
# env var: LOTUS_SEALING_MAXSEALINGSECTORS
#MaxSealingSectors = 0

# Upper bound on how many sectors can be sealing at the same time when creating new sectors with deals (0 = unlimited)
# Upper bound on how many sectors can be sealing+upgrading at the same time when creating new sectors with deals (0 = unlimited)
#
# type: uint64
# env var: LOTUS_SEALING_MAXSEALINGSECTORSFORDEALS
#MaxSealingSectorsForDeals = 0

# Prefer creating new sectors even if there are sectors Available for upgrading.
# This setting combined with MaxUpgradingSectors set to a value higher than MaxSealingSectorsForDeals makes it
# possible to use fast sector upgrades to handle high volumes of storage deals, while still using the simple sealing
# flow when the volume of storage deals is lower.
#
# type: bool
# env var: LOTUS_SEALING_PREFERNEWSECTORSFORDEALS
#PreferNewSectorsForDeals = false

# Upper bound on how many sectors can be sealing+upgrading at the same time when upgrading CC sectors with deals (0 = MaxSealingSectorsForDeals)
#
# type: uint64
# env var: LOTUS_SEALING_MAXUPGRADINGSECTORS
#MaxUpgradingSectors = 0

# CommittedCapacitySectorLifetime is the duration a Committed Capacity (CC) sector will
# live before it must be extended or converted into sector containing deals before it is
# terminated. Value must be between 180-540 days inclusive
Expand Down
2 changes: 0 additions & 2 deletions extern/sector-storage/piece_provider.go
Expand Up @@ -166,8 +166,6 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,

r, err := p.tryReadUnsealedPiece(ctx, unsealed, sector, pieceOffset, size)

log.Debugf("result of first tryReadUnsealedPiece: r=%s, err=%s", r, err)

if xerrors.Is(err, storiface.ErrSectorNotFound) {
log.Debugf("no unsealed sector file with unsealed piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
err = nil
Expand Down
101 changes: 60 additions & 41 deletions extern/storage-sealing/input.go
Expand Up @@ -550,7 +550,7 @@ func (m *Sealing) calcTargetExpiration(ctx context.Context, ssize abi.SectorSize
return curEpoch + minDur, curEpoch + maxDur, nil
}

func (m *Sealing) tryGetUpgradeSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) (bool, error) {
func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) (bool, error) {
if len(m.available) == 0 {
return false, nil
}
Expand Down Expand Up @@ -619,6 +619,24 @@ func (m *Sealing) tryGetUpgradeSector(ctx context.Context, sp abi.RegisteredSeal
return true, m.sectors.Send(uint64(candidate.Number), SectorStartCCUpdate{})
}

// call with m.inputLk
func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi.RegisteredSealProof) (abi.SectorNumber, error) {
sid, err := m.sc.Next()
if err != nil {
return 0, xerrors.Errorf("getting sector number: %w", err)
}

err = m.sealer.NewSector(ctx, m.minerSector(sp, sid))
if err != nil {
return 0, xerrors.Errorf("initializing sector: %w", err)
}

// update stats early, fsm planner would do that async
m.stats.updateSector(ctx, cfg, m.minerSectorID(sid), UndefinedSectorState)

return sid, err
}

func (m *Sealing) tryGetDealSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) error {
m.startupWait.Wait()

Expand All @@ -631,58 +649,59 @@ func (m *Sealing) tryGetDealSector(ctx context.Context, sp abi.RegisteredSealPro
return xerrors.Errorf("getting storage config: %w", err)
}

if cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals {
return nil
}

// if we're above WaitDeals limit, we don't want to add more staging sectors
if cfg.MaxWaitDealsSectors > 0 && m.stats.curStaging() >= cfg.MaxWaitDealsSectors {
return nil
}

got, err := m.tryGetUpgradeSector(ctx, sp, ef)
if err != nil {
return err
}
if got {
return nil
maxUpgrading := cfg.MaxSealingSectorsForDeals
if cfg.MaxUpgradingSectors > 0 {
maxUpgrading = cfg.MaxUpgradingSectors
}

if !cfg.MakeNewSectorForDeals {
return nil
}
canCreate := cfg.MakeNewSectorForDeals && !(cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals)
canUpgrade := !(maxUpgrading > 0 && m.stats.curSealing() >= maxUpgrading)

sid, err := m.createSector(ctx, cfg, sp)
if err != nil {
return err
}
// we want to try to upgrade when:
// - we can upgrade and prefer upgrades
// - we don't prefer upgrades, but can't create a new sector
shouldUpgrade := canUpgrade && (!cfg.PreferNewSectorsForDeals || !canCreate)

m.nextDealSector = &sid

log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp)
return m.sectors.Send(uint64(sid), SectorStart{
ID: sid,
SectorType: sp,
})
}

// call with m.inputLk
func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi.RegisteredSealProof) (abi.SectorNumber, error) {
// Now actually create a new sector

sid, err := m.sc.Next()
if err != nil {
return 0, xerrors.Errorf("getting sector number: %w", err)
}
log.Infow("new deal sector decision",
"sealing", m.stats.curSealing(),
"maxSeal", cfg.MaxSealingSectorsForDeals,
"maxUpgrade", maxUpgrading,
"preferNew", cfg.PreferNewSectorsForDeals,
"canCreate", canCreate,
"canUpgrade", canUpgrade,
"shouldUpgrade", shouldUpgrade)

err = m.sealer.NewSector(ctx, m.minerSector(sp, sid))
if err != nil {
return 0, xerrors.Errorf("initializing sector: %w", err)
if shouldUpgrade {
got, err := m.maybeUpgradeSector(ctx, sp, ef)
if err != nil {
return err
}
if got {
return nil
}
}

// update stats early, fsm planner would do that async
m.stats.updateSector(ctx, cfg, m.minerSectorID(sid), UndefinedSectorState)
if canCreate {
sid, err := m.createSector(ctx, cfg, sp)
if err != nil {
return err
}
m.nextDealSector = &sid

return sid, nil
log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp)
if err := m.sectors.Send(uint64(sid), SectorStart{
ID: sid,
SectorType: sp,
}); err != nil {
return err
}
}
return nil
}

func (m *Sealing) StartPacking(sid abi.SectorNumber) error {
Expand Down
4 changes: 4 additions & 0 deletions extern/storage-sealing/sealiface/config.go
Expand Up @@ -18,6 +18,10 @@ type Config struct {
// includes failed, 0 = no limit
MaxSealingSectorsForDeals uint64

PreferNewSectorsForDeals bool

MaxUpgradingSectors uint64

MakeNewSectorForDeals bool

MakeCCSectorsAvailable bool
Expand Down
124 changes: 18 additions & 106 deletions itests/ccupgrade_test.go
Expand Up @@ -7,16 +7,13 @@ import (
"testing"
"time"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/network"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/itests/kit"
)

Expand All @@ -32,7 +29,23 @@ func TestCCUpgrade(t *testing.T) {
//stm: @MINER_SECTOR_LIST_001
kit.QuietMiningLogs()

runTestCCUpgrade(t)
n := runTestCCUpgrade(t)

t.Run("post", func(t *testing.T) {
ctx := context.Background()
ts, err := n.ChainHead(ctx)
require.NoError(t, err)
start := ts.Height()
// wait for a full proving period
t.Log("waiting for chain")

n.WaitTillChain(ctx, func(ts *types.TipSet) bool {
if ts.Height() > start+abi.ChainEpoch(2880) {
return true
}
return false
})
})
}

func runTestCCUpgrade(t *testing.T) *kit.TestFullNode {
Expand Down Expand Up @@ -60,7 +73,7 @@ func runTestCCUpgrade(t *testing.T) *kit.TestFullNode {
require.NoError(t, err)
require.Less(t, 50000, int(si.Expiration))
}
waitForSectorActive(ctx, t, CCUpgrade, client, maddr)
client.WaitForSectorActive(ctx, t, CCUpgrade, maddr)

//stm: @SECTOR_CC_UPGRADE_001
err = miner.SectorMarkForUpgrade(ctx, sl[0], true)
Expand Down Expand Up @@ -88,104 +101,3 @@ func runTestCCUpgrade(t *testing.T) *kit.TestFullNode {

return client
}

func waitForSectorActive(ctx context.Context, t *testing.T, sn abi.SectorNumber, node *kit.TestFullNode, maddr address.Address) {
for {
active, err := node.StateMinerActiveSectors(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)
for _, si := range active {
if si.SectorNumber == sn {
fmt.Printf("ACTIVE\n")
return
}
}

time.Sleep(time.Second)
}
}

func TestCCUpgradeAndPoSt(t *testing.T) {
kit.QuietMiningLogs()
t.Run("upgrade and then post", func(t *testing.T) {
ctx := context.Background()
n := runTestCCUpgrade(t)
ts, err := n.ChainHead(ctx)
require.NoError(t, err)
start := ts.Height()
// wait for a full proving period
t.Log("waiting for chain")

n.WaitTillChain(ctx, func(ts *types.TipSet) bool {
if ts.Height() > start+abi.ChainEpoch(2880) {
return true
}
return false
})
})
}

func TestAbortUpgradeAvailable(t *testing.T) {
kit.QuietMiningLogs()

ctx := context.Background()
blockTime := 1 * time.Millisecond

client, miner, ens := kit.EnsembleMinimal(t, kit.GenesisNetworkVersion(network.Version15), kit.ThroughRPC())
ens.InterconnectAll().BeginMiningMustPost(blockTime)

maddr, err := miner.ActorAddress(ctx)
if err != nil {
t.Fatal(err)
}

CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1)
fmt.Printf("CCUpgrade: %d\n", CCUpgrade)

miner.PledgeSectors(ctx, 1, 0, nil)
sl, err := miner.SectorsList(ctx)
require.NoError(t, err)
require.Len(t, sl, 1, "expected 1 sector")
require.Equal(t, CCUpgrade, sl[0], "unexpected sector number")
{
si, err := client.StateSectorGetInfo(ctx, maddr, CCUpgrade, types.EmptyTSK)
require.NoError(t, err)
require.Less(t, 50000, int(si.Expiration))
}
waitForSectorActive(ctx, t, CCUpgrade, client, maddr)

err = miner.SectorMarkForUpgrade(ctx, sl[0], true)
require.NoError(t, err)

sl, err = miner.SectorsList(ctx)
require.NoError(t, err)
require.Len(t, sl, 1, "expected 1 sector")

ss, err := miner.SectorsStatus(ctx, sl[0], false)
require.NoError(t, err)

for i := 0; i < 100; i++ {
ss, err = miner.SectorsStatus(ctx, sl[0], false)
require.NoError(t, err)
if ss.State == api.SectorState(sealing.Proving) {
time.Sleep(50 * time.Millisecond)
continue
}

require.Equal(t, api.SectorState(sealing.Available), ss.State)
break
}

require.NoError(t, miner.SectorAbortUpgrade(ctx, sl[0]))

for i := 0; i < 100; i++ {
ss, err = miner.SectorsStatus(ctx, sl[0], false)
require.NoError(t, err)
if ss.State == api.SectorState(sealing.Available) {
time.Sleep(50 * time.Millisecond)
continue
}

require.Equal(t, api.SectorState(sealing.Proving), ss.State)
break
}
}