/
dataset_output.go
151 lines (139 loc) · 3.56 KB
/
dataset_output.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package flow
import (
"bufio"
"errors"
"fmt"
"io"
"os"
"reflect"
"github.com/chrislusf/gleam/gio"
"github.com/chrislusf/gleam/pb"
"github.com/chrislusf/gleam/util"
)
// Output concurrently collects outputs from previous step to the driver.
func (d *Dataset) Output(f func(io.Reader) error) *Dataset {
step := d.Flow.AddAllToOneStep(d, nil)
step.IsOnDriverSide = true
step.Name = "Output"
step.Function = func(readers []io.Reader, writers []io.Writer, stat *pb.InstructionStat) error {
errChan := make(chan error, len(readers))
for i, reader := range readers {
go func(i int, reader io.Reader) {
errChan <- f(reader)
}(i, reader)
}
for range readers {
err := <-errChan
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to process output: %v\n", err)
return err
}
}
return nil
}
return d
}
// Fprintf formats using the format for each row and writes to writer.
func (d *Dataset) Fprintf(writer io.Writer, format string) *Dataset {
fn := func(r io.Reader) error {
w := bufio.NewWriter(writer)
defer w.Flush()
return util.Fprintf(w, r, format)
}
return d.Output(fn)
}
// Fprintlnf add "\n" at the end of each format
func (d *Dataset) Fprintlnf(writer io.Writer, format string) *Dataset {
return d.Fprintf(writer, format+"\n")
}
// Printf prints to os.Stdout in the specified format
func (d *Dataset) Printf(format string) *Dataset {
return d.Fprintf(os.Stdout, format)
}
// Printlnf prints to os.Stdout in the specified format,
// adding an "\n" at the end of each format
func (d *Dataset) Printlnf(format string) *Dataset {
return d.Fprintf(os.Stdout, format+"\n")
}
// SaveFirstRowTo saves the first row's values into the operands.
func (d *Dataset) SaveFirstRowTo(decodedObjects ...interface{}) *Dataset {
fn := func(reader io.Reader) error {
return util.TakeMessage(reader, 1, func(encodedBytes []byte) error {
row, err := util.DecodeRow(encodedBytes)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to decode byte: %v\n", err)
return err
}
var counter int
for _, v := range row.K {
if err := setValueTo(v, decodedObjects[counter]); err != nil {
return err
}
counter++
}
for _, v := range row.V {
if err := setValueTo(v, decodedObjects[counter]); err != nil {
return err
}
counter++
}
return nil
})
}
return d.Output(fn)
}
func (d *Dataset) OutputRow(f func(*util.Row) error) *Dataset {
fn := func(reader io.Reader) error {
return util.TakeMessage(reader, -1, func(encodedBytes []byte) error {
row, err := util.DecodeRow(encodedBytes)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to decode byte: %v\n", err)
return err
}
return f(row)
})
}
return d.Output(fn)
}
func setValueTo(src, dst interface{}) error {
switch v := dst.(type) {
case *string:
*v = gio.ToString(src)
case *[]byte:
*v = src.([]byte)
case *int:
*v = int(gio.ToInt64(src))
case *int8:
*v = int8(gio.ToInt64(src))
case *int16:
*v = int16(gio.ToInt64(src))
case *int32:
*v = int32(gio.ToInt64(src))
case *int64:
*v = gio.ToInt64(src)
case *uint:
*v = uint(gio.ToInt64(src))
case *uint8:
*v = uint8(gio.ToInt64(src))
case *uint16:
*v = uint16(gio.ToInt64(src))
case *uint32:
*v = uint32(gio.ToInt64(src))
case *uint64:
*v = uint64(gio.ToInt64(src))
case *bool:
*v = src.(bool)
case *float32:
*v = float32(gio.ToFloat64(src))
case *float64:
*v = gio.ToFloat64(src)
}
v := reflect.ValueOf(dst)
if !v.IsValid() {
return errors.New("setValueTo nil")
}
if v.Kind() != reflect.Ptr {
return fmt.Errorf("setValueTo to nonsettable %T", dst)
}
return nil
}