Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
api, chunk, storage: anonymous uploads (#1828)
Browse files Browse the repository at this point in the history
* pushsync, storage, api, chunk, network: anonymous uploads - first iteration with test vectors and cli integration to allow uploads to use only pullsync (using `--anonymous` flag) or both pull/push sync for an upload. This also adds an `Anonymous` field in the `Tag` struct to maintain the state of anonymity of the upload
  • Loading branch information
acud authored Oct 8, 2019
1 parent a2c765f commit 675b761
Show file tree
Hide file tree
Showing 36 changed files with 385 additions and 120 deletions.
7 changes: 5 additions & 2 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestApiTagLarge(t *testing.T) {
const contentLength = 4096 * 4095
testAPI(t, func(api *API, tags *chunk.Tags, toEncrypt bool) {
randomContentReader := io.LimitReader(crand.Reader, int64(contentLength))
tag, err := api.Tags.Create("unnamed-tag", 0)
tag, err := api.Tags.Create("unnamed-tag", 0, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -549,7 +549,10 @@ func TestDetectContentType(t *testing.T) {
// putString provides singleton manifest creation on top of api.API
func putString(ctx context.Context, a *API, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) {
r := strings.NewReader(content)
tag, err := a.Tags.Create("unnamed-tag", 0)
tag, err := a.Tags.Create("unnamed-tag", 0, false)
if err != nil {
return nil, nil, err
}

log.Trace("created new tag", "id", tag.Uid)

Expand Down
18 changes: 9 additions & 9 deletions api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type Client struct {

// UploadRaw uploads raw data to swarm and returns the resulting hash. If toEncrypt is true it
// uploads encrypted data
func (c *Client) UploadRaw(r io.Reader, size int64, toEncrypt bool, toPin bool) (string, error) {
func (c *Client) UploadRaw(r io.Reader, size int64, toEncrypt, toPin, anonymous bool) (string, error) {
if size <= 0 {
return "", errors.New("data size must be greater than zero")
}
Expand Down Expand Up @@ -163,11 +163,11 @@ func Open(path string) (*File, error) {
// (if the manifest argument is non-empty) or creates a new manifest containing
// the file, returning the resulting manifest hash (the file will then be
// available at bzz:/<hash>/<path>)
func (c *Client) Upload(file *File, manifest string, toEncrypt bool, toPin bool) (string, error) {
func (c *Client) Upload(file *File, manifest string, toEncrypt, toPin, anonymous bool) (string, error) {
if file.Size <= 0 {
return "", errors.New("file size must be greater than zero")
}
return c.TarUpload(manifest, &FileUploader{file}, "", toEncrypt, toPin)
return c.TarUpload(manifest, &FileUploader{file}, "", toEncrypt, toPin, anonymous)
}

// Download downloads a file with the given path from the swarm manifest with
Expand Down Expand Up @@ -197,7 +197,7 @@ func (c *Client) Download(hash, path string) (*File, error) {
// directory will then be available at bzz:/<hash>/path/to/file), with
// the file specified in defaultPath being uploaded to the root of the manifest
// (i.e. bzz:/<hash>/)
func (c *Client) UploadDirectory(dir, defaultPath, manifest string, toEncrypt bool, toPin bool) (string, error) {
func (c *Client) UploadDirectory(dir, defaultPath, manifest string, toEncrypt, toPin, anonymous bool) (string, error) {
stat, err := os.Stat(dir)
if err != nil {
return "", err
Expand All @@ -212,7 +212,7 @@ func (c *Client) UploadDirectory(dir, defaultPath, manifest string, toEncrypt bo
return "", fmt.Errorf("default path: %v", err)
}
}
return c.TarUpload(manifest, &DirectoryUploader{dir}, defaultPath, toEncrypt, toPin)
return c.TarUpload(manifest, &DirectoryUploader{dir}, defaultPath, toEncrypt, toPin, anonymous)
}

// DownloadDirectory downloads the files contained in a swarm manifest under
Expand Down Expand Up @@ -370,13 +370,13 @@ func (c *Client) DownloadFile(hash, path, dest, credentials string) error {
}

// UploadManifest uploads the given manifest to swarm
func (c *Client) UploadManifest(m *api.Manifest, toEncrypt bool, toPin bool) (string, error) {
func (c *Client) UploadManifest(m *api.Manifest, toEncrypt, toPin, anonymous bool) (string, error) {
data, err := json.Marshal(m)
if err != nil {
return "", err
}

return c.UploadRaw(bytes.NewReader(data), int64(len(data)), toEncrypt, toPin)
return c.UploadRaw(bytes.NewReader(data), int64(len(data)), toEncrypt, toPin, anonymous)
}

// DownloadManifest downloads a swarm manifest
Expand Down Expand Up @@ -511,7 +511,7 @@ type UploadFn func(file *File) error

// TarUpload uses the given Uploader to upload files to swarm as a tar stream,
// returning the resulting manifest hash
func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, toEncrypt bool, toPin bool) (string, error) {
func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, toEncrypt, toPin, anonymous bool) (string, error) {
ctx, sp := spancontext.StartSpan(context.Background(), "api.client.tarupload")
defer sp.Finish()

Expand Down Expand Up @@ -609,7 +609,7 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t

// MultipartUpload uses the given Uploader to upload files to swarm as a
// multipart form, returning the resulting manifest hash
func (c *Client) MultipartUpload(hash string, uploader Uploader, toPin bool) (string, error) {
func (c *Client) MultipartUpload(hash string, uploader Uploader, toPin, anonymous bool) (string, error) {
reqR, reqW := io.Pipe()
defer reqR.Close()
req, err := http.NewRequest("POST", c.Gateway+"/bzz:/"+hash, reqR)
Expand Down
14 changes: 7 additions & 7 deletions api/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestClientUploadDownloadRawEncrypted(t *testing.T) {
func testClientUploadDownloadRaw(srv *swarmhttp.TestSwarmServer, toEncrypt bool, t *testing.T, data []byte, toPin bool) string {
client := NewClient(srv.URL)

hash, err := client.UploadRaw(bytes.NewReader(data), int64(len(data)), toEncrypt, toPin)
hash, err := client.UploadRaw(bytes.NewReader(data), int64(len(data)), toEncrypt, toPin, true)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -125,7 +125,7 @@ func testClientUploadDownloadFiles(toEncrypt bool, t *testing.T) string {
Size: int64(len(data)),
},
}
hash, err := client.Upload(file, manifest, toEncrypt, toPin)
hash, err := client.Upload(file, manifest, toEncrypt, toPin, true)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestClientUploadDownloadDirectory(t *testing.T) {
// upload the directory
client := NewClient(srv.URL)
defaultPath := testDirFiles[0]
hash, err := client.UploadDirectory(dir, defaultPath, "", false, false)
hash, err := client.UploadDirectory(dir, defaultPath, "", false, false, true)
if err != nil {
t.Fatalf("error uploading directory: %s", err)
}
Expand Down Expand Up @@ -289,7 +289,7 @@ func testClientFileList(toEncrypt bool, t *testing.T) {
defer os.RemoveAll(dir)

client := NewClient(srv.URL)
hash, err := client.UploadDirectory(dir, "", "", toEncrypt, false)
hash, err := client.UploadDirectory(dir, "", "", toEncrypt, false, true)
if err != nil {
t.Fatalf("error uploading directory: %s", err)
}
Expand Down Expand Up @@ -365,7 +365,7 @@ func TestClientMultipartUpload(t *testing.T) {

// upload the files as a multipart upload
client := NewClient(srv.URL)
hash, err := client.MultipartUpload("", uploader, false)
hash, err := client.MultipartUpload("", uploader, false, true)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -403,7 +403,7 @@ func TestClientQueryTagByHash(t *testing.T) {
data := []byte("foo123")
client := NewClient(srv.URL)

hash, err := client.UploadRaw(bytes.NewReader(data), int64(len(data)), false, false)
hash, err := client.UploadRaw(bytes.NewReader(data), int64(len(data)), false, false, true)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -475,7 +475,7 @@ func TestClientBzzWithFeed(t *testing.T) {
}

// upload data to bzz:// and retrieve the content-addressed manifest hash, hex-encoded.
manifestAddressHex, err := swarmClient.Upload(f, "", false, false)
manifestAddressHex, err := swarmClient.Upload(f, "", false, false, true)
if err != nil {
t.Fatalf("Error creating manifest: %s", err)
}
Expand Down
8 changes: 5 additions & 3 deletions api/http/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"net/http"
"runtime/debug"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -109,6 +110,7 @@ func InitUploadTag(h http.Handler, tags *chunk.Tags) http.Handler {
estimatedTotal int64 = 0
contentType = r.Header.Get("Content-Type")
headerTag = r.Header.Get(TagHeaderName)
anonTag = r.Header.Get(AnonymousHeaderName)
)
if headerTag != "" {
tagName = headerTag
Expand All @@ -131,8 +133,8 @@ func InitUploadTag(h http.Handler, tags *chunk.Tags) http.Handler {
}

log.Trace("creating tag", "tagName", tagName, "estimatedTotal", estimatedTotal)

t, err := tags.Create(tagName, estimatedTotal)
anon, _ := strconv.ParseBool(anonTag)
t, err := tags.Create(tagName, estimatedTotal, anon)
if err != nil {
log.Error("error creating tag", "err", err, "tagName", tagName)
}
Expand Down Expand Up @@ -168,7 +170,7 @@ func PinningEnabledPassthrough(h http.Handler, api *pin.API, checkHeader bool) h
// if checkHeader is true, it means that the passthrough should happen if the header is set and the pinAPI is not nil
if checkHeader {
headerPin := r.Header.Get(PinHeaderName)
if strings.ToLower(headerPin) == "true" && api == nil {
if shouldPin, _ := strconv.ParseBool(headerPin); shouldPin && api == nil {
respondError(w, r, "Pinning disabled on this node", http.StatusForbidden)
return
}
Expand Down
6 changes: 4 additions & 2 deletions api/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ var (
)

const (
TagHeaderName = "x-swarm-tag" // Presence of this in header indicates the tag
PinHeaderName = "x-swarm-pin" // Presence of this in header indicates pinning required
TagHeaderName = "x-swarm-tag" // Presence of this in header indicates the tag
AnonymousHeaderName = "x-swarm-anonymous" // Presence of this in header indicates only pull sync should be used for upload
PinHeaderName = "x-swarm-pin" // Presence of this in header indicates pinning required

encryptAddr = "encrypt"
tarContentType = "application/x-tar"
)
Expand Down
19 changes: 15 additions & 4 deletions api/http/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,18 @@ func TestGetTagUsingHash(t *testing.T) {

// upload a file
data := testutil.RandomBytes(1, 10000)
resp, err := http.Post(fmt.Sprintf("%s/bzz-raw:/", srv.URL), "text/plain", bytes.NewReader(data))

req, err := http.NewRequest("POST", srv.URL+"/bzz-raw:/", bytes.NewReader(data))
if err != nil {
t.Fatal(err)
}
req.Header.Add(AnonymousHeaderName, "true")
req.Header.Add("Content-Type", "text/plain")
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
}

defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("err %s", resp.Status)
Expand Down Expand Up @@ -120,6 +128,10 @@ func TestGetTagUsingHash(t *testing.T) {
t.Fatalf("retrieved total tag count mismatch, expected %x, got %x", 4, tag.TotalCounter())
}

if tag.Anonymous != true {
t.Fatalf("expected tag anonymous field to be %t but got %t", true, tag.Anonymous)
}

if !strings.HasPrefix(tag.Name, "unnamed_tag_") {
t.Fatalf("retrieved name prefix mismatch, expected %x, got %x", "unnamed_tag_", tag.Name)
}
Expand Down Expand Up @@ -1454,8 +1466,7 @@ func TestBzzGetFileWithResolver(t *testing.T) {
t.Fatal(err)
}
req.Header.Add("Content-Type", "application/x-tar")
client := &http.Client{}
serverResponse, err := client.Do(req)
serverResponse, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1496,7 +1507,7 @@ func TestBzzGetFileWithResolver(t *testing.T) {
if err != nil {
t.Fatal(err)
}
serverResponse, err := client.Do(req)
serverResponse, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
}
Expand Down
12 changes: 8 additions & 4 deletions chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,10 @@ func (m ModeSet) String() string {
switch m {
case ModeSetAccess:
return "Access"
case ModeSetSync:
return "Sync"
case ModeSetSyncPush:
return "SyncPush"
case ModeSetSyncPull:
return "SyncPull"
case ModeSetRemove:
return "Remove"
case ModeSetPin:
Expand All @@ -229,8 +231,10 @@ func (m ModeSet) String() string {
const (
// ModeSetAccess: when an update request is received for a chunk or chunk is retrieved for delivery
ModeSetAccess ModeSet = iota
// ModeSetSync: when a chunk is added to a pull sync batch or when a push sync receipt is received
ModeSetSync
// ModeSetSyncPush: when a push sync receipt is received for a chunk
ModeSetSyncPush
// ModeSetSyncPull: when a chunk is added to a pull sync batch
ModeSetSyncPull
// ModeSetRemove: when a chunk is removed
ModeSetRemove
// ModeSetPin: when a chunk is pinned during upload or separately
Expand Down
4 changes: 3 additions & 1 deletion chunk/tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
// Tag represents info on the status of new chunks
type Tag struct {
Uid uint32 // a unique identifier for this tag
Anonymous bool // indicates if the tag is anonymous (i.e. if only pull sync should be used)
Name string // a name tag for this tag
Address Address // the associated swarm hash for this tag
Total int64 // total chunks belonging to a tag
Expand All @@ -64,9 +65,10 @@ type Tag struct {
}

// NewTag creates a new tag, and returns it
func NewTag(uid uint32, s string, total int64) *Tag {
func NewTag(uid uint32, s string, total int64, anon bool) *Tag {
t := &Tag{
Uid: uid,
Anonymous: anon,
Name: s,
StartedAt: time.Now(),
Total: total,
Expand Down
9 changes: 6 additions & 3 deletions chunk/tag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestTagsMultipleConcurrentIncrementsSyncMap(t *testing.T) {
wg.Add(10 * 5 * n)
for i := 0; i < 10; i++ {
s := string([]byte{uint8(i)})
tag, err := ts.Create(s, int64(n))
tag, err := ts.Create(s, int64(n), false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestTagsMultipleConcurrentIncrementsSyncMap(t *testing.T) {
// TestMarshallingWithAddr tests that marshalling and unmarshalling is done correctly when the
// tag Address (byte slice) contains some arbitrary value
func TestMarshallingWithAddr(t *testing.T) {
tg := NewTag(111, "test/tag", 10)
tg := NewTag(111, "test/tag", 10, false)
tg.Address = []byte{0, 1, 2, 3, 4, 5, 6}

for _, f := range allStates {
Expand All @@ -208,6 +208,9 @@ func TestMarshallingWithAddr(t *testing.T) {
if unmarshalledTag.Name != tg.Name {
t.Fatalf("tag names not equal. want %s got %s", tg.Name, unmarshalledTag.Name)
}
if unmarshalledTag.Anonymous != tg.Anonymous {
t.Fatalf("tag anon field not equal. want %t got %t", tg.Anonymous, unmarshalledTag.Anonymous)
}

for _, state := range allStates {
uv, tv := unmarshalledTag.Get(state), tg.Get(state)
Expand All @@ -231,7 +234,7 @@ func TestMarshallingWithAddr(t *testing.T) {

// TestMarshallingNoAddress tests that marshalling and unmarshalling is done correctly
func TestMarshallingNoAddr(t *testing.T) {
tg := NewTag(111, "test/tag", 10)
tg := NewTag(111, "test/tag", 10, false)
for _, f := range allStates {
tg.Inc(f)
}
Expand Down
6 changes: 4 additions & 2 deletions chunk/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/ethersphere/swarm/sctx"
)

var TagUidFunc = rand.Uint32

// Tags hold tag information indexed by a unique random uint32
type Tags struct {
tags *sync.Map
Expand All @@ -41,8 +43,8 @@ func NewTags() *Tags {

// Create creates a new tag, stores it by the name and returns it
// it returns an error if the tag with this name already exists
func (ts *Tags) Create(s string, total int64) (*Tag, error) {
t := NewTag(rand.Uint32(), s, total)
func (ts *Tags) Create(s string, total int64, anon bool) (*Tag, error) {
t := NewTag(TagUidFunc(), s, total, anon)

if _, loaded := ts.tags.LoadOrStore(t.Uid, t); loaded {
return nil, errExists
Expand Down
6 changes: 3 additions & 3 deletions chunk/tags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (

func TestAll(t *testing.T) {
ts := NewTags()
ts.Create("1", 1)
ts.Create("2", 1)
ts.Create("1", 1, false)
ts.Create("2", 1, false)

all := ts.All()

Expand All @@ -39,7 +39,7 @@ func TestAll(t *testing.T) {
t.Fatalf("expected tag 1 Total to be 1 got %d", n)
}

ts.Create("3", 1)
ts.Create("3", 1, false)
all = ts.All()

if len(all) != 3 {
Expand Down
2 changes: 1 addition & 1 deletion cmd/swarm-smoke/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func uploadWithTag(data []byte, endpoint string, tag string) (string, error) {
Tag: tag,
}

return swarm.TarUpload("", &client.FileUploader{File: f}, "", false, false)
return swarm.TarUpload("", &client.FileUploader{File: f}, "", false, false, true)
}

func digest(r io.Reader) ([]byte, error) {
Expand Down
Loading

0 comments on commit 675b761

Please sign in to comment.