diff --git a/api/client/client.go b/api/client/client.go new file mode 100644 index 000000000..1abbeb3b9 --- /dev/null +++ b/api/client/client.go @@ -0,0 +1,47 @@ +package client + +import ( + "context" + "net/http" + "net/url" + "path" + + "github.com/filecoin-project/go-jsonrpc" + + "github.com/filecoin-project/boost/api" + "github.com/filecoin-project/boost/lib/rpcenc" +) + +func getPushUrl(addr string) (string, error) { + pushUrl, err := url.Parse(addr) + if err != nil { + return "", err + } + switch pushUrl.Scheme { + case "ws": + pushUrl.Scheme = "http" + case "wss": + pushUrl.Scheme = "https" + } + ///rpc/v0 -> /rpc/streams/v0/push + + pushUrl.Path = path.Join(pushUrl.Path, "../streams/v0/push") + return pushUrl.String(), nil +} + +// NewBoostRPCV0 creates a new http jsonrpc client for miner +func NewBoostRPCV0(ctx context.Context, addr string, requestHeader http.Header, opts ...jsonrpc.Option) (api.Boost, jsonrpc.ClientCloser, error) { + pushUrl, err := getPushUrl(addr) + if err != nil { + return nil, nil, err + } + + var res api.BoostStruct + closer, err := jsonrpc.NewMergeClient(ctx, addr, "Filecoin", + api.GetInternalStructs(&res), requestHeader, + append([]jsonrpc.Option{ + rpcenc.ReaderParamEncoder(pushUrl), + }, opts...)...) + + return &res, closer, err +} diff --git a/cli/cmd.go b/cli/cmd.go new file mode 100644 index 000000000..b2b149233 --- /dev/null +++ b/cli/cmd.go @@ -0,0 +1,9 @@ +package cli + +import ( + cliutil "github.com/filecoin-project/boost/cli/util" +) + +var GetBoostAPI = cliutil.GetBoostAPI + +var DaemonContext = cliutil.DaemonContext diff --git a/cli/util/api.go b/cli/util/api.go new file mode 100644 index 000000000..28943dde4 --- /dev/null +++ b/cli/util/api.go @@ -0,0 +1,250 @@ +package cliutil + +import ( + "context" + "errors" + "fmt" + "net/http" + "net/url" + "os" + "os/signal" + "strings" + "syscall" + + "github.com/mitchellh/go-homedir" + "github.com/urfave/cli/v2" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-jsonrpc" + + logging "github.com/ipfs/go-log/v2" + + "github.com/filecoin-project/boost/api" + "github.com/filecoin-project/boost/api/client" + "github.com/filecoin-project/boost/node/repo" +) + +const ( + metadataTraceContext = "traceContext" +) + +var log = logging.Logger("cli") + +// flagsForAPI returns flags passed on the command line with the listen address +// of the API server (only used by the tests), in the order of precedence they +// should be applied for the requested kind of node. +func flagsForAPI(t repo.RepoType) []string { + switch t { + case repo.Boost: + return []string{"boost-api-url"} + default: + panic(fmt.Sprintf("Unknown repo type: %v", t)) + } +} + +func flagsForRepo(t repo.RepoType) []string { + switch t { + case repo.Boost: + return []string{"boost-repo"} + default: + panic(fmt.Sprintf("Unknown repo type: %v", t)) + } +} + +// EnvsForAPIInfos returns the environment variables to use in order of precedence +// to determine the API endpoint of the specified node type. +// +// It returns the current variables and deprecated ones separately, so that +// the user can log a warning when deprecated ones are found to be in use. +func EnvsForAPIInfos(t repo.RepoType) (primary string, fallbacks []string, deprecated []string) { + switch t { + case repo.Boost: + return "BOOST_API_INFO", []string{"BOOST_API_INFO"}, nil + default: + panic(fmt.Sprintf("Unknown repo type: %v", t)) + } +} + +// GetAPIInfo returns the API endpoint to use for the specified kind of repo. +// +// The order of precedence is as follows: +// +// 1. *-api-url command line flags. +// 2. *_API_INFO environment variables +// 3. deprecated *_API_INFO environment variables +// 4. *-repo command line flags. +func GetAPIInfo(ctx *cli.Context, t repo.RepoType) (APIInfo, error) { + // Check if there was a flag passed with the listen address of the API + // server (only used by the tests) + apiFlags := flagsForAPI(t) + for _, f := range apiFlags { + if !ctx.IsSet(f) { + continue + } + strma := ctx.String(f) + strma = strings.TrimSpace(strma) + + return APIInfo{Addr: strma}, nil + } + + // + // Note: it is not correct/intuitive to prefer environment variables over + // CLI flags (repo flags below). + // + primaryEnv, fallbacksEnvs, deprecatedEnvs := EnvsForAPIInfos(t) + env, ok := os.LookupEnv(primaryEnv) + if ok { + return ParseApiInfo(env), nil + } + + for _, env := range deprecatedEnvs { + env, ok := os.LookupEnv(env) + if ok { + log.Warnf("Using deprecated env(%s) value, please use env(%s) instead.", env, primaryEnv) + return ParseApiInfo(env), nil + } + } + + repoFlags := flagsForRepo(t) + for _, f := range repoFlags { + // cannot use ctx.IsSet because it ignores default values + path := ctx.String(f) + if path == "" { + continue + } + + p, err := homedir.Expand(path) + if err != nil { + return APIInfo{}, xerrors.Errorf("could not expand home dir (%s): %w", f, err) + } + + r, err := repo.NewFS(p) + if err != nil { + return APIInfo{}, xerrors.Errorf("could not open repo at path: %s; %w", p, err) + } + + exists, err := r.Exists() + if err != nil { + return APIInfo{}, xerrors.Errorf("repo.Exists returned an error: %w", err) + } + + if !exists { + return APIInfo{}, errors.New("repo directory does not exist. Make sure your configuration is correct") + } + + ma, err := r.APIEndpoint() + if err != nil { + return APIInfo{}, xerrors.Errorf("could not get api endpoint: %w", err) + } + + token, err := r.APIToken() + if err != nil { + log.Warnf("Couldn't load CLI token, capabilities may be limited: %v", err) + } + + return APIInfo{ + Addr: ma.String(), + Token: token, + }, nil + } + + for _, env := range fallbacksEnvs { + env, ok := os.LookupEnv(env) + if ok { + return ParseApiInfo(env), nil + } + } + + return APIInfo{}, fmt.Errorf("could not determine API endpoint for node type: %v", t) +} + +type GetBoostOptions struct { + PreferHttp bool +} + +type GetBoostOption func(*GetBoostOptions) + +func BoostUseHttp(opts *GetBoostOptions) { + opts.PreferHttp = true +} + +func GetBoostAPI(ctx *cli.Context, opts ...GetBoostOption) (api.Boost, jsonrpc.ClientCloser, error) { + var options GetBoostOptions + for _, opt := range opts { + opt(&options) + } + + if tn, ok := ctx.App.Metadata["testnode-boost"]; ok { + return tn.(api.Boost), func() {}, nil + } + + addr, headers, err := GetRawAPI(ctx, repo.Boost, "v0") + if err != nil { + return nil, nil, err + } + + if options.PreferHttp { + u, err := url.Parse(addr) + if err != nil { + return nil, nil, xerrors.Errorf("parsing miner api URL: %w", err) + } + + switch u.Scheme { + case "ws": + u.Scheme = "http" + case "wss": + u.Scheme = "https" + } + + addr = u.String() + } + + if IsVeryVerbose { + _, _ = fmt.Fprintln(ctx.App.Writer, "using miner API v0 endpoint:", addr) + } + + return client.NewBoostRPCV0(ctx.Context, addr, headers) +} + +func GetRawAPI(ctx *cli.Context, t repo.RepoType, version string) (string, http.Header, error) { + ainfo, err := GetAPIInfo(ctx, t) + if err != nil { + return "", nil, xerrors.Errorf("could not get API info for %s: %w", t, err) + } + + addr, err := ainfo.DialArgs(version) + if err != nil { + return "", nil, xerrors.Errorf("could not get DialArgs: %w", err) + } + + if IsVeryVerbose { + _, _ = fmt.Fprintf(ctx.App.Writer, "using raw API %s endpoint: %s\n", version, addr) + } + + return addr, ainfo.AuthHeader(), nil +} + +func DaemonContext(cctx *cli.Context) context.Context { + if mtCtx, ok := cctx.App.Metadata[metadataTraceContext]; ok { + return mtCtx.(context.Context) + } + + return context.Background() +} + +// ReqContext returns context for cli execution. Calling it for the first time +// installs SIGTERM handler that will close returned context. +// Not safe for concurrent execution. +func ReqContext(cctx *cli.Context) context.Context { + tCtx := DaemonContext(cctx) + + ctx, done := context.WithCancel(tCtx) + sigChan := make(chan os.Signal, 2) + go func() { + <-sigChan + done() + }() + signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP) + + return ctx +} diff --git a/cli/util/apiinfo.go b/cli/util/apiinfo.go new file mode 100644 index 000000000..a91f37b2b --- /dev/null +++ b/cli/util/apiinfo.go @@ -0,0 +1,80 @@ +package cliutil + +import ( + "net/http" + "net/url" + "regexp" + "strings" + + "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +var ( + infoWithToken = regexp.MustCompile(`^[a-zA-Z0-9\-_]+?\.[a-zA-Z0-9\-_]+?\.([a-zA-Z0-9\-_]+)?:.+$`) +) + +type APIInfo struct { + Addr string + Token []byte +} + +func ParseApiInfo(s string) APIInfo { + var tok []byte + if infoWithToken.Match([]byte(s)) { + sp := strings.SplitN(s, ":", 2) + tok = []byte(sp[0]) + s = sp[1] + } + + return APIInfo{ + Addr: s, + Token: tok, + } +} + +func (a APIInfo) DialArgs(version string) (string, error) { + ma, err := multiaddr.NewMultiaddr(a.Addr) + if err == nil { + _, addr, err := manet.DialArgs(ma) + if err != nil { + return "", err + } + + return "ws://" + addr + "/rpc/" + version, nil + } + + _, err = url.Parse(a.Addr) + if err != nil { + return "", err + } + return a.Addr + "/rpc/" + version, nil +} + +func (a APIInfo) Host() (string, error) { + ma, err := multiaddr.NewMultiaddr(a.Addr) + if err == nil { + _, addr, err := manet.DialArgs(ma) + if err != nil { + return "", err + } + + return addr, nil + } + + spec, err := url.Parse(a.Addr) + if err != nil { + return "", err + } + return spec.Host, nil +} + +func (a APIInfo) AuthHeader() http.Header { + if len(a.Token) != 0 { + headers := http.Header{} + headers.Add("Authorization", "Bearer "+string(a.Token)) + return headers + } + log.Warn("API Token not set and requested, capabilities might be limited.") + return nil +} diff --git a/cmd/boost/dummydeal.go b/cmd/boost/dummydeal.go new file mode 100644 index 000000000..84deda3d1 --- /dev/null +++ b/cmd/boost/dummydeal.go @@ -0,0 +1,35 @@ +package main + +import ( + "github.com/davecgh/go-spew/spew" + "github.com/urfave/cli/v2" + "golang.org/x/xerrors" + + lcli "github.com/filecoin-project/boost/cli" +) + +var dummydealCmd = &cli.Command{ + Name: "dummydeal", + Usage: "Trigger a sample deal", + Before: before, + Action: func(cctx *cli.Context) error { + boostApi, ncloser, err := lcli.GetBoostAPI(cctx) + if err != nil { + return xerrors.Errorf("getting boost api: %w", err) + } + defer ncloser() + + ctx := lcli.DaemonContext(cctx) + + log.Debug("Get boost identity") + + res, err := boostApi.ID(ctx) + if err != nil { + return xerrors.Errorf("couldnt get boost identity: %w", err) + } + + spew.Dump(res) + + return nil + }, +} diff --git a/cmd/boost/main.go b/cmd/boost/main.go index 2f97ad02c..482034f5f 100644 --- a/cmd/boost/main.go +++ b/cmd/boost/main.go @@ -36,6 +36,7 @@ func main() { Commands: []*cli.Command{ runCmd, initCmd, + dummydealCmd, }, } app.Setup() diff --git a/go.mod b/go.mod index 74dae6a8a..b28adb1cd 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi require ( contrib.go.opencensus.io/exporter/prometheus v0.4.0 github.com/BurntSushi/toml v0.3.1 + github.com/davecgh/go-spew v1.1.1 github.com/dgraph-io/badger/v2 v2.2007.2 github.com/filecoin-project/dagstore v0.4.3 github.com/filecoin-project/go-address v0.0.5 @@ -108,7 +109,6 @@ require ( github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 // indirect github.com/daaku/go.zipexe v1.0.0 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect github.com/detailyang/go-fallocate v0.0.0-20180908115635-432fa640bd2e // indirect github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de // indirect diff --git a/lib/rpcenc/reader.go b/lib/rpcenc/reader.go new file mode 100644 index 000000000..6693dc83d --- /dev/null +++ b/lib/rpcenc/reader.go @@ -0,0 +1,445 @@ +package rpcenc + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "path" + "reflect" + "strconv" + "sync" + "time" + + "github.com/google/uuid" + logging "github.com/ipfs/go-log/v2" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-jsonrpc" + "github.com/filecoin-project/go-state-types/abi" + sealing "github.com/filecoin-project/lotus/extern/storage-sealing" +) + +var log = logging.Logger("rpcenc") + +var Timeout = 30 * time.Second + +type StreamType string + +const ( + Null StreamType = "null" + PushStream StreamType = "push" + // TODO: Data transfer handoff to workers? +) + +type ReaderStream struct { + Type StreamType + Info string +} + +var client = func() *http.Client { + c := *http.DefaultClient + c.CheckRedirect = func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + } + return &c +}() + +/* + + Example rpc function: + Push(context.Context, io.Reader) error + + Request flow: + 1. Client invokes a method with an io.Reader param + 2. go-jsonrpc invokes `ReaderParamEncoder` for the client-provided io.Reader + 3. `ReaderParamEncoder` transforms the reader into a `ReaderStream` which can + be serialized as JSON, and sent as jsonrpc request parameter + 3.1. If the reader is of type `*sealing.NullReader`, the resulting object + is `ReaderStream{ Type: "null", Info: "[base 10 number of bytes]" }` + 3.2. If the reader is of type `*RpcReader`, and it wasn't read from, we + notify that RpcReader to go a different push endpoint, and return + a `ReaderStream` object like in 3.4. + 3.3. In remaining cases we start a goroutine which: + 3.3.1. Makes a HEAD request to the server push endpoint + 3.3.2. If the HEAD request is redirected, it follows the redirect + 3.3.3. If the request succeeds, it starts a POST request to the + endpoint to which the last HEAD request was sent with the + reader set as request body. + 3.4. We return a `ReaderStream` indicating the uuid of push request, ex: + `ReaderStream{ Type: "push", Info: "[UUID string]" }` + 4. If the reader wasn't a NullReader, the server will receive a HEAD (or + POST in case of older clients) request to the push endpoint. + 4.1. The server gets or registers an `*RpcReader` in the `readers` map. + 4.2. It waits for a request to a matching push endpoint to be opened + 4.3. After the request is opened, it returns the `*RpcReader` to + go-jsonrpc, which will pass it as the io.Reader parameter to the + rpc method implementation + 4.4. If the first request made to the push endpoint was a POST, the + returned `*RpcReader` acts as a simple reader reading the POST + request body + 4.5. If the first request made to the push endpoint was a HEAD + 4.5.1. On the first call to Read or Close the server responds with + a 200 OK header, the client starts a POST request to the same + push URL, and the reader starts passing through the POST request + body + 4.5.2. If the reader is passed to another (now client) RPC method as a + reader parameter, the server for the first request responds to the + HEAD request with http 302 Found, instructing the first client to + go to the push endpoint of the second RPC server + 5. If the reader was a NullReader (ReaderStream.Type=="null"), we instantiate + it, and provide to the method implementation + +*/ + +func ReaderParamEncoder(addr string) jsonrpc.Option { + // Client side parameter encoder. Runs on the rpc client side. io.Reader -> ReaderStream{} + return jsonrpc.WithParamEncoder(new(io.Reader), func(value reflect.Value) (reflect.Value, error) { + r := value.Interface().(io.Reader) + + if r, ok := r.(*sealing.NullReader); ok { + return reflect.ValueOf(ReaderStream{Type: Null, Info: fmt.Sprint(r.N)}), nil + } + + reqID := uuid.New() + u, err := url.Parse(addr) + if err != nil { + return reflect.Value{}, xerrors.Errorf("parsing push address: %w", err) + } + u.Path = path.Join(u.Path, reqID.String()) + + rpcReader, redir := r.(*RpcReader) + if redir { + // if we have an rpc stream, redirect instead of proxying all the data + redir = rpcReader.redirect(u.String()) + } + + if !redir { + go func() { + // TODO: figure out errors here + for { + req, err := http.NewRequest("HEAD", u.String(), nil) + if err != nil { + log.Errorf("sending HEAD request for the reder param: %+v", err) + return + } + req.Header.Set("Content-Type", "application/octet-stream") + resp, err := client.Do(req) + if err != nil { + log.Errorf("sending reader param: %+v", err) + return + } + // todo do we need to close the body for a head request? + + if resp.StatusCode == http.StatusFound { + nextStr := resp.Header.Get("Location") + u, err = url.Parse(nextStr) + if err != nil { + log.Errorf("sending HEAD request for the reder param, parsing next url (%s): %+v", nextStr, err) + return + } + + continue + } + + if resp.StatusCode == http.StatusNoContent { // reader closed before reading anything + // todo just return?? + return + } + + if resp.StatusCode != http.StatusOK { + b, _ := ioutil.ReadAll(resp.Body) + log.Errorf("sending reader param (%s): non-200 status: %s, msg: '%s'", u.String(), resp.Status, string(b)) + return + } + + break + } + + // now actually send the data + req, err := http.NewRequest("POST", u.String(), r) + if err != nil { + log.Errorf("sending reader param: %+v", err) + return + } + req.Header.Set("Content-Type", "application/octet-stream") + resp, err := client.Do(req) + if err != nil { + log.Errorf("sending reader param: %+v", err) + return + } + + defer resp.Body.Close() //nolint + + if resp.StatusCode != http.StatusOK { + b, _ := ioutil.ReadAll(resp.Body) + log.Errorf("sending reader param (%s): non-200 status: %s, msg: '%s'", u.String(), resp.Status, string(b)) + return + } + }() + } + + return reflect.ValueOf(ReaderStream{Type: PushStream, Info: reqID.String()}), nil + }) +} + +type resType int + +const ( + resStart resType = iota // send on first read after HEAD + resRedirect // send on redirect before first read after HEAD + resError + // done/closed = close res channel +) + +type readRes struct { + rt resType + meta string +} + +// RpcReader watches the ReadCloser and closes the res channel when +// either: (1) the ReaderCloser fails on Read (including with a benign error +// like EOF), or (2) when Close is called. +// +// Use it be notified of terminal states, in situations where a Read failure (or +// EOF) is considered a terminal state too (besides Close). +type RpcReader struct { + postBody io.ReadCloser // nil on initial head request + next chan *RpcReader // on head will get us the postBody after sending resStart + mustRedirect bool + + res chan readRes + beginOnce *sync.Once + closeOnce sync.Once +} + +var ErrHasBody = errors.New("RPCReader has body, either already read from or from a client with no redirect support") +var ErrMustRedirect = errors.New("reader can't be read directly; marked as MustRedirect") + +// MustRedirect marks the reader as required to be redirected. Will make local +// calls Read fail. MUST be called before this reader is used in any goroutine. +// If the reader can't be redirected will return ErrHasBody +func (w *RpcReader) MustRedirect() error { + if w.postBody != nil { + w.closeOnce.Do(func() { + w.res <- readRes{ + rt: resError, + } + close(w.res) + }) + + return ErrHasBody + } + + w.mustRedirect = true + return nil +} + +func (w *RpcReader) beginPost() { + if w.mustRedirect { + w.res <- readRes{ + rt: resError, + } + close(w.res) + return + } + + if w.postBody == nil { + w.res <- readRes{ + rt: resStart, + } + + nr := <-w.next + + w.postBody = nr.postBody + w.res = nr.res + w.beginOnce = nr.beginOnce + } +} + +func (w *RpcReader) Read(p []byte) (int, error) { + w.beginOnce.Do(func() { + w.beginPost() + }) + + if w.mustRedirect { + return 0, ErrMustRedirect + } + + if w.postBody == nil { + return 0, xerrors.Errorf("reader already closed or redirected") + } + + n, err := w.postBody.Read(p) + if err != nil { + w.closeOnce.Do(func() { + close(w.res) + }) + } + return n, err +} + +func (w *RpcReader) Close() error { + w.beginOnce.Do(func() {}) + w.closeOnce.Do(func() { + close(w.res) + }) + if w.postBody == nil { + return nil + } + return w.postBody.Close() +} + +func (w *RpcReader) redirect(to string) bool { + if w.postBody != nil { + return false + } + + done := false + + w.beginOnce.Do(func() { + w.closeOnce.Do(func() { + w.res <- readRes{ + rt: resRedirect, + meta: to, + } + + done = true + close(w.res) + }) + }) + + return done +} + +func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) { + var readersLk sync.Mutex + readers := map[uuid.UUID]chan *RpcReader{} + + // runs on the rpc server side, called by the client before making the jsonrpc request + hnd := func(resp http.ResponseWriter, req *http.Request) { + strId := path.Base(req.URL.Path) + u, err := uuid.Parse(strId) + if err != nil { + http.Error(resp, fmt.Sprintf("parsing reader uuid: %s", err), 400) + return + } + + readersLk.Lock() + ch, found := readers[u] + if !found { + ch = make(chan *RpcReader) + readers[u] = ch + } + readersLk.Unlock() + + wr := &RpcReader{ + res: make(chan readRes), + next: ch, + beginOnce: &sync.Once{}, + } + + switch req.Method { + case http.MethodHead: + // leave body nil + case http.MethodPost: + wr.postBody = req.Body + default: + http.Error(resp, "unsupported method", http.StatusMethodNotAllowed) + } + + tctx, cancel := context.WithTimeout(req.Context(), Timeout) + defer cancel() + + select { + case ch <- wr: + case <-tctx.Done(): + close(ch) + log.Errorf("context error in reader stream handler (1): %v", tctx.Err()) + resp.WriteHeader(500) + return + } + + select { + case res, ok := <-wr.res: + if !ok { + if req.Method == http.MethodHead { + resp.WriteHeader(http.StatusNoContent) + } else { + resp.WriteHeader(http.StatusOK) + } + return + } + // TODO should we check if we failed the Read, and if so + // return an HTTP 500? i.e. turn res into a chan error? + + switch res.rt { + case resRedirect: + http.Redirect(resp, req, res.meta, http.StatusFound) + case resStart: // responding to HEAD, request POST with reader data + resp.WriteHeader(http.StatusOK) + case resError: + resp.WriteHeader(500) + default: + log.Errorf("unknown res.rt") + resp.WriteHeader(500) + } + + return + case <-req.Context().Done(): + log.Errorf("context error in reader stream handler (2): %v", req.Context().Err()) + resp.WriteHeader(500) + return + } + } + + // Server side reader decoder. runs on the rpc server side, invoked when decoding client request parameters. json(ReaderStream{}) -> io.Reader + dec := jsonrpc.WithParamDecoder(new(io.Reader), func(ctx context.Context, b []byte) (reflect.Value, error) { + var rs ReaderStream + if err := json.Unmarshal(b, &rs); err != nil { + return reflect.Value{}, xerrors.Errorf("unmarshaling reader id: %w", err) + } + + if rs.Type == Null { + n, err := strconv.ParseInt(rs.Info, 10, 64) + if err != nil { + return reflect.Value{}, xerrors.Errorf("parsing null byte count: %w", err) + } + + return reflect.ValueOf(sealing.NewNullReader(abi.UnpaddedPieceSize(n))), nil + } + + u, err := uuid.Parse(rs.Info) + if err != nil { + return reflect.Value{}, xerrors.Errorf("parsing reader UUDD: %w", err) + } + + readersLk.Lock() + ch, found := readers[u] + if !found { + ch = make(chan *RpcReader) + readers[u] = ch + } + readersLk.Unlock() + + ctx, cancel := context.WithTimeout(ctx, Timeout) + defer cancel() + + select { + case wr, ok := <-ch: + if !ok { + return reflect.Value{}, xerrors.Errorf("handler timed out") + } + + return reflect.ValueOf(wr), nil + case <-ctx.Done(): + return reflect.Value{}, ctx.Err() + } + }) + + return hnd, dec +} diff --git a/node/repo/fsrepo.go b/node/repo/fsrepo.go index 150898f1a..d870ef4d3 100644 --- a/node/repo/fsrepo.go +++ b/node/repo/fsrepo.go @@ -42,8 +42,8 @@ const ( type RepoType int const ( - //_ = iota // Default is invalid - Boost RepoType = iota + 6 + _ = iota // Default is invalid + Boost RepoType = iota ) func (t RepoType) String() string {