Skip to content

Commit

Permalink
Use shared piece reader for blockstore get (fixed conflicts) (#1584)
Browse files Browse the repository at this point in the history
* feat: add local index directory

Internally this is still refered to as the piece directory

Co-authored-by: dirkmc <dirkmdev@gmail.com>
Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>

* refactor: merge from main into lid branch (#1339)

* merge: main to lid (#1370)

* add free check (#1315)

* chore: bump version to 1.6.1 (#1317)

* fix legacy deal verified status (#1324)

* fix: update go-unixfsnode enough to make sure unixfs-preload is available (#1323)

* release v1.6.2-rc1 (#1328)

* use full path (#1330)

* fix bug (#1332)

* use forks of graphsync, go-data-transfer and go-fil-markets (#1333)

* refactor: use forks of graphsync, go-data-transfer and go-fil-markets

* refactor: convert from data transfer v1 to v2 voucher type

* fix: index provider validation voucher type

* fix: pass index provider engine link system through to graphsync's transport configurer

* feat: use tagged version of boost-gfm

* fix: retrieval client imports

* feat: tagged version of lotus

* feat: require go 1.19

* lint: fix lint errors

* fix: itests

* fix: cbor-gen, docsgen

* fix: update CI lint version

* fix: lint

* fix: docgen

* fix: go mod tidy

* fix: protocol proxy TestOutboundForwarding

* fix: docsgen

* fix: update filecoin-ffi submodule

* fix: prometheus duplicate register panic

* fix: cleanup imports

* fix: legs voucher processing

* chore: release v1.6.2-rc2 (#1340)

* release v1.6.2-rc2

* fix test

* fix: flaky TestLibp2pCarServerNewTransferCancelsPreviousTransfer (#1350)

* fix: flaky TestDealCompletionOnProcessResumption (#1351)

* fix: occasional panic on shutdown (#1353)

* feat: query UI (#1352)

* log insert

* fix display error

* refactor code

* shorten status strings

* remove comment

* apply suggestion

* feat: add download block link to inspect page (#1312)

* fix(devnet): update golang and lotus default versions (#1354)

* fix(devnet): bump golang to 1.19

* chore(devnet): bump lotus default version

* chore(devnet): remove unused stable env

* booster-http: implement IPFS HTTP gateway (#1225)

* feat: implement http api gateway

* feat: use go-libipfs lib (instead of copying to extern)

* feat: bump booster-bitswap info minor version

* feat: http gateway metrics

* fix: TestHttpInfo

* feat: by default only serve blocks and CARs, with option to serve original files (jpg, mov etc)

* fix: correct link for download root block (#1355)

* feat: option to cleanup data for offline deals after add piece (#1341)

* chore: add support for multiple node.js versions in makefile (#1356)

* chore: release v.1.7.0-rc1 (#1357)

* release v.1.7.0-rc1

* fix version

* fix: dagstore initialize-all parameter (#1363)

* fix: show verifying commp state for offline deals (#1364)

* fix: boost run missing staging-area dir (#1368)

* merge(wip): main to lid

TODO: remoteblockstore needs to handle nil metrics

* fix: flaky TestNewHttpServer (#1372)

* feat: group agent version by binary name (#1369)

* fix: wrap stats in nil checks for now

we should probably revisit how stats are handled now that we have all 3 transports being tracked

* test(fix): incorrect test urls

---------

Co-authored-by: LexLuthr <88259624+LexLuthr@users.noreply.github.com>
Co-authored-by: Rod Vagg <rod@vagg.org>
Co-authored-by: dirkmc <dirkmdev@gmail.com>

* fix: make devnet work for lid (#1375)

* feat: support full addr config in boostd-data

* chore: fix linting for boostd-data

* feat: use addr instead of port for lid

chore: update devnet to work with lid setup

* chore: resolve feedback on lint changes

* feat: fail deal if start epoch passed (#1319)

* fail deal if start epoch passed

* add suggestion

* test: add deal expiry on startup test

---------

Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>

* fix: makefile

* fix: db migration ordering

* fix: correct rootcid formatting

* fix: prevent accidental removal of valid sector index announcements

fix: add cache tests and dont announce cache state
fix: add unique index to sector state db
fix: sealed and unsealed sector state conflict
fix: ensure index provider wrapper starts after db migration has completed

* chore: go mod tidy

* fix: download block (#1440)

* LID yugabyte db impl (#1391)

* feat: yugabyte db impl

* feat: run yugabyte tests against a dockerized yugabyte

* fix: use out own yugabyte docker image

* fix: use yugabyte 2.17.2.0 docker image

* feat: piece doctor yugabyte impl

* fix: go mod tidy

* refactor: remove SetCarSize as its not longer being used

* refactor: remove functionality to mark index as errored (not being used)

* feat: implement delete commands

* refactor: consolidate test params

* feat: add lid yugabyte config

* fix: port map yugabyte postgres to standard port

* Fix yugabyte CI (#1433)

* fix: yugabyte tests in CI

* docker-compose.yml ; Dockerfile.test ; connect to `yugabyte` and not localhost

* add tag

* test lid

* make gen

* fixup

* move couchbase settings under build tag

---------

Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>

---------

Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>

* script to migrate from couchbase to yugabyte (#1445)

* feat: script to migrate from couchbase to yugabyte

* fix: reduce batch size for yugabyte inserts

* Change service GetIndex / AddIndex to return channel instead of array (#1444)

* feat: yugabyte db impl

* feat: run yugabyte tests against a dockerized yugabyte

* fix: use out own yugabyte docker image

* fix: use yugabyte 2.17.2.0 docker image

* feat: piece doctor yugabyte impl

* fix: go mod tidy

* refactor: remove SetCarSize as its not longer being used

* refactor: remove functionality to mark index as errored (not being used)

* feat: implement delete commands

* refactor: consolidate test params

* feat: add lid yugabyte config

* fix: port map yugabyte postgres to standard port

* Fix yugabyte CI (#1433)

* fix: yugabyte tests in CI

* docker-compose.yml ; Dockerfile.test ; connect to `yugabyte` and not localhost

* add tag

* test lid

* make gen

* fixup

* move couchbase settings under build tag

---------

Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>

* wip: service GetIndex returns channel of records instead of array

* feat: return channel from AddIndex and GetIndex

---------

Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>

* local index directory: recover tool (#1410)

* initial disaster recovery tool for LID

* wip

* do not block on individual error

* instantiate lid

* report

* catch signal

* fixup

* comment out sector already in progress

* fixup

* start containers with init: true

* record that we dont have an unsealed copy

* match deals with boost sqlite db and piece store

* fixup

* fixup

* use logger

* fixup

* disable stacktrace

* fixup

* extract piece store away from disaster recovery struct

* add more sanity checks

* compare IsUnsealed vs storage find

* improve safeIsUnseal

* fixup

* better logs

* expand repodir

* calc properly next offset

* fixup

* add sector id to logs

* incr offset

* break after finding expired deal

* more logs

* fewer logs

* better logs

* better error

* refactor

* refactor minerApi

* better logs

* add time around add index

* pd.Start

* LID benchmarking tool (#1276)

* feat: LID benchmarking tool

* fix: bench thread safety

* refactor: structured logging

* refactor: postgres bulk insert

* lid bench: Add foundationdb impl

* lid fdb: Fix Tx sizing, parallel chunk puts

* lid fdb: More efficient sample generation

* feat: array of piece count / blocks per piece (#1314)

* lid bench: print add rate

* lid bench: Add retry to postgres put (#1316)

* lid bench: Make cassandra put much more robust (#1318)

* instrumentation for bench tool (#1337)

* instrument postgres

* more instrumentation

* check for err getoffsetsize

* emit metrics every 10sec

* ignore errors

* add postgres-drop

* use directly tables

* fix: go mod tidy

* use INSERT INTO instead of tmp tables

* try to catch sig

* remove transaction commit

* fixup

* add postgres-init

* fixuop

* split create and init

* fixup

* remove if not exist

---------

Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>

* feat: batch insert queries for postgres

* feat: add flag to insert into postgres using tmp table

* refactor: merge changes from nonsense/lid-bench

* refactor: just use one database (dont create bench database)

* refactor: remove unused params

* refactor: command structure

* fix: cassandra - dont use batch insert for PayloadToPieces

* fix: create tables CQL

* fix: increase payload to pieces insert parallelism

* fix: use simple replication strategy

* feat: use yugabyte cassandra driver

* fix: remove bench binary

* update metrics endpoint

* fix random generated piece cid

* fixup

* fix: cassandra bitswap benchmark

* remove foundationdb

---------

Co-authored-by: Łukasz Magiera <magik6k@gmail.com>
Co-authored-by: Łukasz Magiera <magik6k@users.noreply.github.com>
Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>

* fix: failing tests due to bad merge

* fix: flaky TestMultipleDealsConcurrent

* more logs

* piece doctor and sector state manager refactor (#1463)

* fix timer.Reset and improve logs

* revert randomization

* piece doc: handle errors

* adjust piece check

* refactor unsealsectormanager

* refactor piece doctor

* add random ports

* ignore tests

* add version to boostd-data

* fix ctx in Start

* fix: add reader mock to fix tests

* fix: pass new piece directory to provider on test restart

* fix synchronisation

* note that panics are not propagated in tests

* carv1 panics piece directory

* print panics

* fix: use reader that supports Seek in piece reader mock

* fix: reset mock car reader on each invocation

* fix: TestOfflineDealDataCleanup

* add check for nil cancel func

* bump min check period for LevelDB to 5 minutes

* check if sector state mgr is initialised

* debug line for unflagging

* commenting out TestMultipleDealsConcurrent -- flaky test -- works locally

* add SectorStateUpdates pubsub

* add close for pubsub

* add mock sectorstatemgr

* add wrapper tests

* fixup

* cleanup

* cleanup

* better names

* t.Skip for test

* remove TODO above println for panic

* add unit tests for refreshState

* rename tests

* more cases

* more tests

* update description

* better comment

* better names and comments

---------

Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>

* Merge from main to lid branch (#1483)

* fix statx output string (#1451)

* fix: flaky TestMultipleDealsConcurrent (#1458)

* Add option to serve index provider ads over http (#1452)

* feat: option to serve index provider ads over http

* fix: config naming, hostname parsing

* fix: update docsgen

* fix: log announce address

* feat: add config for indexer direct announce urls

* refactor: always announce over pubsub

* fix: docsgen

* test: add test case for empty announce address hostname

* Add `boostd index announce-latest` command (#1456)

* feat: boostd index announce-latest

* feat: add announce-latest-http command

* fix: default direct announce url

* feat: update to index-provider v0.11.2

* Signal to index provider to skip announcements (#1457)

* fix: signal to index provider to skip announcements

* fix: ensure multihash lister skip error is of type ipld.ErrNotExists

---------

Co-authored-by: LexLuthr <lexluthr@protocol.ai>

* release v1.7.3-rc2 (#1460)

* fix: improve stalled retrieval cancellation (#1449)

* refactor stalled retrieval cancel

* add ctx with timeout

* implement suggestions

* update err wrapping

* fix: set short cancel timeout for unpaid retrievals only

---------

Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>

* feat: enable listen address for booster-http (#1461)

* enable listen address

* modify tests

* fix nil ptr (#1470)

* fix: incorrect check when import offline deal data using proposal CID (#1473)

* fix incorrect early check

* update error msg

* fix(server): properly cancel graphsync requests (#1475)

* set UI default listen address to localhost (#1476)

* feat: display msg params in the mpool UI (#1471)

* show msg params

* fix: mpool nil pointer

* fix width

---------

Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>

* Reset read deadline after reading deal proposal message (#1479)

* fix: reset read deadline after reading deal proposal message

* fix: increase client request deadline

* feat: Show elapsed epoch and PSD wait epochs in UI (#1480)

* show epochs

* fix devnet UI, use BlockdDelaySecs

* fix lint err

* Update gql/resolver.go

Co-authored-by: dirkmc <dirkmdev@gmail.com>

---------

Co-authored-by: dirkmc <dirkmdev@gmail.com>

* release v1.7.3-rc3 (#1481)

---------

Co-authored-by: LexLuthr <88259624+LexLuthr@users.noreply.github.com>
Co-authored-by: LexLuthr <lexluthr@protocol.ai>
Co-authored-by: Hannah Howard <hannah@hannahhoward.net>

* update local index directory ui (#1477)

* feat: update local index directory ui

* comment out wrench as docker doesnt build

* rearrange menu

* refactor: remove sectors list

---------

Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>

* feat: surface indexing errors (#1490)

* feat: log panic (instead of just printing to stdout) (#1491)

* split flagged pieces into unsealed/sealed tables (#1493)

* refactor: remove couchbase tests (#1496)

* refactor: remove piece directory couchbase tests (#1497)

* GraphQL resolvers for LID (#1494)

* wip

* rename

* sectorUnsealedCopies and SectorProvingState

* fix: piece directory tests (#1498)

* log line for only sealed sectors

* more logs

* feat: flagged pieces (#1501)

* check that sector has deals for unsealed sectors (#1502)

* check that sector has deals for unsealed sectors

* simplify

* rename heading

* piece doctor to ignore expired/slashed deals (#1503)

* ignore expired/slashed deals

* fix mocks

* add timer for checkPiece

* move ChainHead away from checkPiece

* add nil check for fullnodeApi

* add debug line

* fix pagination

* LID landing page: add stats around Flagged and non-Flagged pieces (#1508)

* wip

* fixup

* add debug line

* fixup

* feat: split flagged pieces page into flagged / flagged because unsealed (#1509)

* fix: display of no flagged pieces (#1511)

* disable dummy panels - block stats; deal data (#1510)

* fix unsealed field in flagged piece (#1515)

* update ffi

* fix main merge issue

* fix go mod

* Add info boxes on LID UI page (#1516)

* feat: add info boxes on LID UI page

* Update react/src/LID.js

Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>

* Update react/src/LID.js

Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>

---------

Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>

* feat: replace migrate couchbase command with migrate yugavbyte (#1518)

* remove redundant makefile (#1519)

* remove redundant makefile

* add migrate-lid to Makefile

* update gitignore

* move booster-bitswap and booster-http to make and make install

* fix: inspect page - dont try to fetch root cid (#1525)

* feat: add send epoch, time, elapsed epoch and elapsed time for each message in mpool to UI (#1523)

* add message epoch/time details

* implement suggestion

* use moment lib

* fix alerting bug

* update polling interval

* add logs

* fix devnet: use ws instead of http to connect to boostd-data

* feat: make legacy deals optional (#1524)

* make legacy deals optional

* fix gen

* modify itests, create new

* handle legacy stream explicitly

* separate out the protocols

* fix lint error

* enable itest in CI

* fix ci

* apply suggestions

* fix error after conflict resolution

* refactor: simplify legacy deal response code

---------

Co-authored-by: Dirk McCormick <dirkmdev@gmail.com>

* refactor: remove couchbase implementation (#1535)

* Update lotus and boxo versions (#1466) (#1537)

* Update to use packages in go-libipni

* feat: update lotus version

* update boxo (#1492)

* feat: update boxo

* refactor: depend on repo:Jorropo/lotus branch:boxo2

* chore: temporarily update go-fil-markets with replace directive

* feat: switch itests framework ExtractFileFromCAR to use non-global IPLD registry

* feat: switch booster-bitswap client fetch to use the go-ipld-prime globals via go-ipld-legacy

* go fmt

* chore: update dependencies and migrate to boxo

* fix: update boost-gfm

* fix: stop itests framework from prematurely setting listenaddrs via go-libp2p defaults that conflict with lotus

* fix: docs gen

* chore(deps): update deps for boxo v0.10.0

* chore(deps): update boost-gfm

* fix(booster-http): update for boxo v0.10.0

* chore(deps): update to remove kubo dependency

* fix(gen): update docs gen

* feat: update boost-gfm to v1.26.6

* chore(deps): update lotus to master

---------




---------

Co-authored-by: gammazero <gammazero@users.noreply.github.com>
Co-authored-by: Adin Schmahmann <adin.schmahmann@gmail.com>
Co-authored-by: hannahhoward <hannah@hannahhoward.net>

* feat: update boost-gfm to v1.26.7 (#1538)

* fix: piece doctor tests (#1540)

* refactor: build indexes for legacy deals (#1539)

* feat: http index announcements (#1418)

* feat(indexprovider): announce http transport

refactor: isolate extended provider logic

feat: announce http indexes

refactor(indexprovider): use metadata.Default

fix(wrapper): fix compile error

* fix http ep signing bug

* update comment

---------

Co-authored-by: LexLuthr <lexluthr@protocol.ai>

* feat: check unseal status of piece through both apis (#1548)

* fix: metrics and Grafana (#1546)

* fix grafna, metrics

* remove dagstore from name

* fix: add missing PieceDeal (PieceCid) index (#1551)

* fix: iterate all deals to index piece (#1549)

* fix: iterate all deals to index piece

* add test, use multierror

* add and update comments

* refactor: separate yugabyte / leveldb tests for easier local testing (#1553)

* feat: refactor mpool page in UI (#1530)

* modify GQL

* fix count type

* fix locks

* fix js

* migrate config to v5 (#1560)

* migrate config to v5

* change default version

* chore: release v2.0.0-rc1 (#1561)

* Upgrade to index-provider v0.13.4 (#1559)

Upgrade to the latest index-provider library.

* feat: add IPNI itest (#1563)

* ipni itest

* refactor test

* add to circleCI

* add indexer topic

* Print protocol IDs exposed by f.Boost

* generate topic name dynamically

---------

Co-authored-by: Masih H. Derkani <m@derkani.org>

* IPNI UX (#1562)

* feat: IPNI UX

* Update react/src/Ipni.js

Co-authored-by: LexLuthr <88259624+LexLuthr@users.noreply.github.com>

* feat: server side config

---------

Co-authored-by: LexLuthr <88259624+LexLuthr@users.noreply.github.com>

* feat: use shared piece reader for blockstore get

* fix: booster-bitswap client - extract blocks from identity cids

* go mod tidy

* fixup

* remove refs and use ttlcache

* ignore err on cache

* add lock around ttl cache

* use a new readerCtx

* add expire again

* fix logger

* check for refs=0 when closing individual shared reader

* use cancel instead of direct close

* add expired field

* downgrade warnw to debugw

---------

Co-authored-by: Jacob Heun <jacob.heun@gmail.com>
Co-authored-by: dirkmc <dirkmdev@gmail.com>
Co-authored-by: Jacob Heun <jacobheun@gmail.com>
Co-authored-by: LexLuthr <88259624+LexLuthr@users.noreply.github.com>
Co-authored-by: Rod Vagg <rod@vagg.org>
Co-authored-by: Łukasz Magiera <magik6k@gmail.com>
Co-authored-by: Łukasz Magiera <magik6k@users.noreply.github.com>
Co-authored-by: LexLuthr <lexluthr@protocol.ai>
Co-authored-by: Hannah Howard <hannah@hannahhoward.net>
Co-authored-by: gammazero <gammazero@users.noreply.github.com>
Co-authored-by: Adin Schmahmann <adin.schmahmann@gmail.com>
Co-authored-by: Masih H. Derkani <m@derkani.org>
  • Loading branch information
13 people committed Jul 25, 2023
1 parent 53fdf99 commit 12e1a75
Show file tree
Hide file tree
Showing 6 changed files with 350 additions and 39 deletions.
103 changes: 77 additions & 26 deletions cmd/booster-bitswap/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ import (
"sync/atomic"
"time"

"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/traversal"
mh "github.com/multiformats/go-multihash"

"github.com/filecoin-project/boost/cmd/booster-bitswap/bitswap"
lotus_blockstore "github.com/filecoin-project/lotus/blockstore"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/ipfs/boxo/bitswap/client"
bsnetwork "github.com/ipfs/boxo/bitswap/network"
Expand Down Expand Up @@ -109,7 +115,8 @@ var fetchCmd = &cli.Command{

ctx, cancel := context.WithCancel(ctx)
defer cancel()
brn := &blockReceiver{bs: bs, ctx: ctx, cancel: cancel}
idbs := lotus_blockstore.WrapIDStore(bs)
brn := &blockReceiver{bs: idbs, ctx: ctx, cancel: cancel}
bsClient := client.New(ctx, net, bs, client.WithBlockReceivedNotifier(brn))
defer bsClient.Close()
net.Start(bsClient)
Expand Down Expand Up @@ -163,37 +170,51 @@ var fetchCmd = &cli.Command{
}

func getBlocks(ctx context.Context, bsClient *client.Client, c cid.Cid, throttle chan struct{}) (uint64, uint64, error) {
if throttle != nil {
throttle <- struct{}{}
}
// Get the block
start := time.Now()
blk, err := bsClient.GetBlock(ctx, c)
if throttle != nil {
<-throttle
}
if err != nil {
return 0, 0, err
}
var size uint64
var links []cid.Cid
if c.Prefix().MhType == mh.IDENTITY {
var err error
size, links, err = getIDBlock(c)
if err != nil {
return 0, 0, err
}
} else {
if throttle != nil {
throttle <- struct{}{}
}
// Get the block
start := time.Now()
blk, err := bsClient.GetBlock(ctx, c)
if throttle != nil {
<-throttle
}
if err != nil {
return 0, 0, err
}

var size = uint64(len(blk.RawData()))
log.Debugw("receive", "cid", c, "size", size, "duration", time.Since(start).String())
size = uint64(len(blk.RawData()))
log.Debugw("receive", "cid", c, "size", size, "duration", time.Since(start).String())

// Read the links from the block to child nodes in the DAG
var count = uint64(1)
ipldDecoder := ipldlegacy.NewDecoder()
nd, err := ipldDecoder.DecodeNode(ctx, blk)
if err != nil {
return 0, 0, fmt.Errorf("decoding node %s: %w", c, err)
// Read the links from the block to child nodes in the DAG
ipldDecoder := ipldlegacy.NewDecoder()
nd, err := ipldDecoder.DecodeNode(ctx, blk)
if err != nil {
return 0, 0, fmt.Errorf("decoding node %s: %w", c, err)
}

ndLinks := nd.Links()
for _, l := range ndLinks {
links = append(links, l.Cid)
}
}

var count = uint64(1)
var eg errgroup.Group
lnks := nd.Links()
for _, l := range lnks {
l := l
for _, link := range links {
link := link
// Launch a go routine to fetch the blocks underneath each link
eg.Go(func() error {
cnt, sz, err := getBlocks(ctx, bsClient, l.Cid, throttle)
cnt, sz, err := getBlocks(ctx, bsClient, link, throttle)
if err != nil {
return err
}
Expand All @@ -206,8 +227,38 @@ func getBlocks(ctx context.Context, bsClient *client.Client, c cid.Cid, throttle
return count, size, eg.Wait()
}

func getIDBlock(c cid.Cid) (uint64, []cid.Cid, error) {
dmh, err := mh.Decode(c.Hash())
if err != nil {
return 0, nil, err
}

if dmh.Code != mh.IDENTITY {
return 0, nil, fmt.Errorf("bad cid: multihash type identity but decoded mh is not identity")
}

decoder, err := cidlink.DefaultLinkSystem().DecoderChooser(cidlink.Link{Cid: c})
if err != nil {
return 0, nil, fmt.Errorf("choosing decoder for identity CID %s: %w", c, err)
}
node, err := ipld.Decode(dmh.Digest, decoder)
if err != nil {
return 0, nil, fmt.Errorf("decoding identity CID %s: %w", c, err)
}
links, err := traversal.SelectLinks(node)
if err != nil {
return 0, nil, fmt.Errorf("collecting links from identity CID %s: %w", c, err)
}
// convert from Link to Cid
resultCids := make([]cid.Cid, 0)
for _, link_ := range links {
resultCids = append(resultCids, link_.(cidlink.Link).Cid)
}
return uint64(len(dmh.Digest)), resultCids, nil
}

type blockReceiver struct {
bs *blockstore.ReadWrite
bs lotus_blockstore.Blockstore
ctx context.Context
cancel context.CancelFunc
}
Expand Down
1 change: 1 addition & 0 deletions cmd/booster-http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func before(cctx *cli.Context) error {
if cliutil.IsVeryVerbose {
_ = logging.SetLogLevel("booster", "DEBUG")
_ = logging.SetLogLevel("remote-blockstore", "DEBUG")
_ = logging.SetLogLevel("piecedirectory", "DEBUG")
}

return nil
Expand Down
10 changes: 9 additions & 1 deletion piecedirectory/piece_directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@ package piecedirectory

import (
"context"
"testing"

"github.com/filecoin-project/boostd-data/svc"
"github.com/stretchr/testify/require"
"testing"
)

func TestPieceDirectoryLevelDB(t *testing.T) {
bdsvc, err := svc.NewLevelDB("")
require.NoError(t, err)
testPieceDirectory(context.Background(), t, bdsvc)
}

func TestPieceDirectoryLevelDBFuzz(t *testing.T) {
//_ = logging.SetLogLevel("piecedirectory", "debug")
bdsvc, err := svc.NewLevelDB("")
require.NoError(t, err)
testPieceDirectoryFuzz(context.Background(), t, bdsvc)
}
142 changes: 132 additions & 10 deletions piecedirectory/piecedirectory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/boostd-data/shared/tracing"
bdtypes "github.com/filecoin-project/boostd-data/svc/types"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/lib/readerutil"
"github.com/filecoin-project/lotus/markets/dagstore"
"github.com/hashicorp/go-multierror"
bstore "github.com/ipfs/boxo/blockstore"
Expand All @@ -25,17 +26,23 @@ import (
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/blockstore"
carindex "github.com/ipld/go-car/v2/index"
"github.com/jellydator/ttlcache/v2"
"github.com/multiformats/go-multihash"
mh "github.com/multiformats/go-multihash"
"go.opentelemetry.io/otel/attribute"
)

var log = logging.Logger("piecedirectory")

var MaxCachedReaders = 128

type PieceDirectory struct {
store *bdclient.Store
pieceReader types.PieceReader

pieceReaderCacheMu sync.Mutex
pieceReaderCache *ttlcache.Cache

ctx context.Context

addIdxThrottleSize int
Expand All @@ -44,12 +51,39 @@ type PieceDirectory struct {
}

func NewPieceDirectory(store *bdclient.Store, pr types.PieceReader, addIndexThrottleSize int) *PieceDirectory {
return &PieceDirectory{
prCache := ttlcache.NewCache()
_ = prCache.SetTTL(30 * time.Second)
prCache.SetCacheSizeLimit(MaxCachedReaders)

pd := &PieceDirectory{
store: store,
pieceReader: pr,
pieceReaderCache: prCache,
addIdxThrottleSize: addIndexThrottleSize,
addIdxThrottle: make(chan struct{}, addIndexThrottleSize),
}

expireCallback := func(key string, reason ttlcache.EvictionReason, value interface{}) {
log.Debugw("expire callback", "piececid", key, "reason", reason)

r := value.(*cachedSectionReader)

pd.pieceReaderCacheMu.Lock()
defer pd.pieceReaderCacheMu.Unlock()

r.expired = true

if r.refs <= 0 {
r.cancel()
return
}

log.Debugw("expire callback with refs > 0", "refs", r.refs, "piececid", key, "reason", reason)
}

prCache.SetExpirationReasonCallback(expireCallback)

return pd
}

func (ps *PieceDirectory) Start(ctx context.Context) {
Expand Down Expand Up @@ -334,6 +368,98 @@ func (ps *PieceDirectory) GetPieceReader(ctx context.Context, pieceCid cid.Cid)
return nil, merr
}

type cachedSectionReader struct {
types.SectionReader
ps *PieceDirectory
pieceCid cid.Cid
// Signals when the underlying piece reader is ready
ready chan struct{}
// err is non-nil if there's an error getting the underlying piece reader
err error
// cancel for underlying GetPieceReader call
cancel func()
refs int
expired bool
}

func (r *cachedSectionReader) Close() error {
r.ps.pieceReaderCacheMu.Lock()
defer r.ps.pieceReaderCacheMu.Unlock()

r.refs--

if r.refs == 0 && r.expired {
log.Debugw("canceling underlying section reader context as cache entry doesn't exist", "piececid", r.pieceCid)

r.cancel()
}

return nil
}

// Get a piece reader that is shared between callers. These readers are most
// performant for random acccess (eg bitswap reads).
// If there is no error, the caller must call Close() on the section reader.
func (ps *PieceDirectory) GetSharedPieceReader(ctx context.Context, pieceCid cid.Cid) (types.SectionReader, error) {
ctx, span := tracing.Tracer.Start(ctx, "pm.get_shared_piece_reader")
defer span.End()
span.SetAttributes(attribute.String("piececid", pieceCid.String()))

var r *cachedSectionReader

// Check if there is already a piece reader in the cache
ps.pieceReaderCacheMu.Lock()
rr, err := ps.pieceReaderCache.Get(pieceCid.String())
if err != nil {
// There is not yet a cached piece reader, create a new one and add it
// to the cache
r = &cachedSectionReader{
ps: ps,
pieceCid: pieceCid,
ready: make(chan struct{}),
refs: 1,
}
_ = ps.pieceReaderCache.Set(pieceCid.String(), r)
ps.pieceReaderCacheMu.Unlock()

// We just added a cached reader, so get its underlying piece reader
readerCtx, readerCtxCancel := context.WithCancel(context.Background())
sr, err := ps.GetPieceReader(readerCtx, pieceCid)

r.SectionReader = sr
r.err = err
r.cancel = readerCtxCancel

// Inform any waiting threads that the cached reader is ready
close(r.ready)
} else {

r = rr.(*cachedSectionReader)
r.refs++

ps.pieceReaderCacheMu.Unlock()

// We already had a cached reader, wait for it to be ready
select {
case <-ctx.Done():
// The context timed out. Deference the cached piece reader and
// return an error.
_ = r.Close()
return nil, ctx.Err()
case <-r.ready:
}
}

// If there was an error getting the underlying piece reader, make sure
// that the cached reader gets cleaned up
if r.err != nil {
_ = r.Close()
return nil, r.err
}

return r, nil
}

// Get all pieces that contain a multihash (used when retrieving by payload CID)
func (ps *PieceDirectory) PiecesContainingMultihash(ctx context.Context, m mh.Multihash) ([]cid.Cid, error) {
ctx, span := tracing.Tracer.Start(ctx, "pm.pieces_containing_multihash")
Expand Down Expand Up @@ -361,7 +487,6 @@ func (ps *PieceDirectory) GetIterableIndex(ctx context.Context, pieceCid cid.Cid

// Get a block (used by Bitswap retrieval)
func (ps *PieceDirectory) BlockstoreGet(ctx context.Context, c cid.Cid) ([]byte, error) {
// TODO: use caching to make this efficient for repeated Gets against the same piece
ctx, span := tracing.Tracer.Start(ctx, "pm.get_block")
defer span.End()

Expand All @@ -370,8 +495,8 @@ func (ps *PieceDirectory) BlockstoreGet(ctx context.Context, c cid.Cid) ([]byte,

// Check if it's an identity cid, if it is, return its digest
if err != nil {
digest, ok, err := isIdentity(c)
if err == nil && ok {
digest, ok, iderr := isIdentity(c)
if iderr == nil && ok {
return digest, nil
}
return nil, fmt.Errorf("getting pieces containing cid %s: %w", c, err)
Expand All @@ -385,7 +510,7 @@ func (ps *PieceDirectory) BlockstoreGet(ctx context.Context, c cid.Cid) ([]byte,
for i, pieceCid := range pieces {
data, err := func() ([]byte, error) {
// Get a reader over the piece data
reader, err := ps.GetPieceReader(ctx, pieceCid)
reader, err := ps.GetSharedPieceReader(ctx, pieceCid)
if err != nil {
return nil, fmt.Errorf("getting piece reader: %w", err)
}
Expand All @@ -398,13 +523,10 @@ func (ps *PieceDirectory) BlockstoreGet(ctx context.Context, c cid.Cid) ([]byte,
}

// Seek to the block offset
_, err = reader.Seek(int64(offsetSize.Offset), io.SeekStart)
if err != nil {
return nil, fmt.Errorf("seeking to offset %d in piece reader: %w", int64(offsetSize.Offset), err)
}
readerAt := readerutil.NewReadSeekerFromReaderAt(reader, int64(offsetSize.Offset))

// Read the block data
_, data, err := util.ReadNode(bufio.NewReader(reader))
_, data, err := util.ReadNode(bufio.NewReader(readerAt))
if err != nil {
return nil, fmt.Errorf("reading data for block %s from reader for piece %s: %w", c, pieceCid, err)
}
Expand Down
Loading

0 comments on commit 12e1a75

Please sign in to comment.