Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Lookahead buffer for ServeContent #1876

Merged
merged 43 commits into from
Nov 12, 2019
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
50ef884
api/http: initial lookahead implenmentation
acud Oct 10, 2019
0bd408a
api/http: add logging
acud Oct 10, 2019
45c680e
api/http: fix langos for current tests
janos Oct 10, 2019
56e47a3
api/http: langos data read validity test and goroutine leak prevention
janos Oct 10, 2019
9213953
api/http: implement langos Seek method
janos Oct 10, 2019
da3197f
api/http: add copyright to new files
janos Oct 10, 2019
d1d2be5
api/http/langos: new package
janos Oct 10, 2019
4745d85
api/http/langos: export required names and ensure blackbox testing
janos Oct 10, 2019
3d6effe
api/http/langos: improve Langos and add BenchmarkDelayedReaders
janos Oct 10, 2019
9ac7b5e
api/http: select download via http get query "_downloader"
janos Oct 10, 2019
d53ec20
api/http: configurable default downloader
janos Oct 10, 2019
70d435e
api/http/langos: add comments and optimize mutex lock in peek
janos Oct 11, 2019
839e167
api/http/langos: race free code and http test and benchmark
janos Oct 11, 2019
4264b84
api/http/langos: avoid buffer allocation
janos Oct 11, 2019
aa5b8ce
api/http/langos: add uncommitted file
janos Oct 11, 2019
17500a0
api/http/langos: reduce locking area
janos Oct 11, 2019
06c62e5
api/http/langos: add TestHTTPRangeResponse
janos Oct 11, 2019
f7971c8
api/http/langos: remove defer from for loop in test
janos Oct 11, 2019
33fcef1
api/http/langos: buffered peekDOne chan not to block peek goroutine
janos Oct 11, 2019
3e5b2b2
api/http/langos: add random delays benchmarks
janos Oct 13, 2019
2e14e65
api/http/langos: improve read count validation in tests
janos Oct 14, 2019
0ac192b
api/http/langos: add TestHTTPMultipleRangeResponse
janos Oct 14, 2019
21cbf56
api/http/langos: simplify/fix peek and enable TestHTTPMultipleRangeRe…
janos Oct 14, 2019
04dbdc6
api/http: langos by default
janos Oct 14, 2019
e757ef8
api/http/langos: http range tests with multiple size/buffer combinations
janos Oct 14, 2019
1148a8f
api/http/lanogs: add plain langs to benchmarks
janos Oct 14, 2019
82a1662
api/http/lanogs: update BenchmarkDelayedReaders comment
janos Oct 14, 2019
1dc82f8
api/http/langos: rename NewLangos max peek size argument
janos Oct 14, 2019
0f21272
api/http/langos: correct TestLangosCallsPeekOnlyTwice subtest
janos Oct 15, 2019
908048a
api/http/langos: add io.ReadSeeker tests for all implementations
janos Oct 17, 2019
d1197fb
api/http/langos: add more comments and extend newReadSeekerTester
janos Oct 17, 2019
210a32d
api/http/langos: fix comment for TestBufferedReadSeeker
janos Oct 22, 2019
9a737a8
api/http/langos: fix some comments
janos Oct 23, 2019
a43b969
api/http/langos: new langos with multiple peaks
janos Oct 24, 2019
9d3933e
Merge branch 'master' into lookahead-buffer
janos Oct 25, 2019
6d01ddd
api/http/langos: preserve peek tail
janos Oct 25, 2019
ebbe46c
Merge branch 'master' into lookahead-buffer
janos Oct 29, 2019
85d8a0d
api/http/langos: fix TestLangosCallsPeekOnlyTwice
janos Oct 29, 2019
912be25
api/http/langos: update test comments and change a test name
janos Oct 29, 2019
d178167
api/http/langos: add comments to BufferedReadSeeker methods
janos Oct 29, 2019
b8ea632
api/http: remove readseeker tests (now in langos)
janos Oct 29, 2019
fb1694f
Merge branch 'master' into lookahead-buffer
janos Nov 4, 2019
9556a1e
api/http: remove temporary _downloader get param
janos Nov 11, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions api/http/langos/buffered_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2019 The Swarm Authors
// This file is part of the Swarm library.
//
// The Swarm library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Swarm library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Swarm library. If not, see <http://www.gnu.org/licenses/>.

package langos

import (
"bufio"
"io"
)

// BufferedReadSeeker wraps bufio.Reader to expose Seek method
// from the provided io.ReadSeeker in NewBufferedReadSeeker.
type BufferedReadSeeker struct {
r *bufio.Reader
s io.ReadSeeker
ra io.ReaderAt
}

// NewBufferedReadSeeker creates a new instance of BufferedReadSeeker,
// out of io.ReadSeeker. Argument `size` is the size of the read buffer.
func NewBufferedReadSeeker(readSeeker io.ReadSeeker, size int) BufferedReadSeeker {
ra, _ := readSeeker.(io.ReaderAt)
return BufferedReadSeeker{
r: bufio.NewReaderSize(readSeeker, size),
s: readSeeker,
ra: ra,
}
}

// Read reads to the byte slice from from buffered reader.
func (b BufferedReadSeeker) Read(p []byte) (n int, err error) {
return b.r.Read(p)
}

// Seek moves the read position of the underlying ReadSeeker and resets the buffer.
func (b BufferedReadSeeker) Seek(offset int64, whence int) (int64, error) {
n, err := b.s.Seek(offset, whence)
b.r.Reset(b.s)
return n, err
}

// ReadAt implements io.ReaderAt if the provided ReadSeeker also implements it,
// otherwise it returns no error and no bytes read.
func (b BufferedReadSeeker) ReadAt(p []byte, off int64) (n int, err error) {
if b.ra == nil {
return 0, nil
}
return b.ra.ReadAt(p, off)
}
319 changes: 319 additions & 0 deletions api/http/langos/http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
// Copyright 2019 The Swarm Authors
// This file is part of the Swarm library.
//
// The Swarm library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Swarm library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Swarm library. If not, see <http://www.gnu.org/licenses/>.

package langos_test

import (
"bytes"
"fmt"
"io/ioutil"
"math/rand"
"mime"
"mime/multipart"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
"time"

"github.com/ethersphere/swarm/api/http/langos"
)

// TestHTTPResponse validates that the langos returns correct data
// over http test server and ServeContent function.
func TestHTTPResponse(t *testing.T) {
multiSizeTester(t, func(t *testing.T, dataSize, bufferSize int) {
data := randomData(t, dataSize)

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.ServeContent(w, r, "test", time.Now(), langos.NewBufferedLangos(bytes.NewReader(data), bufferSize))
}))
defer ts.Close()

res, err := http.Get(ts.URL)
if err != nil {
t.Fatal(err)
}
defer res.Body.Close()

got, err := ioutil.ReadAll(res.Body)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(got, data) {
t.Fatalf("got invalid data (lengths: got %v, want %v)", len(got), len(data))
}
})
}

// TestHTTPResponse validates that the langos returns correct data
// over http test server and ServeContent function for http range requests.
func TestHTTPRangeResponse(t *testing.T) {
multiSizeTester(t, func(t *testing.T, dataSize, bufferSize int) {
data := randomData(t, dataSize)

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.ServeContent(w, r, "test", time.Now(), langos.NewBufferedLangos(bytes.NewReader(data), bufferSize))
}))
defer ts.Close()

for i := 0; i < 12; i++ {
start := rand.Intn(dataSize)
var end int
if dataSize-1-start <= 0 {
end = dataSize - 1
} else {
end = rand.Intn(dataSize-1-start) + start
}
rangeHeader := fmt.Sprintf("bytes=%v-%v", start, end)
if i == 0 {
// test open ended range
end = dataSize - 1
rangeHeader = fmt.Sprintf("bytes=%v-", start)
}

gotRangs := httpRangeRequest(t, ts.URL, rangeHeader)
got := gotRangs[0]
want := data[start : end+1]
if !bytes.Equal(got, want) {
t.Fatalf("got invalid data for range %s (lengths: got %v, want %v)", rangeHeader, len(got), len(want))
}
}
})
}

// TestHTTPMultipleRangeResponse validates that the langos returns correct data
// over http test server and ServeContent function for http requests with multiple ranges.
func TestHTTPMultipleRangeResponse(t *testing.T) {
multiSizeTester(t, func(t *testing.T, dataSize, bufferSize int) {
data := randomData(t, dataSize)

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.ServeContent(w, r, "test", time.Now(), langos.NewBufferedLangos(bytes.NewReader(data), bufferSize))
}))
defer ts.Close()

for i := 0; i < 12; i++ {
var ranges [][2]int

var wantParts [][2]int
for i := rand.Intn(5); i >= 0; i-- {
var beginning int
if l := len(ranges); l > 0 {
beginning = ranges[l-1][1]
}
if beginning >= dataSize {
break
}
start := rand.Intn(dataSize-beginning) + beginning
var end int
if dataSize-1-start <= 0 {
end = dataSize - 1
} else {
end = rand.Intn(dataSize-1-start) + start
}
if l := len(wantParts); l > 0 && wantParts[l-1][0] == start && wantParts[l-1][1] == end {
continue
}
ranges = append(ranges, [2]int{start, end})
wantParts = append(wantParts, [2]int{start, end})
}

rangeHeader := "bytes="
for i, r := range ranges {
if i > 0 {
rangeHeader += ", "
}
rangeHeader += fmt.Sprintf("%v-%v", r[0], r[1])
}

gotParts := httpRangeRequest(t, ts.URL, rangeHeader)

if len(gotParts) != len(wantParts) {
t.Fatalf("got %v parts for range %q, want %v", len(gotParts), rangeHeader, len(wantParts))
}

for i, w := range wantParts {
got := gotParts[i]
want := data[w[0] : w[1]+1]
if !bytes.Equal(got, want) {
t.Fatalf("got invalid data for range #%v %s (lengths: got %v, want %v)", i+1, rangeHeader, len(got), len(want))
}
}
}
})
}

func parseDataSize(t *testing.T, v string) (s int) {
t.Helper()

multiplier := 1
for suffix, value := range map[string]int{
"k": 1024,
"M": 1024 * 1024,
} {
if strings.HasSuffix(v, suffix) {
v = strings.TrimSuffix(v, suffix)
multiplier = value
break
}
}
s, err := strconv.Atoi(v)
if err != nil {
t.Fatal(err)
}
return s * multiplier
}

func httpRangeRequest(t *testing.T, url, rangeHeader string) (parts [][]byte) {
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
t.Fatal(err)
}

req.Header.Add("Range", rangeHeader)

res, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
}
defer res.Body.Close()

mimetype, params, _ := mime.ParseMediaType(res.Header.Get("Content-Type"))
if mimetype == "multipart/byteranges" {
mr := multipart.NewReader(res.Body, params["boundary"])
for part, err := mr.NextPart(); err == nil; part, err = mr.NextPart() {
value, err := ioutil.ReadAll(part)
if err != nil {
t.Fatal(err)
}
parts = append(parts, value)
}
} else {
value, err := ioutil.ReadAll(res.Body)
if err != nil {
t.Fatal(err)
}
parts = append(parts, value)
}

return parts
}

// BenchmarkHTTPDelayedReaders measures time needed by test http server to serve the body
// using different readers.
//
// goos: darwin
// goarch: amd64
// pkg: github.com/ethersphere/swarm/api/http/langos
// BenchmarkHTTPDelayedReaders/static_direct-8 8 128278515 ns/op 8389878 B/op 24 allocs/op
// BenchmarkHTTPDelayedReaders/static_buffered-8 43 27465687 ns/op 8389144 B/op 22 allocs/op
// BenchmarkHTTPDelayedReaders/static_langos-8 441 2578510 ns/op 10264076 B/op 63 allocs/op
// BenchmarkHTTPDelayedReaders/static_buffered_langos-8 493 2591692 ns/op 10120822 B/op 57 allocs/op
// BenchmarkHTTPDelayedReaders/random_direct-8 3 351496566 ns/op 8389416 B/op 23 allocs/op
// BenchmarkHTTPDelayedReaders/random_buffered-8 14 90407289 ns/op 8389294 B/op 22 allocs/op
// BenchmarkHTTPDelayedReaders/random_langos-8 430 2771827 ns/op 10256494 B/op 62 allocs/op
// BenchmarkHTTPDelayedReaders/random_buffered_langos-8 420 2817784 ns/op 10115937 B/op 57 allocs/op
func BenchmarkHTTPDelayedReaders(b *testing.B) {
dataSize := 2 * 1024 * 1024
bufferSize := 4 * 32 * 1024

data := randomData(b, dataSize)

for _, bc := range []struct {
name string
newReader func() langos.Reader
}{
{
name: "static direct",
newReader: func() langos.Reader {
return newDelayedReaderStatic(bytes.NewReader(data), defaultStaticDelays)
},
},
{
name: "static buffered",
newReader: func() langos.Reader {
return langos.NewBufferedReadSeeker(newDelayedReaderStatic(bytes.NewReader(data), defaultStaticDelays), bufferSize)
},
},
{
name: "static langos",
newReader: func() langos.Reader {
return langos.NewLangos(newDelayedReaderStatic(bytes.NewReader(data), defaultStaticDelays), bufferSize)
},
},
{
name: "static buffered langos",
newReader: func() langos.Reader {
return langos.NewBufferedLangos(newDelayedReaderStatic(bytes.NewReader(data), defaultStaticDelays), bufferSize)
},
},
{
name: "random direct",
newReader: func() langos.Reader {
return newDelayedReader(bytes.NewReader(data), randomDelaysFunc)
},
},
{
name: "random buffered",
newReader: func() langos.Reader {
return langos.NewBufferedReadSeeker(newDelayedReader(bytes.NewReader(data), randomDelaysFunc), bufferSize)
},
},
{
name: "random langos",
newReader: func() langos.Reader {
return langos.NewLangos(newDelayedReader(bytes.NewReader(data), randomDelaysFunc), bufferSize)
},
},
{
name: "random buffered langos",
newReader: func() langos.Reader {
return langos.NewBufferedLangos(newDelayedReader(bytes.NewReader(data), randomDelaysFunc), bufferSize)
},
},
} {
b.Run(bc.name, func(b *testing.B) {
b.StopTimer()

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.ServeContent(w, r, "test", time.Now(), bc.newReader())
}))
defer ts.Close()

for i := 0; i < b.N; i++ {
res, err := http.Get(ts.URL)
if err != nil {
b.Fatal(err)
}

b.StartTimer()
got, err := ioutil.ReadAll(res.Body)
b.StopTimer()

res.Body.Close()
if err != nil {
b.Fatal(err)
}
if !bytes.Equal(got, data) {
b.Fatalf("%v got invalid data (lengths: got %v, want %v)", i, len(got), len(data))
}
}
})
}
}
Loading