Skip to content

Commit

Permalink
implement support for COPY FROM … RETURN SUMMARY
Browse files Browse the repository at this point in the history
The source URI and a possible URI failure will be collected by new collect
expressions. Also these expressions will be collected dedicated as
`sourceInfoExpressions` as they should be always collected while the normal
once could fail e.g. through an invalid json entry. Source URI processing
failures will result in NULL values for `success_count` and `error_count`
to indicate that no item/row was processed at all.
  • Loading branch information
seut committed Jun 12, 2018
1 parent 3f77dee commit 66e5338
Show file tree
Hide file tree
Showing 56 changed files with 1,735 additions and 273 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ Breaking Changes
Changes
=======

- Added support for ``COPY FROM ... RETURN SUMMARY`` which will return a result
set with detailed error reporting of imported rows.

- Added a new ``stats.jobs_log_filter`` setting which can be used to control
what kind of entries are recorded into the ``sys.jobs_log`` table.

Expand Down
74 changes: 70 additions & 4 deletions blackbox/docs/general/dml.rst
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ Import From a File URI

An example import from a file URI::

cr> copy quotes from 'file:///tmp/import_data/quotes.json';
cr> COPY quotes FROM 'file:///tmp/import_data/quotes.json';
COPY OK, 3 rows affected (... sec)

.. Hidden: delete imported data
Expand All @@ -448,7 +448,7 @@ An example import from a file URI::
If all files inside a directory should be imported a ``*`` wildcard has to be
used::

cr> copy quotes from '/tmp/import_data/*' with (bulk_size = 4);
cr> COPY quotes FROM '/tmp/import_data/*' WITH (bulk_size = 4);
COPY OK, 3 rows affected (... sec)

.. Hidden: delete imported data
Expand All @@ -462,9 +462,66 @@ used::
This wildcard can also be used to only match certain files::

cr> copy quotes from '/tmp/import_data/qu*.json';
cr> COPY quotes FROM '/tmp/import_data/qu*.json';
COPY OK, 3 rows affected (... sec)

.. Hidden: delete imported data
cr> refresh table quotes;
REFRESH OK, 1 row affected (... sec)
cr> delete from quotes;
DELETE OK, 3 rows affected (... sec)
cr> refresh table quotes;
REFRESH OK, 1 row affected (... sec)
Import With Detailed Error Reporting
....................................

If the ``RETURN_SUMMARY`` clause is specified, a result set containing information
about failures and successfully imported records is returned.

.. Hidden: delete existing data
cr> refresh table locations;
REFRESH OK, 1 row affected (... sec)
cr> delete from locations;
DELETE OK, 8 rows affected (... sec)
cr> refresh table locations;
REFRESH OK, 1 row affected (... sec)
::

cr> COPY locations FROM '/tmp/import_data/locations_with_failure/locations*.json' RETURN SUMMARY;
+--...--+----------...--------+---------------+-------------+------------------------------------------------------------------+
| node | uri | success_count | error_count | errors |
+--...--+----------...--------+---------------+-------------+------------------------------------------------------------------+
| {...} | .../locations1.json | 6 | 0 | {} |
| {...} | .../locations2.json | 5 | 2 | {"failed to parse [date]": {"count": 2, "line_numbers": [1, 2]}} |
+--...--+----------...--------+---------------+-------------+------------------------------------------------------------------+
COPY 2 rows in set (... sec)

.. Hidden: delete imported data
cr> refresh table locations;
REFRESH OK, 1 row affected (... sec)
cr> delete from locations;
DELETE OK, ...
cr> refresh table locations;
REFRESH OK, 1 row affected (... sec)
If an error happens while processing the URI in general, the ``error_count`` and
``success_count`` columns will contains `NULL` values to indicate that no records were processed.

::

cr> COPY locations FROM '/tmp/import_data/not-existing.json' RETURN SUMMARY;
+--...--+-----------...---------+---------------+-------------+------------------------...------------------------+
| node | uri | success_count | error_count | errors |
+--...--+-----------...---------+---------------+-------------+------------------------...------------------------+
| {...} | .../not-existing.json | NULL | NULL | {"...not-existing.json (...)": {"count": 1, ...}} |
+--...--+-----------...---------+---------------+-------------+------------------------...------------------------+
COPY 1 row in set (... sec)

See :ref:`copy_from` for more information.

.. _exporting_data:
Expand All @@ -479,7 +536,16 @@ Replicated data is not exported. So every row of an exported table is stored
only once.

This example shows how to export a given table into files named after the table
and shard ID with gzip compression::
and shard ID with gzip compression:

.. Hidden: import data
cr> refresh table quotes;
REFRESH OK...
cr> copy quotes from '/tmp/import_data/*';
COPY OK, 3 rows affected (... sec)
::

cr> refresh table quotes;
REFRESH OK...
Expand Down
51 changes: 50 additions & 1 deletion blackbox/docs/sql/statements/copy-from.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Synopsis
::

COPY table_ident [ PARTITION (partition_column = value [ , ... ]) ]
FROM uri [ WITH ( option = value [, ...] ) ]
FROM uri [ WITH ( option = value [, ...] ) ] [ RETURN SUMMARY ]

where ``option`` can be one of:

Expand Down Expand Up @@ -296,6 +296,55 @@ This option specifies the format of the input file. Available formats are
``csv`` or ``json``. If a format is not specified and the format cannot be
guessed from the file extension, the file will be processed as JSON.

.. _return_summary:

``RETURN SUMMARY``
------------------

By using the optional ``RETURN SUMMARY`` clause, a per-node result set will be
returned containing information about possible failures and successfully
inserted records.

::

[ RETURN SUMMARY ]

.. rubric:: Schema

+---------------------------------------+------------------------------------------------+---------------+
| Column Name | Description | Return Type |
+=======================================+================================================+===============+
| ``node`` | Information about the node that has processed | ``OBJECT`` |
| | the URI resource. | |
+---------------------------------------+------------------------------------------------+---------------+
| ``node['id']`` | The id of the node. | ``STRING`` |
+---------------------------------------+------------------------------------------------+---------------+
| ``node['name']`` | The name of the node. | ``STRING`` |
+---------------------------------------+------------------------------------------------+---------------+
| ``uri`` | The URI the node has processed. | ``STRING`` |
+---------------------------------------+------------------------------------------------+---------------+
| ``error_count`` | The total number of records which failed. | ``LONG`` |
| | A NULL value indicates a general URI reading | |
| | error, the error will be listed inside the | |
| | ``errors`` column. | |
+---------------------------------------+------------------------------------------------+---------------+
| ``success_count`` | The total number of records which were | ``LONG`` |
| | inserted. | |
| | A NULL value indicates a general URI reading | |
| | error, the error will be listed inside the | |
| | ``errors`` column. | |
+---------------------------------------+------------------------------------------------+---------------+
| ``errors`` | Contains detailed information about all | ``OBJECT`` |
| | errors. | |
+---------------------------------------+------------------------------------------------+---------------+
| ``errors[ERROR_MSG]`` | Contains information about a type of an error. | ``OBJECT`` |
+---------------------------------------+------------------------------------------------+---------------+
| ``errors[ERROR_MSG]['count']`` | The number records failed with this error. | ``LONG`` |
+---------------------------------------+------------------------------------------------+---------------+
| ``errors[ERROR_MSG]['line_numbers']`` | The line numbers of the source URI where the | ``ARRAY`` |
| | error occurred. | |
+---------------------------------------+------------------------------------------------+---------------+

.. _`AWS documentation`: http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html
.. _`AWS Java Documentation`: http://docs.aws.amazon.com/AmazonS3/latest/dev/AuthUsingAcctOrUserCredJava.html
.. _`RFC2396`: http://www.ietf.org/rfc/rfc2396.txt
Expand Down
6 changes: 6 additions & 0 deletions blackbox/docs/src/doc_tests/locations_import_summary1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{ "id": "1", "name": "North West Ripple", "date":308534400000, "kind": "Galaxy", "position": 1, "description": "Relative to life on NowWhat, living on an affluent world in the North West ripple of the Galaxy is said to be easier by a factor of about seventeen million.", "information":[{"population": 12, "evolution_level":4}, {"population": 42, "evolution_level":42}]}
{ "id": "2", "name": "Outer Eastern Rim", "date":308534400000, "kind": "Galaxy", "position": 2, "description": "The Outer Eastern Rim of the Galaxy where the Guide has supplanted the Encyclopedia Galactica among its more relaxed civilisations.", "information":[{"population": 5673745846, "evolution_level":2}]}
{ "id": "3", "name": "Galactic Sector QQ7 Active J Gamma", "date":1367366400000, "kind": "Galaxy", "position": 4, "description": "Galactic Sector QQ7 Active J Gamma contains the Sun Zarss, the planet Preliumtarn of the famed Sevorbeupstry and Quentulus Quazgar Mountains."}
{ "id": "4", "name": "Aldebaran", "date":1373932800000, "kind": "Star System", "position": 1, "description": "Max Quordlepleen claims that the only thing left after the end of the Universe will be the sweets trolley and a fine selection of Aldebaran liqueurs."}
{ "id": "5", "name": "Algol", "date":1373932800000, "kind": "Star System", "position": 2, "description": "Algol is the home of the Algolian Suntiger, the tooth of which is one of the ingredients of the Pan Galactic Gargle Blaster."}
{ "id": "6", "name": "Alpha Centauri", "date":308534400000, "kind": "Star System", "position": 3, "description": "4.1 light-years northwest of earth"}
7 changes: 7 additions & 0 deletions blackbox/docs/src/doc_tests/locations_import_summary2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{ "id": "7", "name": "Altair", "date": "May", "kind": "Star System", "position": 4, "description": "The Altairian dollar is one of three freely convertible currencies in the galaxy, though by the time of the novels it had apparently recently collapsed."}
{ "id": "8", "name": "Allosimanius Syneca", "date": "Juli", "kind": "Planet", "position": 1, "description": "Allosimanius Syneca is a planet noted for ice, snow, mind-hurtling beauty and stunning cold."}
{ "id": "9", "name": "Argabuthon", "date":1373932800000, "kind": "Planet", "position": 2, "description": "It is also the home of Prak, a man placed into solitary confinement after an overdose of truth drug caused him to tell the Truth in its absolute and final form, causing anyone to hear it to go insane."}
{ "id": "10", "name": "Arkintoofle Minor", "date":308534400000, "kind": "Planet", "position": 3, "description": "Motivated by the fact that the only thing in the Universe that travels faster than light is bad news, the Hingefreel people native to Arkintoofle Minor constructed a starship driven by bad news.", "race": {"name": "Minories", "description": "Giants, but with single eye.", "interests": ["baseball", "short stories"] } }
{ "id": "11", "name": "Bartledan", "date":1373932800000, "kind": "Planet", "position": 4, "description": "An Earthlike planet on which Arthur Dent lived for a short time, Bartledan is inhabited by Bartledanians, a race that appears human but only physically.", "race": {"name": "Bartledannians", "description": "Similar to humans, but do not breathe", "interests": ["netball", "books with 100.000 words"] } }
{ "id": "12", "name": "", "date":1373932800000, "kind": "Planet", "position": 5, "description": "This Planet doesn't really exist"}
{ "id": "13", "name": null, "date":1373932800000, "kind": "Galaxy", "position": 6, "description": "The end of the Galaxy.%"}
10 changes: 10 additions & 0 deletions blackbox/docs/src/doc_tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,16 @@ def setUpLocations(test):
locations_file = get_abspath("locations.json")
_execute_sql("""copy locations from '{0}'""".format(locations_file))
_execute_sql("""refresh table locations""")
import_failures_dir = '/tmp/import_data/locations_with_failure'
os.makedirs(import_failures_dir, exist_ok=True)
shutil.copy(
get_abspath("locations_import_summary1.json"),
os.path.join(import_failures_dir, "locations1.json")
)
shutil.copy(
get_abspath("locations_import_summary2.json"),
os.path.join(import_failures_dir, "locations2.json")
)


def setUpUserVisits(test):
Expand Down
2 changes: 1 addition & 1 deletion shared/src/main/java/io/crate/exceptions/Exceptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private static <T extends Throwable> void rethrow(final Throwable t) throws T {
* Does not contain a proper stacktrace.
* @return a String of format ExceptionName[msg];...
*/
public static String userFriendlyMessage(Throwable t) {
public static String userFriendlyMessageInclNested(Throwable t) {
if (t == null) {
return "Unknown";
}
Expand Down
7 changes: 5 additions & 2 deletions sql-parser/src/main/antlr/SqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ statement
| INSERT INTO table ('(' ident (',' ident)* ')')? insertSource
(onDuplicate | onConflict)? #insert
| RESTORE SNAPSHOT qname (ALL | TABLE tableWithPartitions) withProperties? #restore
| COPY tableWithPartition FROM path=expr withProperties? #copyFrom
| COPY tableWithPartition FROM path=expr withProperties? (RETURN SUMMARY)? #copyFrom
| COPY tableWithPartition columns? where?
TO DIRECTORY? path=expr withProperties? #copyTo
| DROP BLOB TABLE (IF EXISTS)? table #dropBlobTable
Expand Down Expand Up @@ -622,7 +622,7 @@ nonReserved
| ISOLATION | TRANSACTION | CHARACTERISTICS | LEVEL | LANGUAGE | OPEN | CLOSE | RENAME
| PRIVILEGES | SCHEMA | INGEST | RULE | PREPARE
| REROUTE | MOVE | SHARD | ALLOCATE | REPLICA | CANCEL | CLUSTER | RETRY | FAILED
| DO | NOTHING | CONFLICT | TRANSACTION_ISOLATION
| DO | NOTHING | CONFLICT | TRANSACTION_ISOLATION | RETURN | SUMMARY
;

SELECT: 'SELECT';
Expand Down Expand Up @@ -850,6 +850,9 @@ SCHEMA: 'SCHEMA';
INGEST: 'INGEST';
RULE: 'RULE';

RETURN: 'RETURN';
SUMMARY: 'SUMMARY';

EQ : '=';
NEQ : '<>' | '!=';
LT : '<';
Expand Down
3 changes: 3 additions & 0 deletions sql-parser/src/main/java/io/crate/sql/SqlFormatter.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ public Void visitCopyFrom(CopyFrom node, Integer indent) {
append(indent, " ");
process(node.genericProperties(), indent);
}
if (node.isReturnSummary()) {
append(indent," RETURN SUMMARY");
}
return null;
}

Expand Down
4 changes: 3 additions & 1 deletion sql-parser/src/main/java/io/crate/sql/parser/AstBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -398,10 +398,12 @@ public Node visitDropSnapshot(SqlBaseParser.DropSnapshotContext context) {

@Override
public Node visitCopyFrom(SqlBaseParser.CopyFromContext context) {
boolean returnSummary = context.SUMMARY() != null;
return new CopyFrom(
(Table) visit(context.tableWithPartition()),
(Expression) visit(context.path),
extractGenericProperties(context.withProperties()));
extractGenericProperties(context.withProperties()),
returnSummary);
}

@Override
Expand Down
30 changes: 17 additions & 13 deletions sql-parser/src/main/java/io/crate/sql/tree/CopyFrom.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,24 @@

import com.google.common.base.MoreObjects;

import java.util.Objects;

public class CopyFrom extends Statement {

private final Table table;
private final Expression path;
private final GenericProperties genericProperties;
private final boolean returnSummary;

public CopyFrom(Table table,
Expression path,
GenericProperties genericProperties) {
GenericProperties genericProperties,
boolean returnSummary) {

this.table = table;
this.path = path;
this.genericProperties = genericProperties;
this.returnSummary = returnSummary;
}

public Table table() {
Expand All @@ -50,26 +55,24 @@ public GenericProperties genericProperties() {
return genericProperties;
}

public boolean isReturnSummary() {
return returnSummary;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

CopyFrom that = (CopyFrom) o;

if (!genericProperties.equals(that.genericProperties)) return false;
if (!path.equals(that.path)) return false;
if (!table.equals(that.table)) return false;

return true;
CopyFrom copyFrom = (CopyFrom) o;
return returnSummary == copyFrom.returnSummary &&
Objects.equals(table, copyFrom.table) &&
Objects.equals(path, copyFrom.path) &&
Objects.equals(genericProperties, copyFrom.genericProperties);
}

@Override
public int hashCode() {
int result = table.hashCode();
result = 31 * result + path.hashCode();
result = 31 * result + genericProperties.hashCode();
return result;
return Objects.hash(table, path, genericProperties, returnSummary);
}

@Override
Expand All @@ -78,6 +81,7 @@ public String toString() {
.add("table", table)
.add("path", path)
.add("properties", genericProperties)
.add("returnSummary", returnSummary)
.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,8 @@ public void testCopy() throws Exception {
printStatement("copy foo from ? with (some_property=1)");
printStatement("copy foo from ? with (some_property=false)");
printStatement("copy schemah.foo from '/folder/file.extension'");
printStatement("copy schemah.foo from '/folder/file.extension' return summary");
printStatement("copy schemah.foo from '/folder/file.extension' with (some_property=1) return summary");

printStatement("copy foo (nae) to '/folder/file.extension'");
printStatement("copy foo to '/folder/file.extension'");
Expand Down
4 changes: 4 additions & 0 deletions sql/src/main/java/io/crate/analyze/CopyAnalyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ CopyFromAnalyzedStatement convertCopyFrom(CopyFrom node, Analysis analysis) {
FileUriCollectPhase.InputFormat inputFormat =
settingAsEnum(FileUriCollectPhase.InputFormat.class, settings.get(INPUT_FORMAT_SETTINGS.name(),INPUT_FORMAT_SETTINGS.defaultValue()));

if (node.isReturnSummary()) {
return new CopyFromReturnSummaryAnalyzedStatement(tableInfo, settings, uri, partitionIdent, nodeFilters, inputFormat);
}

return new CopyFromAnalyzedStatement(tableInfo, settings, uri, partitionIdent, nodeFilters, inputFormat);
}

Expand Down
Loading

0 comments on commit 66e5338

Please sign in to comment.