New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importccl: implement EXPORT csv #24755

Closed
wants to merge 2 commits into
base: master
from
Jump to file or symbol
Failed to load files and symbols.
+1,073 −255
Diff settings

Always

Just for now

Viewing a subset of changes. View all
Prev

importccl: implement EXPORT CSV

This adds a DistSQL processor that writes whatever it is given as CSV
files to a specified storage location.

This processor can be appended as a sink for arbitrary SELECT queries.

The primary intended use-case is dumping tables as CSV for bulk-export,
but it can also be used on any SELECT query, so it could also be used in
report generation or other workflows.

Release note (enterprise change): prototype support for EXPORT CSV.
  • Loading branch information...
dt committed Apr 11, 2018
commit c40ab73b623aaac32faf0bb3c94ed0b0c5cb356d
@@ -0,0 +1,322 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt
package importccl
import (
"bytes"
"context"
"encoding/csv"
"fmt"
"strconv"
"strings"
"sync"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/pkg/errors"
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sem/types"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)
// exportHeader is the header for EXPORT stmt results.
var exportHeader = sqlbase.ResultColumns{
{Name: "filename", Typ: types.String},
{Name: "rows", Typ: types.Int},
{Name: "bytes", Typ: types.Int},
}
const (
exportOptionDelimiter = "delimiter"
exportOptionChunkSize = "chunk_rows"
exportOptionFileName = "filename"
)
var exportOptionExpectValues = map[string]bool{
exportOptionDelimiter: true,
exportOptionChunkSize: true,
exportOptionFileName: true,
}
const exportChunkSizeDefault = 100000
const exportFilePatternPart = "%part%"
const exportFilePatternDefault = exportFilePatternPart + ".csv"
// exportPlanHook implements sql.PlanHook.
func exportPlanHook(
ctx context.Context, stmt tree.Statement, p sql.PlanHookState,
) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, error) {
exportStmt, ok := stmt.(*tree.Export)
if !ok {
return nil, nil, nil, nil
}
fileFn, err := p.TypeAsString(exportStmt.File, "EXPORT")
if err != nil {
return nil, nil, nil, err
}
if exportStmt.FileFormat != "CSV" {
// not possible with current parser rules.
return nil, nil, nil, errors.Errorf("unsupported import format: %q", exportStmt.FileFormat)
}
optsFn, err := p.TypeAsStringOpts(exportStmt.Options, exportOptionExpectValues)
if err != nil {
return nil, nil, nil, err
}
sel, err := p.Select(ctx, exportStmt.Query, nil)
if err != nil {
return nil, nil, nil, err
}
if !p.DistSQLPlanner().CheckPossible(sel) {
return nil, nil, nil, errors.Errorf("unsupported EXPORT query -- as an alternative try `cockroach sql --format=csv`")
}
fn := func(ctx context.Context, plans []sql.PlanNode, resultsCh chan<- tree.Datums) error {
ctx, span := tracing.ChildSpan(ctx, exportStmt.StatementTag())
defer tracing.FinishSpan(span)
if err := utilccl.CheckEnterpriseEnabled(
p.ExecCfg().Settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(), "EXPORT",
); err != nil {
return err
}
if err := p.RequireSuperUser(ctx, "EXPORT"); err != nil {
return err
}
if !p.ExtendedEvalContext().TxnImplicit {
return errors.Errorf("EXPORT cannot be used inside a transaction")
}
file, err := fileFn()
if err != nil {
return err
}
opts, err := optsFn()
if err != nil {
return err
}
delim := ','
if override, ok := opts[exportOptionDelimiter]; ok {
delim, err = util.GetSingleRune(override)
if err != nil {
return pgerror.NewError(pgerror.CodeInvalidParameterValueError, "invalid delimiter")
}
}
chunk := exportChunkSizeDefault
if override, ok := opts[exportOptionChunkSize]; ok {
chunk, err = strconv.Atoi(override)
if err != nil {
return pgerror.NewError(pgerror.CodeInvalidParameterValueError, err.Error())
}
if chunk < 1 {
return pgerror.NewError(pgerror.CodeInvalidParameterValueError, "invalid csv chunk size")
}
}
out := distsqlrun.ProcessorCoreUnion{CSVWriter: &distsqlrun.CSVWriterSpec{
Destination: file,
NamePattern: exportFilePatternDefault,
Delimiter: delim,
ChunkRows: int64(chunk),
}}
rows := sqlbase.NewRowContainer(
p.ExtendedEvalContext().Mon.MakeBoundAccount(), sqlbase.ColTypeInfoFromColTypes(sql.ExportPlanResultTypes), 0,
)
rw := sql.NewRowResultWriter(rows)
if err := sql.PlanAndRunExport(
ctx, p.DistSQLPlanner(), p.ExecCfg(), p.Txn(), p.ExtendedEvalContext(), plans[0], out, rw,
); err != nil {
return err
}
for i := 0; i < rows.Len(); i++ {
resultsCh <- rows.At(i)
}
rows.Close(ctx)
return rw.Err()
}
return fn, exportHeader, []sql.PlanNode{sel}, nil
}
func newCSVWriterProcessor(
flowCtx *distsqlrun.FlowCtx,
spec distsqlrun.CSVWriterSpec,
input distsqlrun.RowSource,
output distsqlrun.RowReceiver,
) (distsqlrun.Processor, error) {
c := &csvWriter{
flowCtx: flowCtx,
spec: spec,
input: input,
output: output,
}
if err := c.out.Init(&distsqlrun.PostProcessSpec{}, sql.ExportPlanResultTypes, flowCtx.NewEvalCtx(), output); err != nil {
return nil, err
}
return c, nil
}
type csvWriter struct {
flowCtx *distsqlrun.FlowCtx
spec distsqlrun.CSVWriterSpec
input distsqlrun.RowSource
out distsqlrun.ProcOutputHelper
output distsqlrun.RowReceiver
}
var _ distsqlrun.Processor = &csvWriter{}
func (sp *csvWriter) OutputTypes() []sqlbase.ColumnType {
return sql.ExportPlanResultTypes
}
func (sp *csvWriter) Run(ctx context.Context, wg *sync.WaitGroup) {
ctx, span := tracing.ChildSpan(ctx, "csvWriter")
defer tracing.FinishSpan(span)
if wg != nil {
defer wg.Done()
}
err := func() error {
pattern := exportFilePatternDefault
if sp.spec.NamePattern != "" {
pattern = sp.spec.NamePattern
}
types := sp.input.OutputTypes()
sp.input.Start(ctx)
input := distsqlrun.MakeNoMetadataRowSource(sp.input, sp.output)
alloc := &sqlbase.DatumAlloc{}
var buf bytes.Buffer
writer := csv.NewWriter(&buf)
writer.Comma = sp.spec.Delimiter
f := tree.NewFmtCtxWithBuf(tree.FmtBareStrings)
defer f.Close()
csvRow := make([]string, len(types))
chunk := 0
done := false
for {
var rows int64
buf.Reset()
for {
if sp.spec.ChunkRows > 0 && rows >= sp.spec.ChunkRows {

This comment has been minimized.

@mjibson

mjibson Apr 17, 2018

Member

this could produce some very large csv files in memory if this sink gets many rows. is it possible to stream the writes to the export storage instead so we have more constant memory usage?

edit: oh i see it needs a io.ReadSeeker. hmm. well then we need some hard coded size after which we do a write or something.

@mjibson

mjibson Apr 17, 2018

Member

this could produce some very large csv files in memory if this sink gets many rows. is it possible to stream the writes to the export storage instead so we have more constant memory usage?

edit: oh i see it needs a io.ReadSeeker. hmm. well then we need some hard coded size after which we do a write or something.

This comment has been minimized.

@dt

dt Apr 17, 2018

Member

Yeah, when we added retry to our uploads, we lost streaming since we don't want to obligate the producer be able to re-produce arbitrary amounts.

Maybe we just add a default chunk size of e.g. 10000.

@dt

dt Apr 17, 2018

Member

Yeah, when we added retry to our uploads, we lost streaming since we don't want to obligate the producer be able to re-produce arbitrary amounts.

Maybe we just add a default chunk size of e.g. 10000.

This comment has been minimized.

@mjibson

mjibson Apr 17, 2018

Member

I was thinking buf.Len so we care about bytes more than row count. But whatever.

@mjibson

mjibson Apr 17, 2018

Member

I was thinking buf.Len so we care about bytes more than row count. But whatever.

break
}
row, err := input.NextRow()
if err != nil {
return err
}
if row == nil {
done = true
break
}
rows++
for i, ed := range row {
if err := ed.EnsureDecoded(&types[i], alloc); err != nil {
return err
}
ed.Datum.Format(&f.FmtCtx)
// TODO(mjibson, dt): this relies on the ctx above's FmtBareStrings meaning that it
// will round-trip with IMPORT. We should replace all the csv/dump encode/decode
// handling with a centralized pair of handlers in sql that are tested, in isolation,
// to always round-trip with each other via ASCII-only representations that look like
// the example literals in our docs / SQL if possible.
csvRow[i] = f.String()
f.Reset()
}
if err := writer.Write(csvRow); err != nil {
return err
}
}
if rows < 1 {
break
}
writer.Flush()
conf, err := storageccl.ExportStorageConfFromURI(sp.spec.Destination)
if err != nil {
return err
}
es, err := storageccl.MakeExportStorage(ctx, conf, sp.flowCtx.Settings)
if err != nil {
return err
}
defer es.Close()
size := buf.Len()
part := fmt.Sprintf("n%d.%d", sp.flowCtx.EvalCtx.NodeID, chunk)
chunk++
filename := strings.Replace(pattern, exportFilePatternPart, part, -1)
if err := es.WriteFile(ctx, filename, bytes.NewReader(buf.Bytes())); err != nil {
return err
}
res := sqlbase.EncDatumRow{
sqlbase.DatumToEncDatum(
sqlbase.ColumnType{SemanticType: sqlbase.ColumnType_STRING},
tree.NewDString(filename),
),
sqlbase.DatumToEncDatum(
sqlbase.ColumnType{SemanticType: sqlbase.ColumnType_INT},
tree.NewDInt(tree.DInt(rows)),
),
sqlbase.DatumToEncDatum(
sqlbase.ColumnType{SemanticType: sqlbase.ColumnType_INT},
tree.NewDInt(tree.DInt(size)),
),
}
cs, err := sp.out.EmitRow(ctx, res)
if err != nil {
return err
}
if cs != distsqlrun.NeedMoreRows {
return errors.New("unexpected closure of consumer")
}
if done {
break
}
}
return nil
}()
// TODO(dt): pick up tracing info in trailing meta
distsqlrun.DrainAndClose(
ctx, sp.output, err, func(context.Context) {} /* pushTrailingMeta */, sp.input)
}
func init() {
sql.AddPlanHook(exportPlanHook)
distsqlrun.NewCSVWriterProcessor = newCSVWriterProcessor
}
Oops, something went wrong.
ProTip! Use n and p to navigate between commits in a pull request.