New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MsgPack instruction to bridge pipe with msgpack #77
Conversation
Codecov Report
@@ Coverage Diff @@
## master #77 +/- ##
=======================================
Coverage 55.37% 55.37%
=======================================
Files 9 9
Lines 186 186
=======================================
Hits 103 103
Misses 81 81
Partials 2 2 Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AllShardTOAllShard -> AllShardToAllShard ?
This Piper to msgpack should have been transparently done already. It did not work? |
testcase: f := flow.New("top5 words in passwd"). $ go run word_count_in_go.go |
It's better to avoid one more API. I fixed the issue with this diff. |
Hi, I also get this error when running util.NewRow within genShardInfo in a plugin I'm writing.
The encoded data is type []uint8 and looks like this:
The following debug shows me:
Please will you let me know what's wrong here? |
The git page stripped out the nil from the last comment:
|
panic: runtime error: invalid memory address or nil pointer dereference goroutine 9 [running]: |
@chrislusf Thanks a lot, I'm just starting Go and not very familiar with the error reporting. |
@chrislusf So this error appears to be coming from my shard_info file for my plugin. Is it possible to print debug from the shard_info file? I've surrounded the function with Printlns but nothing is being emitted. Could you also comment on my understanding of the plugin structure? plugin_source.go - genShardInfos - determines the split for the shard. For example, an databse iteration over 1000 days with 2 partitions would mean that shard info was created with a db connection and a loop range shard 1 (days=1 to 500) shard 2 (days=501 to 1000) plugin_shard_info.go - ReadSplit - uses the shard info generated in plugin_source.go to return the data. Is called by Gleam Flow when you invoke a function like printlnf after read.
It would be great to know if this is correct or not. |
I mostly use println() Correct. |
For anyone else reading this, I was able to get the subprocesses to print using the Stderr output.
My shard info was being set to nil as the struct I was gob encoding had vars beginning with lowercase letters (unexported fields). |
example:
f := flow.New("top5 words in passwd").
Read(file.Txt("/etc/passwd", 1)).
Map("tokenize", mapper.Tokenize). // invoke the registered "tokenize" mapper function.
Pipe("debugWithPipe", "tee debug.txt").MsgPack("fromPipeToMsgpack").
Map("addOne", mapper.AppendOne). // invoke the registered "addOne" mapper function.
ReduceByKey("sum", reducer.SumInt64). // invoke the registered "sum" reducer function.
Sort("sortBySum", flow.OrderBy(2, true)).
Top("top5", 5, flow.OrderBy(2, false)).
Printlnf("%s\t%d")