Skip to content

Commit

Permalink
add io2 pkg
Browse files Browse the repository at this point in the history
  • Loading branch information
pattyshack committed Jul 27, 2017
1 parent 08931d9 commit 46302a6
Show file tree
Hide file tree
Showing 19 changed files with 3,134 additions and 0 deletions.
11 changes: 11 additions & 0 deletions io2/all_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io2

import (
"testing"

. "gopkg.in/check.v1"
)

func Test(t *testing.T) {
TestingT(t)
}
1 change: 1 addition & 0 deletions io2/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package io2
28 changes: 28 additions & 0 deletions io2/ioutil2/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
Copyright 2012, Google Inc.
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:

* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
51 changes: 51 additions & 0 deletions io2/ioutil2/ioutil2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//
// Copyright 2016, Dropbox Inc. All rights reserved.
// This is a modified version of https://github.com/youtube/vitess/go/ioutil2

// Package ioutil2 provides extra functionality along similar lines to io/ioutil.
package ioutil2

import (
"io/ioutil"
"os"
"path"
)

// Write file to temp and atomically move when everything else succeeds.
func WriteFileAtomic(filename string, data []byte, perm os.FileMode) error {
dir, name := path.Split(filename)
fDir, dirErr := os.Open(dir)
if dirErr != nil {
return dirErr
}
f, err := ioutil.TempFile(dir, name)
if err != nil {
return err
}
_, err = f.Write(data)
if err == nil {
err = f.Sync()
}
if closeErr := f.Close(); err == nil {
err = closeErr
}
if permErr := os.Chmod(f.Name(), perm); err == nil {
err = permErr
}
// Any err should result in full cleanup.
if err != nil {
_ = os.Remove(f.Name())
return err
}
if err = os.Rename(f.Name(), filename); err != nil {
return err
}
err = fDir.Sync()
if closeErr := fDir.Close(); err == nil {
err = closeErr
}
return err
}
17 changes: 17 additions & 0 deletions io2/ioutil2/ioutil2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package ioutil2

import (
"os"
"testing"
)

func TestWrite(t *testing.T) {
fname := "/tmp/atomic-file-test.txt"
err := WriteFileAtomic(fname, []byte("test string\n"), 0664)
if err != nil {
t.Fatal(err)
}
if err := os.Remove(fname); err != nil {
t.Fatal(err)
}
}
239 changes: 239 additions & 0 deletions io2/pipelined_copy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
// +build go1.5

package io2

import (
"io"
"net/http"

"github.com/dropbox/godropbox/errors"
)

// This is similar to io.CopyBuffer, except this uses circular buffer and
// goroutines to pipeline copying (When numBuffers is 1, this simply uses
// io.CopyBuffer).
//
// PipelinedCopy copy will always fill a buffer until it has at least minRead
// bytes in buffer before forwarding the buffer.
// PipelinedCopy flushes dest after each write if it implements http.Flusher
// interface.
func PipelinedCopy(
dest io.Writer,
src io.Reader,
numBuffers int,
bufferSize int,
minRead int) (written int64, err error) {

if numBuffers < 1 {
return 0, errors.Newf("Invalid number of buffers: %d", numBuffers)
}

if bufferSize < 1 {
return 0, errors.Newf("Invalid buffer size: %d", bufferSize)
}

if minRead < 0 {
return 0, errors.Newf("Invalid min read size: %d", minRead)
}

if minRead > bufferSize {
return 0, errors.Newf(
"min read size cannot be bigger than buffer size: %d > %d",
minRead,
bufferSize)
}

if numBuffers == 1 {
// Nothing to pipeline. Just use basic copy. (Don't use io.CopyBuffer
// for streaming requests since it always attempt to fill the entire
// buffer before forwarding).
buf := make([]byte, bufferSize, bufferSize)
return io.CopyBuffer(dest, src, buf)
}

copier := newCircularBufferCopier(
dest,
src,
numBuffers,
bufferSize,
minRead)
return copier.execute()
}

type buffer struct {
array []byte // This slice is not resized
size int
eof bool
}

// A simple ring buffer copier.
type circularBufferCopier struct {
// The read/write loop will early exit when this channel is closed.
earlyExitChan chan struct{}

// Set by read / write loop on exit
errChan chan error

readyChan chan *buffer

recycleChan chan []byte

// Only used by read loop.
numBuffers int
bufferSize int
numCreated int
minChunkRead int
src io.Reader

// Only used by write loop.
numWritten int64
dest io.Writer
}

func newCircularBufferCopier(
dest io.Writer,
src io.Reader,
numBuffers int,
bufferSize int,
minRead int) *circularBufferCopier {

if minRead > bufferSize {
minRead = bufferSize
}

// need at least one byte for ReadAtLeast call, otherwise it will not make
// any Read().
if minRead == 0 {
minRead = 1
}

return &circularBufferCopier{
earlyExitChan: make(chan struct{}),
errChan: make(chan error, 2),
readyChan: make(chan *buffer, numBuffers),
recycleChan: make(chan []byte, numBuffers),
numBuffers: numBuffers,
bufferSize: bufferSize,
numCreated: 0,
minChunkRead: minRead,
src: src,
numWritten: 0,
dest: dest,
}
}

func (c *circularBufferCopier) execute() (int64, error) {
go c.readLoop()
go c.writeLoop()

var err error
closed := false
for i := 0; i < 2; i++ {
e := <-c.errChan
if e != nil && !closed {
err = e
closed = true
close(c.earlyExitChan)
}
}

return c.numWritten, err
}

func (c *circularBufferCopier) getWriteBuffer() []byte {
if c.numCreated < c.numBuffers {
select {
case b := <-c.recycleChan:
return b
case <-c.earlyExitChan:
return nil
default:
c.numCreated++
return make([]byte, c.bufferSize, c.bufferSize)
}
}

select {
case b := <-c.recycleChan:
return b
case <-c.earlyExitChan:
return nil
}
}

func (c *circularBufferCopier) readLoop() {
for {
buf := c.getWriteBuffer()
if buf == nil {
c.errChan <- nil
return
}

n, err := io.ReadAtLeast(c.src, buf, c.minChunkRead)

if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
c.readyChan <- &buffer{
array: buf,
size: n,
eof: true,
}

c.errChan <- nil
return
}

c.errChan <- errors.Wrap(err, "Failed to read from source: ")
return
}

c.readyChan <- &buffer{
array: buf,
size: n,
eof: false,
}
}
}

func (c *circularBufferCopier) writeLoop() {
for {
var buf *buffer
select {
case buf = <-c.readyChan:
// do nothing
case <-c.earlyExitChan:
c.errChan <- nil
return
}

if buf.size > 0 {
written, err := c.dest.Write(buf.array[0:buf.size])

c.numWritten += int64(written)

if err != nil {
c.errChan <- errors.Wrap(
err,
"Failed to write to destination: ")
return
}

if written != buf.size {
c.errChan <- errors.New(
"Failed to write to destination: short write")
return
}

if fl, ok := c.dest.(http.Flusher); ok {
fl.Flush()
}
}

if buf.eof {
c.errChan <- nil
return
}

c.recycleChan <- buf.array
}
}
Loading

0 comments on commit 46302a6

Please sign in to comment.