-
Notifications
You must be signed in to change notification settings - Fork 10
/
table.go
296 lines (262 loc) · 6.92 KB
/
table.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
package csv
import (
"bytes"
"compress/gzip"
"encoding/csv"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/frictionlessdata/tableschema-go/table"
)
// Table represents a Table backed by a CSV physical representation.
type Table struct {
headers []string
source Source
skipHeaders bool
dialect dialect
}
// dialect represents CSV dialect configuration options.
// http://frictionlessdata.io/specs/csv-dialect/
type dialect struct {
// Delimiter specifies the character sequence which should separate fields (aka columns).
delimiter rune
// Specifies how to interpret whitespace which immediately follows a delimiter;
// if false, it means that whitespace immediately after a delimiter should be treated as part of the following field.
skipInitialSpace bool
}
var defaultDialect = dialect{
delimiter: ',',
skipInitialSpace: true,
}
// NewTable creates a table.Table from the CSV table physical representation.
// CreationOpts are executed in the order they are declared.
// If a dialect is not configured via SetDialect, DefautltDialect is used.
func NewTable(source Source, opts ...CreationOpts) (*Table, error) {
t := Table{source: source, dialect: defaultDialect}
for _, opt := range opts {
if err := opt(&t); err != nil {
return nil, err
}
}
return &t, nil
}
// Iter returns an Iterator to read the table. Iter returns an error
// if the table physical source can not be iterated.
// The iteration process always start at the beginning of the CSV and
// is backed by a new reading.
func (table *Table) Iter() (table.Iterator, error) {
src, err := table.source()
if err != nil {
return nil, err
}
return newIterator(src, table.dialect, table.skipHeaders), nil
}
// ReadAll reads all rows from the table and return it as strings.
func (table *Table) ReadAll() ([][]string, error) {
var r [][]string
iter, err := table.Iter()
if err != nil {
return nil, err
}
defer iter.Close()
for iter.Next() {
r = append(r, iter.Row())
}
return r, nil
}
// Headers returns the headers of the tabular data.
func (table *Table) Headers() []string {
return table.headers
}
// ReadColumn reads a specific column from the table and return it as strings.
func (table *Table) ReadColumn(name string) ([]string, error) {
index := -1
for i, h := range table.headers {
if name == h {
index = i
break
}
}
if index == -1 {
return nil, fmt.Errorf("column name \"%s\" not found in headers", name)
}
iter, err := table.Iter()
if err != nil {
return nil, fmt.Errorf("error creating iterator:%q", err)
}
var col []string
for iter.Next() {
col = append(col, iter.Row()[index])
}
return col, nil
}
// String returns a string version of the table.
func (table *Table) String() string {
var buf bytes.Buffer
w := csv.NewWriter(&buf)
rows, err := table.ReadAll()
if err != nil {
return ""
}
w.WriteAll(rows)
return buf.String()
}
func newIterator(source io.ReadCloser, dialect dialect, skipHeaders bool) *csvIterator {
r := csv.NewReader(source)
r.Comma = dialect.delimiter
r.TrimLeadingSpace = dialect.skipInitialSpace
return &csvIterator{
source: source,
reader: r,
skipHeaders: skipHeaders,
}
}
type csvIterator struct {
reader *csv.Reader
source io.ReadCloser
current []string
err error
skipHeaders bool
}
func (i *csvIterator) Next() bool {
if i.err != nil {
return false
}
var err error
i.current, err = i.reader.Read()
if err != nil && err != io.EOF {
// For reference: https://github.com/frictionlessdata/tableschema-go/issues/73
if newErr, ok := err.(*csv.ParseError); ok && newErr.Err == csv.ErrFieldCount {
return true
}
i.err = err
}
if i.skipHeaders {
i.skipHeaders = false
i.Next()
}
return err == nil
}
func (i *csvIterator) Row() []string {
return i.current
}
func (i *csvIterator) Err() error {
return i.err
}
func (i *csvIterator) Close() error {
return i.source.Close()
}
// CreationOpts defines functional options for creating Tables.
type CreationOpts func(t *Table) error
// Source defines a table physical data source.
type Source func() (io.ReadCloser, error)
// FromFile defines a file-based Source from a CSV or GZIP compressed CSV path.
func FromFile(path string) Source {
return func() (io.ReadCloser, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
ext := strings.ToLower(filepath.Ext(path))
if ext == ".gz" || ext == ".gzip" {
return gzip.NewReader(f)
}
return f, nil
}
}
var (
httpClient *http.Client
once sync.Once
)
const remoteFetchTimeoutSecs = 15
// Remote fetches the source schema from a remote URL.
func Remote(url string) Source {
return func() (io.ReadCloser, error) {
once.Do(func() {
httpClient = &http.Client{
Timeout: remoteFetchTimeoutSecs * time.Second,
}
})
resp, err := httpClient.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
return stringReadCloser(string(body)), err
}
}
// FromString defines a string-based source.
func FromString(str string) Source {
return func() (io.ReadCloser, error) {
return stringReadCloser(str), nil
}
}
func stringReadCloser(s string) io.ReadCloser {
return ioutil.NopCloser(strings.NewReader(s))
}
func errorSource() Source {
return func() (io.ReadCloser, error) {
return nil, fmt.Errorf("error source")
}
}
// LoadHeaders uses the first line of the CSV as table headers.
// The header line will be skipped during iteration
func LoadHeaders() CreationOpts {
return func(reader *Table) error {
reader.skipHeaders = false
iter, err := reader.Iter()
if err != nil {
return err
}
if iter.Next() {
reader.headers = iter.Row()
}
reader.skipHeaders = true
return nil
}
}
// SetHeaders sets the table headers.
func SetHeaders(headers ...string) CreationOpts {
return func(reader *Table) error {
reader.headers = headers
return nil
}
}
// Delimiter specifies the character sequence which should separate fields (aka columns).
func Delimiter(d rune) CreationOpts {
return func(t *Table) error {
t.dialect.delimiter = d
return nil
}
}
// ConsiderInitialSpace configures the CSV parser to treat the whitespace immediately after a delimiter as part of the following field.
func ConsiderInitialSpace() CreationOpts {
return func(t *Table) error {
t.dialect.skipInitialSpace = false
return nil
}
}
func errorOpts(headers ...string) CreationOpts {
return func(_ *Table) error {
return fmt.Errorf("error opts")
}
}
// NewWriter creates a writer which appends records to a CSV raw file.
//
// As returned by NewWriter, a csv.Writer writes records terminated by a
// newline and uses ',' as the field delimiter. The exported fields can be
// changed to customize the details before the first call to Write or WriteAll.
//
// Comma is the field delimiter.
//
// If UseCRLF is true, the csv.Writer ends each record with \r\n instead of \n.
func NewWriter(w io.Writer) *csv.Writer {
return csv.NewWriter(w)
}