-
Notifications
You must be signed in to change notification settings - Fork 0
/
tabler.go
122 lines (106 loc) · 2.21 KB
/
tabler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package tabler
import (
"database/sql"
"log"
"os"
"sync"
"tabler/input"
"tabler/rowmessage"
"tabler/tabledef"
"tabler/tableshaper"
)
type MessageInput interface {
ReadMsg() (rowmessage.RowMessage, error)
Init() error
Close() error
}
type Tabler struct {
mutex *sync.Mutex
messageInput MessageInput
db *sql.DB
}
func NewTabler() *Tabler {
return &Tabler{mutex: &sync.Mutex{}}
}
func (t *Tabler) Init(listenSocket string, inputFormat string) error {
var err error
t.mutex.Lock()
if listenSocket == "" {
t.messageInput, err = input.NewFileInput(os.Stdin, inputFormat)
} else {
t.messageInput, err = input.NewTCPInput(listenSocket, inputFormat)
}
if err == nil {
err = t.messageInput.Init()
}
t.mutex.Unlock()
if err != nil {
return err
}
return nil
}
func (t *Tabler) GenerateTables() ([]byte, error) {
tableShaper := tableshaper.NewTableShaper()
for {
row, err := t.messageInput.ReadMsg()
if err != nil {
if err == rowmessage.EndOfInput {
break
}
log.Printf("Tabler.GenerateTables: error=%s", err)
return nil, err
}
err = tableShaper.Add(row)
if err != nil {
log.Printf("Tabler.GenerateTables: error=%v row=%v", err, row)
}
}
return tableShaper.GetTablesJSON()
}
func (t *Tabler) WriteRows(tablesPath string, dsn string) error {
tables, err := tabledef.ReadTablesJSON(tablesPath)
if err != nil {
log.Printf("Tabler.WriteRows: path=%s error=%s", tablesPath, err)
return err
}
log.Printf("Tabler.WriteRows: tables=%v\n", tables)
var driverName string
t.mutex.Lock()
t.db, driverName, err = ConnectDB(dsn)
t.mutex.Unlock()
if err != nil {
return err
}
tabledef.SetSQL(tables, driverName)
err = CreateTables(t.db, tables)
if err != nil {
return err
}
// count := 0
for {
row, err := t.messageInput.ReadMsg()
if err != nil {
if err == rowmessage.EndOfInput {
break
}
log.Printf("Tabler.WriteRows: error=%s", err)
return err
}
err = Insert(t.db, tables, row)
if err != nil {
log.Printf("Tabler.WriteRows: error=%v row=%v", err, row)
}
// count++
}
return nil
}
func (t *Tabler) Close() {
t.mutex.Lock()
if t.messageInput != nil {
t.messageInput.Close()
}
if t.db != nil {
t.db.Close()
}
t.mutex.Unlock()
}