Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi-miner / single LID - piece doctor #1600

Merged
merged 5 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions cmd/migrate-lid/migrate_lid.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/filecoin-project/boostd-data/yugabyte/migrations"
"os"
"path"
"sort"
Expand Down Expand Up @@ -142,7 +143,11 @@ var migrateYugabyteDBCmd = &cli.Command{
PayloadPiecesParallelism: cctx.Int("insert-parallelism"),
}

store := yugabyte.NewStore(settings)
// Note that it doesn't matter what address we pass here: because the
// table is newly created, it doesn't contain any rows when the
// migration is run.
migrator := yugabyte.NewMigrator(settings.ConnectString, address.TestAddress)
store := yugabyte.NewStore(settings, migrator)
return migrate(cctx, "yugabyte", store, migrateType)
},
}
Expand Down Expand Up @@ -605,7 +610,8 @@ func migrateReverse(cctx *cli.Context, dbType string) error {
Hosts: cctx.StringSlice("hosts"),
PayloadPiecesParallelism: cctx.Int("insert-parallelism"),
}
store = yugabyte.NewStore(settings)
migrator := yugabyte.NewMigrator(settings.ConnectString, migrations.DisabledMinerAddr)
store = yugabyte.NewStore(settings, migrator)
}

// Perform the reverse migration
Expand Down
9 changes: 5 additions & 4 deletions db/migrations/20220420124936_setcheckpointattounix0.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
package migrations

import (
"context"
"database/sql"

"github.com/pressly/goose/v3"
)

func init() {
goose.AddMigration(upSetcheckpointattounix0, downSetcheckpointattounix0)
goose.AddMigrationContext(upSetcheckpointattounix0, downSetcheckpointattounix0)
}

func upSetcheckpointattounix0(tx *sql.Tx) error {
_, err := tx.Exec("UPDATE Deals SET CheckpointAt=datetime(0, 'unixepoch');")
func upSetcheckpointattounix0(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, "UPDATE Deals SET CheckpointAt=datetime(0, 'unixepoch');")
if err != nil {
return err
}
return nil
}

func downSetcheckpointattounix0(tx *sql.Tx) error {
func downSetcheckpointattounix0(ctx context.Context, tx *sql.Tx) error {
// This code is executed when the migration is rolled back.
return nil
}
11 changes: 6 additions & 5 deletions db/migrations/20220512094027_deals-set-retry.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
package migrations

import (
"context"
"database/sql"

"github.com/filecoin-project/boost/storagemarket/types"
"github.com/pressly/goose/v3"
)

func init() {
goose.AddMigration(upSetdealsretry, downSetdealsretry)
goose.AddMigrationContext(upSetdealsretry, downSetdealsretry)
}

func upSetdealsretry(tx *sql.Tx) error {
_, err := tx.Exec("UPDATE Deals SET Retry=?;", types.DealRetryAuto)
func upSetdealsretry(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, "UPDATE Deals SET Retry=?;", types.DealRetryAuto)
if err != nil {
return err
}
_, err = tx.Exec("UPDATE Deals SET Retry=? WHERE Checkpoint='Complete' AND Error != '';", types.DealRetryFatal)
_, err = tx.ExecContext(ctx, "UPDATE Deals SET Retry=? WHERE Checkpoint='Complete' AND Error != '';", types.DealRetryFatal)
if err != nil {
return err
}
return nil
}

func downSetdealsretry(tx *sql.Tx) error {
func downSetdealsretry(ctx context.Context, tx *sql.Tx) error {
// This code is executed when the migration is rolled back.
return nil
}
11 changes: 6 additions & 5 deletions db/migrations/20220518162704_deals_addr_binary_to_string.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
package migrations

import (
"context"
"database/sql"

"github.com/filecoin-project/go-address"
"github.com/pressly/goose/v3"
)

func init() {
goose.AddMigration(upDealsAddrBinaryToString, downDealsAddrBinaryToString)
goose.AddMigrationContext(upDealsAddrBinaryToString, downDealsAddrBinaryToString)
}

// Convert format of deals in the database from binary to string
func upDealsAddrBinaryToString(tx *sql.Tx) error {
rows, err := tx.Query("SELECT ID, ClientAddress, ProviderAddress FROM Deals")
func upDealsAddrBinaryToString(ctx context.Context, tx *sql.Tx) error {
rows, err := tx.QueryContext(ctx, "SELECT ID, ClientAddress, ProviderAddress FROM Deals")
if err != nil {
return err
}
Expand All @@ -38,7 +39,7 @@ func upDealsAddrBinaryToString(tx *sql.Tx) error {
continue
}

_, err = tx.Exec("UPDATE Deals SET ClientAddress=?, ProviderAddress=? WHERE ID=?", updatedClientAddr, updatedProviderAddr, id)
_, err = tx.ExecContext(ctx, "UPDATE Deals SET ClientAddress=?, ProviderAddress=? WHERE ID=?", updatedClientAddr, updatedProviderAddr, id)
if err != nil {
log.Warnf("could not migrate row with id %s: could not save row: %w", id, err)
continue
Expand Down Expand Up @@ -67,7 +68,7 @@ func addrToString(input []byte) (*string, error) {
return &updated, nil
}

func downDealsAddrBinaryToString(tx *sql.Tx) error {
func downDealsAddrBinaryToString(ctx context.Context, tx *sql.Tx) error {
// This code is executed when the migration is rolled back.
return nil
}
11 changes: 6 additions & 5 deletions db/migrations/20220520121441_deals_peerid_binary_to_string.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@ package migrations

import (
"bytes"
"context"
"database/sql"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/pressly/goose/v3"
)

func init() {
goose.AddMigration(upDealsPeeridBinaryToString, downDealsPeeridBinaryToString)
goose.AddMigrationContext(upDealsPeeridBinaryToString, downDealsPeeridBinaryToString)
}

func upDealsPeeridBinaryToString(tx *sql.Tx) error {
rows, err := tx.Query("SELECT ID, ClientPeerID FROM Deals")
func upDealsPeeridBinaryToString(ctx context.Context, tx *sql.Tx) error {
rows, err := tx.QueryContext(ctx, "SELECT ID, ClientPeerID FROM Deals")
if err != nil {
return err
}
Expand All @@ -31,7 +32,7 @@ func upDealsPeeridBinaryToString(tx *sql.Tx) error {
continue
}

_, err = tx.Exec("UPDATE Deals SET ClientPeerID=? WHERE ID=?", updated, id)
_, err = tx.ExecContext(ctx, "UPDATE Deals SET ClientPeerID=? WHERE ID=?", updated, id)
if err != nil {
log.Warnf("could not migrate row with id %s: could not save row: %w", id, err)
continue
Expand Down Expand Up @@ -72,7 +73,7 @@ func pidToString(input []byte) (*string, error) {
return &updated, nil
}

func downDealsPeeridBinaryToString(tx *sql.Tx) error {
func downDealsPeeridBinaryToString(ctx context.Context, tx *sql.Tx) error {
// This code is executed when the migration is rolled back.
return nil
}
15 changes: 8 additions & 7 deletions db/migrations/20220608160748_deals_label_v8.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package migrations

import (
"context"
"database/sql"
"fmt"
"unicode/utf8"
Expand All @@ -11,13 +12,13 @@ import (
)

func init() {
goose.AddMigration(UpDealsLabelV8, DownDealsLabelV8)
goose.AddMigrationContext(UpDealsLabelV8, DownDealsLabelV8)
}

// Change the deal label format from a string to a marshalled DealLabel
func UpDealsLabelV8(tx *sql.Tx) error {
func UpDealsLabelV8(ctx context.Context, tx *sql.Tx) error {
errPrefix := "migrating deal label up from string to DealLabel: "
rows, err := tx.Query("SELECT ID, Label from Deals WHERE Label IS NOT NULL")
rows, err := tx.QueryContext(ctx, "SELECT ID, Label from Deals WHERE Label IS NOT NULL")
if err != nil {
return fmt.Errorf(errPrefix+"getting deals from DB: %w", err)
}
Expand Down Expand Up @@ -54,7 +55,7 @@ func UpDealsLabelV8(tx *sql.Tx) error {
return fmt.Errorf(dealErrPrefix+"marshalling DealLabel (bytes): %w", err)
}
}
_, err = tx.Exec("UPDATE Deals SET Label = ? WHERE ID = ?", marshalled, id)
_, err = tx.ExecContext(ctx, "UPDATE Deals SET Label = ? WHERE ID = ?", marshalled, id)
if err != nil {
return fmt.Errorf(dealErrPrefix+"saving DealLabel to DB: %w", err)
}
Expand All @@ -63,9 +64,9 @@ func UpDealsLabelV8(tx *sql.Tx) error {
}

// Change the deal label format from a marshalled DealLabel to a string
func DownDealsLabelV8(tx *sql.Tx) error {
func DownDealsLabelV8(ctx context.Context, tx *sql.Tx) error {
errPrefix := "migrating deal label down from DealLabel to string: "
rows, err := tx.Query("SELECT ID, Label from Deals WHERE Label IS NOT NULL")
rows, err := tx.QueryContext(ctx, "SELECT ID, Label from Deals WHERE Label IS NOT NULL")
if err != nil {
return fmt.Errorf(errPrefix+"getting deals from DB: %w", err)
}
Expand Down Expand Up @@ -100,7 +101,7 @@ func DownDealsLabelV8(tx *sql.Tx) error {
asString = string(asBytes)
}

_, err = tx.Exec("UPDATE Deals SET Label = ? WHERE ID = ?", asString, id)
_, err = tx.ExecContext(ctx, "UPDATE Deals SET Label = ? WHERE ID = ?", asString, id)
if err != nil {
return fmt.Errorf(dealErrPrefix+"saving deal label string to DB: %w", err)
}
Expand Down
11 changes: 6 additions & 5 deletions db/migrations/20220908122516_storagetagged_set_host.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
package migrations

import (
"context"
"database/sql"
"fmt"
"github.com/filecoin-project/boost/storagemarket/types"
"github.com/pressly/goose/v3"
)

func init() {
goose.AddMigration(UpSetStorageTaggedTransferHost, DownSetStorageTaggedTransferHost)
goose.AddMigrationContext(UpSetStorageTaggedTransferHost, DownSetStorageTaggedTransferHost)
}

func UpSetStorageTaggedTransferHost(tx *sql.Tx) error {
func UpSetStorageTaggedTransferHost(ctx context.Context, tx *sql.Tx) error {
errPrefix := "setting StorageTagged.TransferHost: "
qry := "SELECT Deals.ID, Deals.TransferType, Deals.TransferParams " +
"FROM Deals INNER JOIN StorageTagged " +
"ON Deals.ID = StorageTagged.DealUUID"
rows, err := tx.Query(qry)
rows, err := tx.QueryContext(ctx, qry)
if err != nil {
return fmt.Errorf(errPrefix+"getting deals from DB: %w", err)
}
Expand All @@ -43,15 +44,15 @@ func UpSetStorageTaggedTransferHost(tx *sql.Tx) error {
continue
}

_, err = tx.Exec("UPDATE StorageTagged SET TransferHost = ? WHERE DealUUID = ?", host, id)
_, err = tx.ExecContext(ctx, "UPDATE StorageTagged SET TransferHost = ? WHERE DealUUID = ?", host, id)
if err != nil {
return fmt.Errorf(dealErrPrefix+"saving TransferHost to DB: %w", err)
}
}
return rows.Err()
}

func DownSetStorageTaggedTransferHost(tx *sql.Tx) error {
func DownSetStorageTaggedTransferHost(ctx context.Context, tx *sql.Tx) error {
// This code is executed when the migration is rolled back.
// Do nothing because sqlite doesn't support removing a column.
return nil
Expand Down
9 changes: 5 additions & 4 deletions db/migrations/20221124191256_deals_set_fast_retrieval.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
package migrations

import (
"context"
"database/sql"

"github.com/pressly/goose/v3"
)

func init() {
goose.AddMigration(upSetdealsfastretrieval, downSetdealsfastretrieval)
goose.AddMigrationContext(upSetdealsfastretrieval, downSetdealsfastretrieval)
}

func upSetdealsfastretrieval(tx *sql.Tx) error {
_, err := tx.Exec("UPDATE Deals SET FastRetrieval=?;", true)
func upSetdealsfastretrieval(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, "UPDATE Deals SET FastRetrieval=?;", true)
if err != nil {
return err
}
return nil
}

func downSetdealsfastretrieval(tx *sql.Tx) error {
func downSetdealsfastretrieval(ctx context.Context, tx *sql.Tx) error {
// This code is executed when the migration is rolled back.
return nil
}
9 changes: 5 additions & 4 deletions db/migrations/20230104230411_deal_announce_to_ipni.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
package migrations

import (
"context"
"database/sql"

"github.com/pressly/goose/v3"
)

func init() {
goose.AddMigration(upSetdealsAnnounceToIPNI, downSetdealsAnnounceToIPNI)
goose.AddMigrationContext(upSetdealsAnnounceToIPNI, downSetdealsAnnounceToIPNI)
}

func upSetdealsAnnounceToIPNI(tx *sql.Tx) error {
_, err := tx.Exec("UPDATE Deals SET AnnounceToIPNI=?;", true)
func upSetdealsAnnounceToIPNI(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, "UPDATE Deals SET AnnounceToIPNI=?;", true)
if err != nil {
return err
}
return nil
}

func downSetdealsAnnounceToIPNI(tx *sql.Tx) error {
func downSetdealsAnnounceToIPNI(ctx context.Context, tx *sql.Tx) error {
// This code is executed when the migration is rolled back.
return nil
}
9 changes: 5 additions & 4 deletions db/migrations/20230330111524_deals_cleanup_data.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
package migrations

import (
"context"
"database/sql"
"github.com/pressly/goose/v3"
)

func init() {
goose.AddMigration(upDealsCleanupData, downDealsCleanupData)
goose.AddMigrationContext(upDealsCleanupData, downDealsCleanupData)
}

func upDealsCleanupData(tx *sql.Tx) error {
_, err := tx.Exec("UPDATE Deals SET CleanupData = NOT IsOffline")
func upDealsCleanupData(ctx context.Context, tx *sql.Tx) error {
_, err := tx.ExecContext(ctx, "UPDATE Deals SET CleanupData = NOT IsOffline")
if err != nil {
return err
}
return nil
}

func downDealsCleanupData(tx *sql.Tx) error {
func downDealsCleanupData(ctx context.Context, tx *sql.Tx) error {
// This code is executed when the migration is rolled back.
return nil
}
2 changes: 1 addition & 1 deletion db/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

var log = logging.Logger("migrations")

//go:embed *.sql
//go:embed *.sql *.go
var EmbedMigrations embed.FS

func Migrate(sqldb *sql.DB) error {
Expand Down
4 changes: 2 additions & 2 deletions db/migrations_tests/deals_label_v8_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestDealsLabelv8(t *testing.T) {
// Migrate down
tx, err := sqldb.BeginTx(ctx, nil)
req.NoError(err)
err = migrations.DownDealsLabelV8(tx)
err = migrations.DownDealsLabelV8(ctx, tx)
req.NoError(err)
err = tx.Commit()
req.NoError(err)
Expand All @@ -85,7 +85,7 @@ func TestDealsLabelv8(t *testing.T) {
// Migrate up
tx, err = sqldb.BeginTx(ctx, nil)
req.NoError(err)
err = migrations.UpDealsLabelV8(tx)
err = migrations.UpDealsLabelV8(ctx, tx)
req.NoError(err)
err = tx.Commit()
req.NoError(err)
Expand Down
Loading