Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/migrate-curio/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ func main() {
EnvVars: []string{"CURIO_DB_PORT", "CURIO_HARMONYDB_PORT"},
Value: "5433",
},
&cli.StringFlag{
Name: "api-lid",
Usage: "Boostd-data service API endpoint. Service must be running.",
Required: true,
},
},
Commands: []*cli.Command{
migrateCmd,
Expand Down
54 changes: 46 additions & 8 deletions cmd/migrate-curio/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/filecoin-project/boost/db"
bdclient "github.com/filecoin-project/boost/extern/boostd-data/client"
"github.com/filecoin-project/boost/lib/legacy"
"github.com/filecoin-project/boost/node/repo"
"github.com/filecoin-project/boost/storagemarket/types"
Expand Down Expand Up @@ -148,8 +149,20 @@ func migrate(cctx *cli.Context, repoDir string) error {
return fmt.Errorf("merging bitfields to generate all sealed sectors on miner %s: %w", maddr, err)
}

cl := bdclient.NewStore()
defer cl.Close(ctx)
err = cl.Dial(ctx, cctx.String("api-lid"))
if err != nil {
return fmt.Errorf("connecting to local index directory service: %w", err)
}

_, err = cl.ListPieces(ctx)
if err != nil {
return fmt.Errorf("listing pieces from local index directory service: %w", err)
}

// Migrate Boost deals
if err := migrateBoostDeals(ctx, activeSectors, maddr, hdb, sqldb, mdb); err != nil {
if err := migrateBoostDeals(ctx, activeSectors, maddr, hdb, sqldb, mdb, cl); err != nil {
return xerrors.Errorf("failed to migrate boost deals: %w", err)
}

Expand All @@ -166,7 +179,7 @@ func migrate(cctx *cli.Context, repoDir string) error {
return nil
}

func migrateBoostDeals(ctx context.Context, activeSectors bitfield.BitField, maddr address.Address, hdb *harmonydb.DB, sqldb, mdb *sql.DB) error {
func migrateBoostDeals(ctx context.Context, activeSectors bitfield.BitField, maddr address.Address, hdb *harmonydb.DB, sqldb, mdb *sql.DB, bdclient *bdclient.Store) error {
sdb := db.NewDealsDB(sqldb)

mid, err := address.IDFromAddress(maddr)
Expand Down Expand Up @@ -291,17 +304,38 @@ func migrateBoostDeals(ctx context.Context, activeSectors bitfield.BitField, mad

}

if deal.NBytesReceived == 0 {
if deal.Transfer.Size > 0 {
deal.NBytesReceived = int64(deal.Transfer.Size)
} else {
pds, err := bdclient.GetPieceDeals(ctx, prop.PieceCID)
if err != nil {
return fmt.Errorf("failed to get piece deals from LID: %w", err)
}
for _, pd := range pds {
if pd.CarLength > 0 {
deal.NBytesReceived = int64(pd.CarLength)
break
}
}
}
}

if deal.NBytesReceived == 0 {
return fmt.Errorf("deal: %s: failed to get raw size from SQL or LID", deal.DealUuid.String())
}

_, err = hdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) {
// Add deal to HarmonyDB
if !a {
_, err = tx.Exec(`INSERT INTO market_mk12_deals (uuid, sp_id, signed_proposal_cid,
proposal_signature, proposal, proposal_cid, piece_cid,
piece_size, offline, verified, start_epoch, end_epoch,
piece_size, raw_size, offline, verified, start_epoch, end_epoch,
client_peer_id, fast_retrieval, announce_to_ipni, url, url_headers, chain_deal_id, publish_cid, created_at, label)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22)
ON CONFLICT (uuid) DO NOTHING`,
deal.DealUuid.String(), mid, sProp.String(), sigByte, propJson, propCid, prop.PieceCID.String(),
prop.PieceSize, deal.IsOffline, prop.VerifiedDeal, prop.StartEpoch, prop.EndEpoch, deal.ClientPeerID.String(),
prop.PieceSize, deal.NBytesReceived, deal.IsOffline, prop.VerifiedDeal, prop.StartEpoch, prop.EndEpoch, deal.ClientPeerID.String(),
deal.FastRetrieval, deal.AnnounceToIPNI, tInfo.URL, headers, int64(deal.ChainDealID), deal.PublishCID.String(), deal.CreatedAt, buf.Bytes())

if err != nil {
Expand Down Expand Up @@ -554,15 +588,19 @@ func migrateDDODeals(ctx context.Context, full v1api.FullNode, activeSectors bit
continue
}

if deal.InboundFileSize == 0 {
return fmt.Errorf("deal: %s: inbound file size is 0", deal.ID.String())
}

_, err = hdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (bool, error) {
if !a {
// Add DDO deal to harmonyDB
_, err = tx.Exec(`INSERT INTO market_direct_deals (uuid, sp_id, created_at, client, offline, verified,
start_epoch, end_epoch, allocation_id, piece_cid, piece_size, fast_retrieval, announce_to_ipni)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
start_epoch, end_epoch, allocation_id, piece_cid, piece_size, raw_size, fast_retrieval, announce_to_ipni)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
ON CONFLICT (uuid) DO NOTHING`,
deal.ID.String(), mid, deal.CreatedAt, deal.Client.String(), true, true, deal.StartEpoch, deal.EndEpoch, deal.AllocationID,
deal.PieceCID.String(), deal.PieceSize, true, true)
deal.PieceCID.String(), deal.PieceSize, deal.InboundFileSize, true, true)

if err != nil {
return false, fmt.Errorf("deal: %s: failed to add the DDO deal to harmonyDB: %w", deal.ID.String(), err)
Expand Down