/
gzip.go
60 lines (51 loc) · 1.28 KB
/
gzip.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// Copyright (c) 2017 Opsidian Ltd.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package blocks
import (
"compress/gzip"
"context"
"io"
"github.com/conflowio/conflow/src/conflow"
"github.com/conflowio/conflow/src/conflow/block"
)
// @block "task"
type Gzip struct {
// @id
id conflow.ID
// @required
in io.ReadCloser
// @generated
out *Stream
// @dependency
blockPublisher conflow.BlockPublisher
}
func (g *Gzip) ID() conflow.ID {
return g.id
}
func (g *Gzip) Run(ctx context.Context) (conflow.Result, error) {
var pipeWriter io.WriteCloser
g.out.Stream, pipeWriter = io.Pipe()
defer g.out.Stream.Close()
defer pipeWriter.Close()
published, err := g.blockPublisher.PublishBlock(g.out, func() error {
gzipWriter := gzip.NewWriter(pipeWriter)
_, err := io.Copy(gzipWriter, g.in)
_ = gzipWriter.Close()
_ = pipeWriter.Close()
return err
})
if !published {
_, _ = io.Copy(io.Discard, g.in)
}
return nil, err
}
func (g *Gzip) ParseContextOverride() conflow.ParseContextOverride {
return conflow.ParseContextOverride{
BlockTransformerRegistry: block.InterpreterRegistry{
"out": StreamInterpreter{},
},
}
}