Skip to content

Commit

Permalink
Serialize Compact; Run SwitchToImportMode periodically; Log progress …
Browse files Browse the repository at this point in the history
…periodically (pingcap#92)

* *: run Compact, SwitchToImportMode and log a progress periodically

* restore: ensure FullCompact and Level1Compact are also serial

Fixed an invalid usage of metrics.

* tests: fixed integration test failure caused by pingcap#91

* config, restore: fixed comments

* config, restore: start compaction asap (but still in seequential order)
  • Loading branch information
kennytm committed Dec 7, 2018
1 parent 986ca51 commit 7ca4e96
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 16 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/pingcap/tidb-enterprise-tools v1.0.1-0.20181116033341-5832f7307d74
github.com/pkg/errors v0.8.0
github.com/prometheus/client_golang v0.9.0
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910
github.com/satori/go.uuid v1.2.0
github.com/siddontang/go v0.0.0-20170517070808-cb568a3e5cc0 // indirect
github.com/sirupsen/logrus v1.2.0
Expand Down
27 changes: 27 additions & 0 deletions lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io/ioutil"
"runtime"
"time"

"github.com/BurntSushi/toml"
"github.com/pingcap/tidb-enterprise-tools/pkg/filter"
Expand Down Expand Up @@ -49,6 +50,7 @@ type Config struct {
BWList *filter.Rules `toml:"black-white-list" json:"black-white-list"`
TikvImporter TikvImporter `toml:"tikv-importer" json:"tikv-importer"`
PostRestore PostRestore `toml:"post-restore" json:"post-restore"`
Cron Cron `toml:"cron" json:"cron"`

// command line flags
ConfigFile string `json:"config-file"`
Expand Down Expand Up @@ -101,6 +103,27 @@ type Checkpoint struct {
KeepAfterSuccess bool `toml:"keep-after-success" json:"keep-after-success"`
}

type Cron struct {
SwitchMode Duration `toml:"switch-mode" json:"switch-mode"`
LogProgress Duration `toml:"log-progress" json:"log-progress"`
}

// A duration which can be deserialized from a TOML string.
// Implemented as https://github.com/BurntSushi/toml#using-the-encodingtextunmarshaler-interface
type Duration struct {
time.Duration
}

func (d *Duration) UnmarshalText(text []byte) error {
var err error
d.Duration, err = time.ParseDuration(string(text))
return err
}

func (d *Duration) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf(`"%s"`, d.Duration)), nil
}

func NewConfig() *Config {
return &Config{
App: Lightning{
Expand All @@ -115,6 +138,10 @@ func NewConfig() *Config {
IndexSerialScanConcurrency: 20,
ChecksumTableConcurrency: 16,
},
Cron: Cron{
SwitchMode: Duration{Duration: 5 * time.Minute},
LogProgress: Duration{Duration: 5 * time.Minute},
},
}
}

Expand Down
21 changes: 21 additions & 0 deletions lightning/metric/metric.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package metric

import (
"math"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)

var (
Expand Down Expand Up @@ -157,3 +160,21 @@ func RecordTableCount(status string, err error) {
}
TableCounter.WithLabelValues(status, result).Inc()
}

// ReadCounter reports the current value of the counter.
func ReadCounter(counter prometheus.Counter) float64 {
var metric dto.Metric
if err := counter.Write(&metric); err != nil {
return math.NaN()
}
return metric.Counter.GetValue()
}

// ReadCounter reports the sum of all observed values in the histogram.
func ReadHistogramSum(histogram prometheus.Histogram) float64 {
var metric dto.Metric
if err := histogram.Write(&metric); err != nil {
return math.NaN()
}
return metric.Histogram.GetSampleSum()
}
35 changes: 35 additions & 0 deletions lightning/metric/metric_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package metric_test

import (
"testing"

. "github.com/pingcap/check"

"github.com/pingcap/tidb-lightning/lightning/metric"
"github.com/prometheus/client_golang/prometheus"
)

type testMetricSuite struct{}

func (s *testMetricSuite) SetUpSuite(c *C) {}
func (s *testMetricSuite) TearDownSuite(c *C) {}

var _ = Suite(&testMetricSuite{})

func TestMetric(t *testing.T) {
TestingT(t)
}

func (s *testMetricSuite) TestReadCounter(c *C) {
counter := prometheus.NewCounter(prometheus.CounterOpts{})
counter.Add(1256.0)
counter.Add(2214.0)
c.Assert(metric.ReadCounter(counter), Equals, 3470.0)
}

func (s *testMetricSuite) TestReadHistogramSum(c *C) {
histogram := prometheus.NewHistogram(prometheus.HistogramOpts{})
histogram.Observe(11131.5)
histogram.Observe(15261.0)
c.Assert(metric.ReadHistogramSum(histogram), Equals, 26392.5)
}
107 changes: 93 additions & 14 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ const (
defaultGCLifeTime = 100 * time.Hour
)

const (
compactStateIdle int32 = iota
compactStateDoing
)

var (
requiredTiDBVersion = *semver.New("2.0.4")
requiredPDVersion = *semver.New("2.0.4")
Expand Down Expand Up @@ -95,6 +100,7 @@ type RestoreController struct {
tidbMgr *TiDBManager
postProcessLock sync.Mutex // a simple way to ensure post-processing is not concurrent without using complicated goroutines
alterTableLock sync.Mutex
compactState int32

errorSummaries errorSummaries

Expand Down Expand Up @@ -177,7 +183,6 @@ func (rc *RestoreController) Run(ctx context.Context) error {
timer := time.Now()
opts := []func(context.Context) error{
rc.checkRequirements,
rc.switchToImportMode,
rc.restoreSchema,
rc.restoreTables,
rc.fullCompact,
Expand All @@ -196,7 +201,7 @@ outside:
err = nil
break outside
default:
common.AppLogger.Errorf("run cause error : %s", errors.ErrorStack(err))
common.AppLogger.Errorf("run cause error : %v", err)
fmt.Fprintf(os.Stderr, "Error: %s\n", err)
break outside // ps : not continue
}
Expand Down Expand Up @@ -263,7 +268,7 @@ func (rc *RestoreController) estimateChunkCountIntoMetrics() {
}
}
}
metric.ChunkCounter.WithLabelValues("estimated").Add(float64(estimatedChunkCount))
metric.ChunkCounter.WithLabelValues(metric.ChunkStateEstimated).Add(float64(estimatedChunkCount))
}

func (rc *RestoreController) saveStatusCheckpoint(tableName string, err error, statusIfSucceed CheckpointStatus) {
Expand Down Expand Up @@ -322,6 +327,60 @@ func (rc *RestoreController) listenCheckpointUpdates(wg *sync.WaitGroup) {
}
}

func (rc *RestoreController) runPeriodicActions(ctx context.Context, stop <-chan struct{}) {
switchModeTicker := time.NewTicker(rc.cfg.Cron.SwitchMode.Duration)
logProgressTicker := time.NewTicker(rc.cfg.Cron.LogProgress.Duration)
defer func() {
switchModeTicker.Stop()
logProgressTicker.Stop()
}()

rc.switchToImportMode(ctx)

start := time.Now()

for {
select {
case <-ctx.Done():
common.AppLogger.Warnf("Stopping periodic actions due to %v", ctx.Err())
return
case <-stop:
common.AppLogger.Info("Everything imported, stopping periodic actions")
return

case <-switchModeTicker.C:
// periodically switch to import mode, as requested by TiKV 3.0
rc.switchToImportMode(ctx)

case <-logProgressTicker.C:
// log the current progress periodically, so OPS will know that we're still working
nanoseconds := float64(time.Since(start).Nanoseconds())
estimated := metric.ReadCounter(metric.ChunkCounter.WithLabelValues(metric.ChunkStateEstimated))
finished := metric.ReadCounter(metric.ChunkCounter.WithLabelValues(metric.ChunkStateFinished))
totalTables := metric.ReadCounter(metric.TableCounter.WithLabelValues(metric.TableStatePending, metric.TableResultSuccess))
completedTables := metric.ReadCounter(metric.TableCounter.WithLabelValues(metric.TableStateCompleted, metric.TableResultSuccess))
bytesRead := metric.ReadHistogramSum(metric.BlockReadBytesHistogram)

var remaining string
if finished >= estimated {
remaining = ", post-processing"
} else if finished > 0 {
remainNanoseconds := (estimated/finished - 1) * nanoseconds
remaining = fmt.Sprintf(", remaining %s", time.Duration(remainNanoseconds).Round(time.Second))
}

// Note: a speed of 28 MiB/s roughly corresponds to 100 GiB/hour.
common.AppLogger.Infof(
"progress: %.0f/%.0f chunks (%.1f%%), %.0f/%.0f tables (%.1f%%), speed %.2f MiB/s%s",
finished, estimated, finished/estimated*100,
completedTables, totalTables, completedTables/totalTables*100,
bytesRead/(1048576e-9*nanoseconds),
remaining,
)
}
}
}

func (rc *RestoreController) restoreTables(ctx context.Context) error {
timer := time.Now()
var wg sync.WaitGroup
Expand All @@ -331,6 +390,9 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error {
restoreErr error
)

stopPeriodicActions := make(chan struct{}, 1)
go rc.runPeriodicActions(ctx, stopPeriodicActions)

for dbName, dbMeta := range rc.dbMetas {
dbInfo, ok := rc.dbInfos[dbName]
if !ok {
Expand Down Expand Up @@ -393,6 +455,7 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error {
}

wg.Wait()
stopPeriodicActions <- struct{}{}
common.AppLogger.Infof("restore all tables data takes %v", time.Since(timer))

restoreErrLock.Lock()
Expand Down Expand Up @@ -538,11 +601,16 @@ func (t *TableRestore) postProcess(ctx context.Context, closedEngine *kv.ClosedE
return errors.Trace(err)
}

// 2. compact level 1
err = rc.doCompact(ctx, Level1Compact)
if err != nil {
// log it and continue
common.AppLogger.Warnf("[%s] do compact %d failed err %v", t.tableName, Level1Compact, errors.ErrorStack(err))
// 2. perform a level-1 compact if idling.
if atomic.CompareAndSwapInt32(&rc.compactState, compactStateIdle, compactStateDoing) {
go func() {
err := rc.doCompact(ctx, Level1Compact)
if err != nil {
// log it and continue
common.AppLogger.Warnf("compact %d failed %v", Level1Compact, err)
}
atomic.StoreInt32(&rc.compactState, compactStateIdle)
}()
}
}

Expand Down Expand Up @@ -570,7 +638,7 @@ func (t *TableRestore) postProcess(ctx context.Context, closedEngine *kv.ClosedE
rc.saveStatusCheckpoint(t.tableName, nil, CheckpointStatusChecksumSkipped)
} else {
err := t.compareChecksum(ctx, rc.tidbMgr.db, cp)
rc.saveStatusCheckpoint(t.tableName, nil, CheckpointStatusChecksummed)
rc.saveStatusCheckpoint(t.tableName, err, CheckpointStatusChecksummed)
if err != nil {
common.AppLogger.Errorf("[%s] checksum failed: %v", t.tableName, err.Error())
return errors.Trace(err)
Expand Down Expand Up @@ -603,23 +671,34 @@ func (rc *RestoreController) fullCompact(ctx context.Context) error {
return nil
}

// wait until any existing level-1 compact to complete first.
common.AppLogger.Info("Wait for existing level 1 compaction to finish")
start := time.Now()
for !atomic.CompareAndSwapInt32(&rc.compactState, compactStateIdle, compactStateDoing) {
time.Sleep(100 * time.Millisecond)
}
common.AppLogger.Infof("Wait for existing level 1 compaction to finish takes %v", time.Since(start))

return errors.Trace(rc.doCompact(ctx, FullLevelCompact))
}

func (rc *RestoreController) doCompact(ctx context.Context, level int32) error {
return errors.Trace(rc.importer.Compact(ctx, level))
}

func (rc *RestoreController) switchToImportMode(ctx context.Context) error {
return errors.Trace(rc.switchTiKVMode(ctx, sstpb.SwitchMode_Import))
func (rc *RestoreController) switchToImportMode(ctx context.Context) {
rc.switchTiKVMode(ctx, sstpb.SwitchMode_Import)
}

func (rc *RestoreController) switchToNormalMode(ctx context.Context) error {
return errors.Trace(rc.switchTiKVMode(ctx, sstpb.SwitchMode_Normal))
rc.switchTiKVMode(ctx, sstpb.SwitchMode_Normal)
return nil
}

func (rc *RestoreController) switchTiKVMode(ctx context.Context, mode sstpb.SwitchMode) error {
return errors.Trace(rc.importer.SwitchMode(ctx, mode))
func (rc *RestoreController) switchTiKVMode(ctx context.Context, mode sstpb.SwitchMode) {
if err := rc.importer.SwitchMode(ctx, mode); err != nil {
common.AppLogger.Warnf("cannot switch to %s mode: %v", mode.String(), err)
}
}

func (rc *RestoreController) checkRequirements(_ context.Context) error {
Expand Down
2 changes: 1 addition & 1 deletion tests/checkpoint/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,5 @@ echo "******** Verify checkpoint no-op ********"
PIDFILE="$PIDFILE" run_lightning
run_sql "$PARTIAL_IMPORT_QUERY"
check_contains "s: $(( (1000 * $CHUNK_COUNT + 1001) * $CHUNK_COUNT * $TABLE_COUNT ))"
run_sql "SELECT count(*) FROM tidb_lightning_checkpoint_test_cppk.table_v1 WHERE status = 210"
run_sql "SELECT count(*) FROM tidb_lightning_checkpoint_test_cppk.table_v1 WHERE status >= 200"
check_contains "count(*): $TABLE_COUNT"
2 changes: 1 addition & 1 deletion tests/checkpoint_chunks/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ PIDFILE="$PIDFILE" run_lightning
run_sql 'SELECT count(i), sum(i) FROM cpch_tsr.tbl;'
check_contains "count(i): $(($ROW_COUNT*$CHUNK_COUNT))"
check_contains "sum(i): $(( $ROW_COUNT*$CHUNK_COUNT*(($CHUNK_COUNT+2)*$ROW_COUNT + 1)/2 ))"
run_sql "SELECT count(*) FROM tidb_lightning_checkpoint_test_cpch.table_v1 WHERE status = 210"
run_sql "SELECT count(*) FROM tidb_lightning_checkpoint_test_cpch.table_v1 WHERE status >= 200"
check_contains "count(*): 1"
run_sql "SELECT count(*) FROM tidb_lightning_checkpoint_test_cpch.chunk_v3 WHERE pos = end_offset"
check_contains "count(*): $CHUNK_COUNT"
Expand Down
8 changes: 8 additions & 0 deletions tidb-lightning.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,11 @@ checksum = true
compact = true
# if set true, analyze will do ANALYZE TABLE <table> for each table.
analyze = true

# cron performs some periodic actions in background
[cron]
# duration between which Lightning will automatically refresh the import mode status.
# should be shorter than the corresponding TiKV setting
switch-mode = "5m"
# the duration which the an import progress will be printed to the log.
log-progress = "5m"

0 comments on commit 7ca4e96

Please sign in to comment.