Skip to content

Commit

Permalink
correctly 'copy from' and 'insert from subquery' into partitioned tables
Browse files Browse the repository at this point in the history
  • Loading branch information
msbt committed Mar 9, 2015
1 parent a62ca1c commit c0c2b4f
Show file tree
Hide file tree
Showing 18 changed files with 225 additions and 69 deletions.
5 changes: 4 additions & 1 deletion CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ Changes for Crate
Unreleased
==========

- Copy from / copy to will now automatically create a quoted URI if only a
- Fix: COPY FROM and INSERT from subquery into a partitioned table created
separate tables with a wrong name instead of correct partitions

- COPY FROM / COPY TO will now automatically create a quoted URI if only a
path is given.

- Upgrade Elasticsearch to v1.4.4
Expand Down
3 changes: 2 additions & 1 deletion sql/src/main/java/io/crate/metadata/TableIdent.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

public class TableIdent implements Comparable<TableIdent>, Streamable {

@Nullable
private String schema;
private String name;

Expand All @@ -52,7 +53,7 @@ public TableIdent() {

}

public TableIdent(String schema, String name) {
public TableIdent(@Nullable String schema, String name) {
Preconditions.checkNotNull(name, "table name is null");
this.schema = schema;
this.name = name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.crate.core.collections.Row1;
import io.crate.metadata.ColumnIdent;
import io.crate.metadata.PartitionName;
import io.crate.metadata.TableIdent;
import io.crate.operation.Input;
import io.crate.operation.ProjectorUpstream;
import io.crate.operation.collect.CollectExpression;
Expand Down Expand Up @@ -60,7 +61,9 @@ public abstract class AbstractIndexWriterProjector implements Projector, Project

private final AtomicInteger remainingUpstreams = new AtomicInteger(0);
private final CollectExpression<?>[] collectExpressions;
private final String tableName;
private final TableIdent tableIdent;
@Nullable
private final String partitionIdent;
private final Object lock = new Object();
private final List<Input<?>> partitionedByInputs;
private final Function<Input<?>, BytesRef> inputToBytesRef = new Function<Input<?>, BytesRef>() {
Expand All @@ -76,13 +79,24 @@ public BytesRef apply(Input<?> input) {

private final LoadingCache<List<BytesRef>, String> partitionIdentCache;

protected AbstractIndexWriterProjector(final String tableName,
/**
* 3 states:
*
* <ul>
* <li> writing into a normal table - <code>partitionIdent = null, partitionedByInputs = []</code>
* <li> writing into a partitioned table - <code>partitionIdent = null, partitionedByInputs = [...]</code>
* <li> writing into a single partition - <code>partitionIdent = "...", partitionedByInputs = []</code>
*
*/
protected AbstractIndexWriterProjector(final TableIdent tableIdent,
@Nullable String partitionIdent,
List<ColumnIdent> primaryKeyIdents,
List<Symbol> primaryKeySymbols,
List<Input<?>> partitionedByInputs,
@Nullable Symbol routingSymbol,
CollectExpression<?>[] collectExpressions) {
this.tableName = tableName;
this.tableIdent = tableIdent;
this.partitionIdent = partitionIdent;
this.partitionedByInputs = partitionedByInputs;
this.collectExpressions = collectExpressions;
if (partitionedByInputs.size() > 0) {
Expand All @@ -92,7 +106,7 @@ protected AbstractIndexWriterProjector(final String tableName,
.build(new CacheLoader<List<BytesRef>, String>() {
@Override
public String load(@Nonnull List<BytesRef> key) throws Exception {
return new PartitionName(tableName, key).stringValue();
return new PartitionName(tableIdent, key).stringValue();
}
});
} else {
Expand Down Expand Up @@ -195,8 +209,10 @@ private String getIndexName() {
} catch (ExecutionException e) {
throw ExceptionsHelper.convertToRuntime(e);
}
} else if (partitionIdent != null) {
return PartitionName.fromPartitionIdent(tableIdent.schema(), tableIdent.name(), partitionIdent).stringValue();
} else {
return tableName;
return tableIdent.esName();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import io.crate.core.collections.Row;
import io.crate.metadata.ColumnIdent;
import io.crate.metadata.TableIdent;
import io.crate.operation.Input;
import io.crate.operation.collect.CollectExpression;
import io.crate.planner.symbol.Reference;
Expand All @@ -43,7 +44,8 @@ protected ColumnIndexWriterProjector(ClusterService clusterService,
Settings settings,
TransportShardUpsertActionDelegate transportShardUpsertActionDelegate,
TransportCreateIndexAction transportCreateIndexAction,
String tableName,
TableIdent tableIdent,
@Nullable String partitionIdent,
List<ColumnIdent> primaryKeyIdents,
List<Symbol> primaryKeySymbols,
List<Input<?>> partitionedByInputs,
Expand All @@ -55,7 +57,7 @@ protected ColumnIndexWriterProjector(ClusterService clusterService,
Map<Reference, Symbol> updateAssignments,
@Nullable Integer bulkActions,
boolean autoCreateIndices) {
super(tableName, primaryKeyIdents, primaryKeySymbols,
super(tableIdent, partitionIdent, primaryKeyIdents, primaryKeySymbols,
partitionedByInputs, routingSymbol, collectExpressions);
assert columnReferences.size() == columnSymbols.size();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import io.crate.core.collections.Row;
import io.crate.metadata.ColumnIdent;
import io.crate.metadata.TableIdent;
import io.crate.operation.Input;
import io.crate.operation.collect.CollectExpression;
import io.crate.planner.symbol.InputColumn;
Expand Down Expand Up @@ -52,7 +53,7 @@ public class IndexWriterProjector extends AbstractIndexWriterProjector {

private final SourceInjectorRow row;

private static final class SourceInjectorRow implements Row{
private static final class SourceInjectorRow implements Row {

private final int idx;
private final BytesRefGenerator generator;
Expand Down Expand Up @@ -83,7 +84,8 @@ public IndexWriterProjector(ClusterService clusterService,
Settings settings,
TransportShardUpsertActionDelegate transportShardUpsertActionDelegate,
TransportCreateIndexAction transportCreateIndexAction,
String tableName,
TableIdent tableIdent,
@Nullable String partitionIdent,
Reference rawSourceReference,
List<ColumnIdent> primaryKeyIdents,
List<Symbol> primaryKeySymbols,
Expand All @@ -97,7 +99,7 @@ public IndexWriterProjector(ClusterService clusterService,
@Nullable String[] excludes,
boolean autoCreateIndices,
boolean overwriteDuplicates) {
super(tableName, primaryKeyIdents, primaryKeySymbols, partitionedByInputs,
super(tableIdent, partitionIdent, primaryKeyIdents, primaryKeySymbols, partitionedByInputs,
routingSymbol, collectExpressions);

if (includes == null && excludes == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ public Projector visitSourceIndexWriterProjection(SourceIndexWriterProjection pr
settings,
transportActionProvider.transportShardUpsertActionDelegate(),
transportActionProvider.transportCreateIndexAction(),
projection.tableName(),
projection.tableIdent(),
projection.partitionIdent(),
projection.rawSourceReference(),
projection.primaryKeys(),
projection.ids(),
Expand Down Expand Up @@ -259,7 +260,8 @@ public Projector visitColumnIndexWriterProjection(ColumnIndexWriterProjection pr
settings,
transportActionProvider.transportShardUpsertActionDelegate(),
transportActionProvider.transportCreateIndexAction(),
projection.tableName(),
projection.tableIdent(),
projection.partitionIdent(),
projection.primaryKeys(),
projection.ids(),
partitionedByInputs,
Expand Down
10 changes: 6 additions & 4 deletions sql/src/main/java/io/crate/planner/Planner.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,11 @@ private void copyFromPlan(CopyAnalyzedStatement analysis, IterablePlan plan) {
TableInfo table = analysis.table();
int clusteredByPrimaryKeyIdx = table.primaryKey().indexOf(analysis.table().clusteredBy());
List<String> partitionedByNames;
String tableName;
String partitionIdent = null;

List<BytesRef> partitionValues;
if (analysis.partitionIdent() == null) {
tableName = table.ident().esName();

if (table.isPartitioned()) {
partitionedByNames = Lists.newArrayList(
Lists.transform(table.partitionedBy(), ColumnIdent.GET_FQN_NAME_FUNCTION));
Expand All @@ -239,12 +239,14 @@ private void copyFromPlan(CopyAnalyzedStatement analysis, IterablePlan plan) {
// partitionIdent is present -> possible to index raw source into concrete es index
PartitionName partitionName = PartitionName.fromPartitionIdent(table.ident().schema(), table.ident().name(), analysis.partitionIdent());
partitionValues = partitionName.values();
tableName = partitionName.stringValue();

partitionIdent = partitionName.ident();
partitionedByNames = Collections.emptyList();
}

SourceIndexWriterProjection sourceIndexWriterProjection = new SourceIndexWriterProjection(
tableName,
table.ident(),
partitionIdent,
new Reference(table.getReferenceInfo(DocSysColumns.RAW)),
table.primaryKey(),
table.partitionedBy(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public Visitor(AnalysisMetaData analysisMetaData){
public AnalyzedRelation visitInsertFromQuery(InsertFromSubQueryAnalyzedStatement insertFromSubQueryAnalyzedStatement, Context context) {

ColumnIndexWriterProjection indexWriterProjection = new ColumnIndexWriterProjection(
insertFromSubQueryAnalyzedStatement.tableInfo().ident().name(),
insertFromSubQueryAnalyzedStatement.tableInfo().ident(),
null,
insertFromSubQueryAnalyzedStatement.tableInfo().primaryKey(),
insertFromSubQueryAnalyzedStatement.columns(),
insertFromSubQueryAnalyzedStatement.onDuplicateKeyAssignments(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import io.crate.metadata.ColumnIdent;
import io.crate.metadata.TableIdent;
import io.crate.planner.symbol.InputColumn;
import io.crate.planner.symbol.Symbol;
import io.crate.planner.symbol.Value;
Expand All @@ -47,7 +48,8 @@ public abstract class AbstractIndexWriterProjection extends Projection {
protected final static int BULK_SIZE_DEFAULT = 10000;

protected Integer bulkActions;
protected String tableName;
protected TableIdent tableIdent;
protected String partitionIdent;
protected List<ColumnIdent> primaryKeys;
protected @Nullable ColumnIdent clusteredByColumn;

Expand All @@ -59,12 +61,14 @@ public abstract class AbstractIndexWriterProjection extends Projection {

protected AbstractIndexWriterProjection() {}

protected AbstractIndexWriterProjection(String tableName,
protected AbstractIndexWriterProjection(TableIdent tableIdent,
@Nullable String partitionIdent,
List<ColumnIdent> primaryKeys,
@Nullable ColumnIdent clusteredByColumn,
Settings settings,
boolean autoCreateIndices) {
this.tableName = tableName;
this.tableIdent = tableIdent;
this.partitionIdent = partitionIdent;
this.primaryKeys = primaryKeys;
this.clusteredByColumn = clusteredByColumn;
this.autoCreateIndices = autoCreateIndices;
Expand Down Expand Up @@ -129,44 +133,21 @@ public Integer bulkActions() {
return bulkActions;
}

public String tableName() {
return tableName;
public TableIdent tableIdent() {
return tableIdent;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof AbstractIndexWriterProjection)) return false;

AbstractIndexWriterProjection that = (AbstractIndexWriterProjection) o;

if (!bulkActions.equals(that.bulkActions)) return false;
if (clusteredBySymbol != null ? !clusteredBySymbol.equals(that.clusteredBySymbol) : that.clusteredBySymbol != null)
return false;
if (!idSymbols.equals(that.idSymbols)) return false;
if (!partitionedBySymbols.equals(that.partitionedBySymbols))
return false;
if (!primaryKeys.equals(that.primaryKeys)) return false;
if (!tableName.equals(that.tableName)) return false;

return true;
}

@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + bulkActions.hashCode();
result = 31 * result + tableName.hashCode();
result = 31 * result + primaryKeys.hashCode();
result = 31 * result + idSymbols.hashCode();
result = 31 * result + partitionedBySymbols.hashCode();
result = 31 * result + (clusteredBySymbol != null ? clusteredBySymbol.hashCode() : 0);
return result;
@Nullable
public String partitionIdent() {
return partitionIdent;
}

@Override
public void readFrom(StreamInput in) throws IOException {
tableName = in.readString();
tableIdent = new TableIdent();
tableIdent.readFrom(in);

partitionIdent = in.readOptionalString();
int numIdSymbols = in.readVInt();
idSymbols = new ArrayList<>(numIdSymbols);
for (int i = 0; i < numIdSymbols; i++) {
Expand Down Expand Up @@ -200,9 +181,49 @@ public void readFrom(StreamInput in) throws IOException {
autoCreateIndices = in.readBoolean();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof AbstractIndexWriterProjection)) return false;

AbstractIndexWriterProjection that = (AbstractIndexWriterProjection) o;

if (autoCreateIndices != that.autoCreateIndices) return false;
if (!bulkActions.equals(that.bulkActions)) return false;
if (clusteredByColumn != null ? !clusteredByColumn.equals(that.clusteredByColumn) : that.clusteredByColumn != null)
return false;
if (clusteredBySymbol != null ? !clusteredBySymbol.equals(that.clusteredBySymbol) : that.clusteredBySymbol != null)
return false;
if (!idSymbols.equals(that.idSymbols)) return false;
if (!partitionedBySymbols.equals(that.partitionedBySymbols))
return false;
if (!primaryKeys.equals(that.primaryKeys)) return false;
if (!tableIdent.equals(that.tableIdent)) return false;
if (partitionIdent != null ? !partitionIdent.equals(that.partitionIdent) : that.partitionIdent != null)
return false;

return true;
}

@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + bulkActions.hashCode();
result = 31 * result + tableIdent.hashCode();
result = 31 * result + (partitionIdent != null ? partitionIdent.hashCode() : 0);
result = 31 * result + primaryKeys.hashCode();
result = 31 * result + (clusteredByColumn != null ? clusteredByColumn.hashCode() : 0);
result = 31 * result + idSymbols.hashCode();
result = 31 * result + partitionedBySymbols.hashCode();
result = 31 * result + (clusteredBySymbol != null ? clusteredBySymbol.hashCode() : 0);
result = 31 * result + (autoCreateIndices ? 1 : 0);
return result;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(tableName);
tableIdent.writeTo(out);
out.writeOptionalString(partitionIdent);

out.writeVInt(idSymbols.size());
for (Symbol idSymbol : idSymbols) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.carrotsearch.hppc.IntSet;
import com.google.common.collect.Lists;
import io.crate.metadata.ColumnIdent;
import io.crate.metadata.TableIdent;
import io.crate.planner.symbol.InputColumn;
import io.crate.planner.symbol.Reference;
import io.crate.planner.symbol.Symbol;
Expand Down Expand Up @@ -57,7 +58,7 @@ protected ColumnIndexWriterProjection() {}

/**
*
* @param tableName
* @param tableIdent identifying the table to write to
* @param primaryKeys
* @param columns the columnReferences of all the columns to be written in order of appearance
* @param onDuplicateKeyAssignments reference to symbol map used for update on duplicate key
Expand All @@ -66,7 +67,8 @@ protected ColumnIndexWriterProjection() {}
* @param clusteredByIndex
* @param settings
*/
public ColumnIndexWriterProjection(String tableName,
public ColumnIndexWriterProjection(TableIdent tableIdent,
@Nullable String partitionIdent,
List<ColumnIdent> primaryKeys,
List<Reference> columns,
@Nullable
Expand All @@ -77,7 +79,7 @@ public ColumnIndexWriterProjection(String tableName,
int clusteredByIndex,
Settings settings,
boolean autoCreateIndices) {
super(tableName, primaryKeys, clusteredByColumn, settings, autoCreateIndices);
super(tableIdent, partitionIdent, primaryKeys, clusteredByColumn, settings, autoCreateIndices);
generateSymbols(primaryKeyIndices.toArray(), partitionedByIndices.toArray(), clusteredByIndex);

this.onDuplicateKeyAssignments = onDuplicateKeyAssignments;
Expand Down
Loading

0 comments on commit c0c2b4f

Please sign in to comment.