Skip to content

Commit

Permalink
fix: add miner address to yugabyte / leveldb impl
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Aug 4, 2023
1 parent 4279fed commit eb350d5
Show file tree
Hide file tree
Showing 22 changed files with 481 additions and 129 deletions.
10 changes: 8 additions & 2 deletions cmd/migrate-lid/migrate_lid.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/filecoin-project/boostd-data/yugabyte/migrations"
"os"
"path"
"sort"
Expand Down Expand Up @@ -142,7 +143,11 @@ var migrateYugabyteDBCmd = &cli.Command{
PayloadPiecesParallelism: cctx.Int("insert-parallelism"),
}

store := yugabyte.NewStore(settings)
// Note that it doesn't matter what address we pass here: because the
// table is newly created, it doesn't contain any rows when the
// migration is run.
migrator := yugabyte.NewMigrator(settings.ConnectString, address.TestAddress)
store := yugabyte.NewStore(settings, migrator)
return migrate(cctx, "yugabyte", store, migrateType)
},
}
Expand Down Expand Up @@ -605,7 +610,8 @@ func migrateReverse(cctx *cli.Context, dbType string) error {
Hosts: cctx.StringSlice("hosts"),
PayloadPiecesParallelism: cctx.Int("insert-parallelism"),
}
store = yugabyte.NewStore(settings)
migrator := yugabyte.NewMigrator(settings.ConnectString, migrations.DisabledMinerAddr)
store = yugabyte.NewStore(settings, migrator)
}

// Perform the reverse migration
Expand Down
61 changes: 59 additions & 2 deletions extern/boostd-data/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package main

import (
"context"
"errors"
"fmt"
"net/http"

"github.com/filecoin-project/boostd-data/shared/cliutil"
"github.com/filecoin-project/boostd-data/shared/tracing"
"github.com/filecoin-project/boostd-data/svc"
"github.com/filecoin-project/boostd-data/yugabyte"
"github.com/filecoin-project/boostd-data/yugabyte/migrations"
"github.com/filecoin-project/go-address"
"github.com/mitchellh/go-homedir"
"github.com/urfave/cli/v2"
)
Expand Down Expand Up @@ -88,15 +91,29 @@ var yugabyteCmd = &cli.Command{
}},
runFlags...,
),
Subcommands: []*cli.Command{yugabyteMigrateCmd},
Action: func(cctx *cli.Context) error {
// Create a yugabyte data service
settings := yugabyte.DBSettings{
Hosts: cctx.StringSlice("hosts"),
ConnectString: cctx.String("connect-string"),
}

bdsvc := svc.NewYugabyte(settings)
return runAction(cctx, "yugabyte", bdsvc)
// One of the migrations requires a miner address. But we don't want to
// add a miner-address parameter to this command just for the one time
// that the user needs to perform that specific migration. Instead, pass
// a disabled miner address, and if the migration is needed it will
// throw ErrMissingMinerAddr and we can inform the user they need to
// perform the migration.
migrator := yugabyte.NewMigrator(settings.ConnectString, migrations.DisabledMinerAddr)

// Create a connection to the yugabyte implementation of LID
bdsvc := svc.NewYugabyte(settings, migrator)
err := runAction(cctx, "yugabyte", bdsvc)
if err != nil && errors.Is(err, migrations.ErrMissingMinerAddr) {
return fmt.Errorf("The database needs to be migrated. Run `boost-data run yugabyte migrate`")
}
return err
},
}

Expand Down Expand Up @@ -151,3 +168,43 @@ func runAction(cctx *cli.Context, dbType string, store *svc.Service) error {

return nil
}

var yugabyteMigrateCmd = &cli.Command{
Name: "migrate",
Usage: "Migrate boostd-data yugabyte database",
Before: before,
Flags: []cli.Flag{
&cli.StringSliceFlag{
Name: "hosts",
Usage: "yugabyte hosts to connect to over cassandra interface eg '127.0.0.1'",
Required: true,
},
&cli.StringFlag{
Name: "connect-string",
Usage: "postgres connect string eg 'postgresql://postgres:postgres@localhost'",
Required: true,
},
&cli.StringFlag{
Name: "miner-address",
Usage: "default miner address eg f1234",
Required: true,
},
},
Action: func(cctx *cli.Context) error {
// Create a yugabyte data service
settings := yugabyte.DBSettings{
Hosts: cctx.StringSlice("hosts"),
ConnectString: cctx.String("connect-string"),
}

maddr, err := address.NewFromString(cctx.String("miner-address"))
if err != nil {
return fmt.Errorf("parsing miner address '%s': %w", maddr, err)
}
migrator := yugabyte.NewMigrator(settings.ConnectString, maddr)
bdsvc := svc.NewYugabyte(settings, migrator)

// Create the database and run migrations
return bdsvc.Impl.(*yugabyte.Store).Create(cctx.Context)
},
}
2 changes: 2 additions & 0 deletions extern/boostd-data/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/mitchellh/go-homedir v1.1.0
github.com/multiformats/go-multicodec v0.6.0
github.com/multiformats/go-multihash v0.2.1
github.com/pressly/goose/v3 v3.5.3
github.com/stretchr/testify v1.8.1
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
github.com/urfave/cli/v2 v2.24.4
Expand Down Expand Up @@ -101,6 +102,7 @@ require (
github.com/multiformats/go-varint v0.0.6 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/polydawn/refmt v0.0.0-20201211092308-30ac6d18308e // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
Expand Down
15 changes: 15 additions & 0 deletions extern/boostd-data/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,7 @@ github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E
github.com/jwilder/encoding v0.0.0-20170811194829-b4e1701a28ef/go.mod h1:Ct9fl0F6iIOGgxJ5npU/IUOhOhqlVrGjyIZc8/MagT0=
github.com/kabukky/httpscerts v0.0.0-20150320125433-617593d7dcb3/go.mod h1:BYpt4ufZiIGv2nXn4gMxnfKV306n3mWXgNu/d2TqdTU=
github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d/go.mod h1:P2viExyCEfeWGU259JnaQ34Inuec4R38JCyBx2edgD0=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kilic/bls12-381 v0.0.0-20200607163746-32e1441c8a9f/go.mod h1:XXfR6YFCRSrkEXbNlIyDsgXVNJWVUV30m/ebkVy9n6s=
Expand Down Expand Up @@ -1546,6 +1547,7 @@ github.com/marten-seemann/qtls-go1-17 v0.1.1/go.mod h1:C2ekUKcDdz9SDWxec1N/MvcXB
github.com/marten-seemann/qtls-go1-18 v0.1.0-beta.1/go.mod h1:PUhIQk19LoFt2174H4+an8TYvWOGjb/hHwphBeaDHwI=
github.com/marten-seemann/qtls-go1-18 v0.1.1/go.mod h1:mJttiymBAByA49mhlNZZGrH5u1uXYZJ+RW28Py7f4m4=
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd/go.mod h1:QuCEs1Nt24+FYQEqAAncTDPJIuGs+LxK1MCiFL25pMU=
github.com/matryer/is v1.4.0 h1:sosSmIWwkYITGrxZ25ULNDeKiMNzFSr4V/eqBQP0PeE=
github.com/matryer/is v1.4.0/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
Expand Down Expand Up @@ -1827,6 +1829,7 @@ github.com/polydawn/refmt v0.0.0-20201211092308-30ac6d18308e h1:ZOcivgkkFRnjfoTc
github.com/polydawn/refmt v0.0.0-20201211092308-30ac6d18308e/go.mod h1:uIp+gprXxxrWSjjklXD+mN4wed/tMfjMMmN/9+JsA9o=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
github.com/pressly/goose/v3 v3.5.3 h1:lIQIIXVbdO2RuQtJBS1e7MZjKEk0demVWt6i0YPiOrg=
github.com/pressly/goose/v3 v3.5.3/go.mod h1:IL4NNMdXx9O6hHpGbNB5l1hkVe/Avoz4gBDE5g7rQNg=
github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
Expand Down Expand Up @@ -1886,6 +1889,7 @@ github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40T
github.com/raulk/clock v1.1.0/go.mod h1:3MpVxdZ/ODBQDxbN+kzshf5OSZwPjtMDx6BBXBmOeY0=
github.com/raulk/go-watchdog v1.2.0/go.mod h1:lzSbAl5sh4rtI8tYHU01BWIDzgzqaQLj6RcA1i4mlqI=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc=
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
Expand Down Expand Up @@ -2835,7 +2839,9 @@ lukechampine.com/blake3 v1.1.6/go.mod h1:tkKEOtDkNtklkXtLNEOGNq5tcV90tJiA1vAA12R
lukechampine.com/blake3 v1.1.7 h1:GgRMhmdsuK8+ii6UZFDL8Nb+VyMwadAgcJyfYHxG6n0=
lukechampine.com/blake3 v1.1.7/go.mod h1:tkKEOtDkNtklkXtLNEOGNq5tcV90tJiA1vAA12R78LA=
lukechampine.com/uint128 v1.1.1/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk=
lukechampine.com/uint128 v1.2.0 h1:mBi/5l91vocEN8otkC5bDLhi2KdCticRiwbdB0O+rjI=
lukechampine.com/uint128 v1.2.0/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk=
modernc.org/cc v1.0.0 h1:nPibNuDEx6tvYrUAtvDTTw98rx5juGsa5zuDnKwEEQQ=
modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw=
modernc.org/cc/v3 v3.33.6/go.mod h1:iPJg1pkwXqAV16SNgFBVYmggfMg6xhs+2oiO0vclK3g=
modernc.org/cc/v3 v3.33.9/go.mod h1:iPJg1pkwXqAV16SNgFBVYmggfMg6xhs+2oiO0vclK3g=
Expand All @@ -2852,6 +2858,7 @@ modernc.org/cc/v3 v3.35.16/go.mod h1:iPJg1pkwXqAV16SNgFBVYmggfMg6xhs+2oiO0vclK3g
modernc.org/cc/v3 v3.35.17/go.mod h1:iPJg1pkwXqAV16SNgFBVYmggfMg6xhs+2oiO0vclK3g=
modernc.org/cc/v3 v3.35.18/go.mod h1:iPJg1pkwXqAV16SNgFBVYmggfMg6xhs+2oiO0vclK3g=
modernc.org/cc/v3 v3.35.20/go.mod h1:iPJg1pkwXqAV16SNgFBVYmggfMg6xhs+2oiO0vclK3g=
modernc.org/cc/v3 v3.35.22 h1:BzShpwCAP7TWzFppM4k2t03RhXhgYqaibROWkrWq7lE=
modernc.org/cc/v3 v3.35.22/go.mod h1:iPJg1pkwXqAV16SNgFBVYmggfMg6xhs+2oiO0vclK3g=
modernc.org/ccgo/v3 v3.9.5/go.mod h1:umuo2EP2oDSBnD3ckjaVUXMrmeAw8C8OSICVa0iFf60=
modernc.org/ccgo/v3 v3.10.0/go.mod h1:c0yBmkRFi7uW4J7fwx/JiijwOjeAeR2NoSaRVFPmjMw=
Expand Down Expand Up @@ -2895,6 +2902,7 @@ modernc.org/ccgo/v3 v3.15.1/go.mod h1:md59wBwDT2LznX/OTCPoVS6KIsdRgY8xqQwBV+hkTH
modernc.org/ccgo/v3 v3.15.9/go.mod h1:md59wBwDT2LznX/OTCPoVS6KIsdRgY8xqQwBV+hkTH0=
modernc.org/ccgo/v3 v3.15.10/go.mod h1:wQKxoFn0ynxMuCLfFD09c8XPUCc8obfchoVR9Cn0fI8=
modernc.org/ccgo/v3 v3.15.12/go.mod h1:VFePOWoCd8uDGRJpq/zfJ29D0EVzMSyID8LCMWYbX6I=
modernc.org/ccgo/v3 v3.15.13 h1:hqlCzNJTXLrhS70y1PqWckrF9x1btSQRC7JFuQcBg5c=
modernc.org/ccgo/v3 v3.15.13/go.mod h1:QHtvdpeODlXjdK3tsbpyK+7U9JV4PQsrPGIbtmc0KfY=
modernc.org/ccorpus v1.11.1/go.mod h1:2gEUTrWqdpH2pXsmTM1ZkjeSrUWDpjMu2T6m29L/ErQ=
modernc.org/ccorpus v1.11.4/go.mod h1:2gEUTrWqdpH2pXsmTM1ZkjeSrUWDpjMu2T6m29L/ErQ=
Expand Down Expand Up @@ -2945,18 +2953,25 @@ modernc.org/libc v1.12.0/go.mod h1:2MH3DaF/gCU8i/UBiVE1VFRos4o523M7zipmwH8SIgQ=
modernc.org/libc v1.14.1/go.mod h1:npFeGWjmZTjFeWALQLrvklVmAxv4m80jnG3+xI8FdJk=
modernc.org/libc v1.14.2/go.mod h1:MX1GBLnRLNdvmK9azU9LCxZ5lMyhrbEMK8rG3X/Fe34=
modernc.org/libc v1.14.3/go.mod h1:GPIvQVOVPizzlqyRX3l756/3ppsAgg1QgPxjr5Q4agQ=
modernc.org/libc v1.14.5 h1:DAHvwGoVRDZs5iJXnX9RJrgXSsorupCWmJ2ac964Owk=
modernc.org/libc v1.14.5/go.mod h1:2PJHINagVxO4QW/5OQdRrvMYo+bm5ClpUFfyXCYl9ak=
modernc.org/mathutil v1.1.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
modernc.org/mathutil v1.2.2/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
modernc.org/mathutil v1.4.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
modernc.org/mathutil v1.4.1 h1:ij3fYGe8zBF4Vu+g0oT7mB06r8sqGWKuJu1yXeR4by8=
modernc.org/mathutil v1.4.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
modernc.org/memory v1.0.4/go.mod h1:nV2OApxradM3/OVbs2/0OsP6nPfakXpi50C7dcoHXlc=
modernc.org/memory v1.0.5 h1:XRch8trV7GgvTec2i7jc33YlUI0RKVDBvZ5eZ5m8y14=
modernc.org/memory v1.0.5/go.mod h1:B7OYswTRnfGg+4tDH1t1OeUNnsy2viGTdME4tzd+IjM=
modernc.org/opt v0.1.1 h1:/0RX92k9vwVeDXj+Xn23DKp2VJubL7k8qNffND6qn3A=
modernc.org/opt v0.1.1/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0=
modernc.org/sqlite v1.14.6 h1:Jt5P3k80EtDBWaq1beAxnWW+5MdHXbZITujnRS7+zWg=
modernc.org/sqlite v1.14.6/go.mod h1:yiCvMv3HblGmzENNIaNtFhfaNIwcla4u2JQEwJPzfEc=
modernc.org/strutil v1.1.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs=
modernc.org/strutil v1.1.1 h1:xv+J1BXY3Opl2ALrBwyfEikFAj8pmqcpnfmuwUwcozs=
modernc.org/strutil v1.1.1/go.mod h1:DE+MQQ/hjKBZS2zNInV5hhcipt5rLPWkmpbGeW5mmdw=
modernc.org/tcl v1.11.0/go.mod h1:zsTUpbQ+NxQEjOjCUlImDLPv1sG8Ww0qp66ZvyOxCgw=
modernc.org/token v1.0.0 h1:a0jaWiNMDhDUtqOj09wvjWWAqd3q7WpBulmL9H2egsk=
modernc.org/token v1.0.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
modernc.org/xc v1.0.0/go.mod h1:mRNCo0bvLjGhHO9WsyuKVU4q0ceiDDDoEeWDJHrNx8I=
modernc.org/z v1.3.0/go.mod h1:+mvgLH814oDjtATDdT3rs84JnUIpkvAF5B8AVkNlE2g=
Expand Down
54 changes: 42 additions & 12 deletions extern/boostd-data/ldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"sort"
"strings"
"time"

"github.com/filecoin-project/boostd-data/model"
Expand Down Expand Up @@ -219,8 +220,12 @@ func (db *DB) SetMultihashesToPieceCid(ctx context.Context, recs []carindex.Reco
return nil
}

func pieceCidToFlaggedKey(maddr address.Address, pieceCid cid.Cid) ds.Key {
return datastore.NewKey(fmt.Sprintf("%s/%s/%s", sprefixPieceCidToFlagged, maddr.String(), pieceCid.String()))
}

// SetPieceCidToFlagged
func (db *DB) SetPieceCidToFlagged(ctx context.Context, pieceCid cid.Cid, fm LeveldbFlaggedMetadata) error {
func (db *DB) SetPieceCidToFlagged(ctx context.Context, pieceCid cid.Cid, maddr address.Address, fm LeveldbFlaggedMetadata) error {
ctx, span := tracing.Tracer.Start(ctx, "db.set_piece_cid_to_flagged")
defer span.End()

Expand All @@ -229,19 +234,17 @@ func (db *DB) SetPieceCidToFlagged(ctx context.Context, pieceCid cid.Cid, fm Lev
return err
}

key := datastore.NewKey(fmt.Sprintf("%s/%s", sprefixPieceCidToFlagged, pieceCid.String()))

key := pieceCidToFlaggedKey(maddr, pieceCid)
return db.Put(ctx, key, b)
}

// GetPieceCidToFlagged
func (db *DB) GetPieceCidToFlagged(ctx context.Context, pieceCid cid.Cid) (LeveldbFlaggedMetadata, error) {
func (db *DB) GetPieceCidToFlagged(ctx context.Context, pieceCid cid.Cid, maddr address.Address) (LeveldbFlaggedMetadata, error) {
ctx, span := tracing.Tracer.Start(ctx, "db.get_piece_cid_to_flagged")
defer span.End()

var metadata LeveldbFlaggedMetadata

key := datastore.NewKey(fmt.Sprintf("%s/%s", sprefixPieceCidToFlagged, pieceCid.String()))
key := pieceCidToFlaggedKey(maddr, pieceCid)

b, err := db.Get(ctx, key)
if err != nil {
Expand Down Expand Up @@ -409,6 +412,7 @@ func (db *DB) NextPiecesToCheck(ctx context.Context, maddr address.Address) ([]c

var pieceCids []cid.Cid

maddrStr := maddr.String()
now := time.Now()

var i int
Expand All @@ -420,21 +424,33 @@ func (db *DB) NextPiecesToCheck(ctx context.Context, maddr address.Address) ([]c
i++

k := r.Key[len(q.Prefix):]
if t, ok := checked[k]; ok {
minerPiece := maddrStr + k
if t, ok := checked[minerPiece]; ok {
alreadyChecked := t.After(now.Add(-MinPieceCheckPeriod))

if alreadyChecked {
continue
}
}
checked[k] = now

pieceCid, err := cid.Parse(k)
if err != nil {
return nil, fmt.Errorf("parsing piece cid '%s': %w", k, err)
}

pieceCids = append(pieceCids, pieceCid)
// Filter for pieces that match the miner address
md, err := db.GetPieceCidToMetadata(ctx, pieceCid)
if err != nil {
return nil, fmt.Errorf("getting piece metadata: %w", err)
}

for _, dl := range md.Deals {
if dl.MinerAddr == maddr {
checked[minerPiece] = now
pieceCids = append(pieceCids, pieceCid)
break
}
}
}
offset += i

Expand Down Expand Up @@ -652,6 +668,7 @@ func (db *DB) ListFlaggedPieces(ctx context.Context, filter *types.FlaggedPieces
Prefix: "/" + sprefixPieceCidToFlagged + "/",
KeysOnly: false,
}

results, err := db.Query(ctx, q)
if err != nil {
return nil, fmt.Errorf("listing flagged pieces in database: %w", err)
Expand All @@ -664,7 +681,11 @@ func (db *DB) ListFlaggedPieces(ctx context.Context, filter *types.FlaggedPieces
break
}

k := r.Key[len(q.Prefix):]
parts := strings.Split(r.Key, "/")
if len(parts) == 0 {
return nil, fmt.Errorf("unexpected key format '%s'", r.Key)
}
k := parts[len(parts)-1]
pieceCid, err := cid.Parse(k)
if err != nil {
return nil, fmt.Errorf("parsing piece cid '%s': %w", k, err)
Expand All @@ -680,6 +701,10 @@ func (db *DB) ListFlaggedPieces(ctx context.Context, filter *types.FlaggedPieces
continue
}

if filter != nil && !filter.MinerAddr.Empty() && filter.MinerAddr != v.MinerAddr {
continue
}

if cursor != nil && v.CreatedAt.Before(*cursor) {
continue
}
Expand Down Expand Up @@ -719,6 +744,7 @@ func (db *DB) FlaggedPiecesCount(ctx context.Context, filter *types.FlaggedPiece
Prefix: "/" + sprefixPieceCidToFlagged + "/",
KeysOnly: filter == nil,
}

results, err := db.Query(ctx, q)
if err != nil {
return -1, fmt.Errorf("listing flagged pieces in database: %w", err)
Expand All @@ -741,6 +767,10 @@ func (db *DB) FlaggedPiecesCount(ctx context.Context, filter *types.FlaggedPiece
if filter.HasUnsealedCopy != v.HasUnsealedCopy {
continue
}

if !filter.MinerAddr.Empty() && filter.MinerAddr != v.MinerAddr {
continue
}
}

i++
Expand All @@ -750,11 +780,11 @@ func (db *DB) FlaggedPiecesCount(ctx context.Context, filter *types.FlaggedPiece
}

// DeletePieceCidToFlagged
func (db *DB) DeletePieceCidToFlagged(ctx context.Context, pieceCid cid.Cid) error {
func (db *DB) DeletePieceCidToFlagged(ctx context.Context, pieceCid cid.Cid, maddr address.Address) error {
ctx, span := tracing.Tracer.Start(ctx, "db.delete_piece_flagged_metadata")
defer span.End()

key := datastore.NewKey(fmt.Sprintf("%s/%s", sprefixPieceCidToFlagged, pieceCid.String()))
key := pieceCidToFlaggedKey(maddr, pieceCid)

// TODO: Requires DB compaction for removing the key
return db.Delete(ctx, key)
Expand Down
Loading

0 comments on commit eb350d5

Please sign in to comment.