Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: baseline spadev0 integration #28

Merged
merged 16 commits into from
Sep 11, 2023
4 changes: 4 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,7 @@ linters-settings:
disabled: true
lll:
line-length: 120
tagliatelle:
case:
rules:
json: "snake"
jcace marked this conversation as resolved.
Show resolved Hide resolved
5 changes: 3 additions & 2 deletions integration/filplus/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package util

import (
"context"
"strconv"
"time"

"github.com/data-preservation-programs/RetrievalBot/pkg/convert"
"github.com/data-preservation-programs/RetrievalBot/pkg/env"
"github.com/data-preservation-programs/RetrievalBot/pkg/model"
Expand All @@ -13,8 +16,6 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"golang.org/x/exp/slices"
"strconv"
"time"
)

var logger = logging.Logger("addTasks")
Expand Down
206 changes: 206 additions & 0 deletions integration/spadev0/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package main

import (
"context"
"encoding/json"
"io/ioutil"
"math"
"math/rand"
"net/http"
"os"
"time"

logging "github.com/ipfs/go-log/v2"
_ "github.com/joho/godotenv/autoload"
"github.com/klauspost/compress/zstd"
"github.com/pkg/errors"
"github.com/urfave/cli/v2"
)

var logger = logging.Logger("spade-v0-tasks")

func main() {
app := &cli.App{
Name: "spadev0",
Usage: "run spade v0 replica task generation",
Flags: []cli.Flag{
&cli.StringSliceFlag{
Name: "sources",
DefaultText: "http://src-1/replicas.json.zst,http://src-2/replicas.json.zst",
Usage: "comma-separated list of sources to fetch replica list from",
Required: true,
},
},
Action: func(cctx *cli.Context) error {
ctx := cctx.Context
// logging.SetLogLevel("spade-v0-tasks", "DEBUG")

// Extract the sources from the flag
sources := cctx.StringSlice("sources")

for _, source := range sources {
res, err := fetchActiveReplicas(ctx, source)

if err != nil {
return err
}

var perProvider = make(map[int]ProviderReplicas)

for _, replica := range res.ActiveReplicas {
for _, contract := range replica.Contracts {
providerID := contract.ProviderID
size := (1 << replica.PieceLog2Size) >> 30 // Convert to GiB
perProvider[providerID] = ProviderReplicas{
size: perProvider[providerID].size + size,
replicas: append(perProvider[providerID].replicas, Replica{
PieceCID: replica.PieceCID,
PieceLog2Size: replica.PieceLog2Size,
OptionalDagRoot: replica.OptionalDagRoot,
}),
}
}
}

replicasToTest := selectReplicasToTest(perProvider)

// Debug output - no functional purposes
totalCids := 0
totalSize := 0
for prov, rps := range replicasToTest {
provider := perProvider[prov]
logger.Debugf("provider %d is storing %d GiB will have %d tests\n", prov, provider.size, len(rps))
totalCids += len(rps)
totalSize += provider.size
}
logger.Debugf("total %d CIDs will be tested for %d providers\n", totalCids, len(replicasToTest))

err = AddSpadeTasks(ctx, "spadev0", replicasToTest)
if err != nil {
logger.Errorf("failed to add tasks: %s", err)
}
}
return nil
},
}

if err := app.Run(os.Args); err != nil {
panic(err)
}
}

func fetchActiveReplicas(ctx context.Context, url string) (*ActiveReplicas, error) {
logger.Debugf("fetching CIDs from %s", url)

req, err := http.NewRequestWithContext(ctx,
http.MethodGet,
url,
nil)
if err != nil {
return nil, errors.Wrap(err, "failed to create request")
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, errors.Wrap(err, "failed to make request")
}

defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, errors.Errorf("failed to get spade CID list: %s", resp.Status)
}

decompressor, err := zstd.NewReader(resp.Body)
if err != nil {
return nil, errors.Wrap(err, "failed to create decompressor")
}

defer decompressor.Close()

data, err := ioutil.ReadAll(decompressor)
if err != nil {
return nil, errors.Wrap(err, "failed to read decompressed data")
}

var activeReplicas ActiveReplicas
err = json.Unmarshal(data, &activeReplicas)
if err != nil {
return nil, errors.Wrap(err, "failed to unmarshal JSON data")
}

return &activeReplicas, nil
}

// Compute a number of CIDs to test, based on the total size of data (assuming in GiB)
// Minimum 1, then log2 of the size in TiB
// ex:
// < 4TiB = 1 cid
// 4 TiB - 16TiB = 2 cids
// 16 TiB - 32 TiB = 3 cids
// 32 TiB - 64 TiB = 4 cids
// 64 TiB - 128 TiB = 5 cids
// 128 TiB - 256 TiB = 6 cids
// etc...
func numCidsToTest(sizeGiB int) int {
return int(math.Max(math.Log2(float64(sizeGiB/1024)), 1))
}

func selectReplicasToTest(perProvider map[int]ProviderReplicas) map[int][]Replica {
var toTest = make(map[int][]Replica)

for providerID, provider := range perProvider {
toTest[providerID] = make([]Replica, 0)

maxReplicas := len(provider.replicas)
numCidsToTest := numCidsToTest(provider.size)

// This condition should not happen, but just in case there's a situation
// where a massive amount of bytes are being stored in relatively few CIDs
if numCidsToTest > maxReplicas {
logger.Warnf("provider %d only has %d replicas but we are trying to test %d",
providerID,
maxReplicas,
numCidsToTest,
)
numCidsToTest = maxReplicas
jcace marked this conversation as resolved.
Show resolved Hide resolved
}

// Randomize which CIDs are selected
rand.Seed(time.Now().UnixNano())
indices := rand.Perm(maxReplicas)[:numCidsToTest]

for _, index := range indices {
toTest[providerID] = append(toTest[providerID], provider.replicas[index])
}
}

return toTest
}

type ProviderReplicas struct {
size int
replicas []Replica
}

type ActiveReplicas struct {
StateEpoch uint `json:"state_epoch"`
ActiveReplicas []ActiveReplica `json:"active_replicas"`
}

type ActiveReplica struct {
Contracts []Contract `json:"contracts"`
Replica
}

type Replica struct {
PieceCID string `json:"piece_cid"`
PieceLog2Size int `json:"piece_log2_size"`
OptionalDagRoot string `json:"optional_dag_root"`
}

type Contract struct {
ProviderID int `json:"provider_id"`
LegacyMarketID int `json:"legacy_market_id"`
LegacyMarketEndEpoch int `json:"legacy_market_end_epoch"`
}
33 changes: 33 additions & 0 deletions integration/spadev0/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package main

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestSelectCidsToTest(t *testing.T) {
// Sample data for testing
sampleData := map[int]ProviderReplicas{
123: {size: 128, replicas: []Replica{
{OptionalDagRoot: "root1", PieceCID: "cid1"},
}},
456: {size: 4096, replicas: []Replica{
{OptionalDagRoot: "root3", PieceCID: "cid3"},
{OptionalDagRoot: "root4", PieceCID: "cid4"},
{OptionalDagRoot: "root5", PieceCID: "cid5"},
}},
}

toTest := selectReplicasToTest(sampleData)

// Ensure at least one replica is selected for each provider
for providerID, replicas := range toTest {
if len(replicas) == 0 {
t.Errorf("No replicas selected for Provider %d", providerID)
}
}

assert.Equal(t, 1, len(toTest[123]))
assert.Equal(t, 2, len(toTest[456]))
}
Loading
Loading