Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importccl: add direct-ingest, no sort IMPORT mode prototype #34751

Merged
merged 3 commits into from Feb 14, 2019

Conversation

Projects
None yet
4 participants
@dt
Copy link
Member

commented Feb 8, 2019

This adds a prototype of a no-sort IMPORT where SSTs are generated and added directly from the readers. Each reader builds a batch of keys, sorts it and makes it into an SST and sends it.
This means ranges receive many overlapping SSTs — i.e. one from each batch in each reader — and are charged with compacting them, potentially increasing write amplification — but avoids pushing all the KV data into distsql and sorting and buffering it in its entirety to produce perfectly non-overlapping SSTs.

The lack of a sort and conflict detection in adding SSTs also means that we may no longer observe uniqueness/PK conflicts — if two different reader batches produce the same key, the one in second SST applied would simply shadow the first. This issue was worked around in index backfills by counting the resulting index entries and comparing that to the number of rows in the table — since every row produced an entry, if one entry shadowed another, the counts would not line up. However we will need to determine a way detect conflicts during IMPORT before we could use this approach as the default if we want to correctly return uniqueness violations.

This is an early prototype. All of the new functionality is behind a flag, so we potentially could merge it with the above issue unaddressed — to allow easier experimenting, tuning and benchmarking.

Release note: none.

@dt dt requested review from mjibson and danhhz Feb 8, 2019

@dt dt requested review from cockroachdb/core-prs as code owners Feb 8, 2019

@cockroach-teamcity

This comment has been minimized.

Copy link
Member

commented Feb 8, 2019

This change is Reviewable

@danhhz
Copy link
Contributor

left a comment

This also is going to need a cluster version check to be safe against one being started during a rolling upgrade. It also needs some amount of testing, but I'm assuming you were just waiting on that to see what we thought of the approach.

I'm in favor of merging it, so we can easily run larger scale correctness and performance tests. Also assuming we can keep this from making it harder to reason about the production codepath (mainly, the big if cp.spec.IngestDirectlyin readImportDataProcessor makes it harder to read than it already was).

For our own internal usage, can you think of any reason this would be incorrect as-is for loading tpcc? We know tpcc doesn't make any uniqueness violations.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @danhhz, @dt, and @mjibson)


pkg/ccl/importccl/read_import_proc.go, line 481 at r3 (raw file):

		defer tracing.FinishSpan(span)

		if cp.spec.IngestDirectly {

we should find a way to clean this up before merging


pkg/ccl/importccl/read_import_proc.go, line 525 at r3 (raw file):

			if len(buf) > 0 {
				sort.Sort(buf)
				for i := range buf {

pull this bit into a method-local func


pkg/jobs/jobspb/jobs.proto, line 99 at r3 (raw file):

  // to add ranges with an old split point within them.
  repeated bytes samples = 8;
  bool ingest_directly = 11;

needs a comment


pkg/sql/distsql_plan_csv.go, line 703 at r3 (raw file):

	dsp.FinalizePlan(planCtx, &p)

	// Update job details with the sampled keys, as well as any parsed tables,

no sampled keys, right?

what happens to the job progress? as this thing runs?


pkg/sql/distsqlpb/processors.proto, line 664 at r3 (raw file):

  optional int64 walltimeNanos = 11 [(gogoproto.nullable) = false];

  optional bool ingestDirectly = 12 [(gogoproto.nullable) = false];

ditto this wants a comment

@dt

This comment has been minimized.

Copy link
Member Author

commented Feb 9, 2019

I thought about putting the whole thing behind a cluster setting instead of a query param -- that would avoid the churn in the job record and distsql spec, but I thought maybe it'd be easier to turn it on query-by-query? I dunno, could go either way.

I should have also noted: this scrapped most progress stuff. I think wrap reads on the readers though a counter so we have an idea of how much we've read on each reader. When we know the file length, I think for this single-pass import that almost works out to be the actual progress give-or-take the flushing of the chan and sst intermedia buffers, so when we have file length, I think we can get progress back. We we don't, I think it will just have to jump from 0 to done (per reader at least).

I think if/when we actually switch to this approach and delete the old code, we'll want to scrap transform -- with how different this ends up looking from a backup, I think we'd stop trying to make it make one. If we still want to generate backups for fixture generation, I'd vote we should import -> backup -- that will burn a few more cpu cycles but only when we regenerate fixtures, and would simplify the code.

@dt

This comment has been minimized.

Copy link
Member Author

commented Feb 9, 2019

it occurs to me, even without help from addsstable, I think we could fix the uniqueness violation with the same approach we currently use in addindex: counting the results. Initially I thought that didn't apply where since we didn't have a known row-count to compare to but we actually do -- we can count rows as we import, in each reader as we pass them to the row-converter to ingest, so we could pretty trivially know how many rows we expect to get back.

that said, I think I'd stil rather teach addSSTable to help us here since it'd make it much easier to report the offending row than just a "got 5, expected 6".

@mjibson
Copy link
Member

left a comment

Overall approach lgtm. Can you add at least a on-laptop perf improvement benchstat to the commit message? Also, in light of Pete's recent email about release cycles, is this something that is planned to be in the next release?

// TODO(dt): buffer to disk instead of all in-mem.

writeTS := hlc.Timestamp{WallTime: cp.spec.WalltimeNanos}
adder, err := cp.flowCtx.BulkAdder(ctx, cp.flowCtx.ClientDB, 32<<20, writeTS)

This comment has been minimized.

Copy link
@mjibson

mjibson Feb 11, 2019

Member

inline comment on the 32<<20

buf = append(buf, kvBatch...)

if len(buf) > sortBatchSize {
sort.Sort(buf)

This comment has been minimized.

Copy link
@mjibson

mjibson Feb 11, 2019

Member

This inner block could be moved to a common func so it shares impl code with the same block below.

@dt

This comment has been minimized.

Copy link
Member Author

commented Feb 11, 2019

w.r.t stabilization period merge policy -- modulo cleanup concerns, I think if we keep it behind a setting or flag, the stability risk to what we're shipping is minimal. If we do go with a flag over a setting, having the additional proto fields in proto when we cut the branch would reduce back-port conflicts.

@danhhz
Copy link
Contributor

left a comment

I thought about putting the whole thing behind a cluster setting instead of a query param -- that would avoid the churn in the job record and distsql spec, but I thought maybe it'd be easier to turn it on query-by-query?

I lean toward query param, being able to opt in a single query on a cluster seems like good flexibility given how experimental this will merge as.

I think if/when we actually switch to this approach and delete the old code, we'll want to scrap transform.

Fine with me. fixtures import is way less hassle than fixtures restore and at some point I hope to make it as fast or faster. As you mention, import followed by backup is a fine replacement.

it occurs to me, even without help from addsstable, I think we could fix the uniqueness violation with the same approach we currently use in addindex: counting the results. Initially I thought that didn't apply where since we didn't have a known row-count to compare to but we actually do -- we can count rows as we import, in each reader as we pass them to the row-converter to ingest, so we could pretty trivially know how many rows we expect to get back.

that said, I think I'd stil rather teach addSSTable to help us here since it'd make it much easier to report the offending row than just a "got 5, expected 6".

Oooh, I like that. Yes, the better error message would be lovely, but any changes to AddSSTable will require careful thought, so this might be an okay interim placeholder. I also would like to keep flexibility for now about splitting as much work as we can out of AddSSTable (mvcc stat estimates, etc) while we're investigating perf stuff. If we moved to a world where mvcc stats were estimated during AddSSTable and fixed up in a later pass, that pass could also do these uniqueness violations, to reduce the number of times we have to read through every bit of data in rocksdb to once.

I think if we keep it behind a setting or flag, the stability risk to what we're shipping is minimal. If we do go with a flag over a setting, having the additional proto fields in proto when we cut the branch would reduce back-port conflicts.

Agreed. I'm fine with merging this now behind a flag, modulo addressing my comments about making it little more clear that the code changes don't affect the prod codepath

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @dt and @mjibson)

@mjibson

This comment has been minimized.

Copy link
Member

commented Feb 11, 2019

SGTM, merge away.

@dt
Copy link
Member Author

left a comment

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @danhhz and @mjibson)


pkg/ccl/importccl/read_import_proc.go, line 481 at r3 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

we should find a way to clean this up before merging

Moving buffer/flush/etc logic to a func helped a little. Moving the whole thing (i.e. the stuff that reads spec, emits to the outputs) felt a little clumsy. If we wanted to go further, I'd just pull this out and start one of two go-routines to instead of the if but I think pulling the actual drain out helped some.


pkg/ccl/importccl/read_import_proc.go, line 489 at r3 (raw file):

Previously, mjibson (Matt Jibson) wrote…

inline comment on the 32<<20

Done.


pkg/ccl/importccl/read_import_proc.go, line 504 at r3 (raw file):

Previously, mjibson (Matt Jibson) wrote…

This inner block could be moved to a common func so it shares impl code with the same block below.

Done.


pkg/ccl/importccl/read_import_proc.go, line 525 at r3 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

pull this bit into a method-local func

Done.


pkg/jobs/jobspb/jobs.proto, line 99 at r3 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

needs a comment

Done.


pkg/sql/distsql_plan_csv.go, line 703 at r3 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

no sampled keys, right?

what happens to the job progress? as this thing runs?

Right.
Progress-wise, we don't do much -- we no longer really have "steps" we complete along the way since it is jut a single pass now. If we know file length, I think we can update progress with how much we've read. I'm fine with minimal progress support while this is experimental though and only focusing on that once it stops being side-by-side with the more complicated impl.


pkg/sql/distsqlpb/processors.proto, line 664 at r3 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

ditto this wants a comment

Done.

@mjibson
Copy link
Member

left a comment

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @danhhz, @dt, and @mjibson)


pkg/sql/distsql_plan_csv.go, line 703 at r3 (raw file):

Previously, dt (David Taylor) wrote…

Right.
Progress-wise, we don't do much -- we no longer really have "steps" we complete along the way since it is jut a single pass now. If we know file length, I think we can update progress with how much we've read. I'm fine with minimal progress support while this is experimental though and only focusing on that once it stops being side-by-side with the more complicated impl.

We should definitely be able to do the normal file length progress just like the sampling phase does. Also 100% ok with not shipping that until later.

@danhhz
Copy link
Contributor

left a comment

:lgtm_strong:

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @danhhz, @dt, and @mjibson)


pkg/ccl/importccl/import_stmt.go, line 699 at r5 (raw file):

		}

		var ingestDirectly bool

nit: _, ingestDirectly := opts[importOptionDirectIngest]


pkg/ccl/importccl/read_import_proc.go, line 476 at r5 (raw file):

	// Sample KVs
	group.GoCtx(func(ctx context.Context) error {

I think we could go one step further. Something like:

if cp.spec.IngestDirectly{
  group.GoCtx(func(ctx context.Context) error {
    ...
  }
} else {
  group.GoCtx(func(ctx context.Context) error {
    ...
  }  
}
return group.Wait()

but I'll make my peace if you decide to merge as-is.


pkg/ccl/importccl/read_import_proc.go, line 482 at r5 (raw file):

		defer tracing.FinishSpan(span)

		// IngestDirectly means this reader will just ingest the whatever the KVs

nit: "ingest the whatever the"


pkg/ccl/importccl/read_import_proc.go, line 484 at r5 (raw file):

		// IngestDirectly means this reader will just ingest the whatever the KVs
		// producer emitted, and the only result we push into distsql at the end is
		// an encoded BulkOpSummary of what we ingested.

nit: "is an encoded" -> "is one row with an encoded" (or otherwise specify one row gets output)


pkg/sql/distsql_plan_csv.go, line 703 at r3 (raw file):

Previously, mjibson (Matt Jibson) wrote…

We should definitely be able to do the normal file length progress just like the sampling phase does. Also 100% ok with not shipping that until later.

The workload ExportStorage impl is the one I most care about and doesn't support file length, so also okay with saving this for later


pkg/sql/distsql_plan_csv.go, line 662 at r5 (raw file):

}

// DistIngest blah.

"blah"


pkg/sql/distsql_plan_csv.go, line 674 at r5 (raw file):

	ctx = logtags.AddTag(ctx, "import-distsql-ingest", nil)

	ctx = logtags.AddTag(ctx, "import-distsql", nil)

adding both of these tags intentional?


pkg/sql/distsqlpb/processors.proto, line 664 at r3 (raw file):

Previously, dt (David Taylor) wrote…

Done.

Doesn't look like this one made it

@dt dt force-pushed the dt:import-sst branch from bd7ef1c to bd80ded Feb 14, 2019

@dt
Copy link
Member Author

left a comment

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @danhhz and @mjibson)


pkg/ccl/importccl/import_stmt.go, line 699 at r5 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

nit: _, ingestDirectly := opts[importOptionDirectIngest]

Done.


pkg/ccl/importccl/read_import_proc.go, line 476 at r5 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

I think we could go one step further. Something like:

if cp.spec.IngestDirectly{
  group.GoCtx(func(ctx context.Context) error {
    ...
  }
} else {
  group.GoCtx(func(ctx context.Context) error {
    ...
  }  
}
return group.Wait()

but I'll make my peace if you decide to merge as-is.

yeah ,that's what I meant below with the If we wanted to go further, I'd just pull this out and start one of two go-routines to instead of the if. I was initially avoiding doing that since wrapping the whole old block in an else made a big diff and I was initially trying to keep it to the minimum churn in existing code. Did it now though if there are two of us who like the pattern and the majority of that diff is just the whitespace of indenting it.


pkg/ccl/importccl/read_import_proc.go, line 482 at r5 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

nit: "ingest the whatever the"

Done.


pkg/ccl/importccl/read_import_proc.go, line 484 at r5 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

nit: "is an encoded" -> "is one row with an encoded" (or otherwise specify one row gets output)

Done.


pkg/sql/distsql_plan_csv.go, line 703 at r3 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

The workload ExportStorage impl is the one I most care about and doesn't support file length, so also okay with saving this for later

yeah, I'm punting for now -- experimental direct ingest jobs will just be done when they're done unless file read progress just happens to work, but I don't want to mess with it for now.

I'm happy to work on actually making progress work after the 19.1 branch is cut, if we're confident we'll be making this the default and can delete the old IMPORT code and it's more complex notions of progress (sampling vs read vs write). For now, I don't want to mess with it.

fwiw, I've been thinking a bit about the workload usecase and I think the API separation that I introduced when we added the different format frontends (mysqlout/pgdump/etc) is actually pretty well suited for making a workload frontend: it gets a chan that it puts KVs on (which I think is along the lines of what dan has mentioned as far as skipping the strings/csv/parsing/datum/etc overhead) as well as a func (progress float64) it can call to indicate how far it is, which I think a workload generator could trivially do. Anyway, this is just to say, when we get there, I'm confident we can "fix" progress but don't want to think about a nice-to-have yet.


pkg/sql/distsql_plan_csv.go, line 662 at r5 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

"blah"

Done.


pkg/sql/distsql_plan_csv.go, line 674 at r5 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

adding both of these tags intentional?

Nope, thanks.


pkg/sql/distsqlpb/processors.proto, line 664 at r3 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

Doesn't look like this one made it

Done.

@danhhz
Copy link
Contributor

left a comment

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @danhhz, @dt, and @mjibson)


pkg/ccl/importccl/read_import_proc.go, line 476 at r5 (raw file):

Previously, dt (David Taylor) wrote…

yeah ,that's what I meant below with the If we wanted to go further, I'd just pull this out and start one of two go-routines to instead of the if. I was initially avoiding doing that since wrapping the whole old block in an else made a big diff and I was initially trying to keep it to the minimum churn in existing code. Did it now though if there are two of us who like the pattern and the majority of that diff is just the whitespace of indenting it.

reviewable has many faults, but it's pretty good about making it obvious when something simply got indented

@dt
Copy link
Member Author

left a comment

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @danhhz, @dt, and @mjibson)


pkg/sql/distsql_plan_csv.go, line 703 at r3 (raw file):

Previously, dt (David Taylor) wrote…

yeah, I'm punting for now -- experimental direct ingest jobs will just be done when they're done unless file read progress just happens to work, but I don't want to mess with it for now.

I'm happy to work on actually making progress work after the 19.1 branch is cut, if we're confident we'll be making this the default and can delete the old IMPORT code and it's more complex notions of progress (sampling vs read vs write). For now, I don't want to mess with it.

fwiw, I've been thinking a bit about the workload usecase and I think the API separation that I introduced when we added the different format frontends (mysqlout/pgdump/etc) is actually pretty well suited for making a workload frontend: it gets a chan that it puts KVs on (which I think is along the lines of what dan has mentioned as far as skipping the strings/csv/parsing/datum/etc overhead) as well as a func (progress float64) it can call to indicate how far it is, which I think a workload generator could trivially do. Anyway, this is just to say, when we get there, I'm confident we can "fix" progress but don't want to think about a nice-to-have yet.

I got curious to. see if indeed a dedicated frontend would make it as easy as I thought to fix progress and. indeed, it looks like this shouldn't be too tricky: a451831#diff-e37e9ef47c748ff28608fcc3e8407922R72

@danhhz

This comment has been minimized.

Copy link
Contributor

commented Feb 14, 2019

Oh, I almost forgot. This should get a cluster version check to protect against using the new ingest_directly field on ImportDetails during a rolling upgrade.

I got curious to. see if indeed a dedicated frontend would make it as easy as I thought to fix progress and. indeed, it looks like this shouldn't be too tricky: a451831#diff-e37e9ef47c748ff28608fcc3e8407922R72

Nice!

@dt
Copy link
Member Author

left a comment

Added cluster version check.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @danhhz and @mjibson)

@dt dt force-pushed the dt:import-sst branch from 671ecdf to 2def7ae Feb 14, 2019

dt added some commits Feb 7, 2019

importccl: add direct-ingest, no sort IMPORT mode
This adds a prototype of a no-sort IMPORT where SSTs are generated and added directly from the readers. Each reader builds a batch of keys, sorts it and makes it into an SST and sends it.
This means ranges receive many overlapping SSTs — i.e. one from each batch in each reader — and are charged with compacting them, potentially increasing write amplification — but avoids pushing all the KV data into distsql and sorting and buffering it in its entirety to produce perfectly non-overlapping SSTs.

The lack of a sort and conflict detection in adding SSTs also means that we may no longer observe uniqueness/PK conflicts — if two different reader batches produce the same key, the one in second SST applied would simply shadow the first. This issue was worked around in index backfills by counting the resulting index entries and comparing that to the number of rows in the table — since every row produced an entry, if one entry shadowed another, the counts would not line up. However we will need to determine a way detect conflicts during IMPORT before we could use this approach as the default if we want to correctly return uniqueness violations.

This is an early prototype. All of the new functionality is behind a flag, so we potentially could merge it with the above issue unaddressed — to allow easier experimenting, tuning and benchmarking.

Release note: none.

@dt dt force-pushed the dt:import-sst branch 2 times, most recently from 1c1eec3 to dda42c9 Feb 14, 2019

importccl: add a cluster version check to direct_ingest
direct_ingest import jobs should only be run on clusters where all nodes support them.

Release note: none.

@dt dt force-pushed the dt:import-sst branch from dda42c9 to 34776d0 Feb 14, 2019

@dt

This comment has been minimized.

Copy link
Member Author

commented Feb 14, 2019

anything left to do here?

@danhhz

This comment has been minimized.

Copy link
Contributor

commented Feb 14, 2019

merge it!

@dt

This comment has been minimized.

Copy link
Member Author

commented Feb 14, 2019

bors r+

craig bot pushed a commit that referenced this pull request Feb 14, 2019

Merge #34751
34751:  importccl: add direct-ingest, no sort IMPORT mode prototype r=dt a=dt

This adds a prototype of a no-sort IMPORT where SSTs are generated and added directly from the readers. Each reader builds a batch of keys, sorts it and makes it into an SST and sends it.
This means ranges receive many overlapping SSTs — i.e. one from each batch in each reader — and are charged with compacting them, potentially increasing write amplification — but avoids pushing all the KV data into distsql and sorting and buffering it in its entirety to produce perfectly non-overlapping SSTs.

The lack of a sort and conflict detection in adding SSTs also means that we may no longer observe uniqueness/PK conflicts — if two different reader batches produce the same key, the one in second SST applied would simply shadow the first. This issue was worked around in index backfills by counting the resulting index entries and comparing that to the number of rows in the table — since every row produced an entry, if one entry shadowed another, the counts would not line up. However we will need to determine a way detect conflicts during IMPORT before we could use this approach as the default if we want to correctly return uniqueness violations.

This is an early prototype. All of the new functionality is behind a flag, so we potentially could merge it with the above issue unaddressed — to allow easier experimenting, tuning and benchmarking.

Release note: none.

Co-authored-by: David Taylor <tinystatemachine@gmail.com>
@craig

This comment has been minimized.

Copy link

commented Feb 14, 2019

Build succeeded

@craig craig bot merged commit 34776d0 into cockroachdb:master Feb 14, 2019

3 checks passed

GitHub CI (Cockroach) TeamCity build finished
Details
bors Build succeeded
Details
license/cla Contributor License Agreement is signed.
Details

@dt dt deleted the dt:import-sst branch Feb 14, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.