-
Notifications
You must be signed in to change notification settings - Fork 3
/
operation.go
102 lines (95 loc) · 3.35 KB
/
operation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package util
import (
"fmt"
"github.com/go-sif/sif"
)
// SafeMapOperation wraps a MapOperation such that panics are recovered and nice error messages are constructed
func SafeMapOperation(mapOp sif.MapOperation) (safeMapOp sif.MapOperation) {
return func(row sif.Row) (err error) {
defer func() {
if r := recover(); r != nil {
if anErr, ok := r.(error); ok {
err = fmt.Errorf("map Panic: %w\nRow: %s\n%s", anErr, row, GetTrace())
} else {
err = fmt.Errorf("map Panic: %v\nRow: %s\n%s", r, row, GetTrace())
}
} else if err != nil {
err = fmt.Errorf("map Error: %w\nRow: %s", err, row)
}
}()
err = mapOp(row)
return
}
}
// SafeFilterOperation wraps a FilterOperation such that panics are recovered and nice error messages are constructed
func SafeFilterOperation(filterOp sif.FilterOperation) (safeFilterOp sif.FilterOperation) {
return func(row sif.Row) (shouldFilter bool, err error) {
defer func() {
if r := recover(); r != nil {
if anErr, ok := r.(error); ok {
err = fmt.Errorf("filter Panic: %w\nRow: %s\n%s", anErr, row, GetTrace())
} else {
err = fmt.Errorf("filter Panic: %v\nRow: %s\n%s", r, row, GetTrace())
}
} else if err != nil {
err = fmt.Errorf("filter Error: %w\nRow: %s", err, row)
}
}()
shouldFilter, err = filterOp(row)
return
}
}
// SafeFlatMapOperation wraps a FlatMapOperation such that panics are recovered and nice error messages are constructed
func SafeFlatMapOperation(flatMapOp sif.FlatMapOperation) (safeFlatMapOp sif.FlatMapOperation) {
return func(row sif.Row, newRow sif.RowFactory) (err error) {
defer func() {
if r := recover(); r != nil {
if anErr, ok := r.(error); ok {
err = fmt.Errorf("flatMap Panic: %w\nRow: %s\n%s", anErr, row, GetTrace())
} else {
err = fmt.Errorf("flatMap Panic: %v\nRow: %s\n%s", r, row, GetTrace())
}
} else if err != nil {
err = fmt.Errorf("flatMap Error: %w\nRow: %s", err, row)
}
}()
err = flatMapOp(row, newRow)
return
}
}
// SafeKeyingOperation wraps a KeyingOperation such that panics are recovered and nice error messages are constructed
func SafeKeyingOperation(keyingOp sif.KeyingOperation) (safeKeyingOp sif.KeyingOperation) {
return func(row sif.Row) (key []byte, err error) {
defer func() {
if r := recover(); r != nil {
if anErr, ok := r.(error); ok {
err = fmt.Errorf("keying Panic: %w\nRow: %s\n%s", anErr, row, GetTrace())
} else {
err = fmt.Errorf("keying Panic: %v\nRow: %s\n%s", r, row, GetTrace())
}
} else if err != nil {
err = fmt.Errorf("keying Error: %w\nRow: %s", err, row)
}
}()
key, err = keyingOp(row)
return
}
}
// SafeReductionOperation wraps a ReductionOperation such that panics are recovered and nice error messages are constructed
func SafeReductionOperation(reductionOp sif.ReductionOperation) (safeReductionOp sif.ReductionOperation) {
return func(lrow, rrow sif.Row) (err error) {
defer func() {
if r := recover(); r != nil {
if anErr, ok := r.(error); ok {
err = fmt.Errorf("reduction Panic: %w\nLRow: %s\nRRow: %s\n%s", anErr, lrow, rrow, GetTrace())
} else {
err = fmt.Errorf("reduction Panic: %v\nLRow: %s\nRRow: %s\n%s", r, lrow, rrow, GetTrace())
}
} else if err != nil {
err = fmt.Errorf("reduction Error: %w\nLRow: %s\nRRow: %s", err, lrow, rrow)
}
}()
err = reductionOp(lrow, rrow)
return
}
}