/
common.go
229 lines (210 loc) · 6.89 KB
/
common.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package csv reads CSV files and presents the extracted data as records, also
// writes data as record into CSV files
package csv
import (
"errors"
"fmt"
"github.com/apache/arrow/go/v11/arrow"
"github.com/apache/arrow/go/v11/arrow/memory"
)
var (
ErrMismatchFields = errors.New("arrow/csv: number of records mismatch")
)
// Option configures a CSV reader/writer.
type Option func(config)
type config interface{}
// WithComma specifies the fields separation character used while parsing CSV files.
func WithComma(c rune) Option {
return func(cfg config) {
switch cfg := cfg.(type) {
case *Reader:
cfg.r.Comma = c
case *Writer:
cfg.w.Comma = c
default:
panic(fmt.Errorf("arrow/csv: unknown config type %T", cfg))
}
}
}
// WithComment specifies the comment character used while parsing CSV files.
func WithComment(c rune) Option {
return func(cfg config) {
switch cfg := cfg.(type) {
case *Reader:
cfg.r.Comment = c
default:
panic(fmt.Errorf("arrow/csv: unknown config type %T", cfg))
}
}
}
// WithAllocator specifies the Arrow memory allocator used while building records.
func WithAllocator(mem memory.Allocator) Option {
return func(cfg config) {
switch cfg := cfg.(type) {
case *Reader:
cfg.mem = mem
default:
panic(fmt.Errorf("arrow/csv: unknown config type %T", cfg))
}
}
}
// WithChunk specifies the chunk size used while parsing CSV files.
//
// If n is zero or 1, no chunking will take place and the reader will create
// one record per row.
// If n is greater than 1, chunks of n rows will be read.
// If n is negative, the reader will load the whole CSV file into memory and
// create one big record with all the rows.
func WithChunk(n int) Option {
return func(cfg config) {
switch cfg := cfg.(type) {
case *Reader:
cfg.chunk = n
default:
panic(fmt.Errorf("arrow/csv: unknown config type %T", cfg))
}
}
}
// WithCRLF specifies the line terminator used while writing CSV files.
// If useCRLF is true, \r\n is used as the line terminator, otherwise \n is used.
// The default value is false.
func WithCRLF(useCRLF bool) Option {
return func(cfg config) {
switch cfg := cfg.(type) {
case *Writer:
cfg.w.UseCRLF = useCRLF
default:
panic(fmt.Errorf("arrow/csv: unknown config type %T", cfg))
}
}
}
// WithHeader enables or disables CSV-header handling.
func WithHeader(useHeader bool) Option {
return func(cfg config) {
switch cfg := cfg.(type) {
case *Reader:
cfg.header = useHeader
case *Writer:
cfg.header = useHeader
default:
panic(fmt.Errorf("arrow/csv: unknown config type %T", cfg))
}
}
}
// DefaultNullValues is the set of values considered as NULL values by default
// when Reader is configured to handle NULL values.
var DefaultNullValues = []string{"", "NULL", "null"}
// WithNullReader sets options for a CSV Reader pertaining to NULL value
// handling. If stringsCanBeNull is true, then a string that matches one of the
// nullValues set will be interpreted as NULL. Numeric columns will be checked
// for nulls in all cases. If no nullValues arguments are passed in, the
// defaults set in NewReader() will be kept.
//
// When no NULL values is given, the default set is taken from DefaultNullValues.
func WithNullReader(stringsCanBeNull bool, nullValues ...string) Option {
return func(cfg config) {
switch cfg := cfg.(type) {
case *Reader:
cfg.stringsCanBeNull = stringsCanBeNull
if len(nullValues) == 0 {
nullValues = DefaultNullValues
}
cfg.nulls = make([]string, len(nullValues))
copy(cfg.nulls, nullValues)
default:
panic(fmt.Errorf("arrow/csv: unknown config type %T", cfg))
}
}
}
// WithNullWriter sets the null string written for NULL values. The default is
// set in NewWriter().
func WithNullWriter(null string) Option {
return func(cfg config) {
switch cfg := cfg.(type) {
case *Writer:
cfg.nullValue = null
default:
panic(fmt.Errorf("arrow/csv: unknown config type %T", cfg))
}
}
}
// WithBoolWriter override the default bool formatter with a function that returns
// a string representaton of bool states. i.e. True, False, 1, 0
func WithBoolWriter(fmtr func(bool) string) Option {
return func(cfg config) {
switch cfg := cfg.(type) {
case *Writer:
if fmtr != nil {
cfg.boolFormatter = fmtr
}
default:
panic(fmt.Errorf("arrow/csv: WithBoolWriter unknown config type %T", cfg))
}
}
}
// WithColumnTypes allows specifying optional per-column types (disabling
// type inference on those columns).
//
// Will panic if used in conjunction with an explicit schema.
func WithColumnTypes(types map[string]arrow.DataType) Option {
return func(cfg config) {
switch cfg := cfg.(type) {
case *Reader:
if cfg.schema != nil {
panic(fmt.Errorf("%w: cannot use WithColumnTypes with explicit schema", arrow.ErrInvalid))
}
cfg.columnTypes = types
default:
panic(fmt.Errorf("%w: WithColumnTypes only allowed for csv reader", arrow.ErrInvalid))
}
}
}
// WithIncludeColumns indicates the names of the columns from the CSV file
// that should actually be read and converted (in the slice's order).
// If set and non-empty, columns not in this slice will be ignored.
//
// Will panic if used in conjunction with an explicit schema.
func WithIncludeColumns(cols []string) Option {
return func(cfg config) {
switch cfg := cfg.(type) {
case *Reader:
if cfg.schema != nil {
panic(fmt.Errorf("%w: cannot use WithIncludeColumns with explicit schema", arrow.ErrInvalid))
}
cfg.columnFilter = cols
default:
panic(fmt.Errorf("%w: WithIncludeColumns only allowed on csv Reader", arrow.ErrInvalid))
}
}
}
func validate(schema *arrow.Schema) {
for i, f := range schema.Fields() {
switch ft := f.Type.(type) {
case *arrow.BooleanType:
case *arrow.Int8Type, *arrow.Int16Type, *arrow.Int32Type, *arrow.Int64Type:
case *arrow.Uint8Type, *arrow.Uint16Type, *arrow.Uint32Type, *arrow.Uint64Type:
case *arrow.Float32Type, *arrow.Float64Type:
case *arrow.StringType:
case *arrow.TimestampType:
case *arrow.Date32Type, *arrow.Date64Type:
case *arrow.Decimal128Type, *arrow.Decimal256Type:
default:
panic(fmt.Errorf("arrow/csv: field %d (%s) has invalid data type %T", i, f.Name, ft))
}
}
}