Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jobs,sql: create jobs in a batch in a transactions #67991

Merged
merged 2 commits into from
Aug 5, 2021

Conversation

sajjadrizvi
Copy link

@sajjadrizvi sajjadrizvi commented Jul 23, 2021

This commit change the way jobs are created for a transaction.
Previously, the jobs were created individually during the
execution of a transaction. A job creation incurs a round trip
to insert an entry in system.jobs table.

This commit introduces a new function in Registry, 'CreateJobsWithTxn',
that allows creating jobs in a batch using a single SQL statement. Using
this function, this commit creates jobs in a batch during a transaction, which
reduces the number of round-trips between distant nodes during a transaction
execution. This is a step forward to improve the performance of GRANT/REVOKE
queries in large databases.

Release note: None

Fixes: #64389

@sajjadrizvi sajjadrizvi requested a review from a team July 23, 2021 16:23
@cockroach-teamcity
Copy link
Member

This change is Reviewable

@sajjadrizvi sajjadrizvi removed the request for review from a team July 23, 2021 16:43
@sajjadrizvi sajjadrizvi marked this pull request as draft July 23, 2021 16:44
@sajjadrizvi sajjadrizvi marked this pull request as ready for review July 24, 2021 10:40
@sajjadrizvi sajjadrizvi requested a review from a team July 24, 2021 10:40
@sajjadrizvi sajjadrizvi changed the title jobs: create jobs in a batch [DNW] jobs: create jobs in a batch Jul 24, 2021
@sajjadrizvi sajjadrizvi changed the title [DNW] jobs: create jobs in a batch [DNM] jobs: create jobs in a batch Jul 24, 2021
@sajjadrizvi sajjadrizvi marked this pull request as draft July 24, 2021 23:51
@sajjadrizvi sajjadrizvi changed the title [DNM] jobs: create jobs in a batch [DNM] jobs,sql: create jobs in a batch in a transactions Jul 25, 2021
@sajjadrizvi sajjadrizvi marked this pull request as ready for review July 25, 2021 00:28
@sajjadrizvi sajjadrizvi requested a review from a team July 25, 2021 00:28
@sajjadrizvi
Copy link
Author

Although some tests are failing, I appreciate if I can get some feedback. I might be pushing too hard in the tests that are failing. If not, they may lead to further changes or adding bounds on the size of a batch.

@sajjadrizvi sajjadrizvi requested review from a team and removed request for a team July 25, 2021 00:48
Copy link
Contributor

@ajwerner ajwerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preliminary pass on the subway back from the dentist

pkg/jobs/jobs.go Outdated
@@ -76,6 +76,50 @@ type Record struct {
CreatedBy *CreatedByInfo
}

// Specification specifies a job, which will be created in a batch with other jobs.
type Specification struct {
jobID jobspb.JobID
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm starting to think maybe we should just stick ID as a field in Record. It'd simplify the code and some function signatures. I don't see any good reason not to. The history here is that the jobs package used to allocate the id for the user. That turned out to be unfortunate so we changed the client to allocate the id. May as well just move the ID into the record.

// in a single insert to the jobs table.
// There must be at least one job to create, otherwise the function returns an error.
// If an error occurs while preparing the insert statement, no job is created.
//TODO(sajjad): To discuss: What batch size do we expect? Should we limit the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we'll want a maximum batch size. Start with just some constant. Maybe 100? We'll want to make sure we test using multiple batches.

// column's value will be large due to large statement size. Are there any
// implications that can result in exceeding limits such as columns size limits?
func (r *Registry) CreateJobsWithTxn(
ctx context.Context, jobSpecs []*Specification, txn *kv.Txn,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: put the transaction before the jobs

@@ -93,7 +93,7 @@ type extendedEvalContext struct {
Jobs *jobsCollection

// SchemaChangeJobCache refers to schemaChangeJobsCache in extraTxnState.
SchemaChangeJobCache map[descpb.ID]*jobs.Job
SchemaChangeJobCache map[descpb.ID]*jobs.Specification
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My guess is that we're going to want to nuance this data structure a bit. There are places where we delete from this cache. Those places are trying to be a bit clever and used to rely on the fact that we had already written the job. I'm thinking what we want is to create a slice of job records and a map from descriptor ID to the record. Then in the places where today we delete from the map we'd retain the record in the slice.

I suspect this is the cause of some of the failing tests.

}
log.Infof(ctx, "queued new schema change job %d for schema %d", newJob.ID(), desc.ID)
newSpec := jobs.NewSpecification(p.extendedEvalCtx.ExecCfg.JobRegistry.MakeJobID(), jobRecord)
//TODO(sajjad): To discuss: Previous code didn't cache the new job. I think
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you look at what QueueJob does?

Copy link
Author

@sajjadrizvi sajjadrizvi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick review.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner)


pkg/jobs/jobs.go, line 81 at r2 (raw file):

Previously, ajwerner wrote…

I'm starting to think maybe we should just stick ID as a field in Record. It'd simplify the code and some function signatures. I don't see any good reason not to. The history here is that the jobs package used to allocate the id for the user. That turned out to be unfortunate so we changed the client to allocate the id. May as well just move the ID into the record.

I have not built a sufficient context yet. I am following your judgement here.


pkg/jobs/registry.go, line 375 at r2 (raw file):

Previously, ajwerner wrote…

Yeah we'll want a maximum batch size. Start with just some constant. Maybe 100? We'll want to make sure we test using multiple batches.

I will set a max batch size. We may want to make it a cluster setting, but not for now.

I already have a test case that checks multiple batches. The test fails for larger batches.


pkg/jobs/registry.go, line 380 at r2 (raw file):

Previously, ajwerner wrote…

nit: put the transaction before the jobs

Done.


pkg/sql/planner.go, line 96 at r2 (raw file):

Previously, ajwerner wrote…

My guess is that we're going to want to nuance this data structure a bit. There are places where we delete from this cache. Those places are trying to be a bit clever and used to rely on the fact that we had already written the job. I'm thinking what we want is to create a slice of job records and a map from descriptor ID to the record. Then in the places where today we delete from the map we'd retain the record in the slice.

I suspect this is the cause of some of the failing tests.

OK. I guess that clever function is markTableMutationJobsSuccessful. We will end up deleting the slice only inresetExtraTxnState.

It might be the cause of error! However, the test logs show messages related to throttling and ranges going down.


pkg/sql/schema.go, line 87 at r2 (raw file):

Previously, ajwerner wrote…

Can you look at what QueueJob does?

Yeah, it creates a new job and adds the job to Jobs collection. So this is a place where want the job to be only in the Jobs collection but not in the cache. I will have to preserve this.

Copy link
Contributor

@ajwerner ajwerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @sajjadrizvi)


pkg/jobs/jobs.go, line 81 at r2 (raw file):

Previously, sajjadrizvi (Sajjad Rizvi) wrote…

I have not built a sufficient context yet. I am following your judgement here.

Yeah, my advice is that we add ID as a field to record. It should simplify the interfaces.


pkg/jobs/registry.go, line 375 at r2 (raw file):

Previously, sajjadrizvi (Sajjad Rizvi) wrote…

I will set a max batch size. We may want to make it a cluster setting, but not for now.

I already have a test case that checks multiple batches. The test fails for larger batches.

It should not fail. Instead this function should have a loop to send multiple batches.


pkg/sql/conn_executor_exec.go, line 807 at r2 (raw file):

Quoted 10 lines of code…

	// 1. I am not sure if I am returning the error correctly.
	// 2. Is this the right place to create jobs? I have placed it outside of the
	//    the following code block because the jobs were being created outside of
	//    it in the previous code.
	// 3. If I have add any ex.phaseTimes, which one should it be? Do I have to
	//    create a new one?
	if err := ex.createJobs(ctx); err != nil {
		return ex.makeErrEvent(err, ast)
	}

This logic should be pulled into commitSQLTransactionInternal


pkg/sql/conn_executor_exec.go, line 824 at r2 (raw file):

	}
	var specList []*jobs.Specification
	//TODO(sajjad): To discuss: Does the order of jobs creation matter?

No, it does not matter.


pkg/sql/planner.go, line 96 at r2 (raw file):

the test logs show messages related to throttling and ranges going down.

I have a feeling that that's just noise at the end of the test and is not the true failure.


pkg/sql/schema.go, line 87 at r2 (raw file):

Previously, sajjadrizvi (Sajjad Rizvi) wrote…

Yeah, it creates a new job and adds the job to Jobs collection. So this is a place where want the job to be only in the Jobs collection but not in the cache. I will have to preserve this.

My guess is that not putting the descriptor into the cache was an oversight.


pkg/sql/schema.go, line 90 at r2 (raw file):

		// the newSpec should be cached.
		p.extendedEvalCtx.SchemaChangeJobCache[desc.ID] = newSpec
		log.Infof(ctx, "cached specification of new schema change job %d for schema %d", newSpec.ID(), desc.ID)

This is not a good log message. It would be extremely confusing for a read. I think the old message was pretty reasonable. Your change of the implementation here doesn't affect the behavior really at all.

Copy link
Author

@sajjadrizvi sajjadrizvi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner and @sajjadrizvi)


pkg/jobs/registry.go, line 375 at r2 (raw file):

Previously, ajwerner wrote…

It should not fail. Instead this function should have a loop to send multiple batches.

I meant the batch size set in the test function. Nevertheless, I have implemented as suggested.


pkg/sql/conn_executor_exec.go, line 807 at r2 (raw file):

Previously, ajwerner wrote…

	// 1. I am not sure if I am returning the error correctly.
	// 2. Is this the right place to create jobs? I have placed it outside of the
	//    the following code block because the jobs were being created outside of
	//    it in the previous code.
	// 3. If I have add any ex.phaseTimes, which one should it be? Do I have to
	//    create a new one?
	if err := ex.createJobs(ctx); err != nil {
		return ex.makeErrEvent(err, ast)
	}

This logic should be pulled into commitSQLTransactionInternal

Done.


pkg/sql/conn_executor_exec.go, line 824 at r2 (raw file):

Previously, ajwerner wrote…

No, it does not matter.

Done.


pkg/sql/schema.go, line 90 at r2 (raw file):

Previously, ajwerner wrote…

This is not a good log message. It would be extremely confusing for a read. I think the old message was pretty reasonable. Your change of the implementation here doesn't affect the behavior really at all.

Done.

Copy link
Author

@sajjadrizvi sajjadrizvi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I forgot to publish the comments before pushing the changes.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner and @sajjadrizvi)

Copy link
Author

@sajjadrizvi sajjadrizvi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is now ready for another review.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner and @sajjadrizvi)

@sajjadrizvi sajjadrizvi changed the title [DNM] jobs,sql: create jobs in a batch in a transactions jobs,sql: create jobs in a batch in a transactions Jul 27, 2021
Copy link
Contributor

@ajwerner ajwerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this is getting close.

Reviewed 5 of 14 files at r8.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @sajjadrizvi)


pkg/bench/rttanalysis/testdata/benchmark_expectations, line 80 at r8 (raw file):

Quoted 6 lines of code…
26,Truncate/truncate_1_column_0_rows
26,Truncate/truncate_1_column_1_row
26,Truncate/truncate_1_column_2_rows
26,Truncate/truncate_2_column_0_rows
26,Truncate/truncate_2_column_1_rows
26,Truncate/truncate_2_column_2_rows

I have a feeling we're going to want to retain the range here of round-trips here (put the 26-27 back).


pkg/jobs/registry.go, line 375 at r8 (raw file):

no further jobs are created and the error is returned.

I find this language confusing. The caller shouldn't need to care about other jobs having been created. It makes it sound like it is not a transactional operation.


pkg/jobs/registry.go, line 378 at r8 (raw file):

func (r *Registry) CreateJobsWithTxn(
	ctx context.Context, txn *kv.Txn, records []*Record,
) ([]*Job, error) {

Do callers want the *Job or would they be happy with just a slice of IDs? I feel like IDs might be better.


pkg/jobs/registry.go, line 382 at r8 (raw file):

		return nil, errors.Errorf("no jobs to create.")
	}

can you refactor this a bit into a helper to write a batch and a helper to generate the statement and args. I think it should read something like:

created := make([]*Job, 0, len(records)
for toCreate := records; len(toCreate) > 0; {
    const maxBatchSize = 100
    batchSize := len(toCreate)
    if batchSize > maxBatchSize {
        batchSize = maxBatchSize
    }
    createdInBatch, err := r.createJobsWithTxnInBatch(ctx, toCreate[:batchSize])
    if err != nil {
        return nil, err
    }
    created = append(created, createdInBatch...)
    toCreate = toCreate[batchSize:]
}

func (r *Registry) createJobsWithTxnInBatch(ctx context.Context, txn *kv.Txn records []*Record) ([]*Job, error) {
    s, err  := r.sqlInstance.Session(ctx)
    if err != nil {
        return nil, errors.Wrap(err, "error getting live session")
    }
    stmt, args := batchJobInsertStmt(s, records)
    if err := r.ex.Exec(ctx, "job-rows-batch-insert", txn, stmt, args...); err != nil {
        return nil, err
    }
    // return the jobs etc. Maybe construct them when you construct the statement and args. 
}

pkg/jobs/registry.go, line 454 at r8 (raw file):

			if _, err = r.ex.Exec(ctx, "job-rows-batch-insert", txn, stmtSB.String(), args...,
			); err != nil {

nit: I find this wrapping to be bizarre. My preference is:

		if _, err = r.ex.Exec(
			ctx, "job-rows-batch-insert", txn, stmtSB.String(), args...,
		); err != nil {

pkg/jobs/registry_test.go, line 302 at r8 (raw file):

				}

				skip.UnderStress(t)

Isn't this covered above?


pkg/jobs/registry_test.go, line 334 at r8 (raw file):

				// Create jobs in a batch.
				var jobs []*Job
				require.NoError(t, kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {

I think you're going to have a bad time if the callback gets called more than once. You should reset the jobs slice in the callback.


pkg/jobs/registry_test.go, line 350 at r8 (raw file):

Quoted 4 lines of code…

				// and description? I was expecting that the type will be "IMPORT" in
				// the jobs table when the job is created, but it occurred to be NULL. Is
				// that expected? Similarly, description is NULL.

When you say type what do you mean? Which table? The description is a field in the record so it should be what you set it to. Maybe set it to some expected values.


pkg/sql/drop_table.go, line 509 at r8 (raw file):

	for _, mj := range tableDesc.MutationJobs {
		jobID := jobspb.JobID(mj.JobID)
		if record, exists := p.ExtendedEvalContext().SchemaChangeJobCache[tableDesc.ID]; exists {

This deserves some commentary for future readers as well as some commentary about this behavior on the function itself.


pkg/sql/table.go, line 119 at r8 (raw file):

Quoted 4 lines of code…
	var record *jobs.Record
	if cachedRecord, ok := p.extendedEvalCtx.SchemaChangeJobCache[tableDesc.ID]; ok {
		record = cachedRecord
	}

nit: replace this with the following code to allow you to eliminate some code.

record, recordExists :=  p.extendedEvalCtx.SchemaChangeJobCache[tableDesc.ID]

Copy link
Author

@sajjadrizvi sajjadrizvi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner)


pkg/bench/rttanalysis/testdata/benchmark_expectations, line 80 at r8 (raw file):

Previously, ajwerner wrote…
26,Truncate/truncate_1_column_0_rows
26,Truncate/truncate_1_column_1_row
26,Truncate/truncate_1_column_2_rows
26,Truncate/truncate_2_column_0_rows
26,Truncate/truncate_2_column_1_rows
26,Truncate/truncate_2_column_2_rows

I have a feeling we're going to want to retain the range here of round-trips here (put the 26-27 back).

Done.


pkg/jobs/jobs.go, line 81 at r2 (raw file):

Previously, ajwerner wrote…

Yeah, my advice is that we add ID as a field to record. It should simplify the interfaces.

Done.


pkg/jobs/registry.go, line 375 at r8 (raw file):

Previously, ajwerner wrote…

no further jobs are created and the error is returned.

I find this language confusing. The caller shouldn't need to care about other jobs having been created. It makes it sound like it is not a transactional operation.

Done.


pkg/jobs/registry.go, line 378 at r8 (raw file):

Previously, ajwerner wrote…

Do callers want the *Job or would they be happy with just a slice of IDs? I feel like IDs might be better.

That's right. I have changed it to IDs.


pkg/jobs/registry.go, line 382 at r8 (raw file):

Previously, ajwerner wrote…

can you refactor this a bit into a helper to write a batch and a helper to generate the statement and args. I think it should read something like:

created := make([]*Job, 0, len(records)
for toCreate := records; len(toCreate) > 0; {
    const maxBatchSize = 100
    batchSize := len(toCreate)
    if batchSize > maxBatchSize {
        batchSize = maxBatchSize
    }
    createdInBatch, err := r.createJobsWithTxnInBatch(ctx, toCreate[:batchSize])
    if err != nil {
        return nil, err
    }
    created = append(created, createdInBatch...)
    toCreate = toCreate[batchSize:]
}

func (r *Registry) createJobsWithTxnInBatch(ctx context.Context, txn *kv.Txn records []*Record) ([]*Job, error) {
    s, err  := r.sqlInstance.Session(ctx)
    if err != nil {
        return nil, errors.Wrap(err, "error getting live session")
    }
    stmt, args := batchJobInsertStmt(s, records)
    if err := r.ex.Exec(ctx, "job-rows-batch-insert", txn, stmt, args...); err != nil {
        return nil, err
    }
    // return the jobs etc. Maybe construct them when you construct the statement and args. 
}

This is much better. Thanks for the direction. I have updated.


pkg/jobs/registry.go, line 454 at r8 (raw file):

Previously, ajwerner wrote…
			if _, err = r.ex.Exec(ctx, "job-rows-batch-insert", txn, stmtSB.String(), args...,
			); err != nil {

nit: I find this wrapping to be bizarre. My preference is:

		if _, err = r.ex.Exec(
			ctx, "job-rows-batch-insert", txn, stmtSB.String(), args...,
		); err != nil {

Thanks, I have updated.


pkg/jobs/registry_test.go, line 302 at r8 (raw file):

Previously, ajwerner wrote…

Isn't this covered above?

Good catch, it's by mistake.


pkg/jobs/registry_test.go, line 334 at r8 (raw file):

Previously, ajwerner wrote…

I think you're going to have a bad time if the callback gets called more than once. You should reset the jobs slice in the callback.

Done.


pkg/jobs/registry_test.go, line 350 at r8 (raw file):

Previously, ajwerner wrote…

				// and description? I was expecting that the type will be "IMPORT" in
				// the jobs table when the job is created, but it occurred to be NULL. Is
				// that expected? Similarly, description is NULL.

When you say type what do you mean? Which table? The description is a field in the record so it should be what you set it to. Maybe set it to some expected values.

It was job_type in the internal table as we discussed offline.


pkg/sql/drop_table.go, line 509 at r8 (raw file):

Previously, ajwerner wrote…

This deserves some commentary for future readers as well as some commentary about this behavior on the function itself.

OK. Done.


pkg/sql/planner.go, line 96 at r2 (raw file):

Previously, ajwerner wrote…

the test logs show messages related to throttling and ranges going down.

I have a feeling that that's just noise at the end of the test and is not the true failure.

I have updated the structure.


pkg/sql/table.go, line 119 at r8 (raw file):

Previously, ajwerner wrote…
	var record *jobs.Record
	if cachedRecord, ok := p.extendedEvalCtx.SchemaChangeJobCache[tableDesc.ID]; ok {
		record = cachedRecord
	}

nit: replace this with the following code to allow you to eliminate some code.

record, recordExists :=  p.extendedEvalCtx.SchemaChangeJobCache[tableDesc.ID]

Done.

Copy link
Contributor

@ajwerner ajwerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get the linter passing and fix the nits and I think this is good to go.

Reviewed 1 of 3 files at r1, 1 of 14 files at r10.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner and @sajjadrizvi)


pkg/jobs/registry_test.go, line 334 at r8 (raw file):

Previously, sajjadrizvi (Sajjad Rizvi) wrote…

Done.

Not sure what I was talking about here


pkg/sql/drop_table.go, line 515 at r10 (raw file):

) error {
	for _, mj := range tableDesc.MutationJobs {
		mutationID := jobspb.JobID(mj.JobID)

mutationID is confusing as the name of a variable storing a job ID. MutatationID is a differnent concept which is related to jobs. jobID was a pretty good name, no?


pkg/sql/drop_table.go, line 521 at r10 (raw file):

		if record, exists := p.ExtendedEvalContext().SchemaChangeJobCache[tableDesc.ID]; exists {
			if record.JobID == mutationID {

nit: inline the condition with &&

Copy link
Author

@sajjadrizvi sajjadrizvi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner)


pkg/sql/drop_table.go, line 515 at r10 (raw file):

Previously, ajwerner wrote…

mutationID is confusing as the name of a variable storing a job ID. MutatationID is a differnent concept which is related to jobs. jobID was a pretty good name, no?

Yeah, I didn't realize mutation ID is an existing term.

Copy link
Contributor

@ajwerner ajwerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner and @sajjadrizvi)


pkg/bench/rttanalysis/testdata/benchmark_expectations, line 74 at r10 (raw file):

1,SystemDatabaseQueries/select_system.users_with_empty_database_Name
1,SystemDatabaseQueries/select_system.users_with_schema_Name
2,SystemDatabaseQueries/select_system.users_without_schema_Name

What is this about?


pkg/jobs/registry.go, line 416 at r10 (raw file):

	sessionID sqlliveness.SessionID, txn *kv.Txn, records []*Record,
) (string, []interface{}, []jobspb.JobID, error) {
	const columnValues = " ($%d, $%d, $%d, $%d, $%d, $%d)"

I don't think we're going to need this with the refactor I'm about to propose.


pkg/jobs/registry.go, line 424 at r10 (raw file):

Quoted 5 lines of code…
	start := timeutil.Now()
	if txn != nil {
		start = txn.ReadTimestamp().GoTime()
	}
	modifiedMicros := timeutil.ToUnixMicros(start)

nit: lift this above this call and just pass in the timestamp


pkg/jobs/registry.go, line 427 at r10 (raw file):

	// We have six columns per job.
	args := make([]interface{}, 0, len(records)*6)

I'm going to make a nit-picky suggestion for how to make this feel a little cleaner:

instanceID := r.ID()
const  numColumns = 6
columns := [numColumns]string{`id`, `status`, `payload`, `progress`, `claim_session_id`, `claim_instance_id`}
marshalPanic = func(m proto.Message) []byte {
      data, err := protoutil.Marshal(m)
       if err != nil {
           panic(err)
       }
       return data
 }
valueFns := map[string]func(*Record) interface{}{
       `id`: func(r *Record) interface{} { return r.ID },
       `status`: func(r *Record) interface{} { return StatusRunning },
       `payload`: func(r *Record) interface{} { return marshalPanic(r.Payload) },
       `progress`: func(r *Record) interface{} { return marshalPanic(r.Progress) },
       `claim_session_id`: func(r *Record) interface{} { return sessionID.UnsafeBytes() },
       `claim_instance_id`: func(r *Record) interface{} { return instanceID },
 }
appendValues := func(r *Record, val *[]interface{}) (err error) {
        defer func() {
            switch r := recover(); t.(type) {
            case nil:
            case error:
                err = errors.CombineErrors(err, errors.Wrap(r, "encoding job %d", r.ID)
            default:
                panic(r)
            }
        }()
        for _, c := range columns {
            (*vals) = append(*vals, valueFns[c](r)
        }
        return nil
 }
args := make([]interface{}, 0, len(records) * numColumns)
jobIDs := make([]jobspb.JobID, 0, len(records))

var buf strings.Builder
buf.WriteString(`INSERT INTO system.jobs (`
buf.WriteString(strings.Join(columns, ", "))
buf.WriteString(`) VALUES `)
argIdx := 0
for i, r := range records {
    if i > 0 {
        buf.WriteString(", ")
    }
    buf.WriteString("("
    for _, c := range columns {
         buf.WriteString("$")
         buf.WriteString(strconv.Itoa(argIdx))
         argIdx++
    }
    buf.WriteString(")")
    r.Progress.ModifiedMicros = modifiedMicros
    if err := appendValues(r, &args); err != nil {
        return "", nil, nil, err
    }
    jobIDs = append(jobIDs, job.ID)
}
return stmtSB.String(), args, jobIDs, nil

Copy link
Author

@sajjadrizvi sajjadrizvi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner)


pkg/bench/rttanalysis/testdata/benchmark_expectations, line 74 at r10 (raw file):

Previously, ajwerner wrote…
1,SystemDatabaseQueries/select_system.users_with_empty_database_Name
1,SystemDatabaseQueries/select_system.users_with_schema_Name
2,SystemDatabaseQueries/select_system.users_without_schema_Name

What is this about?

I am not sure how it happened. I have now updated only Grant test numbers.


pkg/jobs/registry.go, line 416 at r10 (raw file):

Previously, ajwerner wrote…

I don't think we're going to need this with the refactor I'm about to propose.

OK.


pkg/jobs/registry.go, line 424 at r10 (raw file):

Previously, ajwerner wrote…
	start := timeutil.Now()
	if txn != nil {
		start = txn.ReadTimestamp().GoTime()
	}
	modifiedMicros := timeutil.ToUnixMicros(start)

nit: lift this above this call and just pass in the timestamp

Done.


pkg/jobs/registry.go, line 427 at r10 (raw file):

Previously, ajwerner wrote…

I'm going to make a nit-picky suggestion for how to make this feel a little cleaner:

instanceID := r.ID()
const  numColumns = 6
columns := [numColumns]string{`id`, `status`, `payload`, `progress`, `claim_session_id`, `claim_instance_id`}
marshalPanic = func(m proto.Message) []byte {
      data, err := protoutil.Marshal(m)
       if err != nil {
           panic(err)
       }
       return data
 }
valueFns := map[string]func(*Record) interface{}{
       `id`: func(r *Record) interface{} { return r.ID },
       `status`: func(r *Record) interface{} { return StatusRunning },
       `payload`: func(r *Record) interface{} { return marshalPanic(r.Payload) },
       `progress`: func(r *Record) interface{} { return marshalPanic(r.Progress) },
       `claim_session_id`: func(r *Record) interface{} { return sessionID.UnsafeBytes() },
       `claim_instance_id`: func(r *Record) interface{} { return instanceID },
 }
appendValues := func(r *Record, val *[]interface{}) (err error) {
        defer func() {
            switch r := recover(); t.(type) {
            case nil:
            case error:
                err = errors.CombineErrors(err, errors.Wrap(r, "encoding job %d", r.ID)
            default:
                panic(r)
            }
        }()
        for _, c := range columns {
            (*vals) = append(*vals, valueFns[c](r)
        }
        return nil
 }
args := make([]interface{}, 0, len(records) * numColumns)
jobIDs := make([]jobspb.JobID, 0, len(records))

var buf strings.Builder
buf.WriteString(`INSERT INTO system.jobs (`
buf.WriteString(strings.Join(columns, ", "))
buf.WriteString(`) VALUES `)
argIdx := 0
for i, r := range records {
    if i > 0 {
        buf.WriteString(", ")
    }
    buf.WriteString("("
    for _, c := range columns {
         buf.WriteString("$")
         buf.WriteString(strconv.Itoa(argIdx))
         argIdx++
    }
    buf.WriteString(")")
    r.Progress.ModifiedMicros = modifiedMicros
    if err := appendValues(r, &args); err != nil {
        return "", nil, nil, err
    }
    jobIDs = append(jobIDs, job.ID)
}
return stmtSB.String(), args, jobIDs, nil

I have updated accordingly.

@sajjadrizvi sajjadrizvi force-pushed the batch_job_creation branch 2 times, most recently from c48274c to 4e6017e Compare July 30, 2021 20:38
Copy link
Contributor

@ajwerner ajwerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm: mod some nits and CI

Reviewed 6 of 14 files at r12.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @ajwerner and @sajjadrizvi)


pkg/bench/rttanalysis/testdata/benchmark_expectations, line 43 at r12 (raw file):

19,DropSequence/drop_1_sequence
27,DropSequence/drop_2_sequences
35,DropSequence/drop_3_sequences

seems like some of these have changed.


pkg/jobs/registry.go, line 462 at r12 (raw file):

	appendValues := func(rec *Record, vals *[]interface{}) (err error) {
		defer func() {
			switch ret := recover(); ret.(type) {

ret is a confusing name for the recovered value. It's explicitly not something being returned.


pkg/sql/conn_executor.go, line 1092 at r12 (raw file):

		jobs jobsCollection

		// schemaChangeJobsCache is a map of descriptor IDs to job Records.

I wonder if this deserves a new name.


pkg/sql/conn_executor_exec.go, line 809 at r12 (raw file):

// createJobs creates jobs for the records cached in schemaChangeJobsCache
// during this transaction.
func (ex *connExecutor) createJobs(ctx context.Context) error {

nit: I'd put this below commitSQLTransactionInternal. I usually like to read functions which invoke helpers before the helpers.


pkg/sql/planner.go, line 96 at r12 (raw file):

	// SchemaChangeJobCache refers to schemaChangeJobsCache in extraTxnState.
	SchemaChangeJobCache map[descpb.ID]*jobs.Record

In parallel to the other question about names, I wonder if this deserves a new name. SchemaChangeJobRecords?

Copy link
Author

@sajjadrizvi sajjadrizvi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @ajwerner and @sajjadrizvi)


pkg/sql/planner.go, line 96 at r12 (raw file):

Previously, ajwerner wrote…

In parallel to the other question about names, I wonder if this deserves a new name. SchemaChangeJobRecords?

Done, I have updated the name to SchemaChangeJobRecords.

Copy link
Contributor

@ajwerner ajwerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:, bors it when CI passes.

Reviewed 13 of 14 files at r14.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @ajwerner and @sajjadrizvi)

Sajjad Rizvi added 2 commits August 5, 2021 14:18
This commit adds a new function in Registry, 'CreateJobsWithTxn',
that allows creating jobs in a batch using a single SQL statement.
This function is a precursor to creating schema-changing jobs
in a batch, which will result in reducing the number of round-trips
between nodes during schema-changes.

Release note: None
This commit changes the way schema-change jobs are created in a
transaction. Previously, the jobs were created individually during the
execution of a transaction. A job creation incurs a round trip to
insert an entry in jobs table. This commit accumulates the jobs
to be created during a transaction and creates them in a batch, which
is expected to reduce the number of round-trips between distant nodes
during a transaction execution. This is a step forward to improve the
performance of GRANT/REVOKE queries in large databases.

Release note: None

Fixes: cockroachdb#64389
@sajjadrizvi
Copy link
Author

TFTR!

bors r=ajwerner

@craig
Copy link
Contributor

craig bot commented Aug 5, 2021

Build succeeded:

@craig craig bot merged commit d0f2981 into cockroachdb:master Aug 5, 2021
sajjadrizvi pushed a commit to sajjadrizvi/cockroach that referenced this pull request Aug 16, 2021
Commit cockroachdb#67991 introduced a test that turned out to be flaky.
The test runs out of memory sometimes as it creates a very
large batch of jobs. Nevertheless, the test doesn't contribute
much value to the test suite because it only creates a large
batch of jobs. The core functionality is already being tested
with moderately large batch sizes. Therefore, the test is now
removed in this commit.

Release note: None

Fixes: cockroachdb#68962
sajjadrizvi pushed a commit to sajjadrizvi/cockroach that referenced this pull request Aug 17, 2021
Commit cockroachdb#67991 introduced a test that turned out to be flaky.
The test runs out of memory sometimes as it creates a very
large batch of jobs. This fix disables job adoptions to
avoid large memory use.

Release note: None

Fixes: cockroachdb#68962
craig bot pushed a commit that referenced this pull request Aug 17, 2021
68803: sql: add ON UPDATE syntax r=otan a=pawalt

Previously, we did not have parsing implemented for ON UPDATE
expressions on columns. This PR adds parsing for ON UPDATE expressions.

Release note: None

69013: opt: fix panic in findJoinFilterRange r=mgartner a=mgartner

This commit fixes a panic produced in `findJoinFilterRange` when
attempting to access the 0-th constraint in an empty constraint set.

Fixes #68975

There is no release note because the bug is not present in any releases.

Release note: None

69014: jobs: fix test for batch jobs creation, marked as flaky r=ajwerner a=sajjadrizvi

Commit #67991 introduced a test that turned out to be flaky.
The test runs out of memory sometimes as it creates a very
large batch of jobs. This fix disables job adoptions to avoid 
large memory use.

Release note: None

Fixes: #68962

69022: sql: implement combined iterator for transaction statistics r=maryliag a=Azhng

Follow up to #68675

Release note: None

69031: ccl/backupccl: skip TestBackupRestoreSystemJobsProgress r=otan a=otan

Refs: #68571

Reason: flaky test (this is flaking regularly on PRs)

Generated by bin/skip-test.

Release justification: non-production code changes

Release note: None

69034: roachtest/tests/cdc: effectively disable exponential backoff in jobs r=ajwerner a=ajwerner

Fixes #68963.

Release note: None

69045: bazel: swap order of interfaces in generated mocks in `pkg/roachpb` r=rail a=rickystewart

This makes the order match up to what we have in the checked-in
generated code, quashing a diff when you
`dev build --hoist-generated-code`.

Release note: None

Co-authored-by: Peyton Walters <peyton.walters@cockroachlabs.com>
Co-authored-by: Marcus Gartner <marcus@cockroachlabs.com>
Co-authored-by: Sajjad Rizvi <sajjad@cockroachlabs.com>
Co-authored-by: Azhng <archer.xn@gmail.com>
Co-authored-by: Oliver Tan <otan@cockroachlabs.com>
Co-authored-by: Andrew Werner <awerner32@gmail.com>
Co-authored-by: Ricky Stewart <ricky@cockroachlabs.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

jobs: batch job creation
3 participants