Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-2.1: various import/export fixes #30425

Merged
merged 5 commits into from
Sep 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 46 additions & 3 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down Expand Up @@ -572,6 +573,47 @@ COPY t (a, b, c) FROM stdin;
data: "create table s.t (i INT)",
err: `non-public schemas unsupported: s`,
},
{
name: "unsupported type",
typ: "PGDUMP",
data: "create table t (t time with time zone)",
err: `create table t \(t time with time zone\)
\^`,
},
{
name: "various create ignores",
typ: "PGDUMP",
data: `
CREATE TRIGGER conditions_set_updated_at BEFORE UPDATE ON conditions FOR EACH ROW EXECUTE PROCEDURE set_updated_at();
REVOKE ALL ON SEQUENCE knex_migrations_id_seq FROM PUBLIC;
REVOKE ALL ON SEQUENCE knex_migrations_id_seq FROM database;
GRANT ALL ON SEQUENCE knex_migrations_id_seq TO database;
GRANT SELECT ON SEQUENCE knex_migrations_id_seq TO opentrials_readonly;

CREATE FUNCTION public.isnumeric(text) RETURNS boolean
LANGUAGE sql
AS $_$
SELECT $1 ~ '^[0-9]+$'
$_$;
ALTER FUNCTION public.isnumeric(text) OWNER TO roland;

CREATE TABLE t (i INT);
`,
query: map[string][][]string{
`SHOW TABLES`: {{"t"}},
},
},
{
name: "many tables",
typ: "PGDUMP",
data: func() string {
var sb strings.Builder
for i := 1; i <= 100; i++ {
fmt.Fprintf(&sb, "CREATE TABLE t%d ();\n", i)
}
return sb.String()
}(),
},

// Error
{
Expand Down Expand Up @@ -1759,8 +1801,6 @@ func TestImportLivenessWithRestart(t *testing.T) {
t.Fatalf("not all rows were present. Expecting %d, had %d", rows, rowCount)
}

rescheduled := jobutils.GetJobPayload(t, sqlDB, jobID).Details.(*jobspb.Payload_Import).Import

// Verify that all write progress coalesced into a single span
// encompassing the entire table.
spans := rescheduledProgress.Details.(*jobspb.Progress_Import).Import.SpanProgress
Expand All @@ -1769,7 +1809,10 @@ func TestImportLivenessWithRestart(t *testing.T) {
}

// Ensure that an entire table range is marked as complete
tableSpan := rescheduled.Tables[0].Desc.TableSpan()
tableSpan := roachpb.Span{
Key: keys.MinKey,
EndKey: keys.MaxKey,
}
if !tableSpan.EqualValue(spans[0]) {
t.Fatalf("expected entire table to be marked complete, had %s", spans[0])
}
Expand Down
10 changes: 9 additions & 1 deletion pkg/ccl/importccl/read_import_pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (p *postgreStream) Next() (interface{}, error) {
if isIgnoredStatement(t) {
continue
}
return nil, errors.Errorf("%v: (%s)", err, t)
return nil, err
}
switch len(stmts) {
case 0:
Expand Down Expand Up @@ -134,10 +134,15 @@ func (p *postgreStream) Next() (interface{}, error) {
var (
ignoreComments = regexp.MustCompile(`^\s*(--.*)`)
ignoreStatements = []*regexp.Regexp{
regexp.MustCompile("(?i)^alter function"),
regexp.MustCompile("(?i)^alter sequence .* owned by"),
regexp.MustCompile("(?i)^alter table .* owner to"),
regexp.MustCompile("(?i)^comment on"),
regexp.MustCompile("(?i)^create extension"),
regexp.MustCompile("(?i)^create function"),
regexp.MustCompile("(?i)^create trigger"),
regexp.MustCompile("(?i)^grant .* on sequence"),
regexp.MustCompile("(?i)^revoke .* on sequence"),
}
)

Expand Down Expand Up @@ -287,6 +292,9 @@ func readPostgresCreateTable(
return ret, nil
}
if err != nil {
if pg, ok := pgerror.GetPGCause(err); ok {
return nil, errors.Errorf("%s\n%s", pg.Message, pg.Detail)
}
return nil, errors.Wrap(err, "postgres parse error")
}
switch stmt := stmt.(type) {
Expand Down
32 changes: 20 additions & 12 deletions pkg/ccl/importccl/sst_writer_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,19 +272,27 @@ func (sp *sstWriter) Run(ctx context.Context, wg *sync.WaitGroup) {
key := roachpb.Key(samples[i])
return key.Compare(span.End) >= 0
})
finished := roachpb.Span{EndKey: span.End}
// Special case: if we're processing the first span, we want
// to mark the table key itself as being complete. This
// means that at the end of importing, the table's entire
// key-space will be marked as complete.
if idx == 0 {
_, table, err := keys.DecodeTablePrefix(span.End)
if err != nil {
return errors.Wrapf(err, "expected a table key, had %s", span.End)
var finished roachpb.Span
// Mark the processed span as done for resume. If it was the first or last
// span, use min or max key. This is easier than trying to correctly determine
// the table ID we imported and getting its start span because span.End
// might be (in the case of an empty table) the start key of the next table.
switch idx {
case 0:
finished = roachpb.Span{
Key: keys.MinKey,
EndKey: span.End,
}
case len(samples):
finished = roachpb.Span{
Key: samples[idx-1],
EndKey: keys.MaxKey,
}
default:
finished = roachpb.Span{
Key: samples[idx-1],
EndKey: span.End,
}
finished.Key = keys.MakeTablePrefix(uint32(table))
} else {
finished.Key = samples[idx-1]
}
var sg roachpb.SpanGroup
sg.Add(d.SpanProgress...)
Expand Down
11 changes: 11 additions & 0 deletions pkg/sql/distsql_plan_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package sql

import (
"bytes"
"context"
"fmt"
"math"
Expand Down Expand Up @@ -325,6 +326,16 @@ func LoadCSV(
splits = append(splits, samples...)
sort.Slice(splits, func(a, b int) bool { return roachpb.Key(splits[a]).Compare(splits[b]) < 0 })

// Remove duplicates. These occur when the end span of a descriptor is the
// same as the start span of another.
origSplits := splits
splits = splits[:0]
for _, x := range origSplits {
if len(splits) == 0 || !bytes.Equal(x, splits[len(splits)-1]) {
splits = append(splits, x)
}
}

// jobSpans is a slice of split points, including table start and end keys
// for the table. We create router range spans then from taking each pair
// of adjacent keys.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/parser/sql.y
Original file line number Diff line number Diff line change
Expand Up @@ -1741,7 +1741,7 @@ import_stmt:
// %Help: EXPORT - export data to file in a distributed manner
// %Category: CCL
// %Text:
// EXPORT <format> (<datafile> [WITH <option> [= value] [,...]]) FROM <query>
// EXPORT INTO <format> (<datafile> [WITH <option> [= value] [,...]]) FROM <query>
//
// Formats:
// CSV
Expand Down
11 changes: 0 additions & 11 deletions pkg/testutils/jobutils/jobs_verification.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,3 @@ func GetJobProgress(t *testing.T, db *sqlutils.SQLRunner, jobID int64) *jobspb.P
}
return ret
}

// GetJobPayload loads the Payload message associated with the job.
func GetJobPayload(t *testing.T, db *sqlutils.SQLRunner, jobID int64) *jobspb.Payload {
ret := &jobspb.Payload{}
var buf []byte
db.QueryRow(t, `SELECT payload FROM system.jobs WHERE id = $1`, jobID).Scan(&buf)
if err := protoutil.Unmarshal(buf, ret); err != nil {
t.Fatal(err)
}
return ret
}