/
csvql.go
133 lines (110 loc) · 2.8 KB
/
csvql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// Package csvql offers an adaptor from CSV files to SQL databases.
package csvql
import (
"encoding/csv"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"gopkg.in/src-d/go-mysql-server.v0/sql"
)
type database struct {
path string
tables map[string]sql.Table
}
// NewDatabase returns a database containing a table per CSV file in the given folder.
func NewDatabase(dir string) (sql.Database, error) {
fis, err := ioutil.ReadDir(dir)
if err != nil {
return nil, fmt.Errorf("could not read directory %s: %v", dir, err)
}
tables := make(map[string]sql.Table)
for _, fi := range fis {
name := fi.Name()
if filepath.Ext(name) != ".csv" {
continue
}
t, err := NewTable(filepath.Join(dir, name))
if err != nil {
return nil, err
}
tables[t.Name()] = t
}
return &database{path: dir, tables: tables}, nil
}
func (db *database) Name() string { return db.path }
func (db *database) Tables() map[string]sql.Table { return db.tables }
// NewTable returns a table containing the rows in the given CSV file.
func NewTable(path string) (sql.Table, error) {
name := strings.TrimSuffix(filepath.Base(path), ".csv")
t := &table{name: name, path: path}
f, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("could not open %s: %v", path, err)
}
defer f.Close()
cr := csv.NewReader(f)
cols, err := cr.Read()
if err != nil {
return nil, err
}
for _, col := range cols {
t.schema = append(t.schema, &sql.Column{
Name: strings.ToLower(strings.TrimSpace(col)),
Type: sql.Text,
Source: name,
})
}
return t, nil
}
type table struct {
name string
path string
schema []*sql.Column
}
func (t *table) Name() string { return t.name }
func (t *table) String() string { return t.path }
func (t *table) Schema() sql.Schema { return t.schema }
func (t *table) Partitions(ctx *sql.Context) (sql.PartitionIter, error) {
return &partitionIter{}, nil
}
func (t *table) PartitionRows(ctx *sql.Context, p sql.Partition) (sql.RowIter, error) {
return newRowIter(t.path)
}
type partitionIter struct{ done bool }
func (p *partitionIter) Close() error { return nil }
func (p *partitionIter) Next() (sql.Partition, error) {
if p.done {
return nil, io.EOF
}
p.done = true
return partition{}, nil
}
type partition struct{}
func (p partition) Key() []byte { return []byte("key") }
func newRowIter(path string) (sql.RowIter, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
r := csv.NewReader(f)
r.Read() // skip titles
return &rowIter{f, r}, nil
}
type rowIter struct {
io.Closer
*csv.Reader
}
func (r *rowIter) Next() (sql.Row, error) {
cols, err := r.Read()
if err != nil {
return nil, err
}
args := make([]interface{}, len(cols))
for i, col := range cols {
args[i] = strings.TrimSpace(col)
}
return sql.NewRow(args...), err
}