New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importccl: implement EXPORT csv #24755

Closed
wants to merge 2 commits into
base: master
from
Jump to file or symbol
Failed to load files and symbols.
+15 −21
Diff settings

Always

Just for now

Next

sql: remove DistLoader wrapper

I concur with Dan’s year-old TODO: DistLoader doesn’t seem like
a useful abstraction and we’re better off just getting an unwrapped
DistSQLPlanner into the caller’s hands so they can use its public API
without fighting though additional indirection.

Indeed, Ideally I’d like to follow this with exporting more of the
DistSQL API, with the goal of eventually moving LoadCSV method to a
function in importccl defined entirely in terms of the public DistSQL
API, though that will obviously require some refactoring and careful
thinking about where to draw maintainable lines between these packages.

Release note: none.
  • Loading branch information...
dt committed Apr 10, 2018
commit 4a16f641ffa1990530a4158636dedc6c99e63735
@@ -955,8 +955,9 @@ func doDistributedCSVTransform(
}
}()
if err := p.DistLoader().LoadCSV(
if err := sql.LoadCSV(
ctx,
p.DistSQLPlanner(),
job,
p.ExecCfg().DB,
evalCtx,
@@ -33,12 +33,6 @@ import (
"github.com/pkg/errors"
)
// DistLoader uses DistSQL to convert external data formats (csv, etc) into
// sstables of our mvcc-format key values.
type DistLoader struct {
distSQLPlanner *DistSQLPlanner
}
// RowResultWriter is a thin wrapper around a RowContainer.
type RowResultWriter struct {
rowContainer *sqlbase.RowContainer
@@ -110,8 +104,9 @@ var colTypeBytes = sqlbase.ColumnType{SemanticType: sqlbase.ColumnType_BYTES}
// LoadCSV performs a distributed transformation of the CSV files at from
// and stores them in enterprise backup format at to.
func (l *DistLoader) LoadCSV(
func LoadCSV(
ctx context.Context,
dsp *DistSQLPlanner,
job *jobs.Job,
db *client.DB,
evalCtx *extendedEvalContext,
@@ -170,7 +165,7 @@ func (l *DistLoader) LoadCSV(
}
}
planCtx := l.distSQLPlanner.newPlanningCtx(ctx, evalCtx, nil /* txn */)
planCtx := dsp.newPlanningCtx(ctx, evalCtx, nil /* txn */)
// Because we're not going through the normal pathways, we have to set up
// the nodeID -> nodeAddress map ourselves.
for _, node := range nodes {
@@ -183,7 +178,7 @@ func (l *DistLoader) LoadCSV(
samples := details.Tables[0].Samples
if samples == nil {
var err error
samples, err = l.loadCSVSamplingPlan(ctx, job, db, evalCtx, thisNode, nodes, from, splitSize, &planCtx, csvSpecs, sstSpecs)
samples, err = dsp.loadCSVSamplingPlan(ctx, job, db, evalCtx, thisNode, nodes, from, splitSize, &planCtx, csvSpecs, sstSpecs)
if err != nil {
return err
}
@@ -318,7 +313,7 @@ func (l *DistLoader) LoadCSV(
return err
}
l.distSQLPlanner.FinalizePlan(&planCtx, &p)
dsp.FinalizePlan(&planCtx, &p)
recv := makeDistSQLReceiver(
ctx,
@@ -334,12 +329,12 @@ func (l *DistLoader) LoadCSV(
// TODO(dan): We really don't need the txn for this flow, so remove it once
// Run works without one.
return db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
l.distSQLPlanner.Run(&planCtx, txn, &p, recv, evalCtx)
dsp.Run(&planCtx, txn, &p, recv, evalCtx)
return resultRows.Err()
})
}
func (l *DistLoader) loadCSVSamplingPlan(
func (dsp *DistSQLPlanner) loadCSVSamplingPlan(
ctx context.Context,
job *jobs.Job,
db *client.DB,
@@ -439,7 +434,7 @@ func (l *DistLoader) loadCSVSamplingPlan(
// TODO(dan): Consider making FinalizePlan take a map explicitly instead
// of this PlanCtx. https://reviewable.io/reviews/cockroachdb/cockroach/17279#-KqOrLpy9EZwbRKHLYe6:-KqOp00ntQEyzwEthAsl:bd4nzje
l.distSQLPlanner.FinalizePlan(planCtx, &p)
dsp.FinalizePlan(planCtx, &p)
recv := makeDistSQLReceiver(
ctx,
@@ -456,7 +451,7 @@ func (l *DistLoader) loadCSVSamplingPlan(
if err := db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
// Clear the stage 2 data in case this function is ever restarted (it shouldn't be).
samples = nil
l.distSQLPlanner.Run(planCtx, txn, &p, recv, evalCtx)
dsp.Run(planCtx, txn, &p, recv, evalCtx)
return rowResultWriter.Err()
}); err != nil {
return nil, err
View
@@ -70,7 +70,7 @@ type PlanHookState interface {
ExtendedEvalContext() *extendedEvalContext
SessionData() *sessiondata.SessionData
ExecCfg() *ExecutorConfig
DistLoader() *DistLoader
DistSQLPlanner() *DistSQLPlanner
LeaseMgr() *LeaseManager
TypeAsString(e tree.Expr, op string) (func() (string, error), error)
TypeAsStringArray(e tree.Exprs, op string) (func() ([]string, error), error)
View
@@ -322,11 +322,9 @@ func (p *planner) User() string {
return p.SessionData().User
}
// TODO(dan): This is here to implement PlanHookState, but it's not clear that
// this is the right abstraction. We could also export DistSQLPlanner, for
// example. Revisit.
func (p *planner) DistLoader() *DistLoader {
return &DistLoader{distSQLPlanner: p.extendedEvalCtx.DistSQLPlanner}
// DistSQLPlanner returns the DistSQLPlanner
func (p *planner) DistSQLPlanner() *DistSQLPlanner {
return p.extendedEvalCtx.DistSQLPlanner
}
// makeInternalPlan initializes a plan from a SQL statement string.
ProTip! Use n and p to navigate between commits in a pull request.