Skip to content

Commit

Permalink
Add inmemory implementations for provider, uploader, downloader
Browse files Browse the repository at this point in the history
  • Loading branch information
hinshun committed Aug 28, 2019
1 parent ee342d3 commit 7a10bae
Show file tree
Hide file tree
Showing 20 changed files with 351 additions and 68 deletions.
8 changes: 7 additions & 1 deletion cmd/labapp/command/root.go
Expand Up @@ -42,6 +42,12 @@ func App(ctx context.Context) *cli.App {
Value: ":7003",
EnvVar: "LABAPP_ADDRESS",
},
cli.StringFlag{
Name: "libp2p-address",
Usage: "address for libp2p",
Value: "/ip4/0.0.0.0/tcp/0",
EnvVar: "LABAPP_LIBP2P_ADDRESS",
},
cli.StringFlag{
Name: "log-level,l",
Usage: "set the logging level [debug, info, warn, error, fatal, panic, none]",
Expand All @@ -65,7 +71,7 @@ func appAction(c *cli.Context) error {
}

ctx := cliutil.CommandContext(c)
app, err := labapp.New(root, c.GlobalString("address"), zerolog.Ctx(ctx))
app, err := labapp.New(root, c.GlobalString("address"), c.GlobalString("libp2p-address"), zerolog.Ctx(ctx))
if err != nil {
return err
}
Expand Down
37 changes: 27 additions & 10 deletions cmd/labd/command/root.go
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/Netflix/p2plab/labd"
"github.com/Netflix/p2plab/pkg/cliutil"
"github.com/Netflix/p2plab/uploaders"
"github.com/Netflix/p2plab/uploaders/fileuploader"
"github.com/Netflix/p2plab/uploaders/s3uploader"
"github.com/Netflix/p2plab/version"
"github.com/rs/zerolog"
Expand All @@ -44,6 +45,12 @@ func App(ctx context.Context) *cli.App {
Value: ":7001",
EnvVar: "LABD_ADDRESS",
},
cli.StringFlag{
Name: "libp2p-address",
Usage: "address for libp2p",
Value: "/ip4/0.0.0.0/tcp/0",
EnvVar: "LABD_LIBP2P_ADDRESS",
},
cli.StringFlag{
Name: "log-level,l",
Usage: "set the logging level [debug, info, warn, error, fatal, panic]",
Expand All @@ -52,14 +59,14 @@ func App(ctx context.Context) *cli.App {
},
cli.StringFlag{
Name: "provider,p",
Usage: "set the provider to create node groups [terraform]",
Value: "terraform",
Usage: "set the provider to create node groups [inmemory, terraform]",
Value: "inmemory",
EnvVar: "LABD_PROVIDER",
},
cli.StringFlag{
Name: "uploader,u",
Usage: "set the uploader to use to distribute p2p app binaries [s3]",
Value: "s3",
Usage: "set the uploader to use to distribute p2p app binaries [file, s3]",
Value: "file",
EnvVar: "LABD_UPLOADER",
},
cli.StringFlag{
Expand All @@ -77,6 +84,12 @@ func App(ctx context.Context) *cli.App {
Usage: "region for s3 uploader",
EnvVar: "LABD_UPLOADER_S3_REGION",
},
cli.StringFlag{
Name: "uploader.file.address",
Usage: "address for file uploader",
Value: ":7000",
EnvVar: "LABD_UPLOADER_FILE_ADDRESS",
},
}
app.Action = daemonAction

Expand All @@ -94,14 +107,18 @@ func daemonAction(c *cli.Context) error {
}

ctx := cliutil.CommandContext(c)
daemon, err := labd.New(root, c.String("address"), zerolog.Ctx(ctx),
labd.WithProvider(c.String("provider")),
labd.WithUploader(c.String("uploader")),
daemon, err := labd.New(root, c.GlobalString("address"), zerolog.Ctx(ctx),
labd.WithLibp2pAddress(c.GlobalString("libp2p-address")),
labd.WithProvider(c.GlobalString("provider")),
labd.WithUploader(c.GlobalString("uploader")),
labd.WithUploaderSettings(uploaders.UploaderSettings{
S3: s3uploader.S3UploaderSettings{
Bucket: c.String("uploader.s3.bucket"),
Prefix: c.String("uploader.s3.prefix"),
Region: c.String("uploader.s3.region"),
Bucket: c.GlobalString("uploader.s3.bucket"),
Prefix: c.GlobalString("uploader.s3.prefix"),
Region: c.GlobalString("uploader.s3.region"),
},
File: fileuploader.FileUploaderSettings{
Address: c.GlobalString("uploader.file.address"),
},
}),
)
Expand Down
2 changes: 1 addition & 1 deletion cmd/ociadd/main.go
Expand Up @@ -54,7 +54,7 @@ func run(ref string) error {
defer cancel()

root := "./tmp/ociadd"
p, err := peer.New(ctx, filepath.Join(root, "peer"))
p, err := peer.New(ctx, filepath.Join(root, "peer"), "/ip4/0.0.0.0/tcp/0")
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/ociget/main.go
Expand Up @@ -58,7 +58,7 @@ func run(addr, ref string) error {
defer cancel()

root := "./tmp/ociget"
p, err := peer.New(ctx, filepath.Join(root, "peer"))
p, err := peer.New(ctx, filepath.Join(root, "peer"), "/ip4/0.0.0.0/tcp/0")
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions distribution.go
Expand Up @@ -29,6 +29,8 @@ type Builder interface {

type Uploader interface {
Upload(ctx context.Context, r io.Reader) (link string, err error)

Close() error
}

type Downloader interface {
Expand Down
7 changes: 5 additions & 2 deletions downloaders/downloaders.go
Expand Up @@ -18,6 +18,7 @@ import (
"sync"

"github.com/Netflix/p2plab"
"github.com/Netflix/p2plab/downloaders/filedownloader"
"github.com/Netflix/p2plab/downloaders/httpdownloader"
"github.com/Netflix/p2plab/downloaders/s3downloader"
"github.com/Netflix/p2plab/pkg/httputil"
Expand Down Expand Up @@ -64,10 +65,12 @@ func (f *Downloaders) Get(downloaderType string) (p2plab.Downloader, error) {
func (f *Downloaders) newDownloader(downloaderType string) (p2plab.Downloader, error) {
// root := filepath.Join(f.root, downloaderType)
switch downloaderType {
case "s3":
return s3downloader.New(f.settings.Client.HTTPClient, f.settings.S3)
case "file":
return filedownloader.New(), nil
case "http", "https":
return httpdownloader.New(f.settings.Client), nil
case "s3":
return s3downloader.New(f.settings.Client.HTTPClient, f.settings.S3)
default:
return nil, errors.Errorf("unrecognized downloader type: %q", downloaderType)
}
Expand Down
44 changes: 44 additions & 0 deletions downloaders/filedownloader/downloader.go
@@ -0,0 +1,44 @@
// Copyright 2019 Netflix, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package filedownloader

import (
"context"
"io"
"net/url"
"os"
"path/filepath"

"github.com/Netflix/p2plab"
"github.com/Netflix/p2plab/errdefs"
"github.com/pkg/errors"
)

type downloader struct {
}

func New() p2plab.Downloader {
return &downloader{}
}

func (f *downloader) Download(ctx context.Context, link string) (io.ReadCloser, error) {
u, err := url.Parse(link)
if err != nil {
return nil, errors.Wrapf(errdefs.ErrInvalidArgument, "invalid url %q", link)
}

downloadPath := filepath.Join(u.Host, u.Path)
return os.Open(downloadPath)
}
4 changes: 2 additions & 2 deletions downloaders/httpdownloader/downloader.go
Expand Up @@ -30,8 +30,8 @@ func New(client *httputil.Client) p2plab.Downloader {
return &downloader{client}
}

func (f *downloader) Download(ctx context.Context, ref string) (io.ReadCloser, error) {
req := f.client.NewRequest("GET", ref)
func (f *downloader) Download(ctx context.Context, link string) (io.ReadCloser, error) {
req := f.client.NewRequest("GET", link)
resp, err := req.Send(ctx)
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Expand Up @@ -56,8 +56,9 @@ require (
github.com/opencontainers/runc v1.0.0-rc8 // indirect
github.com/opentracing-contrib/go-stdlib v0.0.0-20190519235532-cf7a6c988dc9
github.com/opentracing/opentracing-go v1.1.0
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 // indirect
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/pkg/errors v0.8.1
github.com/rs/xid v1.2.1
github.com/rs/zerolog v1.14.4-0.20190719171043-b806a5ecbe53
github.com/sirupsen/logrus v1.4.0 // indirect
github.com/stretchr/testify v1.3.0
Expand Down
4 changes: 2 additions & 2 deletions labapp/labapp.go
Expand Up @@ -30,10 +30,10 @@ type LabApp struct {
closers []io.Closer
}

func New(root, addr string, logger *zerolog.Logger) (*LabApp, error) {
func New(root, addr, libp2pAddr string, logger *zerolog.Logger) (*LabApp, error) {
var closers []io.Closer
pctx, cancel := context.WithCancel(context.Background())
p, err := peer.New(pctx, root)
p, err := peer.New(pctx, root, libp2pAddr)
if err != nil {
return nil, err
}
Expand Down
6 changes: 5 additions & 1 deletion labd/labd.go
Expand Up @@ -66,24 +66,28 @@ func New(root, addr string, logger *zerolog.Logger, opts ...LabdOption) (*Labd,
return nil, err
}

settings.ProviderSettings.DB = db
settings.ProviderSettings.Logger = logger
provider, err := providers.GetNodeProvider(filepath.Join(root, "providers"), settings.Provider, settings.ProviderSettings)
if err != nil {
return nil, err
}

settings.UploaderSettings.Client = client
settings.UploaderSettings.Logger = logger
uploader, err := uploaders.GetUploader(filepath.Join(root, "uploaders"), settings.Uploader, settings.UploaderSettings)
if err != nil {
return nil, err
}
closers = append(closers, uploader)

builder, err := builder.New(filepath.Join(root, "builder"), db, uploader)
if err != nil {
return nil, err
}

sctx, cancel := context.WithCancel(context.Background())
seeder, err := peer.New(sctx, filepath.Join(root, "seeder"))
seeder, err := peer.New(sctx, filepath.Join(root, "seeder"), settings.Libp2pAddress)
if err != nil {
return nil, errors.Wrap(err, "failed to create seeder peer")
}
Expand Down
8 changes: 8 additions & 0 deletions labd/settings.go
Expand Up @@ -22,12 +22,20 @@ import (
type LabdOption func(*LabdSettings) error

type LabdSettings struct {
Libp2pAddress string
Provider string
ProviderSettings providers.ProviderSettings
Uploader string
UploaderSettings uploaders.UploaderSettings
}

func WithLibp2pAddress(addr string) LabdOption {
return func(s *LabdSettings) error {
s.Libp2pAddress = addr
return nil
}
}

func WithProvider(provider string) LabdOption {
return func(s *LabdSettings) error {
s.Provider = provider
Expand Down
18 changes: 10 additions & 8 deletions metadata/buckets.go
Expand Up @@ -39,7 +39,9 @@ var (
bucketKeySource = []byte("source")

// Node buckets.
bucketKeyAddress = []byte("address")
bucketKeyAddress = []byte("address")
bucketKeyAgentPort = []byte("agentPort")
bucketKeyAppPort = []byte("appPort")

// Build buckets
bucketKeyLink = []byte("link")
Expand All @@ -49,15 +51,15 @@ var (
bucketKeyScenario = []byte("scenario")
bucketKeyPlan = []byte("plan")
bucketKeySubject = []byte("subject")
bucketKeyReport = []byte("report")
bucketKeyReport = []byte("report")

// Common buckets.
bucketKeyID = []byte("id")
bucketKeyStatus = []byte("status")
bucketKeyLabels = []byte("labels")
bucketKeyCreatedAt = []byte("createdAt")
bucketKeyUpdatedAt = []byte("updatedAt")
bucketKeyDefinition = []byte("definition")
bucketKeyID = []byte("id")
bucketKeyStatus = []byte("status")
bucketKeyLabels = []byte("labels")
bucketKeyCreatedAt = []byte("createdAt")
bucketKeyUpdatedAt = []byte("updatedAt")
bucketKeyDefinition = []byte("definition")
bucketKeyGitReference = []byte("gitReference")
)

Expand Down
13 changes: 9 additions & 4 deletions nodes/connect.go
Expand Up @@ -36,11 +36,12 @@ func Connect(ctx context.Context, ns []p2plab.Node) error {

collectPeerAddrs, gctx := errgroup.WithContext(ctx)

peerAddrs := make([]string, len(ns))
zerolog.Ctx(ctx).Info().Msg("Retrieving peer infos")
go logutil.Elapsed(gctx, 20*time.Second, "Retrieving peer infos")
for i, n := range ns {
i, n := i, n

var peerAddrs []string
for _, n := range ns {
n := n
collectPeerAddrs.Go(func() error {
peerInfo, err := n.PeerInfo(gctx)
if err != nil {
Expand All @@ -51,7 +52,11 @@ func Connect(ctx context.Context, ns []p2plab.Node) error {
return errors.Errorf("peer %q has zero addresses", n.Metadata().Address)
}

peerAddrs[i] = fmt.Sprintf("/ip4/%s/tcp/4001/p2p/%s", n.Metadata().Address, peerInfo.ID)
for _, ma := range peerInfo.Addrs {
peerAddrs = append(peerAddrs, fmt.Sprintf("%s/p2p/%s", ma, peerInfo.ID))
zerolog.Ctx(gctx).Debug().Str("addr", ma.String()).Msg("Retrieved peer address")
}

return nil
})
}
Expand Down
10 changes: 4 additions & 6 deletions peer/peer.go
Expand Up @@ -85,14 +85,14 @@ type Peer struct {
reporter metrics.Reporter
}

func New(ctx context.Context, root string) (*Peer, error) {
func New(ctx context.Context, root, addr string) (*Peer, error) {
ds, err := NewDatastore(root)
if err != nil {
return nil, errors.Wrap(err, "failed to create datastore")
}

reporter := metrics.NewBandwidthCounter()
h, r, err := NewLibp2pPeer(ctx, reporter)
h, r, err := NewLibp2pPeer(ctx, addr, reporter)
if err != nil {
return nil, errors.Wrap(err, "failed to create libp2p peer")
}
Expand Down Expand Up @@ -307,7 +307,7 @@ func NewDatastore(path string) (datastore.Batching, error) {
return badger.NewDatastore(path, &badger.DefaultOptions)
}

func NewLibp2pPeer(ctx context.Context, reporter metrics.Reporter) (host.Host, routing.ContentRouting, error) {
func NewLibp2pPeer(ctx context.Context, addr string, reporter metrics.Reporter) (host.Host, routing.ContentRouting, error) {
transports := libp2p.ChainOptions(
libp2p.Transport(tcp.NewTCPTransport),
libp2p.Transport(ws.New),
Expand All @@ -319,9 +319,7 @@ func NewLibp2pPeer(ctx context.Context, reporter metrics.Reporter) (host.Host, r

security := libp2p.Security(secio.ID, secio.New)

listenAddrs := libp2p.ListenAddrStrings(
"/ip4/0.0.0.0/tcp/4001",
)
listenAddrs := libp2p.ListenAddrStrings(addr)

bwReporter := libp2p.BandwidthReporter(reporter)

Expand Down

0 comments on commit 7a10bae

Please sign in to comment.