From c169ca1b89d01ad9434f8bb664181a15e7a240d3 Mon Sep 17 00:00:00 2001 From: Jacky Li Date: Mon, 12 Mar 2018 19:38:08 +0800 Subject: [PATCH 1/3] support sort_columns --- .../schema/table/TableSchemaBuilder.java | 1 + .../loading/model/CarbonLoadModelBuilder.java | 2 +- .../processing/loading/model/LoadOption.java | 1 + .../sdk/file/CSVCarbonWriterSuite.java | 31 +++++++++++++++---- 4 files changed, 28 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java index 88774ec59a4..ae123b366a4 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java @@ -71,6 +71,7 @@ public TableSchemaBuilder addColumn(StructField field, boolean isSortColumn) { if (isSortColumn) { sortColumns.add(newColumn); + newColumn.setSortColumn(true); } else { otherColumns.add(newColumn); } diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java index 78716438de4..8c005c39192 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java @@ -62,7 +62,7 @@ public CarbonLoadModelBuilder(CarbonTable table) { public CarbonLoadModel build( Map options) throws InvalidLoadOptionException, IOException { Map optionsFinal = LoadOption.fillOptionWithDefaultValue(options); - optionsFinal.put("sort_scope", "no_sort"); + if (!options.containsKey("fileheader")) { List csvHeader = table.getCreateOrderColumn(table.getTableName()); String[] columns = new String[csvHeader.size()]; diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java index 608d14708f7..e605b9eef4c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java @@ -194,6 +194,7 @@ public static Map fillOptionWithDefaultValue( } optionsFinal.put("single_pass", String.valueOf(singlePass)); + optionsFinal.put("sort_scope", "local_sort"); optionsFinal.put("sort_column_bounds", Maps.getOrDefault(options, "sort_column_bounds", "")); return optionsFinal; } diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java index aca2b2df376..022bb8129ff 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java @@ -67,14 +67,22 @@ public void testWriteFilesJsonSchema() throws IOException { } private void writeFilesAndVerify(Schema schema, String path) { + writeFilesAndVerify(schema, path, null); + } + + private void writeFilesAndVerify(Schema schema, String path, String[] sortColumns) { try { - CarbonWriter writer = CarbonWriter.builder() + CarbonWriterBuilder builder = CarbonWriter.builder() .withSchema(schema) - .outputPath(path) - .buildWriterForCSVInput(); + .outputPath(path); + if (sortColumns != null) { + builder = builder.sortBy(sortColumns); + } + + CarbonWriter writer = builder.buildWriterForCSVInput(); for (int i = 0; i < 100; i++) { - writer.write(new String[]{"robot" + i, String.valueOf(i), String.valueOf((double) i / 2)}); + writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)}); } writer.close(); } catch (Exception e) { @@ -110,8 +118,19 @@ public void test2Block() { } @Test - public void testSortColumns() { - // TODO: test sort column + public void testSortColumns() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + writeFilesAndVerify(new Schema(fields), path, new String[]{"name"}); + + // TODO: implement reader and verify the data is sorted + + FileUtils.deleteDirectory(new File(path)); } @Test From c732b6413918e1fdd0fc22ac0ad75240f49d4abf Mon Sep 17 00:00:00 2001 From: Jacky Li Date: Mon, 12 Mar 2018 20:34:53 +0800 Subject: [PATCH 2/3] support blocksize and add test --- .../schema/table/TableSchemaBuilder.java | 30 +++- store/sdk/pom.xml | 5 +- .../sdk/file/CarbonWriterBuilder.java | 12 +- .../sdk/file/CSVCarbonWriterSuite.java | 129 ++++++++++++++++-- 4 files changed, 159 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java index ae123b366a4..7a0e37277c8 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java @@ -18,11 +18,14 @@ package org.apache.carbondata.core.metadata.schema.table; import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.UUID; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.datatype.StructField; @@ -42,6 +45,16 @@ public class TableSchemaBuilder { private List otherColumns = new LinkedList<>(); + private int blockSize; + + public TableSchemaBuilder blockSize(int blockSize) { + if (blockSize == 0) { + throw new IllegalArgumentException("blockSize should be greater than 0"); + } + this.blockSize = blockSize; + return this; + } + public TableSchema build() { TableSchema schema = new TableSchema(); schema.setTableId(UUID.randomUUID().toString()); @@ -53,6 +66,12 @@ public TableSchema build() { List allColumns = new LinkedList<>(sortColumns); allColumns.addAll(otherColumns); schema.setListOfColumns(allColumns); + + if (blockSize > 0) { + Map property = new HashMap<>(); + property.put(CarbonCommonConstants.TABLE_BLOCKSIZE, String.valueOf(blockSize)); + schema.setTableProperties(property); + } return schema; } @@ -62,13 +81,19 @@ public TableSchemaBuilder addColumn(StructField field, boolean isSortColumn) { ColumnSchema newColumn = new ColumnSchema(); newColumn.setColumnName(field.getFieldName()); newColumn.setDataType(field.getDataType()); - newColumn.setDimensionColumn(isSortColumn || field.getDataType() == DataTypes.STRING); + if (isSortColumn || + field.getDataType() == DataTypes.STRING || + field.getDataType() == DataTypes.DATE || + field.getDataType() == DataTypes.TIMESTAMP) { + newColumn.setDimensionColumn(true); + } else { + newColumn.setDimensionColumn(false); + } newColumn.setSchemaOrdinal(ordinal++); newColumn.setColumnar(true); newColumn.setColumnUniqueId(UUID.randomUUID().toString()); newColumn.setColumnReferenceId(newColumn.getColumnUniqueId()); newColumn.setEncodingList(createEncoding(field.getDataType(), isSortColumn)); - if (isSortColumn) { sortColumns.add(newColumn); newColumn.setSortColumn(true); @@ -98,6 +123,7 @@ private List createEncoding(DataType dataType, boolean isSortColumn) { List encodings = new LinkedList<>(); if (dataType == DataTypes.TIMESTAMP || dataType == DataTypes.DATE) { encodings.add(Encoding.DIRECT_DICTIONARY); + encodings.add(Encoding.DICTIONARY); } if (isSortColumn) { encodings.add(Encoding.INVERTED_INDEX); diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml index b3dd4640c8e..9f7038aefa6 100644 --- a/store/sdk/pom.xml +++ b/store/sdk/pom.xml @@ -43,9 +43,6 @@ . - - CARBON_SPARK_INTERFACELogResource.properties - @@ -144,7 +141,7 @@ false true - carbondata-sdk.jar + target/carbondata-sdk.jar *:* diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index e06200afb05..87343412c2a 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -53,6 +53,8 @@ public class CarbonWriterBuilder { private String path; private String[] sortColumns; private boolean persistSchemaFile; + private int blockletSize; + private int blockSize; public CarbonWriterBuilder withSchema(Schema schema) { Objects.requireNonNull(schema, "schema should not be null"); @@ -84,14 +86,16 @@ public CarbonWriterBuilder withBlockSize(int blockSize) { if (blockSize <= 0) { throw new IllegalArgumentException("blockSize should be greater than zero"); } - throw new UnsupportedOperationException(); + this.blockSize = blockSize; + return this; } public CarbonWriterBuilder withBlockletSize(int blockletSize) { if (blockletSize <= 0) { throw new IllegalArgumentException("blockletSize should be greater than zero"); } - throw new UnsupportedOperationException(); + this.blockletSize = blockletSize; + return this; } /** @@ -128,6 +132,10 @@ public CarbonWriter buildWriterForAvroInput() throws IOException { */ private CarbonTable buildCarbonTable() { TableSchemaBuilder tableSchemaBuilder = TableSchema.builder(); + if (blockletSize > 0) { + tableSchemaBuilder = tableSchemaBuilder.blockSize(blockSize); + } + List sortColumnsList; if (sortColumns != null) { sortColumnsList = Arrays.asList(sortColumns); diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java index 022bb8129ff..6555421cf7d 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java @@ -71,6 +71,15 @@ private void writeFilesAndVerify(Schema schema, String path) { } private void writeFilesAndVerify(Schema schema, String path, String[] sortColumns) { + writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1); + } + + private void writeFilesAndVerify(Schema schema, String path, boolean persistSchema) { + writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1); + } + + private void writeFilesAndVerify(int rows, Schema schema, String path, String[] sortColumns, + boolean persistSchema, int blockletSize, int blockSize) { try { CarbonWriterBuilder builder = CarbonWriter.builder() .withSchema(schema) @@ -78,10 +87,19 @@ private void writeFilesAndVerify(Schema schema, String path, String[] sortColumn if (sortColumns != null) { builder = builder.sortBy(sortColumns); } + if (persistSchema) { + builder = builder.persistSchemaFile(true); + } + if (blockletSize != -1) { + builder = builder.withBlockletSize(blockletSize); + } + if (blockSize != -1) { + builder = builder.withBlockSize(blockSize); + } CarbonWriter writer = builder.buildWriterForCSVInput(); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < rows; i++) { writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)}); } writer.close(); @@ -99,22 +117,103 @@ private void writeFilesAndVerify(Schema schema, String path, String[] sortColumn } }); Assert.assertNotNull(dataFiles); - Assert.assertEquals(1, dataFiles.length); + Assert.assertTrue(dataFiles.length > 0); } @Test - public void testAllPrimitiveDataType() { + public void testAllPrimitiveDataType() throws IOException { // TODO: write all data type and read by CarbonRecordReader to verify the content + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[9]; + fields[0] = new Field("stringField", DataTypes.STRING); + fields[1] = new Field("intField", DataTypes.INT); + fields[2] = new Field("shortField", DataTypes.SHORT); + fields[3] = new Field("longField", DataTypes.LONG); + fields[4] = new Field("doubleField", DataTypes.DOUBLE); + fields[5] = new Field("boolField", DataTypes.BOOLEAN); + fields[6] = new Field("dateField", DataTypes.DATE); + fields[7] = new Field("timeField", DataTypes.TIMESTAMP); + fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2)); + + try { + CarbonWriterBuilder builder = CarbonWriter.builder() + .withSchema(new Schema(fields)) + .outputPath(path); + + CarbonWriter writer = builder.buildWriterForCSVInput(); + + for (int i = 0; i < 100; i++) { + String[] row = new String[]{ + "robot" + (i % 10), + String.valueOf(i), + String.valueOf(i), + String.valueOf(Long.MAX_VALUE - i), + String.valueOf((double) i / 2), + String.valueOf(true), + "2019-03-02", + "2019-02-12 03:03:34" + }; + writer.write(row); + } + writer.close(); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } + + File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); + Assert.assertTrue(segmentFolder.exists()); + + File[] dataFiles = segmentFolder.listFiles(new FileFilter() { + @Override public boolean accept(File pathname) { + return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); + } + }); + Assert.assertNotNull(dataFiles); + Assert.assertTrue(dataFiles.length > 0); + + FileUtils.deleteDirectory(new File(path)); } @Test - public void test2Blocklet() { - // TODO: write data with more than one blocklet + public void test2Blocklet() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 1, 100); + + // TODO: implement reader to verify the number of blocklet in the file + + FileUtils.deleteDirectory(new File(path)); } @Test - public void test2Block() { - // TODO: write data with more than one block + public void test2Block() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 2, 2); + + File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); + File[] dataFiles = segmentFolder.listFiles(new FileFilter() { + @Override public boolean accept(File pathname) { + return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT); + } + }); + Assert.assertNotNull(dataFiles); + Assert.assertEquals(2, dataFiles.length); + + FileUtils.deleteDirectory(new File(path)); } @Test @@ -139,8 +238,20 @@ public void testPartitionOutput() { } @Test - public void testSchemaPersistence() { - // TODO: verify schema file is persisted in specified location + public void testSchemaPersistence() throws IOException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + writeFilesAndVerify(new Schema(fields), path, true); + + String schemaFile = CarbonTablePath.getSchemaFilePath(path); + Assert.assertTrue(new File(schemaFile).exists()); + + FileUtils.deleteDirectory(new File(path)); } } From 3ac562958ecf099303f824ba2f5bc7bbce1ab16d Mon Sep 17 00:00:00 2001 From: Jacky Li Date: Tue, 13 Mar 2018 11:52:38 +0800 Subject: [PATCH 3/3] fix comment --- .../core/metadata/schema/table/TableSchemaBuilder.java | 2 +- .../carbondata/sdk/file/CSVCarbonWriterSuite.java | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java index 7a0e37277c8..8fdcbb1967e 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java @@ -48,7 +48,7 @@ public class TableSchemaBuilder { private int blockSize; public TableSchemaBuilder blockSize(int blockSize) { - if (blockSize == 0) { + if (blockSize <= 0) { throw new IllegalArgumentException("blockSize should be greater than 0"); } this.blockSize = blockSize; diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java index 6555421cf7d..0ac6f38d55e 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java @@ -78,6 +78,16 @@ private void writeFilesAndVerify(Schema schema, String path, boolean persistSche writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1); } + /** + * Invoke CarbonWriter API to write carbon files and assert the file is rewritten + * @param rows number of rows to write + * @param schema schema of the file + * @param path local write path + * @param sortColumns sort columns + * @param persistSchema true if want to persist schema file + * @param blockletSize blockletSize in the file, -1 for default size + * @param blockSize blockSize in the file, -1 for default size + */ private void writeFilesAndVerify(int rows, Schema schema, String path, String[] sortColumns, boolean persistSchema, int blockletSize, int blockSize) { try {