Skip to content
This repository has been archived by the owner on Oct 17, 2023. It is now read-only.

Commit

Permalink
new write_timeout option available in pipeline configuration (#383)
Browse files Browse the repository at this point in the history
nightly integration tests caught the issue of having a hard coded write timeout of 1s, this makes the write timeout configurable for the pipeline (currently applies to all sinks)
  • Loading branch information
jipperinbham committed Jun 24, 2017
1 parent d20abda commit 41ffa26
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 5 deletions.
3 changes: 3 additions & 0 deletions cmd/transporter/goja_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type config struct {
LogDir string `json:"log_dir"`
MaxSegmentBytes int `json:"max_segment_bytes"`
CompactionInterval string `json:"compaction_interval"`
WriteTimeout string `json:"write_timeout"`
}

// Node encapsulates a sink/source node in the pipeline.
Expand Down Expand Up @@ -247,6 +248,7 @@ func (n *Node) Save(call goja.FunctionCall) goja.Value {
pipeline.WithParent(n.parent),
pipeline.WithClient(a.a),
pipeline.WithWriter(a.a),
pipeline.WithWriteTimeout(n.config.WriteTimeout),
}

if n.config.LogDir != "" {
Expand All @@ -272,6 +274,7 @@ func (tf *Transformer) Save(call goja.FunctionCall) goja.Value {
pipeline.WithClient(a.a),
pipeline.WithWriter(a.a),
pipeline.WithTransforms(tf.transforms),
pipeline.WithWriteTimeout(tf.config.WriteTimeout),
}

if tf.config.LogDir != "" {
Expand Down
11 changes: 8 additions & 3 deletions integration_tests/mongo_to_es/app.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
enron_source_mongo = mongodb({
var enron_source_mongo = mongodb({
"uri": "mongodb://${MONGODB_ENRON_SOURCE_USER}:${MONGODB_ENRON_SOURCE_PASSWORD}@${MONGODB_ENRON_SOURCE_URI}/enron",
"tail": false
})

enron_sink_es = elasticsearch({
var enron_sink_es = elasticsearch({
"uri": "https://${ES_ENRON_SINK_USER}:${ES_ENRON_SINK_PASSWORD}@${ES_ENRON_SINK_URI}/enron"
})

t.Source("enron_source_mongo", enron_source_mongo, "emails")
var config = {
"write_timeout": "30s",
}

t.Config(config)
.Source("enron_source_mongo", enron_source_mongo, "emails")
.Save("enron_sink_es", enron_sink_es, "emails");
7 changes: 6 additions & 1 deletion integration_tests/mongo_to_rethink/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,10 @@ enron_sink_rethink = rethinkdb({
"ssl": true
})

t.Source("enron_source_mongo", enron_source_mongo, "emails")
var config = {
"write_timeout": "30s",
}

t.Config(config)
.Source("enron_source_mongo", enron_source_mongo, "emails")
.Save("enron_sink_rethink", enron_sink_rethink, "emails");
21 changes: 20 additions & 1 deletion pipeline/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

const (
defaultCompactionInterval = 1 * time.Hour
defaultWriteTimeout = 5 * time.Second
)

var (
Expand Down Expand Up @@ -78,6 +79,7 @@ type Node struct {
pendingOffsets []offset.Offset
offsetLock sync.Mutex
resumeTimeout time.Duration
writeTimeout time.Duration

compactionInterval time.Duration
}
Expand Down Expand Up @@ -110,6 +112,7 @@ func NewNodeWithOptions(name, kind, ns string, options ...OptionFunc) (*Node, er
reader: &client.MockReader{},
writer: &client.MockWriter{},
resumeTimeout: 60 * time.Second,
writeTimeout: defaultWriteTimeout,
compactionInterval: defaultCompactionInterval,
}
// Run the options on it
Expand Down Expand Up @@ -188,6 +191,22 @@ func WithResumeTimeout(timeout time.Duration) OptionFunc {
}
}

// WithWriteTimeout configures the timeout duration for a writer to return.
func WithWriteTimeout(timeout string) OptionFunc {
return func(n *Node) error {
if timeout == "" {
n.writeTimeout = defaultWriteTimeout
return nil
}
wt, err := time.ParseDuration(timeout)
if err != nil {
return err
}
n.writeTimeout = wt
return nil
}
}

// WithOffsetManager configures an offset.Manager to track message offsets.
func WithOffsetManager(om offset.Manager) OptionFunc {
return func(n *Node) error {
Expand Down Expand Up @@ -511,7 +530,7 @@ func (n *Node) write(msg message.Msg, off offset.Offset) (message.Msg, error) {
n.offsetLock.Unlock()
}

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), n.writeTimeout)
defer cancel()
c := make(chan writeResult)
go func() {
Expand Down
27 changes: 27 additions & 0 deletions pipeline/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,33 @@ var (
},
10, 0, nil,
},
{
"with_write_timeout",
func() (*Node, *StopWriter, func()) {
dataDir := scratchDataDir("write_timeout")
a := &StopWriter{SendCount: 10}
n, _ := NewNodeWithOptions(
"write_timeout_source", "stopWriter", defaultNsString,
WithClient(a),
WithReader(a),
WithCommitLog([]commitlog.OptionFunc{
commitlog.WithPath(dataDir),
commitlog.WithMaxSegmentBytes(1024),
}...),
)
om, _ := offset.NewLogManager(dataDir, "stopper")
NewNodeWithOptions(
"write_timeout_sink", "stopWriter", defaultNsString,
WithClient(a),
WithWriter(a),
WithParent(n),
WithOffsetManager(om),
WithWriteTimeout("1m"),
)
return n, a, func() { os.RemoveAll(dataDir) }
},
10, 0, nil,
},
{
"with_transform",
func() (*Node, *StopWriter, func()) {
Expand Down

0 comments on commit 41ffa26

Please sign in to comment.