Permalink
Switch branches/tags
v2.2.0-alpha.00000000 v2.1.0-beta.20181015 v2.1.0-beta.20181008 v2.1.0-beta.20181001 v2.1.0-beta.20180924 v2.1.0-beta.20180917 v2.1.0-beta.20180910 v2.1.0-beta.20180904 v2.1.0-beta.20180827 v2.1.0-alpha.20180730 v2.1.0-alpha.20180702 v2.1.0-alpha.20180604 v2.1.0-alpha.20180507 v2.1.0-alpha.20180416 v2.1.0-alpha.00000000 v2.0.6 v2.0.6-rc.1 v2.0.5 v2.0.4 v2.0.3 v2.0.2 v2.0.1 v2.0.0 v2.0-rc.1 v2.0-beta.20180326 v2.0-beta.20180319 v2.0-beta.20180312 v2.0-beta.20180305 v2.0-alpha.20180212 v2.0-alpha.20180129 v2.0-alpha.20180122 v2.0-alpha.20180116 v2.0-alpha.20171218 v2.0-alpha.20171218-plus-left-join-fix v1.2-alpha.20171211 v1.2-alpha.20171204 v1.2-alpha.20171113 v1.2-alpha.20171026 v1.2-alpha.20170901 v1.1.9 v1.1.9-rc.1 v1.1.8 v1.1.7 v1.1.6 v1.1.5 v1.1.4 v1.1.3 v1.1.2 v1.1.1 v1.1.0 v1.1.0-rc.1 v1.1-beta.20170928 v1.1-beta.20170921 v1.1-beta.20170907 v1.1-alpha.20170817 v1.1-alpha.20170810 v1.1-alpha.20170803 v1.1-alpha.20170720 v1.1-alpha.20170713 v1.1-alpha.20170629 v1.1-alpha.20170622 v1.1-alpha.20170608 v1.1-alpha.20170601 v1.0.7 v1.0.6 v1.0.5 v1.0.4 v1.0.3 v1.0.2 v1.0.1 v1.0 v1.0-rc.3 v1.0-rc.2 v1.0-rc.1 v0.1-alpha beta-20170420 beta-20170413 beta-20170406 beta-20170330 beta-20170323 beta-20170309 beta-20170223 beta-20170216 beta-20170209 beta-20170126 beta-20170112 beta-20170105 beta-20161215 beta-20161208 beta-20161201 beta-20161110 beta-20161103 beta-20161027 beta-20161013 beta-20161006 beta-20160929 beta-20160915 beta-20160908 beta-20160829 beta-20160728
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
322 lines (274 sloc) 8.25 KB
// Copyright 2018 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt
package importccl
import (
"bytes"
"context"
"encoding/csv"
"fmt"
"strconv"
"strings"
"sync"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/pkg/errors"
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sem/types"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)
// exportHeader is the header for EXPORT stmt results.
var exportHeader = sqlbase.ResultColumns{
{Name: "filename", Typ: types.String},
{Name: "rows", Typ: types.Int},
{Name: "bytes", Typ: types.Int},
}
const (
exportOptionDelimiter = "delimiter"
exportOptionChunkSize = "chunk_rows"
exportOptionFileName = "filename"
)
var exportOptionExpectValues = map[string]bool{
exportOptionDelimiter: true,
exportOptionChunkSize: true,
exportOptionFileName: true,
}
const exportChunkSizeDefault = 100000
const exportFilePatternPart = "%part%"
const exportFilePatternDefault = exportFilePatternPart + ".csv"
// exportPlanHook implements sql.PlanHook.
func exportPlanHook(
ctx context.Context, stmt tree.Statement, p sql.PlanHookState,
) (sql.PlanHookRowFn, sqlbase.ResultColumns, []sql.PlanNode, error) {
exportStmt, ok := stmt.(*tree.Export)
if !ok {
return nil, nil, nil, nil
}
fileFn, err := p.TypeAsString(exportStmt.File, "EXPORT")
if err != nil {
return nil, nil, nil, err
}
if exportStmt.FileFormat != "CSV" {
// not possible with current parser rules.
return nil, nil, nil, errors.Errorf("unsupported import format: %q", exportStmt.FileFormat)
}
optsFn, err := p.TypeAsStringOpts(exportStmt.Options, exportOptionExpectValues)
if err != nil {
return nil, nil, nil, err
}
sel, err := p.Select(ctx, exportStmt.Query, nil)
if err != nil {
return nil, nil, nil, err
}
if !p.DistSQLPlanner().CheckPossible(sel) {
return nil, nil, nil, errors.Errorf("unsupported EXPORT query -- as an alternative try `cockroach sql --format=csv`")
}
fn := func(ctx context.Context, plans []sql.PlanNode, resultsCh chan<- tree.Datums) error {
ctx, span := tracing.ChildSpan(ctx, exportStmt.StatementTag())
defer tracing.FinishSpan(span)
if err := utilccl.CheckEnterpriseEnabled(
p.ExecCfg().Settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(), "EXPORT",
); err != nil {
return err
}
if err := p.RequireSuperUser(ctx, "EXPORT"); err != nil {
return err
}
if !p.ExtendedEvalContext().TxnImplicit {
return errors.Errorf("EXPORT cannot be used inside a transaction")
}
file, err := fileFn()
if err != nil {
return err
}
opts, err := optsFn()
if err != nil {
return err
}
delim := ','
if override, ok := opts[exportOptionDelimiter]; ok {
delim, err = util.GetSingleRune(override)
if err != nil {
return pgerror.NewError(pgerror.CodeInvalidParameterValueError, "invalid delimiter")
}
}
chunk := exportChunkSizeDefault
if override, ok := opts[exportOptionChunkSize]; ok {
chunk, err = strconv.Atoi(override)
if err != nil {
return pgerror.NewError(pgerror.CodeInvalidParameterValueError, err.Error())
}
if chunk < 1 {
return pgerror.NewError(pgerror.CodeInvalidParameterValueError, "invalid csv chunk size")
}
}
out := distsqlrun.ProcessorCoreUnion{CSVWriter: &distsqlrun.CSVWriterSpec{
Destination: file,
NamePattern: exportFilePatternDefault,
Delimiter: delim,
ChunkRows: int64(chunk),
}}
rows := sqlbase.NewRowContainer(
p.ExtendedEvalContext().Mon.MakeBoundAccount(), sqlbase.ColTypeInfoFromColTypes(sql.ExportPlanResultTypes), 0,
)
rw := sql.NewRowResultWriter(rows)
if err := sql.PlanAndRunExport(
ctx, p.DistSQLPlanner(), p.ExecCfg(), p.Txn(), p.ExtendedEvalContext(), plans[0], out, rw,
); err != nil {
return err
}
for i := 0; i < rows.Len(); i++ {
resultsCh <- rows.At(i)
}
rows.Close(ctx)
return rw.Err()
}
return fn, exportHeader, []sql.PlanNode{sel}, nil
}
func newCSVWriterProcessor(
flowCtx *distsqlrun.FlowCtx,
spec distsqlrun.CSVWriterSpec,
input distsqlrun.RowSource,
output distsqlrun.RowReceiver,
) (distsqlrun.Processor, error) {
c := &csvWriter{
flowCtx: flowCtx,
spec: spec,
input: input,
output: output,
}
if err := c.out.Init(&distsqlrun.PostProcessSpec{}, sql.ExportPlanResultTypes, flowCtx.NewEvalCtx(), output); err != nil {
return nil, err
}
return c, nil
}
type csvWriter struct {
flowCtx *distsqlrun.FlowCtx
spec distsqlrun.CSVWriterSpec
input distsqlrun.RowSource
out distsqlrun.ProcOutputHelper
output distsqlrun.RowReceiver
}
var _ distsqlrun.Processor = &csvWriter{}
func (sp *csvWriter) OutputTypes() []sqlbase.ColumnType {
return sql.ExportPlanResultTypes
}
func (sp *csvWriter) Run(ctx context.Context, wg *sync.WaitGroup) {
ctx, span := tracing.ChildSpan(ctx, "csvWriter")
defer tracing.FinishSpan(span)
if wg != nil {
defer wg.Done()
}
err := func() error {
pattern := exportFilePatternDefault
if sp.spec.NamePattern != "" {
pattern = sp.spec.NamePattern
}
types := sp.input.OutputTypes()
sp.input.Start(ctx)
input := distsqlrun.MakeNoMetadataRowSource(sp.input, sp.output)
alloc := &sqlbase.DatumAlloc{}
var buf bytes.Buffer
writer := csv.NewWriter(&buf)
writer.Comma = sp.spec.Delimiter
f := tree.NewFmtCtxWithBuf(tree.FmtBareStrings)
defer f.Close()
csvRow := make([]string, len(types))
chunk := 0
done := false
for {
var rows int64
buf.Reset()
for {
if sp.spec.ChunkRows > 0 && rows >= sp.spec.ChunkRows {
break
}
row, err := input.NextRow()
if err != nil {
return err
}
if row == nil {
done = true
break
}
rows++
for i, ed := range row {
if err := ed.EnsureDecoded(&types[i], alloc); err != nil {
return err
}
ed.Datum.Format(&f.FmtCtx)
// TODO(mjibson, dt): this relies on the ctx above's FmtBareStrings meaning that it
// will round-trip with IMPORT. We should replace all the csv/dump encode/decode
// handling with a centralized pair of handlers in sql that are tested, in isolation,
// to always round-trip with each other via ASCII-only representations that look like
// the example literals in our docs / SQL if possible.
csvRow[i] = f.String()
f.Reset()
}
if err := writer.Write(csvRow); err != nil {
return err
}
}
if rows < 1 {
break
}
writer.Flush()
conf, err := storageccl.ExportStorageConfFromURI(sp.spec.Destination)
if err != nil {
return err
}
es, err := storageccl.MakeExportStorage(ctx, conf, sp.flowCtx.Settings)
if err != nil {
return err
}
defer es.Close()
size := buf.Len()
part := fmt.Sprintf("n%d.%d", sp.flowCtx.EvalCtx.NodeID, chunk)
chunk++
filename := strings.Replace(pattern, exportFilePatternPart, part, -1)
if err := es.WriteFile(ctx, filename, bytes.NewReader(buf.Bytes())); err != nil {
return err
}
res := sqlbase.EncDatumRow{
sqlbase.DatumToEncDatum(
sqlbase.ColumnType{SemanticType: sqlbase.ColumnType_STRING},
tree.NewDString(filename),
),
sqlbase.DatumToEncDatum(
sqlbase.ColumnType{SemanticType: sqlbase.ColumnType_INT},
tree.NewDInt(tree.DInt(rows)),
),
sqlbase.DatumToEncDatum(
sqlbase.ColumnType{SemanticType: sqlbase.ColumnType_INT},
tree.NewDInt(tree.DInt(size)),
),
}
cs, err := sp.out.EmitRow(ctx, res)
if err != nil {
return err
}
if cs != distsqlrun.NeedMoreRows {
return errors.New("unexpected closure of consumer")
}
if done {
break
}
}
return nil
}()
distsqlrun.DrainAndClose(
ctx, sp.output, err, func(context.Context) {} /* pushTrailingMeta */, sp.input)
}
func init() {
sql.AddPlanHook(exportPlanHook)
distsqlrun.NewCSVWriterProcessor = newCSVWriterProcessor
}