New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importccl: implement EXPORT csv #24755

Closed
wants to merge 2 commits into
base: master
from
Jump to file or symbol
Failed to load files and symbols.
+1,086 −274
Diff settings

Always

Just for now

@@ -955,8 +955,9 @@ func doDistributedCSVTransform(
}
}()
if err := p.DistLoader().LoadCSV(
if err := sql.LoadCSV(
ctx,
p.DistSQLPlanner(),
job,
p.ExecCfg().DB,
evalCtx,
@@ -0,0 +1,322 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt
package importccl
import (
"bytes"
"context"
"encoding/csv"
"fmt"
"strconv"
"strings"
"sync"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/pkg/errors"
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sem/types"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)
// exportHeader is the header for EXPORT stmt results.
var exportHeader = sqlbase.ResultColumns{
{Name: "filename", Typ: types.String},
{Name: "rows", Typ: types.Int},
{Name: "bytes", Typ: types.Int},
}
const (
exportOptionDelimiter = "delimiter"
exportOptionChunkSize = "chunk_rows"
exportOptionFileName = "filename"
)
var exportOptionExpectValues = map[string]bool{
exportOptionDelimiter: true,
exportOptionChunkSize: true,
exportOptionFileName: true,
}
const exportChunkSizeDefault = 100000
const exportFilePatternPart = "%part%"
const exportFilePatternDefault = exportFilePatternPart + ".csv"
// exportPlanHook implements sql.PlanHook.
func exportPlanHook(
ctx context.Context, stmt tree.Statement, p sql.PlanHookState,
) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, error) {
exportStmt, ok := stmt.(*tree.Export)
if !ok {
return nil, nil, nil, nil
}
fileFn, err := p.TypeAsString(exportStmt.File, "EXPORT")
if err != nil {
return nil, nil, nil, err
}
if exportStmt.FileFormat != "CSV" {
// not possible with current parser rules.
return nil, nil, nil, errors.Errorf("unsupported import format: %q", exportStmt.FileFormat)
}
optsFn, err := p.TypeAsStringOpts(exportStmt.Options, exportOptionExpectValues)
if err != nil {
return nil, nil, nil, err
}
sel, err := p.Select(ctx, exportStmt.Query, nil)
if err != nil {
return nil, nil, nil, err
}
if !p.DistSQLPlanner().CheckPossible(sel) {
return nil, nil, nil, errors.Errorf("unsupported EXPORT query -- as an alternative try `cockroach sql --format=csv`")
}
fn := func(ctx context.Context, plans []sql.PlanNode, resultsCh chan<- tree.Datums) error {
ctx, span := tracing.ChildSpan(ctx, exportStmt.StatementTag())
defer tracing.FinishSpan(span)
if err := utilccl.CheckEnterpriseEnabled(
p.ExecCfg().Settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(), "EXPORT",
); err != nil {
return err
}
if err := p.RequireSuperUser(ctx, "EXPORT"); err != nil {
return err
}
if !p.ExtendedEvalContext().TxnImplicit {
return errors.Errorf("EXPORT cannot be used inside a transaction")
}
file, err := fileFn()
if err != nil {
return err
}
opts, err := optsFn()
if err != nil {
return err
}
delim := ','
if override, ok := opts[exportOptionDelimiter]; ok {
delim, err = util.GetSingleRune(override)
if err != nil {
return pgerror.NewError(pgerror.CodeInvalidParameterValueError, "invalid delimiter")
}
}
chunk := exportChunkSizeDefault
if override, ok := opts[exportOptionChunkSize]; ok {
chunk, err = strconv.Atoi(override)
if err != nil {
return pgerror.NewError(pgerror.CodeInvalidParameterValueError, err.Error())
}
if chunk < 1 {
return pgerror.NewError(pgerror.CodeInvalidParameterValueError, "invalid csv chunk size")
}
}
out := distsqlrun.ProcessorCoreUnion{CSVWriter: &distsqlrun.CSVWriterSpec{
Destination: file,
NamePattern: exportFilePatternDefault,
Delimiter: delim,
ChunkRows: int64(chunk),
}}
rows := sqlbase.NewRowContainer(
p.ExtendedEvalContext().Mon.MakeBoundAccount(), sqlbase.ColTypeInfoFromColTypes(sql.ExportPlanResultTypes), 0,
)
rw := sql.NewRowResultWriter(rows)
if err := sql.PlanAndRunExport(
ctx, p.DistSQLPlanner(), p.ExecCfg(), p.Txn(), p.ExtendedEvalContext(), plans[0], out, rw,
); err != nil {
return err
}
for i := 0; i < rows.Len(); i++ {
resultsCh <- rows.At(i)
}
rows.Close(ctx)
return rw.Err()
}
return fn, exportHeader, []sql.PlanNode{sel}, nil
}
func newCSVWriterProcessor(
flowCtx *distsqlrun.FlowCtx,
spec distsqlrun.CSVWriterSpec,
input distsqlrun.RowSource,
output distsqlrun.RowReceiver,
) (distsqlrun.Processor, error) {
c := &csvWriter{
flowCtx: flowCtx,
spec: spec,
input: input,
output: output,
}
if err := c.out.Init(&distsqlrun.PostProcessSpec{}, sql.ExportPlanResultTypes, flowCtx.NewEvalCtx(), output); err != nil {
return nil, err
}
return c, nil
}
type csvWriter struct {
flowCtx *distsqlrun.FlowCtx
spec distsqlrun.CSVWriterSpec
input distsqlrun.RowSource
out distsqlrun.ProcOutputHelper
output distsqlrun.RowReceiver
}
var _ distsqlrun.Processor = &csvWriter{}
func (sp *csvWriter) OutputTypes() []sqlbase.ColumnType {
return sql.ExportPlanResultTypes
}
func (sp *csvWriter) Run(ctx context.Context, wg *sync.WaitGroup) {
ctx, span := tracing.ChildSpan(ctx, "csvWriter")
defer tracing.FinishSpan(span)
if wg != nil {
defer wg.Done()
}
err := func() error {
pattern := exportFilePatternDefault
if sp.spec.NamePattern != "" {
pattern = sp.spec.NamePattern
}
types := sp.input.OutputTypes()
sp.input.Start(ctx)
input := distsqlrun.MakeNoMetadataRowSource(sp.input, sp.output)
alloc := &sqlbase.DatumAlloc{}
var buf bytes.Buffer
writer := csv.NewWriter(&buf)
writer.Comma = sp.spec.Delimiter
f := tree.NewFmtCtxWithBuf(tree.FmtBareStrings)
defer f.Close()
csvRow := make([]string, len(types))
chunk := 0
done := false
for {
var rows int64
buf.Reset()
for {
if sp.spec.ChunkRows > 0 && rows >= sp.spec.ChunkRows {
break
}
row, err := input.NextRow()
if err != nil {
return err
}
if row == nil {
done = true
break
}
rows++
for i, ed := range row {
if err := ed.EnsureDecoded(&types[i], alloc); err != nil {
return err
}
ed.Datum.Format(&f.FmtCtx)
// TODO(mjibson, dt): this relies on the ctx above's FmtBareStrings meaning that it
// will round-trip with IMPORT. We should replace all the csv/dump encode/decode
// handling with a centralized pair of handlers in sql that are tested, in isolation,
// to always round-trip with each other via ASCII-only representations that look like
// the example literals in our docs / SQL if possible.
csvRow[i] = f.String()
f.Reset()
}
if err := writer.Write(csvRow); err != nil {
return err
}
}
if rows < 1 {
break
}
writer.Flush()
conf, err := storageccl.ExportStorageConfFromURI(sp.spec.Destination)
if err != nil {
return err
}
es, err := storageccl.MakeExportStorage(ctx, conf, sp.flowCtx.Settings)
if err != nil {
return err
}
defer es.Close()
size := buf.Len()
part := fmt.Sprintf("n%d.%d", sp.flowCtx.EvalCtx.NodeID, chunk)
chunk++
filename := strings.Replace(pattern, exportFilePatternPart, part, -1)
if err := es.WriteFile(ctx, filename, bytes.NewReader(buf.Bytes())); err != nil {
return err
}
res := sqlbase.EncDatumRow{
sqlbase.DatumToEncDatum(
sqlbase.ColumnType{SemanticType: sqlbase.ColumnType_STRING},
tree.NewDString(filename),
),
sqlbase.DatumToEncDatum(
sqlbase.ColumnType{SemanticType: sqlbase.ColumnType_INT},
tree.NewDInt(tree.DInt(rows)),
),
sqlbase.DatumToEncDatum(
sqlbase.ColumnType{SemanticType: sqlbase.ColumnType_INT},
tree.NewDInt(tree.DInt(size)),
),
}
cs, err := sp.out.EmitRow(ctx, res)
if err != nil {
return err
}
if cs != distsqlrun.NeedMoreRows {
return errors.New("unexpected closure of consumer")
}
if done {
break
}
}
return nil
}()
// TODO(dt): pick up tracing info in trailing meta
distsqlrun.DrainAndClose(
ctx, sp.output, err, func(context.Context) {} /* pushTrailingMeta */, sp.input)
}
func init() {
sql.AddPlanHook(exportPlanHook)
distsqlrun.NewCSVWriterProcessor = newCSVWriterProcessor
}
Oops, something went wrong.
ProTip! Use n and p to navigate between commits in a pull request.