/
gzip.go
90 lines (75 loc) · 2.07 KB
/
gzip.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
// Package gzip provides a streaming object for taking in io.ReadCloser that is being written to
// and providing an io.ReadCloser that outputs the original content gzip compressed.
package gzip
import (
"compress/gzip"
"io"
"sync"
"sync/atomic"
)
var compressPool = &sync.Pool{
New: func() interface{} {
return gzip.NewWriter(nil)
},
}
// Streamer implements an io.ReadCloser that converts data from a non-compressed stream to a compressed stream.
type Streamer struct {
userInput io.ReadCloser
outputRead *io.PipeReader
outputWrite *io.PipeWriter
size int64
err atomic.Value // holds error
}
// New creates a new streamer object. Use Reset() to initialize it.
func New() *Streamer {
return &Streamer{}
}
// Reset resets the streamer object to defaults and accepts the io.ReadCloser.
// You can only use Reset after a previous reader has closed.
func (s *Streamer) Reset(reader io.ReadCloser) {
s.userInput = reader
s.outputRead, s.outputWrite = io.Pipe()
s.size = 0
s.err = atomic.Value{}
s.run()
}
// InputSize returns the amount of data that the Streamer streamed. This will only be accurate for
// the full stream after Read() has returned io.EOF and not before.
func (s *Streamer) InputSize() int64 {
return s.size
}
func Compress(payload io.Reader) io.Reader {
var closer io.ReadCloser
var ok bool
if closer, ok = payload.(io.ReadCloser); !ok {
closer = io.NopCloser(payload)
}
zw := New()
zw.Reset(closer)
return zw
}
// run copies the file into a buffer that we stream back via our Read() call.
func (s *Streamer) run() {
zw := compressPool.Get().(*gzip.Writer)
zw.Reset(s.outputWrite)
go func() {
defer compressPool.Put(zw)
defer s.outputWrite.Close()
defer zw.Close()
defer zw.Flush()
amount, err := io.Copy(zw, s.userInput)
s.size = int64(amount)
if err != nil {
s.err.Store(err)
}
}()
}
// Read implements io.Reader.
func (s *Streamer) Read(b []byte) (int, error) {
amount, err := s.outputRead.Read(b)
return amount, err
}
// Close implements io.Closer.
func (s *Streamer) Close() error {
return s.outputRead.Close()
}