This repository has been archived by the owner on Dec 29, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 34
/
pgutils.go
98 lines (85 loc) · 1.85 KB
/
pgutils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package pgutils
import (
"bytes"
"database/sql"
"sync"
"github.com/jackc/pgx"
"github.com/mkabilov/pg2ch/pkg/config"
"github.com/mkabilov/pg2ch/pkg/utils"
)
const (
// OutputPlugin contains logical decoder plugin name
OutputPlugin = "pgoutput"
)
var bytesBufPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer([]byte{})
}}
//IstoreToArrays writes 2 tab-delimited ClickHouse arrays from the postgres istore
func IstoreToArrays(w utils.Writer, str []byte) error {
valuesBuf := bytesBufPool.Get().(*bytes.Buffer)
defer func() {
valuesBuf.Reset()
bytesBufPool.Put(valuesBuf)
}()
if err := w.WriteByte('['); err != nil {
return err
}
valuesBuf.WriteByte('[')
first := true
counter := 0
isKey := false
for _, c := range str {
switch c {
case '"':
if counter%2 == 0 {
isKey = !isKey
if !first && isKey {
if err := w.WriteByte(','); err != nil {
return err
}
valuesBuf.WriteByte(',')
}
}
counter++
if counter == 4 {
first = false
}
case '=':
case '>':
case ' ':
case ',':
default:
if isKey {
if err := w.WriteByte(c); err != nil {
return err
}
} else {
valuesBuf.WriteByte(c)
}
}
}
if err := w.WriteByte(']'); err != nil {
return err
}
if err := w.WriteByte('\t'); err != nil {
return err
}
if _, err := w.Write(valuesBuf.Bytes()); err != nil {
return err
}
if err := w.WriteByte(']'); err != nil {
return err
}
return nil
}
// PgStatLiveTuples returns approximate number of rows for the table
func PgStatLiveTuples(pgTx *pgx.Tx, name config.PgTableName) (uint64, error) {
var rows sql.NullInt64
err := pgTx.QueryRow("select n_live_tup from pg_stat_all_tables where schemaname = $1 and relname = $2",
name.SchemaName, name.TableName).Scan(&rows)
if err != nil || !rows.Valid {
return 0, err
}
return uint64(rows.Int64), nil
}