forked from pingcap/tidb-tools
/
meta.go
126 lines (107 loc) · 3.26 KB
/
meta.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"bytes"
"context"
"fmt"
"os"
"path"
"time"
"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
"github.com/pingcap/log"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb-tools/pkg/utils"
"github.com/siddontang/go/ioutil2"
"go.uber.org/zap"
)
const physicalShiftBits = 18
const slowDist = 30 * time.Millisecond
// generateMeta generates Meta from pd
func generateMetaInfo(cfg *Config) error {
if err := os.MkdirAll(cfg.DataDir, 0700); err != nil {
return errors.Trace(err)
}
// get newest ts from pd
commitTS, err := GetTSO(cfg)
if err != nil {
log.Error("get tso failed", zap.Error(err))
return errors.Trace(err)
}
// generate meta file
metaFileName := path.Join(cfg.DataDir, "savepoint")
err = saveMeta(metaFileName, commitTS, cfg.TimeZone)
return errors.Trace(err)
}
// GetTSO gets ts from pd
func GetTSO(cfg *Config) (int64, error) {
now := time.Now()
ectdEndpoints, err := utils.ParseHostPortAddr(cfg.EtcdURLs)
if err != nil {
return 0, errors.Trace(err)
}
pdCli, err := pd.NewClient(ectdEndpoints, pd.SecurityOption{
CAPath: cfg.SSLCA,
CertPath: cfg.SSLCert,
KeyPath: cfg.SSLKey,
})
physical, logical, err := pdCli.GetTS(context.Background())
if err != nil {
return 0, errors.Trace(err)
}
dist := time.Since(now)
if dist > slowDist {
log.Warn("get timestamp too slow", zap.Duration("dist", dist))
}
return int64(composeTS(physical, logical)), nil
}
func composeTS(physical, logical int64) uint64 {
return uint64((physical << physicalShiftBits) + logical)
}
// Meta contains commit TS that can be used to specifies the location of the synchronized data
// TODO: improve meta later, like adding offset of kafka topic that corresponds to each pump node
type Meta struct {
CommitTS int64 `toml:"commitTS" json:"commitTS"`
}
// String returns the string of Meta
func (m *Meta) String() string {
return fmt.Sprintf("commitTS: %d", m.CommitTS)
}
// saveMeta saves current tso in meta file.
func saveMeta(metaFileName string, ts int64, timeZone string) error {
meta := &Meta{CommitTS: ts}
var buf bytes.Buffer
e := toml.NewEncoder(&buf)
err := e.Encode(meta)
if err != nil {
return errors.Annotatef(err, "save meta %+v into %s", meta, metaFileName)
}
if timeZone != "" {
t := utils.TSOToRoughTime(ts)
location, err1 := time.LoadLocation(timeZone)
if err1 != nil {
log.Warn("fail to load location", zap.String("time zone", timeZone), zap.Error(err1))
} else {
buf.WriteString(t.UTC().String())
buf.WriteByte('\n')
buf.WriteString(t.In(location).String())
}
}
err = ioutil2.WriteFileAtomic(metaFileName, buf.Bytes(), 0644)
if err != nil {
return errors.Annotatef(err, "save meta %+v into %s", meta, metaFileName)
}
log.Info("save meta", zap.Stringer("meta", meta))
return nil
}