diff --git a/cmd/labapp/command/root.go b/cmd/labapp/command/root.go index ddacb1ea..7eca6238 100644 --- a/cmd/labapp/command/root.go +++ b/cmd/labapp/command/root.go @@ -42,6 +42,12 @@ func App(ctx context.Context) *cli.App { Value: ":7003", EnvVar: "LABAPP_ADDRESS", }, + cli.StringFlag{ + Name: "libp2p-address", + Usage: "address for libp2p", + Value: "/ip4/0.0.0.0/tcp/0", + EnvVar: "LABAPP_LIBP2P_ADDRESS", + }, cli.StringFlag{ Name: "log-level,l", Usage: "set the logging level [debug, info, warn, error, fatal, panic, none]", @@ -65,7 +71,7 @@ func appAction(c *cli.Context) error { } ctx := cliutil.CommandContext(c) - app, err := labapp.New(root, c.GlobalString("address"), zerolog.Ctx(ctx)) + app, err := labapp.New(root, c.GlobalString("address"), c.GlobalString("libp2p-address"), zerolog.Ctx(ctx)) if err != nil { return err } diff --git a/cmd/labd/command/root.go b/cmd/labd/command/root.go index c01dddb2..c662c76f 100644 --- a/cmd/labd/command/root.go +++ b/cmd/labd/command/root.go @@ -21,6 +21,7 @@ import ( "github.com/Netflix/p2plab/labd" "github.com/Netflix/p2plab/pkg/cliutil" "github.com/Netflix/p2plab/uploaders" + "github.com/Netflix/p2plab/uploaders/fileuploader" "github.com/Netflix/p2plab/uploaders/s3uploader" "github.com/Netflix/p2plab/version" "github.com/rs/zerolog" @@ -44,6 +45,12 @@ func App(ctx context.Context) *cli.App { Value: ":7001", EnvVar: "LABD_ADDRESS", }, + cli.StringFlag{ + Name: "libp2p-address", + Usage: "address for libp2p", + Value: "/ip4/0.0.0.0/tcp/0", + EnvVar: "LABD_LIBP2P_ADDRESS", + }, cli.StringFlag{ Name: "log-level,l", Usage: "set the logging level [debug, info, warn, error, fatal, panic]", @@ -52,14 +59,14 @@ func App(ctx context.Context) *cli.App { }, cli.StringFlag{ Name: "provider,p", - Usage: "set the provider to create node groups [terraform]", - Value: "terraform", + Usage: "set the provider to create node groups [inmemory, terraform]", + Value: "inmemory", EnvVar: "LABD_PROVIDER", }, cli.StringFlag{ Name: "uploader,u", - Usage: "set the uploader to use to distribute p2p app binaries [s3]", - Value: "s3", + Usage: "set the uploader to use to distribute p2p app binaries [file, s3]", + Value: "file", EnvVar: "LABD_UPLOADER", }, cli.StringFlag{ @@ -77,6 +84,12 @@ func App(ctx context.Context) *cli.App { Usage: "region for s3 uploader", EnvVar: "LABD_UPLOADER_S3_REGION", }, + cli.StringFlag{ + Name: "uploader.file.address", + Usage: "address for file uploader", + Value: ":7000", + EnvVar: "LABD_UPLOADER_FILE_ADDRESS", + }, } app.Action = daemonAction @@ -94,14 +107,18 @@ func daemonAction(c *cli.Context) error { } ctx := cliutil.CommandContext(c) - daemon, err := labd.New(root, c.String("address"), zerolog.Ctx(ctx), - labd.WithProvider(c.String("provider")), - labd.WithUploader(c.String("uploader")), + daemon, err := labd.New(root, c.GlobalString("address"), zerolog.Ctx(ctx), + labd.WithLibp2pAddress(c.GlobalString("libp2p-address")), + labd.WithProvider(c.GlobalString("provider")), + labd.WithUploader(c.GlobalString("uploader")), labd.WithUploaderSettings(uploaders.UploaderSettings{ S3: s3uploader.S3UploaderSettings{ - Bucket: c.String("uploader.s3.bucket"), - Prefix: c.String("uploader.s3.prefix"), - Region: c.String("uploader.s3.region"), + Bucket: c.GlobalString("uploader.s3.bucket"), + Prefix: c.GlobalString("uploader.s3.prefix"), + Region: c.GlobalString("uploader.s3.region"), + }, + File: fileuploader.FileUploaderSettings{ + Address: c.GlobalString("uploader.file.address"), }, }), ) diff --git a/cmd/ociadd/main.go b/cmd/ociadd/main.go index cb192f23..0f96e186 100644 --- a/cmd/ociadd/main.go +++ b/cmd/ociadd/main.go @@ -54,7 +54,7 @@ func run(ref string) error { defer cancel() root := "./tmp/ociadd" - p, err := peer.New(ctx, filepath.Join(root, "peer")) + p, err := peer.New(ctx, filepath.Join(root, "peer"), "/ip4/0.0.0.0/tcp/0") if err != nil { return err } diff --git a/cmd/ociget/main.go b/cmd/ociget/main.go index 531ab35d..63e77a89 100644 --- a/cmd/ociget/main.go +++ b/cmd/ociget/main.go @@ -58,7 +58,7 @@ func run(addr, ref string) error { defer cancel() root := "./tmp/ociget" - p, err := peer.New(ctx, filepath.Join(root, "peer")) + p, err := peer.New(ctx, filepath.Join(root, "peer"), "/ip4/0.0.0.0/tcp/0") if err != nil { return err } diff --git a/distribution.go b/distribution.go index 168a0a03..6953f4d0 100644 --- a/distribution.go +++ b/distribution.go @@ -29,6 +29,8 @@ type Builder interface { type Uploader interface { Upload(ctx context.Context, r io.Reader) (link string, err error) + + Close() error } type Downloader interface { diff --git a/downloaders/downloaders.go b/downloaders/downloaders.go index 3daaba58..f07e823d 100644 --- a/downloaders/downloaders.go +++ b/downloaders/downloaders.go @@ -18,6 +18,7 @@ import ( "sync" "github.com/Netflix/p2plab" + "github.com/Netflix/p2plab/downloaders/filedownloader" "github.com/Netflix/p2plab/downloaders/httpdownloader" "github.com/Netflix/p2plab/downloaders/s3downloader" "github.com/Netflix/p2plab/pkg/httputil" @@ -64,10 +65,12 @@ func (f *Downloaders) Get(downloaderType string) (p2plab.Downloader, error) { func (f *Downloaders) newDownloader(downloaderType string) (p2plab.Downloader, error) { // root := filepath.Join(f.root, downloaderType) switch downloaderType { - case "s3": - return s3downloader.New(f.settings.Client.HTTPClient, f.settings.S3) + case "file": + return filedownloader.New(), nil case "http", "https": return httpdownloader.New(f.settings.Client), nil + case "s3": + return s3downloader.New(f.settings.Client.HTTPClient, f.settings.S3) default: return nil, errors.Errorf("unrecognized downloader type: %q", downloaderType) } diff --git a/downloaders/filedownloader/downloader.go b/downloaders/filedownloader/downloader.go new file mode 100644 index 00000000..4b31aa7b --- /dev/null +++ b/downloaders/filedownloader/downloader.go @@ -0,0 +1,44 @@ +// Copyright 2019 Netflix, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package filedownloader + +import ( + "context" + "io" + "net/url" + "os" + "path/filepath" + + "github.com/Netflix/p2plab" + "github.com/Netflix/p2plab/errdefs" + "github.com/pkg/errors" +) + +type downloader struct { +} + +func New() p2plab.Downloader { + return &downloader{} +} + +func (f *downloader) Download(ctx context.Context, link string) (io.ReadCloser, error) { + u, err := url.Parse(link) + if err != nil { + return nil, errors.Wrapf(errdefs.ErrInvalidArgument, "invalid url %q", link) + } + + downloadPath := filepath.Join(u.Host, u.Path) + return os.Open(downloadPath) +} diff --git a/downloaders/httpdownloader/downloader.go b/downloaders/httpdownloader/downloader.go index f0dc760f..e0b1b4d1 100644 --- a/downloaders/httpdownloader/downloader.go +++ b/downloaders/httpdownloader/downloader.go @@ -30,8 +30,8 @@ func New(client *httputil.Client) p2plab.Downloader { return &downloader{client} } -func (f *downloader) Download(ctx context.Context, ref string) (io.ReadCloser, error) { - req := f.client.NewRequest("GET", ref) +func (f *downloader) Download(ctx context.Context, link string) (io.ReadCloser, error) { + req := f.client.NewRequest("GET", link) resp, err := req.Send(ctx) if err != nil { return nil, err diff --git a/go.mod b/go.mod index b4bdf36b..8a82a43d 100644 --- a/go.mod +++ b/go.mod @@ -56,8 +56,9 @@ require ( github.com/opencontainers/runc v1.0.0-rc8 // indirect github.com/opentracing-contrib/go-stdlib v0.0.0-20190519235532-cf7a6c988dc9 github.com/opentracing/opentracing-go v1.1.0 - github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 // indirect + github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 github.com/pkg/errors v0.8.1 + github.com/rs/xid v1.2.1 github.com/rs/zerolog v1.14.4-0.20190719171043-b806a5ecbe53 github.com/sirupsen/logrus v1.4.0 // indirect github.com/stretchr/testify v1.3.0 diff --git a/labapp/labapp.go b/labapp/labapp.go index 7f7c4b7d..1d94dd1c 100644 --- a/labapp/labapp.go +++ b/labapp/labapp.go @@ -30,10 +30,10 @@ type LabApp struct { closers []io.Closer } -func New(root, addr string, logger *zerolog.Logger) (*LabApp, error) { +func New(root, addr, libp2pAddr string, logger *zerolog.Logger) (*LabApp, error) { var closers []io.Closer pctx, cancel := context.WithCancel(context.Background()) - p, err := peer.New(pctx, root) + p, err := peer.New(pctx, root, libp2pAddr) if err != nil { return nil, err } diff --git a/labd/labd.go b/labd/labd.go index 4b3dbe01..af54d9ab 100644 --- a/labd/labd.go +++ b/labd/labd.go @@ -66,16 +66,20 @@ func New(root, addr string, logger *zerolog.Logger, opts ...LabdOption) (*Labd, return nil, err } + settings.ProviderSettings.DB = db + settings.ProviderSettings.Logger = logger provider, err := providers.GetNodeProvider(filepath.Join(root, "providers"), settings.Provider, settings.ProviderSettings) if err != nil { return nil, err } settings.UploaderSettings.Client = client + settings.UploaderSettings.Logger = logger uploader, err := uploaders.GetUploader(filepath.Join(root, "uploaders"), settings.Uploader, settings.UploaderSettings) if err != nil { return nil, err } + closers = append(closers, uploader) builder, err := builder.New(filepath.Join(root, "builder"), db, uploader) if err != nil { @@ -83,7 +87,7 @@ func New(root, addr string, logger *zerolog.Logger, opts ...LabdOption) (*Labd, } sctx, cancel := context.WithCancel(context.Background()) - seeder, err := peer.New(sctx, filepath.Join(root, "seeder")) + seeder, err := peer.New(sctx, filepath.Join(root, "seeder"), settings.Libp2pAddress) if err != nil { return nil, errors.Wrap(err, "failed to create seeder peer") } diff --git a/labd/settings.go b/labd/settings.go index daa4e550..d969e726 100644 --- a/labd/settings.go +++ b/labd/settings.go @@ -22,12 +22,20 @@ import ( type LabdOption func(*LabdSettings) error type LabdSettings struct { + Libp2pAddress string Provider string ProviderSettings providers.ProviderSettings Uploader string UploaderSettings uploaders.UploaderSettings } +func WithLibp2pAddress(addr string) LabdOption { + return func(s *LabdSettings) error { + s.Libp2pAddress = addr + return nil + } +} + func WithProvider(provider string) LabdOption { return func(s *LabdSettings) error { s.Provider = provider diff --git a/metadata/buckets.go b/metadata/buckets.go index 73d17b6d..51d5b5ad 100644 --- a/metadata/buckets.go +++ b/metadata/buckets.go @@ -39,7 +39,9 @@ var ( bucketKeySource = []byte("source") // Node buckets. - bucketKeyAddress = []byte("address") + bucketKeyAddress = []byte("address") + bucketKeyAgentPort = []byte("agentPort") + bucketKeyAppPort = []byte("appPort") // Build buckets bucketKeyLink = []byte("link") @@ -49,15 +51,15 @@ var ( bucketKeyScenario = []byte("scenario") bucketKeyPlan = []byte("plan") bucketKeySubject = []byte("subject") - bucketKeyReport = []byte("report") + bucketKeyReport = []byte("report") // Common buckets. - bucketKeyID = []byte("id") - bucketKeyStatus = []byte("status") - bucketKeyLabels = []byte("labels") - bucketKeyCreatedAt = []byte("createdAt") - bucketKeyUpdatedAt = []byte("updatedAt") - bucketKeyDefinition = []byte("definition") + bucketKeyID = []byte("id") + bucketKeyStatus = []byte("status") + bucketKeyLabels = []byte("labels") + bucketKeyCreatedAt = []byte("createdAt") + bucketKeyUpdatedAt = []byte("updatedAt") + bucketKeyDefinition = []byte("definition") bucketKeyGitReference = []byte("gitReference") ) diff --git a/nodes/connect.go b/nodes/connect.go index b21e3af6..db253e12 100644 --- a/nodes/connect.go +++ b/nodes/connect.go @@ -36,11 +36,12 @@ func Connect(ctx context.Context, ns []p2plab.Node) error { collectPeerAddrs, gctx := errgroup.WithContext(ctx) - peerAddrs := make([]string, len(ns)) zerolog.Ctx(ctx).Info().Msg("Retrieving peer infos") go logutil.Elapsed(gctx, 20*time.Second, "Retrieving peer infos") - for i, n := range ns { - i, n := i, n + + var peerAddrs []string + for _, n := range ns { + n := n collectPeerAddrs.Go(func() error { peerInfo, err := n.PeerInfo(gctx) if err != nil { @@ -51,7 +52,11 @@ func Connect(ctx context.Context, ns []p2plab.Node) error { return errors.Errorf("peer %q has zero addresses", n.Metadata().Address) } - peerAddrs[i] = fmt.Sprintf("/ip4/%s/tcp/4001/p2p/%s", n.Metadata().Address, peerInfo.ID) + for _, ma := range peerInfo.Addrs { + peerAddrs = append(peerAddrs, fmt.Sprintf("%s/p2p/%s", ma, peerInfo.ID)) + zerolog.Ctx(gctx).Debug().Str("addr", ma.String()).Msg("Retrieved peer address") + } + return nil }) } diff --git a/peer/peer.go b/peer/peer.go index e62237b6..b1893d54 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -85,14 +85,14 @@ type Peer struct { reporter metrics.Reporter } -func New(ctx context.Context, root string) (*Peer, error) { +func New(ctx context.Context, root, addr string) (*Peer, error) { ds, err := NewDatastore(root) if err != nil { return nil, errors.Wrap(err, "failed to create datastore") } reporter := metrics.NewBandwidthCounter() - h, r, err := NewLibp2pPeer(ctx, reporter) + h, r, err := NewLibp2pPeer(ctx, addr, reporter) if err != nil { return nil, errors.Wrap(err, "failed to create libp2p peer") } @@ -307,7 +307,7 @@ func NewDatastore(path string) (datastore.Batching, error) { return badger.NewDatastore(path, &badger.DefaultOptions) } -func NewLibp2pPeer(ctx context.Context, reporter metrics.Reporter) (host.Host, routing.ContentRouting, error) { +func NewLibp2pPeer(ctx context.Context, addr string, reporter metrics.Reporter) (host.Host, routing.ContentRouting, error) { transports := libp2p.ChainOptions( libp2p.Transport(tcp.NewTCPTransport), libp2p.Transport(ws.New), @@ -319,9 +319,7 @@ func NewLibp2pPeer(ctx context.Context, reporter metrics.Reporter) (host.Host, r security := libp2p.Security(secio.ID, secio.New) - listenAddrs := libp2p.ListenAddrStrings( - "/ip4/0.0.0.0/tcp/4001", - ) + listenAddrs := libp2p.ListenAddrStrings(addr) bwReporter := libp2p.BandwidthReporter(reporter) diff --git a/providers/inmemory/provider.go b/providers/inmemory/provider.go index d8ad1aed..4679d593 100644 --- a/providers/inmemory/provider.go +++ b/providers/inmemory/provider.go @@ -23,6 +23,8 @@ import ( "github.com/Netflix/p2plab" "github.com/Netflix/p2plab/labagent" "github.com/Netflix/p2plab/metadata" + "github.com/phayes/freeport" + "github.com/rs/xid" "github.com/rs/zerolog" ) @@ -33,41 +35,73 @@ type provider struct { agentOpts []labagent.LabagentOption } -func New(root string, logger *zerolog.Logger, agentOpts ...labagent.LabagentOption) (p2plab.NodeProvider, error) { +func New(root string, db metadata.DB, logger *zerolog.Logger, agentOpts ...labagent.LabagentOption) (p2plab.NodeProvider, error) { err := os.MkdirAll(root, 0711) if err != nil { return nil, err } - return &provider{ + p := &provider{ root: root, nodes: make(map[string][]*node), logger: logger, agentOpts: agentOpts, - }, nil + } + + ctx := context.Background() + clusters, err := db.ListClusters(ctx) + if err != nil { + return nil, err + } + + for _, cluster := range clusters { + nodes, err := db.ListNodes(ctx, cluster.ID) + if err != nil { + return nil, err + } + + for _, node := range nodes { + n, err := p.newNode(node.ID, node.AgentPort, node.AppPort) + if err != nil { + return nil, err + } + p.nodes[node.ID] = append(p.nodes[node.ID], n) + } + } + + return p, nil } func (p *provider) CreateNodeGroup(ctx context.Context, id string, cdef metadata.ClusterDefinition) (*p2plab.NodeGroup, error) { var ns []metadata.Node for _, group := range cdef.Groups { - n, err := newNode() - if err != nil { - return nil, err + for i := 0; i < group.Size; i++ { + freePorts, err := freeport.GetFreePorts(2) + if err != nil { + return nil, err + } + agentPort, appPort := freePorts[0], freePorts[1] + + id := xid.New().String() + n, err := p.newNode(id, agentPort, appPort) + if err != nil { + return nil, err + } + p.nodes[id] = append(p.nodes[id], n) + + ns = append(ns, metadata.Node{ + ID: n.ID, + Address: "127.0.0.1", + AgentPort: n.AgentPort, + AppPort: n.AppPort, + GitReference: group.GitReference, + Labels: []string{ + n.ID, + group.InstanceType, + group.Region, + }, + }) } - p.nodes[id] = append(p.nodes[id], n) - - ns = append(ns, metadata.Node{ - ID: n.ID, - Address: "127.0.0.1", - AgentPort: n.AgentPort, - AppPort: n.AppPort, - GitReference: group.GitReference, - Labels: []string{ - n.ID, - group.InstanceType, - group.Region, - }, - }) } return &p2plab.NodeGroup{ @@ -77,6 +111,14 @@ func (p *provider) CreateNodeGroup(ctx context.Context, id string, cdef metadata } func (p *provider) DestroyNodeGroup(ctx context.Context, ng *p2plab.NodeGroup) error { + for _, n := range p.nodes[ng.ID] { + err := n.Close() + if err != nil { + return err + } + } + + delete(p.nodes, ng.ID) return nil } @@ -85,17 +127,10 @@ type node struct { AgentPort int AppPort int LabAgent *labagent.LabAgent + cancel context.CancelFunc } -func (p *provider) newNode() (*node, error) { - freePorts, err := freeport.GetFreePorts(2) - if err != nil { - return nil, err - } - agentPort, appPort := freePorts[0], freePorts[1] - - id := xid.New() - +func (p *provider) newNode(id string, agentPort, appPort int) (*node, error) { agentRoot := filepath.Join(p.root, id, "labagent") agentAddr := fmt.Sprintf(":%d", agentPort) @@ -107,14 +142,24 @@ func (p *provider) newNode() (*node, error) { return nil, err } + ctx, cancel := context.WithCancel(context.Background()) + go func() { + err := la.Serve(ctx) + if err != nil { + p.logger.Error().Err(err).Str("id", id).Msg("serve exited with error") + } + }() + return &node{ ID: id, AgentPort: agentPort, AppPort: appPort, LabAgent: la, + cancel: cancel, }, nil } func (n *node) Close() error { + n.cancel() return n.LabAgent.Close() } diff --git a/providers/providers.go b/providers/providers.go index 4c959398..d46f0ebe 100644 --- a/providers/providers.go +++ b/providers/providers.go @@ -18,17 +18,32 @@ import ( "path/filepath" "github.com/Netflix/p2plab" + "github.com/Netflix/p2plab/downloaders" + "github.com/Netflix/p2plab/downloaders/s3downloader" "github.com/Netflix/p2plab/errdefs" + "github.com/Netflix/p2plab/labagent" + "github.com/Netflix/p2plab/metadata" + "github.com/Netflix/p2plab/providers/inmemory" "github.com/Netflix/p2plab/providers/terraform" "github.com/pkg/errors" + "github.com/rs/zerolog" ) type ProviderSettings struct { + DB metadata.DB + Logger *zerolog.Logger } func GetNodeProvider(root, providerType string, settings ProviderSettings) (p2plab.NodeProvider, error) { root = filepath.Join(root, providerType) switch providerType { + case "inmemory": + return inmemory.New(root, settings.DB, settings.Logger, labagent.WithDownloaderSettings(downloaders.DownloaderSettings{ + S3: s3downloader.S3DownloaderSettings{ + Region: "us-west-2", + }, + }), + ) case "terraform": return terraform.New(root) default: diff --git a/uploaders/fileuploader/uploader.go b/uploaders/fileuploader/uploader.go new file mode 100644 index 00000000..aa19edd7 --- /dev/null +++ b/uploaders/fileuploader/uploader.go @@ -0,0 +1,123 @@ +// Copyright 2019 Netflix, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileuploader + +import ( + "bytes" + "context" + "fmt" + "io" + "io/ioutil" + "net/http" + "os" + "path/filepath" + "time" + + "github.com/Netflix/p2plab" + cid "github.com/ipfs/go-cid" + multihash "github.com/multiformats/go-multihash" + "github.com/rs/zerolog" +) + +type FileUploaderSettings struct { + Address string +} + +type uploader struct { + root string + cancel context.CancelFunc + cidBuilder cid.Builder +} + +func New(root string, logger *zerolog.Logger, settings FileUploaderSettings) (p2plab.Uploader, error) { + root, err := filepath.Abs(root) + if err != nil { + return nil, err + } + + err = os.MkdirAll(root, 0711) + if err != nil { + return nil, err + } + + s := &http.Server{ + Handler: http.FileServer(http.Dir(root)), + Addr: settings.Address, + ReadHeaderTimeout: 20 * time.Second, + ReadTimeout: 1 * time.Minute, + WriteTimeout: 30 * time.Minute, + } + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + <-ctx.Done() + err := s.Shutdown(ctx) + if err != nil { + logger.Error().Err(err).Msg("failed to shutdown fsuploader") + } + }() + + go func() { + err := s.ListenAndServe() + if err != nil { + logger.Error().Err(err).Msg("failed to serve fsuploader") + } + }() + + return &uploader{ + root: root, + cancel: cancel, + cidBuilder: cid.V1Builder{MhType: multihash.SHA2_256}, + }, nil +} + +func (u *uploader) Close() error { + u.cancel() + return nil +} + +func (u *uploader) Upload(ctx context.Context, r io.Reader) (link string, err error) { + content, err := ioutil.ReadAll(r) + if err != nil { + return "", err + } + + c, err := u.cidBuilder.Sum(content) + if err != nil { + return "", err + } + + link = fmt.Sprintf("file://%s/%s", u.root, c) + uploadPath := filepath.Join(u.root, c.String()) + _, err = os.Stat(uploadPath) + if err != nil && !os.IsNotExist(err) { + return "", err + } else if err == nil { + return link, nil + } + + f, err := os.Create(uploadPath) + if err != nil { + return "", err + } + defer f.Close() + + _, err = io.Copy(f, bytes.NewReader(content)) + if err != nil { + return "", err + } + + return link, nil +} diff --git a/uploaders/s3uploader/uploader.go b/uploaders/s3uploader/uploader.go index 8d48526f..045a38ab 100644 --- a/uploaders/s3uploader/uploader.go +++ b/uploaders/s3uploader/uploader.go @@ -65,6 +65,10 @@ func New(client *http.Client, settings S3UploaderSettings) (p2plab.Uploader, err }, nil } +func (u *uploader) Close() error { + return nil +} + func (u *uploader) Upload(ctx context.Context, r io.Reader) (link string, err error) { content, err := ioutil.ReadAll(r) if err != nil { diff --git a/uploaders/uploaders.go b/uploaders/uploaders.go index 0d486ee0..a573cf9b 100644 --- a/uploaders/uploaders.go +++ b/uploaders/uploaders.go @@ -20,18 +20,24 @@ import ( "github.com/Netflix/p2plab" "github.com/Netflix/p2plab/errdefs" "github.com/Netflix/p2plab/pkg/httputil" + "github.com/Netflix/p2plab/uploaders/fileuploader" "github.com/Netflix/p2plab/uploaders/s3uploader" "github.com/pkg/errors" + "github.com/rs/zerolog" ) type UploaderSettings struct { Client *httputil.Client + Logger *zerolog.Logger S3 s3uploader.S3UploaderSettings + File fileuploader.FileUploaderSettings } func GetUploader(root, uploaderType string, settings UploaderSettings) (p2plab.Uploader, error) { root = filepath.Join(root, uploaderType) switch uploaderType { + case "file": + return fileuploader.New(root, settings.Logger, settings.File) case "s3": return s3uploader.New(settings.Client.HTTPClient, settings.S3) default: