From 7da8d12dc6bd0f6813aee094ec5e2b8f539246e6 Mon Sep 17 00:00:00 2001 From: Oleg Afanasyev Date: Thu, 6 Oct 2022 10:21:54 +0100 Subject: [PATCH] workload: add delete operations to kv workload Delete operation is useful when generating a mix of data in table. This commit adds --del-percent flag that will cause chosen percentage of operations to be delete. Delete operation uses the same semantics as read when picking keys to delete (keys that were previously created by write). Release note: None --- pkg/workload/kv/kv.go | 49 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 47 insertions(+), 2 deletions(-) diff --git a/pkg/workload/kv/kv.go b/pkg/workload/kv/kv.go index 7494f4d5efed..b74fff6104a1 100644 --- a/pkg/workload/kv/kv.go +++ b/pkg/workload/kv/kv.go @@ -70,6 +70,7 @@ type kv struct { cycleLength int64 readPercent int spanPercent int + delPercent int spanLimit int writesUseSelectForUpdate bool seed int64 @@ -120,6 +121,8 @@ var kvMeta = workload.Meta{ `Percent (0-100) of operations that are reads of existing keys.`) g.flags.IntVar(&g.spanPercent, `span-percent`, 0, `Percent (0-100) of operations that are spanning queries of all ranges.`) + g.flags.IntVar(&g.delPercent, `del-percent`, 0, + `Percent (0-100) of operations that delete existing keys.`) g.flags.IntVar(&g.spanLimit, `span-limit`, 0, `LIMIT count for each spanning query, or 0 for no limit`) g.flags.BoolVar(&g.writesUseSelectForUpdate, `sfu-writes`, false, @@ -177,8 +180,11 @@ ALTER TABLE kv ADD COLUMN e enum_type NOT NULL AS ('v') STORED;`) if w.sequential && w.zipfian { return errors.New("'sequential' and 'zipfian' cannot both be enabled") } - if w.readPercent+w.spanPercent > 100 { - return errors.New("'read-percent' and 'span-percent' higher than 100") + if w.shards > 0 && !(w.sequential || w.zipfian) { + return errors.New("'shards' only work with 'sequential' or 'zipfian' key distributions") + } + if w.readPercent+w.spanPercent+w.delPercent > 100 { + return errors.New("'read-percent', 'span-percent' and 'del-precent' combined exceed 100%") } if w.targetCompressionRatio < 1.0 || math.IsNaN(w.targetCompressionRatio) { return errors.New("'target-compression-ratio' must be a number >= 1.0") @@ -342,6 +348,28 @@ func (w *kv) Ops( buf.WriteString(`]`) spanStmtStr := buf.String() + // Del statement + buf.Reset() + if w.shards == 0 { + buf.WriteString(`DELETE FROM kv WHERE k IN (`) + for i := 0; i < w.batchSize; i++ { + if i > 0 { + buf.WriteString(", ") + } + fmt.Fprintf(&buf, `$%d`, i+1) + } + } else { + buf.WriteString(`DELETE FROM kv WHERE (shard, k) in (`) + for i := 0; i < w.batchSize; i++ { + if i > 0 { + buf.WriteString(", ") + } + fmt.Fprintf(&buf, `(mod($%d, %d), $%d)`, i+1, w.shards, i+1) + } + } + buf.WriteString(`)`) + delStmtStr := buf.String() + ql := workload.QueryLoad{SQLDatabase: sqlDatabase} seq := &sequence{config: w, val: int64(writeSeq)} numEmptyResults := new(int64) @@ -357,6 +385,7 @@ func (w *kv) Ops( op.sfuStmt = op.sr.Define(sfuStmtStr) } op.spanStmt = op.sr.Define(spanStmtStr) + op.delStmt = op.sr.Define(delStmtStr) if err := op.sr.Init(ctx, "kv", mcp, w.connFlags); err != nil { return workload.QueryLoad{}, err } @@ -383,6 +412,7 @@ type kvOp struct { writeStmt workload.StmtHandle spanStmt workload.StmtHandle sfuStmt workload.StmtHandle + delStmt workload.StmtHandle g keyGenerator numEmptyResults *int64 // accessed atomically } @@ -413,6 +443,21 @@ func (o *kvOp) run(ctx context.Context) (retErr error) { // Since we know the statement is not a read, we recalibrate // statementProbability to only consider the other statements. statementProbability -= o.config.readPercent + if statementProbability < o.config.delPercent { + start := timeutil.Now() + args := make([]interface{}, o.config.batchSize) + for i := 0; i < o.config.batchSize; i++ { + args[i] = o.g.readKey() + } + _, err := o.delStmt.Exec(ctx, args...) + if err != nil { + return err + } + elapsed := timeutil.Since(start) + o.hists.Get(`del`).Record(elapsed) + return nil + } + statementProbability -= o.config.delPercent if statementProbability < o.config.spanPercent { start := timeutil.Now() var err error