Skip to content

Commit

Permalink
[ASTERIXDB-2713][EXT] CSV & TSV support for external dataset p3
Browse files Browse the repository at this point in the history
- user model changes: no
- storage format changes: no
- interface changes: yes
	IRecordDataParser, IRecordReader, IRecordConverter

Details:
- record parser:
	- delimited-data (CSV/TSV) parser: ignore and warn for invalid records.
	- other parses: continue to use their existing behaviour.
- stream parser:
	continue to use their existing behaviour.
- fixes:
	- fixed S3 stream read() to properly advance to next files and also
	  to notify consumers to handle properties like header properly.
	- fixed localfs stream read() when reached end of current file
	  and notifying of a new file source.
	- extracted the read() of both streams since now they are identical.
- report file, record number and field number in warnings of parser
- propagate stream name to parsers that need report stream name
- add test cases

Change-Id: Ie1ba545d753d8afef9cef4e290e058019a465201
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5926
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Reviewed-by: Murtadha Hubail <mhubail@apache.org>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
  • Loading branch information
AliSolaiman committed Apr 22, 2020
1 parent 36ae101 commit 2bbcdd8
Show file tree
Hide file tree
Showing 145 changed files with 1,930 additions and 254 deletions.
5 changes: 0 additions & 5 deletions asterixdb/asterix-app/data/csv/empty.csv
Original file line number Diff line number Diff line change
@@ -1,5 +0,0 @@





5 changes: 5 additions & 0 deletions asterixdb/asterix-app/data/csv/empty_lines.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@





4 changes: 4 additions & 0 deletions asterixdb/asterix-app/data/csv/header/h_mul_rec.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
f1,f2,f3,f4
1,2,3,"str"
4,5,6,"rts"
7,8,9,"srt"
4 changes: 4 additions & 0 deletions asterixdb/asterix-app/data/csv/header/h_mul_rec_with_ln.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
f1,f2,f3,f4
1,2,3,"str"
4,5,6,"rts"
7,8,9,"srt"
2 changes: 2 additions & 0 deletions asterixdb/asterix-app/data/csv/header/h_one_rec.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
f1,f2,f3,f4
1,2,3,"str"
2 changes: 2 additions & 0 deletions asterixdb/asterix-app/data/csv/header/h_one_rec_with_ln.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
f1,f2,f3,f4
1,2,3,"str"
1 change: 1 addition & 0 deletions asterixdb/asterix-app/data/csv/header/h_only.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
f1,f2,f3,f4
1 change: 1 addition & 0 deletions asterixdb/asterix-app/data/csv/header/h_only_with_ln.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
f1,f2,f3,f4
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1,2,3,"str"
4,5,6
7,8,9,"srt"
3 changes: 3 additions & 0 deletions asterixdb/asterix-app/data/csv/no_header/no_h_mul_rec.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1,2,3,"str"
4,5,6,"rts"
7,8,9,"srt"
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1,2,3,"str"
4,5,6,"rts"
7,8,9,"srt"
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1,2,3,"5
1 change: 1 addition & 0 deletions asterixdb/asterix-app/data/csv/no_header/no_h_one_rec.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1,2,3,"str"
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1,2,3,"str"
Empty file.
5 changes: 5 additions & 0 deletions asterixdb/asterix-app/data/tsv/empty_lines.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@





4 changes: 4 additions & 0 deletions asterixdb/asterix-app/data/tsv/header/h_mul_rec.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
f1 f2 f3 f4
1 2 3 "str"
4 5 6 "rts"
7 8 9 "srt"
4 changes: 4 additions & 0 deletions asterixdb/asterix-app/data/tsv/header/h_mul_rec_with_ln.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
f1 f2 f3 f4
1 2 3 "str"
4 5 6 "rts"
7 8 9 "srt"
2 changes: 2 additions & 0 deletions asterixdb/asterix-app/data/tsv/header/h_one_rec.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
f1 f2 f3 f4
1 2 3 "str"
2 changes: 2 additions & 0 deletions asterixdb/asterix-app/data/tsv/header/h_one_rec_with_ln.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
f1 f2 f3 f4
1 2 3 "str"
1 change: 1 addition & 0 deletions asterixdb/asterix-app/data/tsv/header/h_only.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
f1 f2 f3 f4
1 change: 1 addition & 0 deletions asterixdb/asterix-app/data/tsv/header/h_only_with_ln.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
f1 f2 f3 f4
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1 2 3 "str"
4 5 6
7 8 9 "srt"
3 changes: 3 additions & 0 deletions asterixdb/asterix-app/data/tsv/no_header/no_h_mul_rec.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1 2 3 "str"
4 5 6 "rts"
7 8 9 "srt"
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1 2 3 "str"
4 5 6 "rts"
7 8 9 "srt"
1 change: 1 addition & 0 deletions asterixdb/asterix-app/data/tsv/no_header/no_h_one_rec.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1 2 3 "str"
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1 2 3 "str"
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.test.external_dataset.aws;

import java.util.Collection;

import org.apache.asterix.test.runtime.LangExecutionUtil;
import org.apache.asterix.testframework.context.TestCaseContext;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.FixMethodOrder;
import org.junit.runner.RunWith;
import org.junit.runners.MethodSorters;
import org.junit.runners.Parameterized;

/**
* Runs an AWS S3 mock server and test it as an external dataset using one node one partition.
*/
@RunWith(Parameterized.class)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class AwsS3ExternalDatasetOnePartitionTest extends AwsS3ExternalDatasetTest {

private static final Logger LOGGER = LogManager.getLogger();
private static final String SUITE_PATH = "testsuite_external_dataset_one_partition.xml";

@Parameterized.Parameters(name = "SqlppExecutionTest {index}: {0}")
public static Collection<Object[]> tests() throws Exception {
TEST_CONFIG_FILE_NAME = "src/test/resources/cc-single.conf";
PREPARE_S3_BUCKET = AwsS3ExternalDatasetOnePartitionTest::prepareS3Bucket;
return LangExecutionUtil.tests("only_external_dataset.xml", SUITE_PATH);
}

public AwsS3ExternalDatasetOnePartitionTest(TestCaseContext tcCtx) {
super(tcCtx);
}

private static void prepareS3Bucket() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,36 @@

import static org.apache.hyracks.util.file.FileUtil.joinPath;

import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.test.common.TestExecutor;
import org.apache.asterix.test.runtime.ExecutionTestUtil;
import org.apache.asterix.test.runtime.LangExecutionUtil;
import org.apache.asterix.testframework.context.TestCaseContext;
import org.apache.asterix.testframework.context.TestFileContext;
import org.apache.asterix.testframework.xml.TestCase;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.MethodSorters;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

Expand All @@ -50,17 +60,21 @@
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

/**
* Runs an AWS S3 mock server and test it as an external dataset
*/
@RunWith(Parameterized.class)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class AwsS3ExternalDatasetTest {

private static final Logger LOGGER = LogManager.getLogger();

protected static final String TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf";
protected static String TEST_CONFIG_FILE_NAME;
static Runnable PREPARE_S3_BUCKET;

// S3 mock server
private static S3Mock s3MockServer;
Expand All @@ -76,10 +90,14 @@ public class AwsS3ExternalDatasetTest {
private static final String S3_MOCK_SERVER_HOSTNAME = "http://localhost:" + S3_MOCK_SERVER_PORT;
private static final String CSV_DATA_PATH = joinPath("data", "csv");
private static final String TSV_DATA_PATH = joinPath("data", "tsv");
private static final Set<String> fileNames = new HashSet<>();
private static final CreateBucketRequest.Builder CREATE_BUCKET_BUILDER = CreateBucketRequest.builder();
private static final DeleteBucketRequest.Builder DELETE_BUCKET_BUILDER = DeleteBucketRequest.builder();
private static final PutObjectRequest.Builder PUT_OBJECT_BUILDER = PutObjectRequest.builder();

@BeforeClass
public static void setUp() throws Exception {
final TestExecutor testExecutor = new TestExecutor();
final TestExecutor testExecutor = new AwsTestExecutor();
LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor);
setNcEndpoints(testExecutor);
startAwsS3MockServer();
Expand All @@ -102,6 +120,8 @@ public static void tearDown() throws Exception {

@Parameters(name = "SqlppExecutionTest {index}: {0}")
public static Collection<Object[]> tests() throws Exception {
TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf";
PREPARE_S3_BUCKET = AwsS3ExternalDatasetTest::prepareS3Bucket;
return LangExecutionUtil.tests("only_external_dataset.xml", "testsuite_external_dataset.xml");
}

Expand Down Expand Up @@ -149,7 +169,7 @@ private static void startAwsS3MockServer() {
LOGGER.info("Client created successfully");

// Create the bucket and upload some json files
prepareS3Bucket();
PREPARE_S3_BUCKET.run();
}

/**
Expand Down Expand Up @@ -239,4 +259,56 @@ private static void prepareS3Bucket() {
RequestBody.fromFile(Paths.get(TSV_DATA_PATH, "02.tsv")));
LOGGER.info("Files added successfully");
}

static class AwsTestExecutor extends TestExecutor {

public void executeTestFile(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx,
String statement, boolean isDmlRecoveryTest, ProcessBuilder pb, TestCase.CompilationUnit cUnit,
MutableInt queryCount, List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath,
MutableInt actualWarnCount) throws Exception {
String[] lines;
switch (ctx.getType()) {
case "s3bucket":
// <bucket_name> <def_name> <file1,file2,file3>
lines = TestExecutor.stripAllComments(statement).trim().split("\n");
String lastLine = lines[lines.length - 1];
String[] command = lastLine.trim().split(" ");
int length = command.length;
if (length != 3) {
throw new Exception("invalid create bucket format");
}
dropRecreateBucket(command[0], command[1], command[2]);
break;
default:
super.executeTestFile(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit,
queryCount, expectedResultFileCtxs, testFile, actualPath, actualWarnCount);
}
}
}

private static void dropRecreateBucket(String bucketName, String definition, String files) {
String definitionPath = definition + (definition.endsWith("/") ? "" : "/");
String[] fileSplits = files.split(",");

LOGGER.info("Dropping bucket");
try {
client.deleteBucket(DELETE_BUCKET_BUILDER.bucket(bucketName).build());
} catch (NoSuchBucketException e) {
// ignore
}
LOGGER.info("Creating bucket " + bucketName);
client.createBucket(CREATE_BUCKET_BUILDER.bucket(bucketName).build());
LOGGER.info("Uploading to bucket " + bucketName + " definition " + definitionPath);
fileNames.clear();
for (int i = 0; i < fileSplits.length; i++) {
String fileName = FilenameUtils.getName(fileSplits[i]);
while (fileNames.contains(fileName)) {
fileName = (i + 1) + fileName;
}
fileNames.add(fileName);
client.putObject(PUT_OBJECT_BUILDER.bucket(bucketName).key(definitionPath + fileName).build(),
RequestBody.fromFile(Paths.get(fileSplits[i])));
}
LOGGER.info("Done creating bucket with data");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ CREATE EXTERNAL DATASET ds2(t2) USING localfs(("path"="asterix_nc1://data/csv/sa
CREATE EXTERNAL DATASET ds3(t1) USING localfs(("path"="asterix_nc1://data/csv/sample_11.csv"), ("format"="csv"), ("header"="FALSE"));
CREATE EXTERNAL DATASET ds4(t3) USING localfs(("path"="asterix_nc1://data/csv/sample_12.csv"), ("format"="csv"), ("header"="True"));
CREATE EXTERNAL DATASET ds5(t4) USING localfs(("path"="asterix_nc1://data/csv/sample_13.csv"), ("format"="csv"), ("header"="True"));
CREATE EXTERNAL DATASET ds6(t4) USING localfs(("path"="asterix_nc1://data/csv/empty.csv"), ("format"="csv"), ("header"="false"));
CREATE EXTERNAL DATASET ds6(t4) USING localfs(("path"="asterix_nc1://data/csv/empty_lines.csv"), ("format"="csv"), ("header"="false"));
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
// create S3 bucket with data
playground data_dir data/csv/empty.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

DROP DATAVERSE test IF EXISTS;
CREATE DATAVERSE test;
USE test;

DROP TYPE t1 IF EXISTS;
CREATE TYPE t1 AS {f1: int, f2: int, f3: int, f4: string};

DROP DATASET ds1 IF EXISTS;
CREATE EXTERNAL DATASET ds1(t1) USING S3 (
("accessKey"="dummyAccessKey"),
("secretKey"="dummySecretKey"),
("region"="us-west-2"),
("serviceEndpoint"="http://localhost:8001"),
("container"="playground"),
("definition"="data_dir"),
("format"="CSV"),
("header"="true")
);

0 comments on commit 2bbcdd8

Please sign in to comment.