-
Notifications
You must be signed in to change notification settings - Fork 64
/
api.go
144 lines (124 loc) · 5.03 KB
/
api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package lib
import (
"context"
"errors"
"fmt"
"net/http"
"strings"
"time"
"github.com/filecoin-project/boost/api"
cliutil "github.com/filecoin-project/boost/cli/util"
"github.com/filecoin-project/boost/markets/sectoraccessor"
"github.com/filecoin-project/go-jsonrpc"
lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/client"
"github.com/filecoin-project/lotus/api/v0api"
"github.com/filecoin-project/lotus/api/v1api"
"github.com/filecoin-project/lotus/markets/dagstore"
"github.com/filecoin-project/lotus/node/config"
lotus_modules "github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
logging "github.com/ipfs/go-log/v2"
)
func GetFullNodeApi(ctx context.Context, ai string, log *logging.ZapEventLogger) (v1api.FullNode, jsonrpc.ClientCloser, error) {
ai = strings.TrimPrefix(strings.TrimSpace(ai), "FULLNODE_API_INFO=")
info := cliutil.ParseApiInfo(ai)
addr, err := info.DialArgs("v1")
if err != nil {
return nil, nil, fmt.Errorf("could not get DialArgs: %w", err)
}
log.Infof("Using full node API at %s", addr)
fnapi, closer, err := client.NewFullNodeRPCV1(ctx, addr, info.AuthHeader())
if err != nil {
return nil, nil, fmt.Errorf("creating full node service API: %w", err)
}
v, err := fnapi.Version(ctx)
if err != nil {
return nil, nil, fmt.Errorf("checking full node service API version: %w", err)
}
if !v.APIVersion.EqMajorMinor(lapi.FullAPIVersion1) {
return nil, nil, fmt.Errorf("full node service API version didn't match (expected %s, remote %s)", api.FullAPIVersion1, v.APIVersion)
}
return fnapi, closer, nil
}
func GetMinerApi(ctx context.Context, ai string, log *logging.ZapEventLogger) (v0api.StorageMiner, jsonrpc.ClientCloser, error) {
ai = strings.TrimPrefix(strings.TrimSpace(ai), "MINER_API_INFO=")
info := cliutil.ParseApiInfo(ai)
addr, err := info.DialArgs("v0")
if err != nil {
return nil, nil, fmt.Errorf("could not get DialArgs: %w", err)
}
log.Infof("Using storage API at %s", addr)
api, closer, err := client.NewStorageMinerRPCV0(ctx, addr, info.AuthHeader())
if err != nil {
return nil, nil, fmt.Errorf("creating miner service API: %w", err)
}
v, err := api.Version(ctx)
if err != nil {
return nil, nil, fmt.Errorf("checking miner service API version: %w", err)
}
if !v.APIVersion.EqMajorMinor(lapi.MinerAPIVersion0) {
return nil, nil, fmt.Errorf("miner service API version didn't match (expected %s, remote %s)", lapi.MinerAPIVersion0, v.APIVersion)
}
return api, closer, nil
}
func StorageAuthWithURL(apiInfo string) (sealer.StorageAuth, error) {
s := strings.Split(apiInfo, ":")
if len(s) != 2 {
return nil, errors.New("unexpected format of `apiInfo`")
}
headers := http.Header{}
headers.Add("Authorization", "Bearer "+s[0])
return sealer.StorageAuth(headers), nil
}
func CreateSectorAccessor(ctx context.Context, storageApiInfo string, fullnodeApi v1api.FullNode, log *logging.ZapEventLogger) (dagstore.SectorAccessor, jsonrpc.ClientCloser, error) {
sauth, err := StorageAuthWithURL(storageApiInfo)
if err != nil {
return nil, nil, fmt.Errorf("parsing storage API endpoint: %w", err)
}
storageService, storageCloser, err := GetMinerApi(ctx, storageApiInfo, log)
if err != nil {
return nil, nil, fmt.Errorf("getting miner API: %w", err)
}
maddr, err := storageService.ActorAddress(ctx)
if err != nil {
return nil, nil, fmt.Errorf("getting miner actor address: %w", err)
}
log.Infof("Miner address: %s", maddr)
// Use an in-memory repo because we don't need any functions
// of a real repo, we just need to supply something that satisfies
// the LocalStorage interface to the store
memRepo := repo.NewMemory(nil)
// passing FullNode, so that we don't pass StorageMiner or Worker and
// skip initializing of sectorstore.json with random local storage ID
lr, err := memRepo.Lock(repo.FullNode)
if err != nil {
return nil, nil, fmt.Errorf("locking mem repo: %w", err)
}
defer lr.Close()
if err := lr.SetStorage(func(sc *storiface.StorageConfig) {
sc.StoragePaths = []storiface.LocalPath{}
}); err != nil {
return nil, nil, fmt.Errorf("set storage config: %w", err)
}
// Create the store interface
var urls []string
lstor, err := paths.NewLocal(ctx, lr, storageService, urls)
if err != nil {
return nil, nil, fmt.Errorf("creating new local store: %w", err)
}
storage := lotus_modules.RemoteStorage(lstor, storageService, sauth, config.SealerConfig{
// TODO: Not sure if I need this, or any of the other fields in this struct
ParallelFetchLimit: 1,
})
// Create the piece provider
pp := sealer.NewPieceProvider(storage, storageService, storageService)
const maxCacheSize = 4096
newSectorAccessor := sectoraccessor.NewCachingSectorAccessor(maxCacheSize, 5*time.Minute)
sa := newSectorAccessor(dtypes.MinerAddress(maddr), storageService, pp, fullnodeApi)
return sa, storageCloser, nil
}