Skip to content

Commit

Permalink
Merge pull request #2 from dbyington/chunk_hashing
Browse files Browse the repository at this point in the history
support hashing in chunks
  • Loading branch information
dbyington committed May 5, 2021
2 parents 806100b + fbb65a7 commit 190bfc9
Show file tree
Hide file tree
Showing 3 changed files with 298 additions and 47 deletions.
101 changes: 78 additions & 23 deletions httpio_suite_test.go
Expand Up @@ -3,18 +3,27 @@ package httpio
import (
"net/http"
"net/url"
"reflect"
"sync"
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/ghttp"
)

func TestHttpio(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Httpio Suite")
}

type handler struct {
type httpMock struct {
mutex *sync.Mutex
expected []*request
server *ghttp.Server
}

type request struct {
url *url.URL
header http.Header
method string
Expand All @@ -24,42 +33,88 @@ type handler struct {
responseHeaders map[string][]string
}

func newMockHandler() *handler {
return &handler{responseHeaders: make(map[string][]string)}
func newHTTPMock(s *ghttp.Server) *httpMock {
return &httpMock{
mutex: new(sync.Mutex),
expected: []*request{},
server: s,
}
}

func (h *handler) expect(method string, expectUrl *url.URL, header http.Header) {
h.url = expectUrl
h.header = header
h.method = method
func (h *httpMock) finish() {
h.mutex.Lock()
defer h.mutex.Unlock()
if len(h.expected) > 0 {
Fail("unmatched expectations")
}
h.expected = []*request{}
}

func (h *handler) response(statusCode int, body []byte, headers map[string][]string) {
func (h *httpMock) expect(method string, expectUrl *url.URL, header http.Header) *httpMock {
h.mutex.Lock()
defer h.mutex.Unlock()
h.expected = append(h.expected, &request{
url: expectUrl,
method: method,
header: header,
responseHeaders: make(map[string][]string),
})
return h
}

func (h *httpMock) response(statusCode int, body []byte, headers map[string][]string) *httpMock {
h.mutex.Lock()
defer h.mutex.Unlock()

req := h.expected[len(h.expected)-1]

for k, v := range headers {
h.responseHeaders[k] = v
req.responseHeaders[k] = v
}

h.responseBody = body
h.responseCode = statusCode
req.responseBody = body
req.responseCode = statusCode

h.server.AppendHandlers(h.ServeHTTP)
return h
}

func (h *handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
Expect(req.Method).To(Equal(h.method))
Expect(req.URL.String()).To(Equal(h.url.Path))
func (h *httpMock) ServeHTTP(w http.ResponseWriter, req *http.Request) {
h.mutex.Lock()
defer h.mutex.Unlock()

if len(h.expected) == 0 {
Fail("no expected requests")
}

// Don't care about the UA header.
// We don't care about the UA
req.Header.Del("User-Agent")
Expect(req.Header).To(Equal(h.header))
var (
matched bool
idx int
)
for i, r := range h.expected {
if req.Method == r.method && req.URL.String() == r.url.Path && reflect.DeepEqual(req.Header, r.header) {
for k, v := range r.responseHeaders {
for _, s := range v {
w.Header().Set(k, s)
}
}

if r.responseCode != 0 {
w.WriteHeader(r.responseCode)
}

for k, v := range h.responseHeaders {
for _, s := range v {
w.Header().Set(k, s)
w.Write(r.responseBody)
matched = true
idx = i
break
}
}

if h.responseCode != 0 {
w.WriteHeader(h.responseCode)
if matched {
h.expected = append(h.expected[:idx], h.expected[idx+1:]...)
return
}

w.Write(h.responseBody)
Fail("request did not match any expected request")
}
91 changes: 85 additions & 6 deletions io.go
@@ -1,6 +1,7 @@
package httpio

import (
"bytes"
"context"
"crypto/md5"
"crypto/sha256"
Expand All @@ -9,9 +10,11 @@ import (
"hash"
"io"
"io/ioutil"
"math"
"net/http"
"net/url"
"strings"
"sync"
)

// Possible errors
Expand Down Expand Up @@ -63,6 +66,7 @@ type ReadCloser struct {
}

// ReadAtCloser contains the required options and metadata to implement io.ReadAtCloser on a URL.
// Use the Options to configure the ReadAtCloser with an http.Client, URL, and any additional http.Header values that should be sent with the request.
type ReadAtCloser struct {
options *Options
contentLength int64
Expand Down Expand Up @@ -175,15 +179,90 @@ func (o *Options) hashURL(hashSize uint) (hash.Hash, error) {

switch hashSize {
case sha256.Size:
return Sha256SumReader(res.Body)
return sha256SumReader(res.Body)
default:
return md5SumReader(res.Body)
}
}

// HashURL takes the hash scheme size (sha256.Size or md5.Size) and returns the hashed URL body in the supplied scheme as a hash.Hash interface.
func (r *ReadAtCloser) HashURL(size uint) (hash.Hash, error) {
return r.options.hashURL(size)
// HashURL takes the hash scheme as a uint (either sha256.Size or md5.Size) and the chunk size, and returns the hashed URL body in the supplied scheme as a slice of hash.Hash.
// When the chunk size is less than the length of the content, the URL will be read with multiple, parallel range reads to create the slice of hash.Hash.
// Specifying a chunkSize <= 0 is translated to "do not chunk" and the entire content will be hashed as one chunk.
// The size and capacity of the returned slice of hash.Hash is equal to the number of chunks calculated based on the content length divided by the chunkSize, or 1 if chunkSize is equal to, or less than 0.
func (r *ReadAtCloser) HashURL(scheme uint, chunkSize int64) ([]hash.Hash, error) {
if chunkSize <= 0 {
chunkSize = r.contentLength
}
var chunks int64

// If chunkSize is greater than the content length reset it to the available length and set number of chunks to 1.
// Otherwise we need to divide the length by the number of chunks and round up. The final chunkSize will be the sum of the remainder.
if chunkSize > r.contentLength {
chunkSize = r.contentLength
chunks = 1
} else {
chunks = int64(math.Ceil(float64(r.contentLength) / float64(chunkSize)))
}

var hasher func(reader io.Reader) (hash.Hash, error)

switch scheme {
case sha256.Size:
hasher = sha256SumReader
default:
hasher = md5SumReader
}

hashes := make([]hash.Hash, chunks, chunks)
hashErrs := make([]error, chunks)
wg := sync.WaitGroup{}

for i := int64(0); i < chunks; i++ {
// create a copy of the ReadAtCloser to operate on
rOpt := *r.options
rac := *r
rac.options = &rOpt

wg.Add(1)
go func(w *sync.WaitGroup, idx int64, size int64, rac *ReadAtCloser) {
defer wg.Done()
b := make([]byte, size)
if _, err := rac.ReadAt(b, size*idx); err != nil {
hashErrs[idx] = err
if err != io.ErrUnexpectedEOF {
return
}
}

reader := bytes.NewReader(b[:])
h, err := hasher(reader)
if err != nil {
hashErrs[idx] = err
return
}

hashes[idx] = h
}(&wg, i, chunkSize, &rac)
}

wg.Wait()
if err := checkErrSlice(hashErrs); err != nil {
return hashes, err
}
return hashes, nil
}

func checkErrSlice(es []error) (err error) {
for _, e := range es {
if e != nil {
if err == nil {
err = fmt.Errorf("%s: %s", err, e)
continue
}
err = e
}
}
return nil
}

// Length returns the reported ContentLength of the URL body.
Expand Down Expand Up @@ -292,8 +371,8 @@ func (r *ReadCloser) Close() error {
return nil
}

// Sha256SumReader reads from r until and calculates the sha256 sum, until r is exhausted.
func Sha256SumReader(r io.Reader) (hash.Hash, error) {
// sha256SumReader reads from r until and calculates the sha256 sum, until r is exhausted.
func sha256SumReader(r io.Reader) (hash.Hash, error) {
shaSum := sha256.New()

buf := make([]byte, ReadSizeLimit)
Expand Down

0 comments on commit 190bfc9

Please sign in to comment.