Skip to content

Commit

Permalink
sql: create jobs in a batch in a transactions
Browse files Browse the repository at this point in the history
This commit change the way jobs are created for a transaction.
Previously, the jobs were created individually during the
execution of a transaction. A job creation incurs a round trip
to insert an entry in system.jobs table. This commit accumulates
the jobs to be created during a transaction and creates them in a
batch, which is expected to reduce the number of round-trips between
distant nodes during a transaction execution. This is a step forward
to improve the performance of GRANT/REVOKE queries in large databases.

Release note: None

Fixes: #64389
  • Loading branch information
Sajjad Rizvi committed Jul 27, 2021
1 parent 02a2ee1 commit 441032c
Show file tree
Hide file tree
Showing 13 changed files with 243 additions and 238 deletions.
66 changes: 33 additions & 33 deletions pkg/bench/rttanalysis/testdata/benchmark_expectations
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
exp,benchmark
18,AlterRole/alter_role_with_1_option
19,AlterRole/alter_role_with_2_options
25,AlterRole/alter_role_with_3_options
24,AlterRole/alter_role_with_3_options
17,AlterTableAddCheckConstraint/alter_table_add_1_check_constraint
17,AlterTableAddCheckConstraint/alter_table_add_2_check_constraints
17,AlterTableAddCheckConstraint/alter_table_add_3_check_constraints
17,AlterTableAddColumn/alter_table_add_1_column
17,AlterTableAddColumn/alter_table_add_2_columns
17,AlterTableAddColumn/alter_table_add_3_columns
23,AlterTableAddForeignKey/alter_table_add_1_foreign_key
29,AlterTableAddForeignKey/alter_table_add_2_foreign_keys
35,AlterTableAddForeignKey/alter_table_add_3_foreign_keys
23,AlterTableAddForeignKey/alter_table_add_foreign_key_with_3_columns
22,AlterTableAddForeignKey/alter_table_add_1_foreign_key
27,AlterTableAddForeignKey/alter_table_add_2_foreign_keys
32,AlterTableAddForeignKey/alter_table_add_3_foreign_keys
22,AlterTableAddForeignKey/alter_table_add_foreign_key_with_3_columns
21,AlterTableConfigureZone/alter_table_configure_zone_5_replicas
21,AlterTableConfigureZone/alter_table_configure_zone_7_replicas_
21,AlterTableConfigureZone/alter_table_configure_zone_ranges
Expand All @@ -27,30 +27,30 @@ exp,benchmark
8,AlterTableUnsplit/alter_table_unsplit_at_1_value
10,AlterTableUnsplit/alter_table_unsplit_at_2_values
12,AlterTableUnsplit/alter_table_unsplit_at_3_values
22-23,CreateRole/create_role_with_1_option
24,CreateRole/create_role_with_2_options
25,CreateRole/create_role_with_3_options
23,CreateRole/create_role_with_no_options
21,CreateRole/create_role_with_1_option
23,CreateRole/create_role_with_2_options
24,CreateRole/create_role_with_3_options
22,CreateRole/create_role_with_no_options
20,DropDatabase/drop_database_0_tables
29,DropDatabase/drop_database_1_table
38,DropDatabase/drop_database_2_tables
47,DropDatabase/drop_database_3_tables
28,DropRole/drop_1_role
34,DropRole/drop_2_roles
40,DropRole/drop_3_roles
27,DropRole/drop_1_role
33,DropRole/drop_2_roles
39,DropRole/drop_3_roles
19,DropSequence/drop_1_sequence
27,DropSequence/drop_2_sequences
35,DropSequence/drop_3_sequences
26,DropSequence/drop_2_sequences
33,DropSequence/drop_3_sequences
22,DropTable/drop_1_table
33,DropTable/drop_2_tables
44,DropTable/drop_3_tables
23,DropView/drop_1_view
38,DropView/drop_2_views
53,DropView/drop_3_views
32,DropTable/drop_2_tables
42,DropTable/drop_3_tables
22,DropView/drop_1_view
30,DropView/drop_2_views
38,DropView/drop_3_views
18,Grant/grant_all_on_1_table
21,Grant/grant_all_on_2_tables
24,Grant/grant_all_on_3_tables
21-22,GrantRole/grant_1_role
20,Grant/grant_all_on_2_tables
22,Grant/grant_all_on_3_tables
21,GrantRole/grant_1_role
24,GrantRole/grant_2_roles
2,ORMQueries/activerecord_type_introspection_query
2,ORMQueries/django_table_introspection_1_table
Expand All @@ -65,18 +65,18 @@ exp,benchmark
2,ORMQueries/pg_namespace
2,ORMQueries/pg_type
18,Revoke/revoke_all_on_1_table
21,Revoke/revoke_all_on_2_tables
24,Revoke/revoke_all_on_3_tables
20,Revoke/revoke_all_on_2_tables
22,Revoke/revoke_all_on_3_tables
20,RevokeRole/revoke_1_role
22,RevokeRole/revoke_2_roles
1,SystemDatabaseQueries/select_system.users_with_empty_database_name
1,SystemDatabaseQueries/select_system.users_with_schema_name
2,SystemDatabaseQueries/select_system.users_without_schema_name
26-27,Truncate/truncate_1_column_0_rows
26-27,Truncate/truncate_1_column_1_row
26-27,Truncate/truncate_1_column_2_rows
26-27,Truncate/truncate_2_column_0_rows
26-27,Truncate/truncate_2_column_1_rows
26-27,Truncate/truncate_2_column_2_rows
1,SystemDatabaseQueries/select_system.users_with_empty_database_Name
1,SystemDatabaseQueries/select_system.users_with_schema_Name
2,SystemDatabaseQueries/select_system.users_without_schema_Name
26,Truncate/truncate_1_column_0_rows
26,Truncate/truncate_1_column_1_row
26,Truncate/truncate_1_column_2_rows
26,Truncate/truncate_2_column_0_rows
26,Truncate/truncate_2_column_1_rows
26,Truncate/truncate_2_column_2_rows
1,VirtualTableQueries/select_crdb_internal.invalid_objects_with_1_fk
1,VirtualTableQueries/select_crdb_internal.tables_with_1_fk
50 changes: 15 additions & 35 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type CreatedByInfo struct {

// Record bundles together the user-managed fields in jobspb.Payload.
type Record struct {
JobID jobspb.JobID
Description string
Statements []string
Username security.SQLUsername
Expand All @@ -76,10 +77,20 @@ type Record struct {
CreatedBy *CreatedByInfo
}

// Specification specifies a job, which will be created in a batch with other jobs.
type Specification struct {
jobID jobspb.JobID
record Record
// AppendDescription appends description to this records Description with a
// ';' separator.
func (r *Record) AppendDescription(description string) {
if len(r.Description) == 0 {
r.Description = description
return
}
r.Description = r.Description + ";" + description
}

// SetNonCancelable sets NonCancelable of this Record to the value returned from
// updateFn.
func (r *Record) SetNonCancelable(ctx context.Context, updateFn NonCancelableUpdateFn) {
r.NonCancelable = updateFn(ctx, r.NonCancelable)
}

// StartableJob is a job created with a transaction to be started later.
Expand Down Expand Up @@ -289,37 +300,6 @@ func (j *Job) RunningStatus(
})
}

// SetDescription updates the description of a created job.
func (j *Job) SetDescription(ctx context.Context, txn *kv.Txn, updateFn DescriptionUpdateFn) error {
return j.Update(ctx, txn, func(_ *kv.Txn, md JobMetadata, ju *JobUpdater) error {
prev := md.Payload.Description
desc, err := updateFn(ctx, prev)
if err != nil {
return err
}
if prev != desc {
md.Payload.Description = desc
ju.UpdatePayload(md.Payload)
}
return nil
})
}

// SetNonCancelable updates the NonCancelable field of a created job.
func (j *Job) SetNonCancelable(
ctx context.Context, txn *kv.Txn, updateFn NonCancelableUpdateFn,
) error {
return j.Update(ctx, txn, func(_ *kv.Txn, md JobMetadata, ju *JobUpdater) error {
prev := md.Payload.Noncancelable
newStatus := updateFn(ctx, prev)
if prev != newStatus {
md.Payload.Noncancelable = newStatus
ju.UpdatePayload(md.Payload)
}
return nil
})
}

// RunningStatusFn is a callback that computes a job's running status
// given its details. It is safe to modify details in the callback; those
// modifications will be automatically persisted to the database record.
Expand Down
120 changes: 71 additions & 49 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,70 +368,92 @@ func (r *Registry) newJob(record Record, jobID jobspb.JobID) *Job {
return job
}

// CreateJobsWithTxn creates jobs in a batch by including all the given job records
// in a single insert to the jobs table.
// There must be at least one job to create, otherwise the function returns an error.
// If an error occurs while preparing the insert statement, no job is created.
//TODO(sajjad): To discuss: What batch size do we expect? Should we limit the
// batch size? Large batch sizes can have side effects, e.g., description
// column's value will be large due to large statement size. Are there any
// implications that can result in exceeding limits such as columns size limits?
// CreateJobsWithTxn creates jobs in fixed-size batches. There must be at least
// one job to create, otherwise the function returns an error.
// If an error occurs while preparing the insert statement, no further jobs
// are created and the error is returned. The function returns the list of
// created jobs.
func (r *Registry) CreateJobsWithTxn(
ctx context.Context, jobRecords []*Specification, txn *kv.Txn,
ctx context.Context, txn *kv.Txn, records []*Record,
) ([]*Job, error) {
if len(jobRecords) == 0 {
if len(records) == 0 {
return nil, errors.Errorf("no jobs to create.")
}

// maxBatchSize limits the number of jobs created in one batch.
// Batch size 100 was picked arbitrarily.
const maxBatchSize = 100
const columnValues = " ($%d, $%d, $%d, $%d, $%d, $%d)"
const insertStmt = `
INSERT INTO system.jobs (id, status, payload, progress, claim_session_id, claim_instance_id)
VALUES`

log.Infof(ctx, "creating %d jobs in batches of %d", len(records), maxBatchSize)
s, err := r.sqlInstance.Session(ctx)
if err != nil {
return nil, errors.Wrap(err, "error getting live session")
}
sessionID := s.ID()
start := timeutil.Now()
if txn != nil {
start = txn.ReadTimestamp().GoTime()
}
modifiedMicros := timeutil.ToUnixMicros(start)

var sb strings.Builder
sb.WriteString(`
INSERT INTO system.jobs (id, status, payload, progress, claim_session_id, claim_instance_id)
VALUES`)

argIdx := 0
var args []interface{}
var jobs []*Job
for i, jr := range jobRecords {
j := r.newJob(jr.record, jr.jobID)
j.sessionID = sessionID
j.mu.progress.ModifiedMicros = modifiedMicros
payloadBytes, err := protoutil.Marshal(&j.mu.payload)
if err != nil {
return nil, err
// Jobs to return.
var retJobs []*Job
// To create INSERT statement.
var stmtSB strings.Builder
numBatches := len(records) / maxBatchSize
for iBatch := 0; iBatch <= numBatches; iBatch++ {
start := timeutil.Now()
if txn != nil {
start = txn.ReadTimestamp().GoTime()
}
progressBytes, err := protoutil.Marshal(&j.mu.progress)
if err != nil {
return nil, err
modifiedMicros := timeutil.ToUnixMicros(start)

argIdx := 0
var args []interface{}
for i := 0; i < maxBatchSize; i++ {
iRecord := i + iBatch*maxBatchSize
if iRecord == len(records) {
break
}
// Prepare the statement in the first and last batches only, reuse in
// the middle batches.
if iBatch == 0 || iBatch == numBatches {
// First record in the batch.
if i == 0 {
stmtSB.Reset()
stmtSB.WriteString(insertStmt)
} else {
stmtSB.WriteString(", ")
}
// The number of arguments must match the number of columns in the insert stmt.
stmtSB.WriteString(fmt.Sprintf(columnValues, argIdx+1, argIdx+2, argIdx+3, argIdx+4, argIdx+5, argIdx+6))
}
record := records[iRecord]
job := r.newJob(*record, record.JobID)
job.sessionID = sessionID
job.mu.progress.ModifiedMicros = modifiedMicros
payloadBytes, err := protoutil.Marshal(&job.mu.payload)
if err != nil {
return nil, err
}
progressBytes, err := protoutil.Marshal(&job.mu.progress)
if err != nil {
return nil, err
}
args = append(args, record.JobID, StatusRunning, payloadBytes, progressBytes, s.ID().UnsafeBytes(), r.ID())
argIdx += 6 // We have six columns to insert.
retJobs = append(retJobs, job)
}
if i > 0 {
sb.WriteString(", ")
if log.ExpensiveLogEnabled(ctx, 2) {
log.Infof(ctx, "executing batch insert with statement: %s", stmtSB.String())
}
if haveJobs := len(args) > 0; haveJobs {
if _, err = r.ex.Exec(ctx, "job-rows-batch-insert", txn, stmtSB.String(), args...,
); err != nil {
return nil, err
}
}
// The number of arguments must match the number of columns in the insert stmt.
sb.WriteString(fmt.Sprintf(" ($%d, $%d, $%d, $%d, $%d, $%d)",
argIdx+1, argIdx+2, argIdx+3, argIdx+4, argIdx+5, argIdx+6))
args = append(args, jr.jobID, StatusRunning, payloadBytes, progressBytes, s.ID().UnsafeBytes(), r.ID())
argIdx += 6 // We have six columns to insert.
jobs = append(jobs, j)
}
if log.ExpensiveLogEnabled(ctx, 2) {
log.Infof(ctx, "[SR] executing batch insert with statement: %s", sb.String())
}
if _, err = r.ex.Exec(ctx, "job-rows-batch-insert", txn, sb.String(), args...,
); err != nil {
return nil, err
}
return jobs, nil
return retJobs, nil
}

// CreateJobWithTxn creates a job to be started later with StartJob. It stores
Expand Down
29 changes: 18 additions & 11 deletions pkg/jobs/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -287,12 +288,17 @@ func TestBatchJobsCreation(t *testing.T) {
batchSize int
}{
{"small batch", 10},
{"medium batch", 500},
{"large batch", 1000},
{"extra large batch", 10000},
{"medium batch", 501},
{"large batch", 1001},
{"extra large batch", 5001},
} {
t.Run(test.name, func(t *testing.T) {
{
if test.batchSize > 10 {
skip.UnderStress(t, "skipping stress test for batch size %d", test.batchSize)
}

skip.UnderStress(t)
args := base.TestServerArgs{
Knobs: base.TestingKnobs{
JobsTestingKnobs: NewTestingKnobsWithShortIntervals(),
Expand All @@ -314,22 +320,19 @@ func TestBatchJobsCreation(t *testing.T) {
})

// Create a batch of job specifications.
var jd []*Specification
var records []*Record
for i := 0; i < test.batchSize; i++ {
jr := Record{
// TODO (sajjad): To discuss: I guess this job's type should be set in its
// payload based on Details. I was hoping that the type will be "IMPORT" in
// the jobs table when the job is created, but it occurred to be NULL. Why is that?
records = append(records, &Record{
JobID: r.MakeJobID(),
Details: jobspb.ImportDetails{},
Progress: jobspb.ImportProgress{},
}
jd = append(jd, &Specification{r.MakeJobID(), jr})
})
}
// Create jobs in a batch.
var jobs []*Job
require.NoError(t, kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
var err error
jobs, err = r.CreateJobsWithTxn(ctx, jd, txn)
jobs, err = r.CreateJobsWithTxn(ctx, txn, records)
return err
}))
require.Equal(t, len(jobs), test.batchSize)
Expand All @@ -340,6 +343,10 @@ func TestBatchJobsCreation(t *testing.T) {
tdb.CheckQueryResultsRetry(t, fmt.Sprintf("SELECT status FROM system.jobs WHERE id = '%d'", job.id),
[][]string{{"succeeded"}})
}
// TODO(sajjad): To discuss: What should we expect the values of job_type
// and description? I was expecting that the type will be "IMPORT" in
// the jobs table when the job is created, but it occurred to be NULL. Is
// that expected? Similarly, description is NULL.
}
})
}
Expand Down
Loading

0 comments on commit 441032c

Please sign in to comment.