Permalink
Browse files

cleaned up openBlob API

  • Loading branch information...
dustin committed Jan 18, 2014
1 parent 001a78c commit d4a8f57e6238df361aa54c514483dec1bb350a7b
Showing with 111 additions and 90 deletions.
  1. +64 −1 blobs.go
  2. +2 −2 files.go
  3. +15 −0 hash.go
  4. +30 −87 http.go
View
@@ -4,9 +4,11 @@ import (
"encoding/json"
"errors"
"flag"
+ "fmt"
"io"
"io/ioutil"
"log"
+ "math/rand"
"net/http"
"os"
"sort"
@@ -150,7 +152,7 @@ func blobReader(oid string) io.ReadCloser {
}
func copyBlob(w io.Writer, oid string) error {
- f, err := openBlob(oid)
+ f, err := openLocalBlob(oid)
if err == nil {
// Doing it locally
defer f.Close()
@@ -670,3 +672,64 @@ func maybeQueueBlobFetch(oid, prev string) bool {
return false
}
}
+
+type errNotLocal struct {
+ urls []string
+}
+
+func (e errNotLocal) Error() string {
+ return fmt.Sprintf("non-local, try one of these: %v", e.urls)
+}
+
+func openBlob(oid string, localOnly bool) (io.ReadCloser, error) {
+ f, err := openLocalBlob(oid)
+ if err == nil {
+ return f, err
+ }
+
+ // Special case, just describe where things are.
+ bo, err := getBlobOwnership(oid)
+ if err != nil {
+ return nil, err
+ }
+ nl := bo.ResolveNodes()
+ if len(nl) == 0 {
+ return nil, errors.New("no copies found")
+ }
+
+ if localOnly {
+ return nil, errNotLocal{nl.BlobURLs(oid)}
+ }
+
+ return openRemote(oid, bo.Length, *cachePercentage, nl)
+}
+
+func openRemote(oid string, l int64, cachePerc int, nl NodeList) (io.ReadCloser, error) {
+ for _, sid := range nl {
+ resp, err := sid.ClientForTransfer(l).Get(sid.BlobURL(oid))
+ if err != nil {
+ log.Printf("Error reading %s from node %v: %v",
+ oid, sid, err)
+ continue
+ }
+
+ if resp.StatusCode != 200 {
+ log.Printf("Error response %v from node %v getting %v",
+ resp.Status, sid, oid)
+ resp.Body.Close()
+ continue
+ }
+
+ shouldCache := cachePerc == 100 || (cachePerc > rand.Intn(100) &&
+ availableSpace() > l)
+
+ if !shouldCache {
+ return resp.Body, nil
+ }
+
+ hw, err := NewHashRecord(*root, oid)
+ r := io.TeeReader(resp.Body, hw)
+ return &hwFinisher{r, hw, oid, l}, nil
+ }
+ return nil, fmt.Errorf("couldn't get ob from any of %v", nl)
+}
View
@@ -25,7 +25,7 @@ type ReadSeekCloser interface {
io.Closer
}
-func openBlob(hstr string) (ReadSeekCloser, error) {
+func openLocalBlob(hstr string) (ReadSeekCloser, error) {
return os.Open(hashFilename(*root, hstr))
}
@@ -45,7 +45,7 @@ func forceRemoveObject(h string) error {
}
func verifyObjectHash(h string) error {
- f, err := openBlob(h)
+ f, err := openLocalBlob(h)
if err != nil {
return err
}
View
15 hash.go
@@ -157,3 +157,18 @@ func cleanTmpFiles() error {
}
return nil
}
+
+type hwFinisher struct {
+ io.Reader
+ h *hashRecord
+ oid string
+ l int64
+}
+
+func (h *hwFinisher) Close() error {
+ _, err := h.h.Finish()
+ if err == nil {
+ err = recordBlobOwnership(h.oid, h.l, true)
+ }
+ return err
+}
View
117 http.go
@@ -8,7 +8,6 @@ import (
"io"
"io/ioutil"
"log"
- "math/rand"
"net/http"
"os"
"strconv"
@@ -397,7 +396,7 @@ func doHeadUserFile(w http.ResponseWriter, req *http.Request) {
}
func doHeadRawBlob(w http.ResponseWriter, req *http.Request, oid string) {
- f, err := openBlob(oid)
+ f, err := openLocalBlob(oid)
if err != nil {
http.Error(w,
fmt.Sprintf("Error opening blob: %v", err), 404)
@@ -498,30 +497,16 @@ func doGetUserDoc(w http.ResponseWriter, req *http.Request) {
}
}
- f, err := openBlob(oid)
- switch {
- case err == nil:
+ f, err := openBlob(oid, req.Header.Get("X-CBFS-LocalOnly") != "")
+ if err == nil {
// normal path
- case req.Header.Get("X-CBFS-LocalOnly") != "":
- // Special case, just describe where things are.
- bo, err := getBlobOwnership(oid)
- if err != nil {
- http.Error(w, err.Error(), 500)
- return
- }
- urls := bo.ResolveNodes().BlobURLs(oid)
- if len(urls) == 0 {
- http.Error(w, "No alt URLs found", 500)
- return
- }
- w.Header().Set("Location", urls[0])
+ } else if notloc, ok := err.(errNotLocal); ok {
+ w.Header().Set("Location", notloc.urls[0])
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(300)
- e := json.NewEncoder(w)
- e.Encode(urls)
- return
- default:
- getBlobFromRemote(w, oid, respHeaders, *cachePercentage)
+ json.NewEncoder(w).Encode(notloc.urls)
+ } else {
+ http.Error(w, err.Error(), 500)
return
}
defer f.Close()
@@ -535,11 +520,16 @@ func doGetUserDoc(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Etag", `"`+oid+`"`)
go recordBlobAccess(oid)
- http.ServeContent(w, req, path, modified, f)
+ if r, ok := f.(io.ReadSeeker); ok {
+ http.ServeContent(w, req, path, modified, r)
+ } else {
+ w.WriteHeader(200)
+ io.Copy(w, r)
+ }
}
func doServeRawBlob(w http.ResponseWriter, req *http.Request, oid string) {
- f, err := openBlob(oid)
+ f, err := openLocalBlob(oid)
if err != nil {
http.Error(w, "Error opening blob: "+err.Error(), 404)
removeBlobOwnershipRecord(oid, serverId)
@@ -557,78 +547,31 @@ func getBlobFromRemote(w http.ResponseWriter, oid string,
respHeader http.Header, cachePerc int) error {
// Find the owners of this blob
- ownership := BlobOwnership{}
- oidkey := "/" + oid
- err := couchbase.Get(oidkey, &ownership)
+ ownership, err := getBlobOwnership(oid)
if err != nil {
log.Printf("Missing ownership record for %v", oid)
// Not sure 404 is the right response here
http.Error(w, "Can't find info for blob "+oid, 404)
return err
}
- nl := ownership.ResolveRemoteNodes()
-
- // Loop through the nodes that claim to own this blob
- // If we encounter any errors along the way, try the next node
- for _, sid := range nl {
- resp, err := sid.ClientForTransfer(ownership.Length).Get(sid.BlobURL(oid))
- if err != nil {
- log.Printf("Error reading %s from node %v: %v",
- oid, sid, err)
- continue
- }
- defer resp.Body.Close()
-
- if resp.StatusCode != 200 {
- log.Printf("Error response %v from node %v getting %v",
- resp.Status, sid, oid)
- continue
- }
-
- // Found one, set the headers and send it. Keep a
- // local copy for good luck.
-
- for k, v := range respHeader {
- if isResponseHeader(k) {
- w.Header()[k] = v
- }
- }
- w.WriteHeader(200)
- writeTo := io.Writer(w)
- var hw *hashRecord
-
- if cachePerc == 100 || (cachePerc > rand.Intn(100) &&
- availableSpace() > ownership.Length) {
- hw, err = NewHashRecord(*root, oid)
- if err == nil {
- writeTo = io.MultiWriter(hw, w)
- } else {
- hw = nil
- }
- }
-
- length, err := io.Copy(writeTo, resp.Body)
+ f, err := openRemote(oid, ownership.Length, cachePerc, ownership.ResolveNodes())
+ if err != nil {
+ return err
+ }
+ defer f.Close()
- if err != nil {
- log.Printf("Failed to write %v from remote stream %v",
- oid, err)
- return err
- } else {
- // A successful copy with a working hash
- // record means we should link in and record
- // our copy of this file.
- if hw != nil {
- _, err = hw.Finish()
- if err == nil {
- err = recordBlobOwnership(oid, length,
- true)
- }
- log.Printf("Retrieved %v from %v: result=%v",
- oid, sid, errorOrSuccess(err))
- }
+ for k, v := range respHeader {
+ if isResponseHeader(k) {
+ w.Header()[k] = v
}
+ }
+ w.WriteHeader(200)
+ _, err = io.Copy(w, f)
+ if err != nil {
+ log.Printf("Failed to write %v from remote stream %v",
+ oid, err)
return err
}

0 comments on commit d4a8f57

Please sign in to comment.