Skip to content

Write Mapper Reducer in Go

Chris Lu edited this page Aug 10, 2017 · 1 revision

One of the common request is to be able to write Go code for mappers and reducers. Many users like the syntax from Glow, which sends out the whole binary executable to the agents, and executes the functions via reflection.

However, reflection is not very performant.

How Gleamold works?

Gleamold will still send the driver binary to agents, which will invoke the registered mapper or reducer functions in the binary.

Let us write the Driver program

The driver program mostly are the same. See the full working example here: https://github.com/chrislusf/gleamold/blob/master/examples/word_count_in_go/word_count_in_go.go

var (
	MapperTokenizer = gio.RegisterMapper(tokenize)
	MapperAddOne    = gio.RegisterMapper(addOne)
	ReducerSum      = gio.RegisterReducer(sum)
)

func main() {

	// Init() will determine whether the driver program will execute the mapper/reducer or not.
	gio.Init()

	flow.New().
		TextFile("/etc/passwd").
		Pipe("tr 'A-Z' 'a-z'").
		Mapper(MapperTokenizer). // invoke the registered "tokenize" mapper function.
		Mapper(MapperAddOne).  // invoke the registered "addOne" mapper function.
		ReducerBy(ReducerSum). // invoke the registered "sum" reducer function.
		Sort(flow.OrderBy(2, true)).
		Fprintf(os.Stdout, "%s %d\n").
		Run()

Write the Mapper Reducer

Here are the interface to define mapper and reducer. See https://godoc.org/github.com/chrislusf/gleamold/gio#Mapper

type Mapper func([]interface{}) error
type Reducer func(x, y interface{}) (interface{}, error)

We are using interface{} here, without the luxury of strict type checking any more. It's a small cost to pay for a much higher performance.

func tokenize(row []interface{}) error {
	line := string(row[0].([]byte))
	for _, s := range strings.FieldsFunc(line, func(r rune) bool {
		return !('A' <= r && r <= 'Z' || 'a' <= r && r <= 'z' || '0' <= r && r <= '9')
	}) {
		gio.Emit(s)
	}
	return nil
}

func addOne(row []interface{}) error {
	word := string(row[0].([]byte))
	gio.Emit(word, 1)
	return nil
}

func sum(x, y interface{}) (interface{}, error) {
	return x.(uint64) + y.(uint64), nil
}

Mapper Reducer Common Mishaps

Forget to add gio.Init() ?

Without it, it may get into a fork loop.

Why my string object is becoming []byte?

It's because LuaJIT only knows []byte. This is trying to be compatible with LuaJIT.