Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement COPY <table> TO DIRECTORY #685

Merged
merged 1 commit into from Apr 14, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.txt
Expand Up @@ -5,6 +5,8 @@ Changes for Crate Data
Unreleased
==========

- added the ``DIRECTORY`` keyword to COPY TO

- updated crate-theme to 0.0.15 to fix documentation link in the admin-ui

- added s3 support for COPY TO
Expand Down
4 changes: 1 addition & 3 deletions docs/sql/dml.txt
Expand Up @@ -647,9 +647,7 @@ the table and shard id with gzip compression::

cr> refresh table quotes;
REFRESH OK...
cr> copy quotes to format('/tmp/%s_%s.json',
... sys.shards.table_name, sys.shards.id)
... with (compression='gzip');
cr> copy quotes to DIRECTORY '/tmp/' with (compression='gzip');
COPY OK, 3 rows affected ...

For further details see :ref:`copy_to`.
Expand Down
6 changes: 5 additions & 1 deletion docs/sql/reference/copy_to.txt
Expand Up @@ -11,7 +11,7 @@ Synopsis

.. code-block:: sql

COPY table_ident TO { output_uri }
COPY table_ident TO [DIRECTORY] { output_uri }
[ WITH ( copy_parameter [= value] [, ... ] ) ]

Description
Expand All @@ -28,6 +28,10 @@ containing data from the given table.
The created files are JSON formatted and contain one table row per
line.

If the ``DIRECTORY`` keyword is given, the uri is treated as a directory path.
This will generate one or more files in the given directory, named in such a
way to prevent filename conflicts.

Parameters
==========

Expand Down
9 changes: 9 additions & 0 deletions sql/src/main/java/io/crate/analyze/CopyAnalysis.java
Expand Up @@ -39,6 +39,7 @@ public static enum Mode {

private Symbol uri;
private Mode mode;
private boolean directoryUri;

public CopyAnalysis(ReferenceInfos referenceInfos, Functions functions, Object[] parameters, ReferenceResolver referenceResolver) {
super(referenceInfos, functions, parameters, referenceResolver);
Expand All @@ -48,6 +49,14 @@ public Symbol uri() {
return uri;
}

public void directoryUri(boolean directoryUri) {
this.directoryUri = directoryUri;
}

public boolean directoryUri() {
return this.directoryUri;
}

public void uri(Symbol uri) {
this.uri = uri;
}
Expand Down
Expand Up @@ -54,11 +54,8 @@ public Symbol visitCopyTo(CopyTo node, CopyAnalysis context) {
}
process(node.table(), context);
context.uri(process(node.targetUri(), context));
context.directoryUri(node.directoryUri());

if (node.directoryUri()) {
// TODO: add format symbols to generate shard specific uris
throw new UnsupportedFeatureException("directory URI not supprted");
}
return null;
}

Expand Down
Expand Up @@ -27,10 +27,7 @@
import io.crate.operation.Input;
import io.crate.operation.collect.CollectExpression;
import io.crate.planner.projection.*;
import io.crate.planner.symbol.Aggregation;
import io.crate.planner.symbol.StringValueSymbolVisitor;
import io.crate.planner.symbol.SymbolType;
import io.crate.planner.symbol.Symbol;
import io.crate.planner.symbol.*;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Injector;

Expand Down Expand Up @@ -137,12 +134,22 @@ public Projector visitAggregationProjection(AggregationProjection projection, Vo
@Override
public Projector visitWriterProjection(WriterProjection projection, Void context) {
projection = projection.normalize(normalizer);
assert projection.uri().symbolType() == SymbolType.STRING_LITERAL;
Projector projector = new WriterProjector(
StringValueSymbolVisitor.INSTANCE.process(projection.uri()),
projection.settings()
);
return projector;
String uri = StringValueSymbolVisitor.INSTANCE.process(projection.uri());
if (projection.isDirectoryUri()) {
StringBuilder sb = new StringBuilder(uri);
Symbol resolvedFileName = normalizer.normalize(WriterProjection.DIRECTORY_TO_FILENAME);
assert resolvedFileName instanceof StringLiteral;
String fileName = StringValueSymbolVisitor.INSTANCE.process(resolvedFileName);
if (!uri.endsWith("/")) {
sb.append("/");
}
sb.append(fileName);
if (projection.settings().get("compression", "").equalsIgnoreCase("gzip")) {
sb.append(".gz");
}
uri = sb.toString();
}
return new WriterProjector(uri, projection.settings());
}

public Projector visitIndexWriterProjection(IndexWriterProjection projection, Void context) {
Expand Down
1 change: 1 addition & 0 deletions sql/src/main/java/io/crate/planner/Planner.java
Expand Up @@ -156,6 +156,7 @@ protected Plan visitCopyAnalysis(final CopyAnalysis analysis, Void context) {

WriterProjection projection = new WriterProjection();
projection.uri(analysis.uri());
projection.isDirectoryUri(analysis.directoryUri());
projection.settings(analysis.settings());
PlannerContextBuilder contextBuilder = new PlannerContextBuilder()
.output(ImmutableList.<Symbol>of(rawReference));
Expand Down
Expand Up @@ -24,9 +24,13 @@
import com.google.common.collect.ImmutableList;
import io.crate.DataType;
import io.crate.analyze.EvaluatingNormalizer;
import io.crate.metadata.ColumnIdent;
import io.crate.metadata.FunctionIdent;
import io.crate.metadata.FunctionInfo;
import io.crate.metadata.sys.SysShardsTableInfo;
import io.crate.operation.scalar.FormatFunction;
import io.crate.planner.RowGranularity;
import io.crate.planner.symbol.Symbol;
import io.crate.planner.symbol.Value;
import io.crate.planner.symbol.*;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -35,14 +39,26 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class WriterProjection extends Projection {

private static final List<Symbol> OUTPUTS = ImmutableList.<Symbol>of(
new Value(DataType.LONG) // number of lines written
);

private final static Reference SHARD_ID_REF = new Reference(SysShardsTableInfo.INFOS.get(new ColumnIdent("id")));
private final static Reference TABLE_NAME_REF = new Reference(SysShardsTableInfo.INFOS.get(new ColumnIdent("table_name")));

public static final Symbol DIRECTORY_TO_FILENAME = new Function(new FunctionInfo(
new FunctionIdent(FormatFunction.NAME, Arrays.asList(DataType.STRING, DataType.STRING, DataType.STRING)),
DataType.STRING),
Arrays.<Symbol>asList(new StringLiteral("%s_%s.json"), TABLE_NAME_REF, SHARD_ID_REF)
);

private Symbol uri;
private boolean isDirectoryUri = false;

@Nullable
private Settings settings = ImmutableSettings.EMPTY;
Expand Down Expand Up @@ -87,6 +103,14 @@ public Settings settings() {
return settings;
}

public void isDirectoryUri(boolean isDirectoryUri) {
this.isDirectoryUri = isDirectoryUri;
}

public boolean isDirectoryUri() {
return isDirectoryUri;
}

@Override
public List<Symbol> outputs() {
return OUTPUTS;
Expand All @@ -104,6 +128,7 @@ public <C, R> R accept(ProjectionVisitor<C, R> visitor, C context) {

@Override
public void readFrom(StreamInput in) throws IOException {
isDirectoryUri = in.readBoolean();
uri = Symbol.fromStream(in);
int size = in.readVInt();
if (size > 0) {
Expand All @@ -117,6 +142,7 @@ public void readFrom(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(isDirectoryUri);
Symbol.toStream(uri, out);
if (outputNames != null) {
out.writeVInt(outputNames.size());
Expand All @@ -136,9 +162,11 @@ public boolean equals(Object o) {

WriterProjection that = (WriterProjection) o;

if (outputNames != null ? !outputNames.equals(that.outputNames) : that.outputNames != null) return false;
if (settings != null ? !settings.equals(that.settings) : that.settings != null) return false;
if (uri != null ? !uri.equals(that.uri) : that.uri != null) return false;
if (isDirectoryUri != that.isDirectoryUri) return false;
if (outputNames != null ? !outputNames.equals(that.outputNames) : that.outputNames != null)
return false;
if (!settings.equals(that.settings)) return false;
if (!uri.equals(that.uri)) return false;

return true;
}
Expand All @@ -147,6 +175,8 @@ public boolean equals(Object o) {
public int hashCode() {
int result = super.hashCode();
result = 31 * result + uri.hashCode();
result = 31 * result + (isDirectoryUri ? 1 : 0);
result = 31 * result + settings.hashCode();
result = 31 * result + (outputNames != null ? outputNames.hashCode() : 0);
return result;
}
Expand All @@ -157,6 +187,7 @@ public String toString() {
"uri=" + uri +
", settings=" + settings +
", outputNames=" + outputNames +
", isDirectory=" + isDirectoryUri +
'}';
}

Expand Down
8 changes: 6 additions & 2 deletions sql/src/test/java/io/crate/analyze/CopyAnalyzerTest.java
Expand Up @@ -104,6 +104,12 @@ public void testCopyToFile() throws Exception {
assertThat(((StringLiteral)analysis.uri()).valueAsString(), is("/blah.txt"));
}

@Test
public void testCopyToDirectory() throws Exception {
CopyAnalysis analysis = (CopyAnalysis)analyze("copy users to directory '/foo'");
assertThat(analysis.directoryUri(), is(true));
}

@Test
public void testCopyToFileWithParams() throws Exception {
CopyAnalysis analysis = (CopyAnalysis)analyze("copy users to '/blah.txt' with (compression='gzip')");
Expand All @@ -112,6 +118,4 @@ public void testCopyToFileWithParams() throws Exception {
assertThat(((StringLiteral)analysis.uri()).valueAsString(), is("/blah.txt"));
assertThat(analysis.settings().get("compression"), is("gzip"));
}


}
Expand Up @@ -41,14 +41,16 @@
import java.util.ArrayList;
import java.util.List;

import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.startsWith;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.*;

public class TransportSQLActionClassLifecycleTest extends ClassLifecycleIntegrationTest {

static {
ClassLoader.getSystemClassLoader().setDefaultAssertionStatus(true);
}

@Rule
public TemporaryFolder folder = new TemporaryFolder();

Expand Down Expand Up @@ -431,7 +433,6 @@ public void testCountWithGroupByOrderOnAggDescFuncAndLimit() throws Exception {

@Test
public void testCopyToFile() throws Exception {

String uriTemplate = Paths.get(folder.getRoot().toURI()).resolve("testCopyToFile%s.json").toAbsolutePath().toString();

SQLResponse response = executor.exec("copy characters to format(?, sys.shards.id)", uriTemplate);
Expand All @@ -448,8 +449,25 @@ public void testCopyToFile() throws Exception {
assertThat(line, startsWith("{"));
assertThat(line, endsWith("}"));
}

}

@Test
public void testCopyToDirectory() throws Exception {
String uriTemplate = Paths.get(folder.getRoot().toURI()).toAbsolutePath().toString();
SQLResponse response = executor.exec("copy characters to DIRECTORY ?", uriTemplate);
assertThat(response.rowCount(), is(7L));
List<String> lines = new ArrayList<>(7);
DirectoryStream<Path> stream = Files.newDirectoryStream(Paths.get(folder.getRoot().toURI()), "*.json");
for (Path entry: stream) {
lines.addAll(Files.readAllLines(entry, StandardCharsets.UTF_8));
}
Path path = Paths.get(folder.getRoot().toURI().resolve("characters_1.json"));
assertTrue(path.toFile().exists());

assertThat(lines.size(), is(7));
for (String line : lines) {
assertThat(line, startsWith("{"));
assertThat(line, endsWith("}"));
}
}
}