Skip to content

Commit

Permalink
*: Set collation to uint16
Browse files Browse the repository at this point in the history
  • Loading branch information
dveeden committed Apr 29, 2024
1 parent c60f97d commit 2555083
Show file tree
Hide file tree
Showing 237 changed files with 1,577 additions and 1,579 deletions.
24 changes: 12 additions & 12 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,13 +966,13 @@ func (bc *Client) FindTargetPeer(ctx context.Context, key []byte, isRawKv bool,
var leader *metapb.Peer
key = codec.EncodeBytesExt([]byte{}, key, isRawKv)
state := utils.InitialRetryState(60, 100*time.Millisecond, 2*time.Second)
failpoint.Inject("retry-state-on-find-target-peer", func(v failpoint.Value) {
if v, _err_ := failpoint.Eval(_curpkg_("retry-state-on-find-target-peer")); _err_ == nil {
logutil.CL(ctx).Info("reset state for FindTargetPeer")
state = utils.InitialRetryState(v.(int), 100*time.Millisecond, 100*time.Millisecond)
})
}
err := utils.WithRetry(ctx, func() error {
region, err := bc.mgr.GetPDClient().GetRegion(ctx, key)
failpoint.Inject("return-region-on-find-target-peer", func(v failpoint.Value) {
if v, _err_ := failpoint.Eval(_curpkg_("return-region-on-find-target-peer")); _err_ == nil {
switch v.(string) {
case "nil":
{
Expand Down Expand Up @@ -1017,7 +1017,7 @@ func (bc *Client) FindTargetPeer(ctx context.Context, key []byte, isRawKv bool,
}
}
}
})
}
if err != nil || region == nil {
logutil.CL(ctx).Error("find region failed", zap.Error(err), zap.Reflect("region", region))
return errors.Annotate(berrors.ErrPDLeaderNotFound, "cannot find region from pd client")
Expand Down Expand Up @@ -1067,7 +1067,7 @@ func (bc *Client) fineGrainedBackup(
ctx = opentracing.ContextWithSpan(ctx, span1)
}

failpoint.Inject("hint-fine-grained-backup", func(v failpoint.Value) {
if v, _err_ := failpoint.Eval(_curpkg_("hint-fine-grained-backup")); _err_ == nil {
log.Info("failpoint hint-fine-grained-backup injected, "+
"process will sleep for 3s and notify the shell.", zap.String("file", v.(string)))
if sigFile, ok := v.(string); ok {
Expand All @@ -1080,7 +1080,7 @@ func (bc *Client) fineGrainedBackup(
}
time.Sleep(3 * time.Second)
}
})
}

bo := utils.AdaptTiKVBackoffer(ctx, backupFineGrainedMaxBackoff, berrors.ErrUnknown)
for {
Expand Down Expand Up @@ -1298,7 +1298,7 @@ func doSendBackup(
req backuppb.BackupRequest,
respFn func(*backuppb.BackupResponse) error,
) error {
failpoint.Inject("hint-backup-start", func(v failpoint.Value) {
if v, _err_ := failpoint.Eval(_curpkg_("hint-backup-start")); _err_ == nil {
logutil.CL(ctx).Info("failpoint hint-backup-start injected, " +
"process will notify the shell.")
if sigFile, ok := v.(string); ok {
Expand All @@ -1311,9 +1311,9 @@ func doSendBackup(
}
}
time.Sleep(3 * time.Second)
})
}
bCli, err := client.Backup(ctx, &req)
failpoint.Inject("reset-retryable-error", func(val failpoint.Value) {
if val, _err_ := failpoint.Eval(_curpkg_("reset-retryable-error")); _err_ == nil {
switch val.(string) {
case "Unavaiable":
{
Expand All @@ -1326,13 +1326,13 @@ func doSendBackup(
err = status.Error(codes.Internal, "Internal error")
}
}
})
failpoint.Inject("reset-not-retryable-error", func(val failpoint.Value) {
}
if val, _err_ := failpoint.Eval(_curpkg_("reset-not-retryable-error")); _err_ == nil {
if val.(bool) {
logutil.CL(ctx).Debug("failpoint reset-not-retryable-error injected.")
err = status.Error(codes.Unknown, "Your server was haunted hence doesn't work, meow :3")
}
})
}
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/backup/prepare_snap/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,9 +454,9 @@ func (p *Preparer) pushWaitApply(reqs pendingRequests, region Region) {
// PrepareConnections prepares the connections for each store.
// This will pause the admin commands for each store.
func (p *Preparer) PrepareConnections(ctx context.Context) error {
failpoint.Inject("PrepareConnectionsErr", func() {
failpoint.Return(errors.New("mock PrepareConnectionsErr"))
})
if _, _err_ := failpoint.Eval(_curpkg_("PrepareConnectionsErr")); _err_ == nil {
return errors.New("mock PrepareConnectionsErr")
}
log.Info("Preparing connections to stores.")
stores, err := p.env.GetAllLiveStores(ctx)
if err != nil {
Expand Down
22 changes: 11 additions & 11 deletions br/pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ func (push *pushDown) pushBackup(
}

// Push down backup tasks to all tikv instances.
failpoint.Inject("noop-backup", func(_ failpoint.Value) {
if _, _err_ := failpoint.Eval(_curpkg_("noop-backup")); _err_ == nil {
logutil.CL(ctx).Warn("skipping normal backup, jump to fine-grained backup, meow :3", logutil.Key("start-key", req.StartKey), logutil.Key("end-key", req.EndKey))
failpoint.Return(nil)
})
return nil
}

wg := new(sync.WaitGroup)
errContext := utils.NewErrorContext("pushBackup", 10)
Expand Down Expand Up @@ -128,28 +128,28 @@ func (push *pushDown) pushBackup(
// Finished.
return nil
}
failpoint.Inject("backup-timeout-error", func(val failpoint.Value) {
if val, _err_ := failpoint.Eval(_curpkg_("backup-timeout-error")); _err_ == nil {
msg := val.(string)
logutil.CL(ctx).Info("failpoint backup-timeout-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
Msg: msg,
}
})
failpoint.Inject("backup-storage-error", func(val failpoint.Value) {
}
if val, _err_ := failpoint.Eval(_curpkg_("backup-storage-error")); _err_ == nil {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint backup-storage-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
Msg: msg,
}
})
failpoint.Inject("tikv-rw-error", func(val failpoint.Value) {
}
if val, _err_ := failpoint.Eval(_curpkg_("tikv-rw-error")); _err_ == nil {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint tikv-rw-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
Msg: msg,
}
})
failpoint.Inject("tikv-region-error", func(val failpoint.Value) {
}
if val, _err_ := failpoint.Eval(_curpkg_("tikv-region-error")); _err_ == nil {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint tikv-region-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
Expand All @@ -160,7 +160,7 @@ func (push *pushDown) pushBackup(
},
},
}
})
}
if resp.GetError() == nil {
// None error means range has been backuped successfully.
if checkpointRunner != nil {
Expand Down
22 changes: 11 additions & 11 deletions br/pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func (r *CheckpointRunner[K, V]) startCheckpointMainLoop(
tickDurationForChecksum,
tickDurationForLock time.Duration,
) {
failpoint.Inject("checkpoint-more-quickly-flush", func(_ failpoint.Value) {
if _, _err_ := failpoint.Eval(_curpkg_("checkpoint-more-quickly-flush")); _err_ == nil {
tickDurationForChecksum = 1 * time.Second
tickDurationForFlush = 3 * time.Second
if tickDurationForLock > 0 {
Expand All @@ -402,7 +402,7 @@ func (r *CheckpointRunner[K, V]) startCheckpointMainLoop(
zap.Duration("checksum", tickDurationForChecksum),
zap.Duration("lock", tickDurationForLock),
)
})
}
r.wg.Add(1)
checkpointLoop := func(ctx context.Context) {
defer r.wg.Done()
Expand Down Expand Up @@ -506,9 +506,9 @@ func (r *CheckpointRunner[K, V]) doChecksumFlush(ctx context.Context, checksumIt
return errors.Annotatef(err, "failed to write file %s for checkpoint checksum", fname)
}

failpoint.Inject("failed-after-checkpoint-flushes-checksum", func(_ failpoint.Value) {
failpoint.Return(errors.Errorf("failpoint: failed after checkpoint flushes checksum"))
})
if _, _err_ := failpoint.Eval(_curpkg_("failed-after-checkpoint-flushes-checksum")); _err_ == nil {
return errors.Errorf("failpoint: failed after checkpoint flushes checksum")
}
return nil
}

Expand Down Expand Up @@ -570,9 +570,9 @@ func (r *CheckpointRunner[K, V]) doFlush(ctx context.Context, meta map[K]*RangeG
}
}

failpoint.Inject("failed-after-checkpoint-flushes", func(_ failpoint.Value) {
failpoint.Return(errors.Errorf("failpoint: failed after checkpoint flushes"))
})
if _, _err_ := failpoint.Eval(_curpkg_("failed-after-checkpoint-flushes")); _err_ == nil {
return errors.Errorf("failpoint: failed after checkpoint flushes")
}
return nil
}

Expand Down Expand Up @@ -663,9 +663,9 @@ func (r *CheckpointRunner[K, V]) updateLock(ctx context.Context) error {
return errors.Trace(err)
}

failpoint.Inject("failed-after-checkpoint-updates-lock", func(_ failpoint.Value) {
failpoint.Return(errors.Errorf("failpoint: failed after checkpoint updates lock"))
})
if _, _err_ := failpoint.Eval(_curpkg_("failed-after-checkpoint-updates-lock")); _err_ == nil {
return errors.Errorf("failpoint: failed after checkpoint updates lock")
}

return nil
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/checksum/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,12 +387,12 @@ func (exec *Executor) Execute(
vars.BackOffWeight = exec.backoffWeight
}
resp, err = sendChecksumRequest(ctx, client, req, vars)
failpoint.Inject("checksumRetryErr", func(val failpoint.Value) {
if val, _err_ := failpoint.Eval(_curpkg_("checksumRetryErr")); _err_ == nil {
// first time reach here. return error
if val.(bool) {
err = errors.New("inject checksum error")
}
})
}
if err != nil {
return errors.Trace(err)
}
Expand Down
8 changes: 4 additions & 4 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,23 +87,23 @@ func GetAllTiKVStoresWithRetry(ctx context.Context,
ctx,
func() error {
stores, err = util.GetAllTiKVStores(ctx, pdClient, storeBehavior)
failpoint.Inject("hint-GetAllTiKVStores-error", func(val failpoint.Value) {
if val, _err_ := failpoint.Eval(_curpkg_("hint-GetAllTiKVStores-error")); _err_ == nil {
logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-error injected.")
if val.(bool) {
err = status.Error(codes.Unknown, "Retryable error")
} else {
err = context.Canceled
}
})
}

failpoint.Inject("hint-GetAllTiKVStores-cancel", func(val failpoint.Value) {
if val, _err_ := failpoint.Eval(_curpkg_("hint-GetAllTiKVStores-cancel")); _err_ == nil {
logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-cancel injected.")
if val.(bool) {
err = status.Error(codes.Canceled, "Cancel Retry")
} else {
err = context.Canceled
}
})
}

return errors.Trace(err)
},
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,12 @@ func parseVersion(versionStr string) *semver.Version {
zap.String("version", versionStr), zap.Error(err))
version = &semver.Version{Major: 0, Minor: 0, Patch: 0}
}
failpoint.Inject("PDEnabledPauseConfig", func(val failpoint.Value) {
if val, _err_ := failpoint.Eval(_curpkg_("PDEnabledPauseConfig")); _err_ == nil {
if val.(bool) {
// test pause config is enable
version = &semver.Version{Major: 5, Minor: 0, Patch: 0}
}
})
}
return version
}

Expand Down
44 changes: 22 additions & 22 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,11 +735,11 @@ func (rc *Client) GetTSWithRetry(ctx context.Context) (uint64, error) {

err := utils.WithRetry(ctx, func() error {
startTS, getTSErr = rc.GetTS(ctx)
failpoint.Inject("get-ts-error", func(val failpoint.Value) {
if val, _err_ := failpoint.Eval(_curpkg_("get-ts-error")); _err_ == nil {
if val.(bool) && retry < 3 {
getTSErr = errors.Errorf("rpc error: code = Unknown desc = [PD:tso:ErrGenerateTimestamp]generate timestamp failed, requested pd is not leader of cluster")
}
})
}

retry++
if getTSErr != nil {
Expand Down Expand Up @@ -1139,11 +1139,11 @@ func (rc *Client) createTablesInWorkerPool(ctx context.Context, dom *domain.Doma
workers.ApplyWithIDInErrorGroup(eg, func(id uint64) error {
db := rc.dbPool[id%uint64(len(rc.dbPool))]
cts, err := rc.createTables(ectx, db, dom, tableSlice, newTS) // ddl job for [lastSent:i)
failpoint.Inject("restore-createtables-error", func(val failpoint.Value) {
if val, _err_ := failpoint.Eval(_curpkg_("restore-createtables-error")); _err_ == nil {
if val.(bool) {
err = errors.New("sample error without extra message")
}
})
}
if err != nil {
log.Error("create tables fail", zap.Error(err))
return err
Expand Down Expand Up @@ -2287,9 +2287,9 @@ func (rc *Client) getRuleID(tableID int64) string {

// IsFull returns whether this backup is full.
func (rc *Client) IsFull() bool {
failpoint.Inject("mock-incr-backup-data", func() {
failpoint.Return(false)
})
if _, _err_ := failpoint.Eval(_curpkg_("mock-incr-backup-data")); _err_ == nil {
return false
}
return !rc.IsIncremental()
}

Expand Down Expand Up @@ -3011,9 +3011,9 @@ func (rc *Client) RestoreMetaKVFiles(
filesInDefaultCF = SortMetaKVFiles(filesInDefaultCF)
filesInWriteCF = SortMetaKVFiles(filesInWriteCF)

failpoint.Inject("failed-before-id-maps-saved", func(_ failpoint.Value) {
failpoint.Return(errors.New("failpoint: failed before id maps saved"))
})
if _, _err_ := failpoint.Eval(_curpkg_("failed-before-id-maps-saved")); _err_ == nil {
return errors.New("failpoint: failed before id maps saved")
}

log.Info("start to restore meta files",
zap.Int("total files", len(files)),
Expand All @@ -3031,9 +3031,9 @@ func (rc *Client) RestoreMetaKVFiles(
return errors.Trace(err)
}
}
failpoint.Inject("failed-after-id-maps-saved", func(_ failpoint.Value) {
failpoint.Return(errors.New("failpoint: failed after id maps saved"))
})
if _, _err_ := failpoint.Eval(_curpkg_("failed-after-id-maps-saved")); _err_ == nil {
return errors.New("failpoint: failed after id maps saved")
}

// run the rewrite and restore meta-kv into TiKV cluster.
if err := rc.RestoreMetaKVFilesWithBatchMethod(
Expand Down Expand Up @@ -3276,18 +3276,18 @@ func (rc *Client) restoreMetaKvEntries(
log.Debug("after rewrite entry", zap.Int("new-key-len", len(newEntry.Key)),
zap.Int("new-value-len", len(entry.e.Value)), zap.ByteString("new-key", newEntry.Key))

failpoint.Inject("failed-to-restore-metakv", func(_ failpoint.Value) {
failpoint.Return(0, 0, errors.Errorf("failpoint: failed to restore metakv"))
})
if _, _err_ := failpoint.Eval(_curpkg_("failed-to-restore-metakv")); _err_ == nil {
return 0, 0, errors.Errorf("failpoint: failed to restore metakv")
}
if err := rc.rawKVClient.Put(ctx, newEntry.Key, newEntry.Value, entry.ts); err != nil {
return 0, 0, errors.Trace(err)
}
// for failpoint, we need to flush the cache in rawKVClient every time
failpoint.Inject("do-not-put-metakv-in-batch", func(_ failpoint.Value) {
if _, _err_ := failpoint.Eval(_curpkg_("do-not-put-metakv-in-batch")); _err_ == nil {
if err := rc.rawKVClient.PutRest(ctx); err != nil {
failpoint.Return(0, 0, errors.Trace(err))
return 0, 0, errors.Trace(err)
}
})
}
kvCount++
size += uint64(len(newEntry.Key) + len(newEntry.Value))
}
Expand Down Expand Up @@ -3544,11 +3544,11 @@ NEXTSQL:
return errors.Trace(err)
}
}
failpoint.Inject("failed-before-create-ingest-index", func(v failpoint.Value) {
if v, _err_ := failpoint.Eval(_curpkg_("failed-before-create-ingest-index")); _err_ == nil {
if v != nil && v.(bool) {
failpoint.Return(errors.New("failed before create ingest index"))
return errors.New("failed before create ingest index")
}
})
}
// create the repaired index when first execution or not found it
if err := rc.db.se.ExecuteInternal(ctx, sql.AddSQL, sql.AddArgs...); err != nil {
return errors.Trace(err)
Expand Down

0 comments on commit 2555083

Please sign in to comment.