diff --git a/common/src/main/java/org/apache/carbondata/common/annotations/Since.java b/common/src/main/java/org/apache/carbondata/common/annotations/Since.java new file mode 100644 index 00000000000..b7e43919c4a --- /dev/null +++ b/common/src/main/java/org/apache/carbondata/common/annotations/Since.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.common.annotations; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * The annotation indicates that the version number since a member or a type has been present. + */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.FIELD, ElementType.TYPE, ElementType.METHOD}) +public @interface Since { + /** + * the value indicating a version number since this member + * or type has been present. + */ + String value(); +} diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index f2b9308bf24..e10d1d5c732 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -971,6 +971,8 @@ public final class CarbonCommonConstants { */ public static final String DICTIONARY_PATH = "dictionary_path"; public static final String SORT_COLUMNS = "sort_columns"; + // file format for the data files + public static final String FORMAT = "format"; public static final String PARTITION_TYPE = "partition_type"; public static final String NUM_PARTITIONS = "num_partitions"; public static final String RANGE_INFO = "range_info"; @@ -993,6 +995,8 @@ public final class CarbonCommonConstants { // Flat folder support on table. when it is true all carbondata files store directly under table // path instead of sub folders. public static final String FLAT_FOLDER = "flat_folder"; + // this will be used in hadoop conf to pass the format type to executor + public static final String CARBON_EXTERNAL_FORMAT_CONF_KEY = "carbon_external_format_type"; /** * 16 mb size diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java index 38145e5ab22..46328f714f6 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java @@ -25,11 +25,13 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.carbondata.common.annotations.Since; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; @@ -89,6 +91,17 @@ public class TableInfo implements Serializable, Writable { * */ private boolean isTransactionalTable = true; + /** + * The format of the fact table. + * By default it is carbondata, and we also support other format like CSV + */ + @Since("1.4.1") + private String format = "carbondata"; + /** + * properties for the format, such as delimiter/header for csv format + */ + @Since("1.4.1") + private Map formatProperties; // this identifier is a lazy field which will be created when it is used first time private AbsoluteTableIdentifier identifier; @@ -104,6 +117,7 @@ public class TableInfo implements Serializable, Writable { public TableInfo() { dataMapSchemaList = new ArrayList<>(); + formatProperties = new HashMap<>(); isTransactionalTable = true; } @@ -196,6 +210,22 @@ public void setTablePath(String tablePath) { this.tablePath = tablePath; } + public String getFormat() { + return format; + } + + public void setFormat(String format) { + this.format = format; + } + + public Map getFormatProperties() { + return formatProperties; + } + + public void setFormatProperties(Map formatProperties) { + this.formatProperties = formatProperties; + } + public List getDataMapSchemaList() { return dataMapSchemaList; } @@ -291,6 +321,17 @@ public void write(DataOutput out) throws IOException { } } out.writeBoolean(isSchemaModified); + + out.writeUTF(format); + boolean isFormatPropertiesExists = null != formatProperties && formatProperties.size() > 0; + out.writeBoolean(isFormatPropertiesExists); + if (isFormatPropertiesExists) { + out.writeShort(formatProperties.size()); + for (Map.Entry entry : formatProperties.entrySet()) { + out.writeUTF(entry.getKey()); + out.writeUTF(entry.getValue()); + } + } } @Override public void readFields(DataInput in) throws IOException { @@ -327,6 +368,17 @@ public void write(DataOutput out) throws IOException { } } this.isSchemaModified = in.readBoolean(); + + this.format = in.readUTF(); + boolean isFormatPropertiesExists = in.readBoolean(); + if (isFormatPropertiesExists) { + short size = in.readShort(); + for (int i = 0; i < size; i++) { + String key = in.readUTF(); + String value = in.readUTF(); + this.formatProperties.put(key, value); + } + } } public AbsoluteTableIdentifier getOrCreateAbsoluteTableIdentifier() { diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java index c154c5f65dd..2b61f0d05f3 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java @@ -18,7 +18,8 @@ package org.apache.carbondata.core.statusmanager; /** - * The data file format supported in carbondata project + * The data file format supported in carbondata project. + * The fileformat along with its property will be stored in tableinfo */ public enum FileFormat { @@ -26,7 +27,10 @@ public enum FileFormat { COLUMNAR_V3, // carbondata row file format, optimized for write - ROW_V1; + ROW_V1, + + // external file format, such as parquet/csv + EXTERNAL; public static FileFormat getByOrdinal(int ordinal) { if (ordinal < 0 || ordinal >= FileFormat.values().length) { @@ -38,6 +42,8 @@ public static FileFormat getByOrdinal(int ordinal) { return COLUMNAR_V3; case 1: return ROW_V1; + case 2: + return EXTERNAL; } return COLUMNAR_V3; diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormatProperties.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormatProperties.java new file mode 100644 index 00000000000..862c36c09e9 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormatProperties.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.statusmanager; + +/** + * Provides the constant name for the file format properties + */ +public class FileFormatProperties { + public static class CSV { + public static final String HEADER = "csv.header"; + public static final String DELIMITER = "csv.delimiter"; + public static final String COMMENT = "csv.comment"; + public static final String SKIP_EMPTY_LINE = "csv.skipemptyline"; + public static final String QUOTE = "csv.quote"; + public static final String ESCAPE = "csv.escape"; + } +} diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java index 9dc8fe60576..4339e342b96 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java @@ -122,6 +122,11 @@ public void setIndexSize(String indexSize) { * the file format of this segment */ private FileFormat fileFormat = FileFormat.COLUMNAR_V3; + /** + * path of the fact files. + * Since format and formatProperties are stored in tableInfo, we do not store it in each segment + */ + private String factFilePath; /** * Segment file name where it has the information of partition information. @@ -429,8 +434,17 @@ public void setSegmentFile(String segmentFile) { this.segmentFile = segmentFile; } + public String getFactFilePath() { + return factFilePath; + } + + public void setFactFilePath(String factFilePath) { + this.factFilePath = factFilePath; + } + @Override public String toString() { return "LoadMetadataDetails{" + "loadStatus=" + loadStatus + ", loadName='" + loadName + '\'' - + ", loadStartTime='" + loadStartTime + '\'' + ", segmentFile='" + segmentFile + '\'' + '}'; + + ", loadStartTime='" + loadStartTime + '\'' + ", factFilePath='" + factFilePath + '\'' + + ", segmentFile='" + segmentFile + '\'' + '}'; } } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java index 0b991cb9f9f..ae11cf29b2c 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java @@ -64,12 +64,18 @@ public CarbonMultiBlockSplit(List blocks, String hostname) { this.splitList.add((CarbonInputSplit)block); } this.locations = new String[]{hostname}; + if (splitList.size() > 0) { + this.fileFormat = splitList.get(0).getFileFormat(); + } } public CarbonMultiBlockSplit(List splitList, String[] locations) { this.splitList = splitList; this.locations = locations; + if (splitList.size() > 0) { + this.fileFormat = splitList.get(0).getFileFormat(); + } calculateLength(); } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CsvRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CsvRecordReader.java new file mode 100644 index 00000000000..70f58c3a518 --- /dev/null +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CsvRecordReader.java @@ -0,0 +1,506 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.UnsupportedEncodingException; +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.intf.RowImpl; +import org.apache.carbondata.core.scan.filter.intf.RowIntf; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.statusmanager.FileFormatProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; + +import com.univocity.parsers.csv.CsvParser; +import com.univocity.parsers.csv.CsvParserSettings; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +/** + * scan csv file and filter on it + */ +@InterfaceStability.Evolving +@InterfaceAudience.Internal +public class CsvRecordReader extends AbstractRecordReader { + private static final LogService LOGGER = LogServiceFactory.getLogService( + CsvRecordReader.class.getName()); + private static final int MAX_BATCH_SIZE = + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT; + // vector reader + private boolean isVectorReader; + private T columnarBatch; + + // metadata + private CarbonTable carbonTable; + private CarbonColumn[] carbonColumns; + // input + private QueryModel queryModel; + private CarbonReadSupport readSupport; + private FileSplit fileSplit; + private Configuration hadoopConf; + // the index is schema ordinal, the value is the csv ordinal + private int[] schema2csvIdx; + + // filter + private FilterExecuter filter; + // the index is the dimension ordinal, the value is the schema ordinal + private int[] filterColumn2SchemaIdx; + private Object[] internalValues; + private RowIntf internalRow; + + // output + private CarbonColumn[] projection; + // the index is the projection column ordinal, the value is the schema ordinal + private int[] projectionColumn2SchemaIdx; + private Object[] outputValues; + private Object[][] batchOutputValues; + private T outputRow; + + // inputMetricsStats + private InputMetricsStats inputMetricsStats; + + // scan + private Reader reader; + private CsvParser csvParser; + + public CsvRecordReader(QueryModel queryModel, CarbonReadSupport readSupport) { + this.queryModel = queryModel; + this.readSupport = readSupport; + } + + public CsvRecordReader(QueryModel queryModel, CarbonReadSupport readSupport, + InputMetricsStats inputMetricsStats) { + this(queryModel, readSupport); + this.inputMetricsStats = inputMetricsStats; + } + + public boolean isVectorReader() { + return isVectorReader; + } + + public void setVectorReader(boolean vectorReader) { + isVectorReader = vectorReader; + } + + public void setQueryModel(QueryModel queryModel) { + this.queryModel = queryModel; + } + + public void setInputMetricsStats(InputMetricsStats inputMetricsStats) { + this.inputMetricsStats = inputMetricsStats; + } + + public void setReadSupport(CarbonReadSupport readSupport) { + this.readSupport = readSupport; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + if (split instanceof CarbonInputSplit) { + fileSplit = (CarbonInputSplit) split; + } else if (split instanceof CarbonMultiBlockSplit) { + fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0); + } else { + fileSplit = (FileSplit) split; + } + + hadoopConf = context.getConfiguration(); + if (queryModel == null) { + CarbonTableInputFormat inputFormat = new CarbonTableInputFormat(); + queryModel = inputFormat.createQueryModel(split, context); + } + + carbonTable = queryModel.getTable(); + + // since the sequence of csv header, schema, carbon internal row, projection are different, + // we need to init the column mappings + initializedIdxMapping(); + + // init filter + if (null != queryModel.getFilterExpressionResolverTree()) { + initializeFilter(); + } + + // init reading + initializeCsvReader(); + + this.readSupport.initialize(projection, carbonTable); + } + + private void initializedIdxMapping() { + carbonColumns = + carbonTable.getCreateOrderColumn(carbonTable.getTableName()).toArray(new CarbonColumn[0]); + // for schema to csv mapping + schema2csvIdx = new int[carbonColumns.length]; + if (!carbonTable.getTableInfo().getFormatProperties().containsKey( + FileFormatProperties.CSV.HEADER)) { + // if no header specified, it means that they are the same + LOGGER.info("no header specified, will take the schema from table as header"); + for (int i = 0; i < carbonColumns.length; i++) { + schema2csvIdx[i] = i; + } + } else { + String[] csvHeader = carbonTable.getTableInfo().getFormatProperties().get( + FileFormatProperties.CSV.HEADER).split(","); + for (int i = 0; i < csvHeader.length; i++) { + boolean found = false; + for (int j = 0; j < carbonColumns.length; j++) { + if (StringUtils.strip(csvHeader[i]).equalsIgnoreCase(carbonColumns[j].getColName())) { + schema2csvIdx[carbonColumns[j].getSchemaOrdinal()] = i; + found = true; + break; + } + } + if (!found) { + throw new RuntimeException( + String.format("Can not find csv header '%s' in table fields", csvHeader[i])); + } + } + } + + // for carbon internal row to schema mapping + filterColumn2SchemaIdx = new int[carbonColumns.length]; + int filterIdx = 0; + for (CarbonDimension dimension : carbonTable.getDimensions()) { + filterColumn2SchemaIdx[filterIdx++] = dimension.getSchemaOrdinal(); + } + for (CarbonMeasure measure : carbonTable.getMeasures()) { + filterColumn2SchemaIdx[filterIdx++] = measure.getSchemaOrdinal(); + } + + // for projects to schema mapping + projection = queryModel.getProjectionColumns(); + projectionColumn2SchemaIdx = new int[projection.length]; + + for (int i = 0; i < projection.length; i++) { + for (int j = 0; j < carbonColumns.length; j++) { + if (projection[i].getColName().equals(carbonColumns[j].getColName())) { + projectionColumn2SchemaIdx[i] = projection[i].getSchemaOrdinal(); + break; + } + } + } + } + + private void initializeFilter() { + List wrapperColumnSchemaList = CarbonUtil + .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()), + carbonTable.getMeasureByTableName(carbonTable.getTableName())); + int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()]; + for (int i = 0; i < dimLensWithComplex.length; i++) { + dimLensWithComplex[i] = Integer.MAX_VALUE; + } + + int[] dictionaryColumnCardinality = + CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList); + SegmentProperties segmentProperties = + new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality); + Map complexDimensionInfoMap = new HashMap<>(); + + FilterResolverIntf resolverIntf = queryModel.getFilterExpressionResolverTree(); + filter = FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties, + complexDimensionInfoMap); + // for row filter, we need update column index + FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(), + carbonTable.getDimensionOrdinalMax()); + } + + private void initializeCsvReader() throws IOException { + internalValues = new Object[carbonColumns.length]; + internalRow = new RowImpl(); + internalRow.setValues(internalValues); + + outputValues = new Object[projection.length]; + batchOutputValues = new Object[MAX_BATCH_SIZE][projection.length]; + + Path file = fileSplit.getPath(); + FileSystem fs = file.getFileSystem(hadoopConf); + int bufferSize = Integer.parseInt( + hadoopConf.get(CSVInputFormat.READ_BUFFER_SIZE, CSVInputFormat.READ_BUFFER_SIZE_DEFAULT)); + // note that here we read the whole file, not a split of the file + FSDataInputStream fsStream = fs.open(file, bufferSize); + reader = new InputStreamReader(fsStream, CarbonCommonConstants.DEFAULT_CHARSET); + // use default csv settings first, then update it using user specified properties later + CsvParserSettings settings = CSVInputFormat.extractCsvParserSettings(hadoopConf); + initCsvSettings(settings); + csvParser = new CsvParser(settings); + csvParser.beginParsing(reader); + } + + /** + * update the settings using properties from user + */ + private void initCsvSettings(CsvParserSettings settings) { + Map csvProperties = carbonTable.getTableInfo().getFormatProperties(); + + if (csvProperties.containsKey(FileFormatProperties.CSV.DELIMITER)) { + settings.getFormat().setDelimiter( + csvProperties.get(FileFormatProperties.CSV.DELIMITER).charAt(0)); + } + + if (csvProperties.containsKey(FileFormatProperties.CSV.COMMENT)) { + settings.getFormat().setComment( + csvProperties.get(FileFormatProperties.CSV.COMMENT).charAt(0)); + } + + if (csvProperties.containsKey(FileFormatProperties.CSV.QUOTE)) { + settings.getFormat().setQuote( + csvProperties.get(FileFormatProperties.CSV.QUOTE).charAt(0)); + } + + if (csvProperties.containsKey(FileFormatProperties.CSV.ESCAPE)) { + settings.getFormat().setQuoteEscape( + csvProperties.get(FileFormatProperties.CSV.ESCAPE).charAt(0)); + } + + if (csvProperties.containsKey(FileFormatProperties.CSV.SKIP_EMPTY_LINE)) { + settings.setSkipEmptyLines( + Boolean.parseBoolean(csvProperties.get(FileFormatProperties.CSV.SKIP_EMPTY_LINE))); + } + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (isVectorReader) { + return nextColumnarBatch(); + } + + return nextRow(); + } + + private boolean nextColumnarBatch() throws IOException { + return scanAndFillBatch(); + } + + private boolean scanAndFillBatch() throws IOException { + int rowNum = 0; + if (null == filter) { + while (readRowFromFile() && rowNum < MAX_BATCH_SIZE) { + System.arraycopy(outputValues, 0, batchOutputValues[rowNum++], 0, outputValues.length); + } + } else { + try { + while (readRowFromFile() && rowNum < MAX_BATCH_SIZE) { + if (filter.applyFilter(internalRow, carbonTable.getDimensionOrdinalMax())) { + System.arraycopy(outputValues, 0, batchOutputValues[rowNum++], 0, outputValues.length); + } + } + } catch (FilterUnsupportedException e) { + throw new IOException("Failed to filter row in CarbonCsvRecordReader", e); + } + } + if (rowNum < MAX_BATCH_SIZE) { + Object[][] tmpBatchOutputValues = new Object[rowNum][]; + for (int i = 0; i < rowNum; i++) { + tmpBatchOutputValues[i] = batchOutputValues[i]; + } + System.arraycopy(batchOutputValues, 0, tmpBatchOutputValues, 0, rowNum); + for (int i = 0; i < tmpBatchOutputValues.length; i++) { + } + columnarBatch = readSupport.readRow(tmpBatchOutputValues); + } else { + columnarBatch = readSupport.readRow(batchOutputValues); + } + return rowNum > 0; + } + + private boolean nextRow() throws IOException { + if (csvParser == null) { + return false; + } + + if (!readRowFromFile()) { + return false; + } + + if (null == filter) { + outputRow = readSupport.readRow(outputValues); + return true; + } else { + try { + boolean scanMore; + do { + scanMore = !filter.applyFilter(internalRow, carbonTable.getDimensionOrdinalMax()); + if (!scanMore) { + outputRow = readSupport.readRow(outputValues); + return true; + } + } while (readRowFromFile()); + // if we read the end of file and still need scanMore, it means that there is no row + return false; + } catch (FilterUnsupportedException e) { + throw new IOException("Failed to filter row in CarbonCsvRecordReader", e); + } + } + } + + /** + * read from csv file and convert to internal row + * todo: prune with project/filter + * @return false, if it comes to an end + */ + private boolean readRowFromFile() { + String[] parsedOut = csvParser.parseNext(); + if (parsedOut == null) { + return false; + } else { + convertToInternalRow(parsedOut); + convertToOutputRow(parsedOut); + return true; + } + } + + /** + * convert origin csv string row to carbondata internal row. + * The row will be used to do filter on it. Note that the dimensions are at the head + * while measures are at the end, so we need to adjust the values. + */ + private void convertToInternalRow(String[] csvLine) { + try { + for (int i = 0; i < carbonColumns.length; i++) { + internalValues[i] = convertOriginValue2Carbon( + csvLine[schema2csvIdx[filterColumn2SchemaIdx[i]]], + carbonColumns[filterColumn2SchemaIdx[i]].getDataType()); + } + } catch (UnsupportedEncodingException e) { + LOGGER.error(e, "Error occurs while convert input to internal row"); + throw new RuntimeException(e); + } + internalRow.setValues(internalValues); + } + + /** + * Since output the sequence of columns is not the same as input, we need to adjust them + */ + private void convertToOutputRow(String[] csvLine) { + for (int i = 0; i < projection.length; i++) { + outputValues[i] = csvLine[schema2csvIdx[projectionColumn2SchemaIdx[i]]]; + } + } + + private Object convertOriginValue2Carbon(String value, + org.apache.carbondata.core.metadata.datatype.DataType t) throws UnsupportedEncodingException { + if (null == value) { + return null; + } else { + if (t == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN) { + return Boolean.parseBoolean(value); + } else if (t == org.apache.carbondata.core.metadata.datatype.DataTypes.BYTE) { + return Byte.parseByte(value); + } else if (t == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT) { + return Short.parseShort(value); + } else if (t == org.apache.carbondata.core.metadata.datatype.DataTypes.INT) { + return Integer.parseInt(value); + } else if (t == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) { + return Long.parseLong(value); + } else if (t == org.apache.carbondata.core.metadata.datatype.DataTypes.FLOAT) { + return Float.parseFloat(value); + } else if (t == org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE) { + return Double.parseDouble(value); + } else if (t == org.apache.carbondata.core.metadata.datatype.DataTypes.STRING) { + return value.getBytes(CarbonCommonConstants.DEFAULT_CHARSET); + } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(t)) { + BigDecimal javaDecimal = new BigDecimal(value); + return DataTypeUtil.bigDecimalToByte(javaDecimal); + } else if (t == org.apache.carbondata.core.metadata.datatype.DataTypes.DATE) { + return Integer.parseInt(value); + } else if (t == org.apache.carbondata.core.metadata.datatype.DataTypes.TIMESTAMP) { + return Long.parseLong(value); + } else { + throw new RuntimeException("Unsupport datatype in CarbonCsvRecordReader"); + } + } + } + + @Override + public Void getCurrentKey() throws IOException, InterruptedException { + return null; + } + + @Override + public T getCurrentValue() throws IOException, InterruptedException { + if (isVectorReader) { + if (inputMetricsStats != null) { + inputMetricsStats.incrementRecordRead(1L); + } + return (T) columnarBatch; + } else { + if (inputMetricsStats != null) { + inputMetricsStats.incrementRecordRead(1L); + } + return (T) outputRow; + } + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return 0; + } + + @Override + public void close() throws IOException { + try { + if (reader != null) { + reader.close(); + } + if (csvParser != null) { + csvParser.stopParsing(); + } + if (readSupport != null) { + readSupport.close(); + } + } finally { + reader = null; + csvParser = null; + } + } +} diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java index 0f02e12c972..605b681a0b6 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java @@ -172,9 +172,14 @@ private List getSplits(JobContext job, FilterResolverIntf filterReso List result = new LinkedList(); // for each segment fetch blocks matching filter in Driver BTree - List dataBlocksOfSegment = - getDataBlocksOfSegment(job, carbonTable, filterResolver, matchedPartitions, - validSegments, partitionInfo, oldPartitionIdList); + List dataBlocksOfSegment; + if (carbonTable.getTableInfo().getFormat().equals("carbondata")) { + dataBlocksOfSegment = getDataBlocksOfSegment(job, carbonTable, filterResolver, + matchedPartitions, validSegments, partitionInfo, oldPartitionIdList); + } else { + dataBlocksOfSegment = getDataBlocksOfSegment4ExternalFormat(job, carbonTable, filterResolver, + validSegments); + } numBlocks = dataBlocksOfSegment.size(); result.addAll(dataBlocksOfSegment); return result; diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index eeb3ae84611..5fdc5229860 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -34,6 +34,7 @@ import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; import org.apache.carbondata.core.datamap.dev.expr.DataMapWrapperSimpleInfo; +import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.exception.InvalidConfigurationException; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.PartitionSpec; @@ -55,6 +56,9 @@ import org.apache.carbondata.core.stats.QueryStatisticsConstants; import org.apache.carbondata.core.stats.QueryStatisticsRecorder; import org.apache.carbondata.core.util.BlockletDataMapUtil; +import org.apache.carbondata.core.statusmanager.FileFormat; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; import org.apache.carbondata.core.util.CarbonUtil; @@ -66,6 +70,7 @@ import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; import org.apache.carbondata.hadoop.CarbonProjection; import org.apache.carbondata.hadoop.CarbonRecordReader; +import org.apache.carbondata.hadoop.CsvRecordReader; import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport; @@ -73,6 +78,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; @@ -361,6 +368,31 @@ protected Expression getFilterPredicates(Configuration configuration) { } } + protected List getDataBlocksOfSegment4ExternalFormat(JobContext job, + CarbonTable carbonTable, FilterResolverIntf resolver, List segmentIds) + throws IOException { + + QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder(); + QueryStatistic statistic = new QueryStatistic(); + + // get tokens for all the required FileSystem for table path + TokenCache.obtainTokensForNamenodes(job.getCredentials(), + new Path[] { new Path(carbonTable.getTablePath()) }, job.getConfiguration()); + List prunedFiles = getPrunedFiles4ExternalFormat(job, carbonTable, + resolver, segmentIds); + List resultFilteredFiles = new ArrayList<>(); + + for (ExtendedBlocklet blocklet : prunedFiles) { + List inputSplits = convertToInputSplit4ExternalFormat(job, blocklet); + resultFilteredFiles.addAll(inputSplits); + } + + statistic + .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis()); + recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id")); + return resultFilteredFiles; + } + /** * get data blocks of given segment */ @@ -512,6 +544,32 @@ private List intersectFilteredBlocklets(CarbonTable carbonTabl return prunedBlocklets; } + private List getPrunedFiles4ExternalFormat(JobContext job, + CarbonTable carbonTable, + FilterResolverIntf resolver, List segmentIds) throws IOException { + ExplainCollector.addPruningInfo(carbonTable.getTableName()); + if (resolver != null) { + ExplainCollector.setFilterStatement(resolver.getFilterExpression().getStatement()); + } else { + ExplainCollector.setFilterStatement("none"); + } + + // there is no default datamap for external format, so return all files + List prunedFiles = new ArrayList<>(); + LoadMetadataDetails[] loadMetadatas = SegmentStatusManager.readTableStatusFile( + CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath())); + for (LoadMetadataDetails loadMetadata : loadMetadatas) { + for (String file : loadMetadata.getFactFilePath().split(",")) { + ExtendedBlocklet extendedBlocklet = new ExtendedBlocklet(file, "0"); + extendedBlocklet.setSegmentId(loadMetadata.getLoadName()); + prunedFiles.add(extendedBlocklet); + } + } + + // todo: skip datamap prune now, will add it back later + return prunedFiles; + } + /** * Prune the segments from the already pruned blocklets. * @param segments @@ -552,12 +610,72 @@ private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) th return split; } + private List convertToInputSplit4ExternalFormat(JobContext jobContext, + ExtendedBlocklet extendedBlocklet) throws IOException { + List splits = new ArrayList(); + String factFilePath = extendedBlocklet.getFilePath(); + Path path = new Path(factFilePath); + FileSystem fs = FileFactory.getFileSystem(path); + FileStatus fileStatus = fs.getFileStatus(path); + long length = fileStatus.getLen(); + if (length != 0) { + BlockLocation[] blkLocations = fs.getFileBlockLocations(path, 0, length); + long blkSize = fileStatus.getBlockSize(); + long minSplitSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(jobContext)); + long maxSplitSize = getMaxSplitSize(jobContext); + long splitSize = computeSplitSize(blkSize, minSplitSize, maxSplitSize); + long bytesRemaining = fileStatus.getLen(); + while (((double) bytesRemaining) / splitSize > 1.1) { + int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); + splits.add(new CarbonInputSplit(extendedBlocklet.getSegmentId(), path, + length - bytesRemaining, + splitSize, blkLocations[blkIndex].getHosts(), + blkLocations[blkIndex].getCachedHosts(), FileFormat.EXTERNAL)); + bytesRemaining -= splitSize; + } + if (bytesRemaining != 0) { + int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); + splits.add(new CarbonInputSplit(extendedBlocklet.getSegmentId(), path, + length - bytesRemaining, + bytesRemaining, blkLocations[blkIndex].getHosts(), + blkLocations[blkIndex].getCachedHosts(), FileFormat.EXTERNAL)); + } + } else { + splits.add(new CarbonInputSplit(extendedBlocklet.getSegmentId(), path, 0, length, + new String[0], FileFormat.EXTERNAL)); + } + return splits; + } + @Override public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { Configuration configuration = taskAttemptContext.getConfiguration(); QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext); CarbonReadSupport readSupport = getReadSupportClass(configuration); - return new CarbonRecordReader(queryModel, readSupport); + if (inputSplit instanceof CarbonMultiBlockSplit + && ((CarbonMultiBlockSplit) inputSplit).getFileFormat() == FileFormat.EXTERNAL) { + return createRecordReaderForExternalFormat(queryModel, readSupport, + configuration.get(CarbonCommonConstants.CARBON_EXTERNAL_FORMAT_CONF_KEY)); + } else if (inputSplit instanceof CarbonInputSplit + && ((CarbonInputSplit) inputSplit).getFileFormat() == FileFormat.EXTERNAL) { + return createRecordReaderForExternalFormat(queryModel, readSupport, + configuration.get(CarbonCommonConstants.CARBON_EXTERNAL_FORMAT_CONF_KEY)); + } else { + return new CarbonRecordReader(queryModel, readSupport); + } + } + + private RecordReader createRecordReaderForExternalFormat(QueryModel queryModel, + CarbonReadSupport readSupport, String format) { + try { + if ("csv".equals(format)) { + return new CsvRecordReader(queryModel, readSupport); + } else { + throw new RuntimeException("Unsupported external file format " + format); + } + } catch (Throwable e) { + throw new RuntimeException("Failed to create recordReader for format " + format, e); + } } public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index be7298305f2..2c9c1afd018 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -535,9 +535,15 @@ private List getSplits(JobContext job, FilterResolverIntf filterReso isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0); // for each segment fetch blocks matching filter in Driver BTree - List dataBlocksOfSegment = - getDataBlocksOfSegment(job, carbonTable, filterResolver, matchedPartitions, - validSegments, partitionInfo, oldPartitionIdList); + List dataBlocksOfSegment; + if (carbonTable.getTableInfo().getFormat().equals("") + || carbonTable.getTableInfo().getFormat().equals("carbondata")) { + dataBlocksOfSegment = getDataBlocksOfSegment(job, carbonTable, filterResolver, + matchedPartitions, validSegments, partitionInfo, oldPartitionIdList); + } else { + dataBlocksOfSegment = getDataBlocksOfSegment4ExternalFormat(job, carbonTable, filterResolver, + validSegments); + } numBlocks = dataBlocksOfSegment.size(); for (org.apache.carbondata.hadoop.CarbonInputSplit inputSplit : dataBlocksOfSegment) { diff --git a/integration/spark-common-test/src/test/resources/datawithoutheader_delimiter_separator.csv b/integration/spark-common-test/src/test/resources/datawithoutheader_delimiter_separator.csv new file mode 100644 index 00000000000..62efa686830 --- /dev/null +++ b/integration/spark-common-test/src/test/resources/datawithoutheader_delimiter_separator.csv @@ -0,0 +1,10 @@ +11|arvind|SE|17-01-2007|1|developer|10|network|928478|17-02-2007|29-11-2016|96|96|5040 +12|krithin|SSE|29-05-2008|1|developer|11|protocol|928378|29-06-2008|30-12-2016|85|95|7124 +13|madhan|TPL|07-07-2009|2|tester|10|network|928478|07-08-2009|30-12-2016|88|99|9054 +14|anandh|SA|29-12-2010|3|manager|11|protocol|928278|29-01-2011|29-06-2016|77|92|11248 +15|ayushi|SSA|09-11-2011|1|developer|12|security|928375|09-12-2011|29-05-2016|99|91|13245 +16|pramod|SE|14-10-2012|1|developer|13|configManagement|928478|14-11-2012|29-12-2016|86|93|5040 +17|gawrav|PL|22-09-2013|2|tester|12|security|928778|22-10-2013|15-11-2016|78|97|9574 +18|sibi|TL|15-08-2014|2|tester|14|Learning|928176|15-09-2014|29-05-2016|84|98|7245 +19|shivani|PL|12-05-2015|1|developer|10|network|928977|12-06-2015|12-11-2016|88|91|11254 +20|bill|PM|01-12-2015|3|manager|14|Learning|928479|01-01-2016|30-11-2016|75|94|13547 diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/externalformat/CsvBasedCarbonTableSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/externalformat/CsvBasedCarbonTableSuite.scala new file mode 100644 index 00000000000..7f07878ccb3 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/externalformat/CsvBasedCarbonTableSuite.scala @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.externalformat + + +import org.apache.spark.sql.CarbonEnv +import org.apache.spark.sql.test.Spark2TestQueryExecutor +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath + +class CsvBasedCarbonTableSuite extends QueryTest + with BeforeAndAfterEach with BeforeAndAfterAll { + + val carbonTable = "fact_carbon_table" + val csvCarbonTable = "fact_carbon_csv_table" + val csvFile = s"$resourcesPath/datawithoutheader.csv" + val csvFile_delimiter_separator = s"$resourcesPath/datawithoutheader_delimiter_separator.csv" + + // prepare normal carbon table for comparison + override protected def beforeAll(): Unit = { + sql(s"DROP TABLE IF EXISTS $carbonTable") + sql( + s""" + | CREATE TABLE $carbonTable(empno smallint, empname String, designation string, + | doj String, workgroupcategory int, workgroupcategoryname String,deptno int, + | deptname String, projectcode int, projectjoindate String,projectenddate String, + | attendance String, utilization String,salary String) + | STORED BY 'carbondata' + """.stripMargin + ) + sql( + s""" + | LOAD DATA LOCAL INPATH '$csvFile' INTO TABLE $carbonTable + | OPTIONS('DELIMITER'=',', + | 'QUOTECHAR'='\"', + | 'FILEHEADER'='EMPno, empname,designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, SALARY') + """.stripMargin) + } + + override protected def afterAll(): Unit = { + sql(s"DROP TABLE IF EXISTS $carbonTable") + } + + override protected def beforeEach(): Unit = { + sql(s"DROP TABLE IF EXISTS $csvCarbonTable") + } + + override protected def afterEach(): Unit = { + sql(s"DROP TABLE IF EXISTS $csvCarbonTable") + } + + private def checkQuery() { + // query all the columns + checkAnswer(sql(s"SELECT eMPno, empname,designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, SALARY FROM $csvCarbonTable WHERE empno = 15"), + sql(s"SELECT eMPno, empname,designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, SALARY FROM $carbonTable WHERE empno = 15")) + // query part of the columns + checkAnswer(sql(s"SELECT empno,empname, deptname, doj FROM $csvCarbonTable WHERE empno = 15"), + sql(s"SELECT empno,empname, deptname, doj FROM $carbonTable WHERE empno = 15")) + // sequence of projection column are not same with that in DDL + checkAnswer(sql(s"SELECT empname, empno, deptname, doj FROM $csvCarbonTable WHERE empno = 15"), + sql(s"SELECT empname, empno, deptname, doj FROM $carbonTable WHERE empno = 15")) + // query with greater + checkAnswer(sql(s"SELECT empname, empno, deptname, doj FROM $csvCarbonTable WHERE empno > 15"), + sql(s"SELECT empname, empno, deptname, doj FROM $carbonTable WHERE empno > 15")) + // query with filter on dimension + checkAnswer(sql(s"SELECT empname, empno, deptname, doj FROM $csvCarbonTable WHERE empname = 'ayushi'"), + sql(s"SELECT empname, empno, deptname, doj FROM $carbonTable WHERE empname = 'ayushi'")) + // aggreate query + checkAnswer(sql(s"SELECT designation, sum(empno), avg(empno) FROM $csvCarbonTable GROUP BY designation"), + sql(s"SELECT designation, sum(empno), avg(empno) FROM $carbonTable GROUP BY designation")) + } + + test("test csv based carbon table") { + // create csv based carbon table + sql( + s""" + | CREATE TABLE $csvCarbonTable(empno smallint, empname String, designation string, + | doj String, workgroupcategory int, workgroupcategoryname String,deptno int, + | deptname String, projectcode int, projectjoindate String,projectenddate String, + | attendance String, utilization String,salary String) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'format'='csv', + | 'csv.header'='eMPno, empname,designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, SALARY' + | ) + """.stripMargin + ) + // check that the external format info is stored in tableinfo + val tblInfo = + CarbonEnv.getCarbonTable(Option("default"), csvCarbonTable)(Spark2TestQueryExecutor.spark) + assertResult("csv")(tblInfo.getTableInfo.getFormat) + assertResult(1)(tblInfo.getTableInfo.getFormatProperties.size()) + assertResult( + "eMPno, empname,designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, SALARY".toLowerCase)( + tblInfo.getTableInfo.getFormatProperties.get("csv.header")) + + // add segment for csv based carbontable + sql(s"ALTER TABLE $csvCarbonTable ADD SEGMENT LOCATION '$csvFile'") + + // check that the fact files has been stored in tablestatus + val metadataPath = CarbonTablePath.getMetadataPath(tblInfo.getTablePath) + val details = SegmentStatusManager.readLoadMetadata(metadataPath) + assertResult(1)(details.length) + assertResult(csvFile)(details(0).getFactFilePath) + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "true") + // check query on csv based carbontable + // query with vector reader on + checkQuery() + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "false") + // query with vector reader off + checkQuery() + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, + CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT) + } + + test("test csv based carbon table: only support csv now") { + val expectedException = intercept[Exception] { + sql( + s""" + | CREATE TABLE $csvCarbonTable(empname String, empno smallint, designation string, + | deptname String, projectcode int, projectjoindate String,projectenddate String, + | doj String, workgroupcategory int, workgroupcategoryname String,deptno int, + | attendance String, utilization String,salary String) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'format'='parquet', + | 'csv.header'='eMPno, empname,designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, SALARY' + | ) + """.stripMargin + ) + } + + assert(expectedException.getMessage.contains("Currently we only support csv as external file format")) + } + + test("test csv based carbon table: the sequence of header does not match schema") { + // create csv based carbon table, the sequence in schema is not the same in csv.header + sql( + s""" + | CREATE TABLE $csvCarbonTable(empname String, empno smallint, designation string, + | deptname String, projectcode int, projectjoindate String,projectenddate String, + | doj String, workgroupcategory int, workgroupcategoryname String,deptno int, + | attendance String, utilization String,salary String) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'format'='csv', + | 'csv.header'='eMPno, empname,designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, SALARY' + | ) + """.stripMargin + ) + // add segment for csv based carbontable + sql(s"ALTER TABLE $csvCarbonTable ADD SEGMENT LOCATION '$csvFile'") + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "true") + // check query on csv based carbontable + // query with vector reader on + checkQuery() + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "false") + // query with vector reader off + checkQuery() + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, + CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT) + } + + test("test csv based carbon table: not specify the header") { + // create csv based carbon table, the sequence in schema is not the same in csv.header + sql( + s""" + | CREATE TABLE $csvCarbonTable(empno smallint, empname String, designation string, + | doj String, workgroupcategory int, workgroupcategoryname String,deptno int, + | deptname String, projectcode int, projectjoindate String,projectenddate String, + | attendance String, utilization String,salary String) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'format'='csv' + | ) + """.stripMargin + ) + + // add segment for csv based carbontable + sql(s"ALTER TABLE $csvCarbonTable ADD SEGMENT LOCATION '$csvFile'") + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "true") + // check query on csv based carbontable + // query with vector reader on + checkQuery() + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "false") + // query with vector reader off + checkQuery() + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, + CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT) + } + + test("test csv based carbon table: user specified delimiter") { + // create csv based carbon table, the sequence in schema is not the same in csv.header + sql( + s""" + | CREATE TABLE $csvCarbonTable(empno smallint, empname String, designation string, + | doj String, workgroupcategory int, workgroupcategoryname String,deptno int, + | deptname String, projectcode int, projectjoindate String,projectenddate String, + | attendance String, utilization String,salary String) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'format'='csv', + | 'csv.delimiter'='|' + | ) + """.stripMargin + ) + + // add segment for csv based carbontable + sql(s"ALTER TABLE $csvCarbonTable ADD SEGMENT LOCATION '$csvFile_delimiter_separator'") + + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "true") + // check query on csv based carbontable + // query with vector reader on + checkQuery() + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "false") + // query with vector reader off + checkQuery() + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, + CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT) + } +} \ No newline at end of file diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/format/CsvReadSupport.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/format/CsvReadSupport.java new file mode 100644 index 00000000000..53d6d7f9aa4 --- /dev/null +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/format/CsvReadSupport.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.format; + +import java.io.IOException; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl; + +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.types.CalendarIntervalType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * read support for csv + */ +@InterfaceStability.Evolving +@InterfaceAudience.Internal +public class CsvReadSupport implements CarbonReadSupport { + private CarbonColumn[] carbonColumns; + private StructType outputSchema; + private Object[] finalOutputValues; + @Override + public void initialize(CarbonColumn[] carbonColumns, CarbonTable carbonTable) + throws IOException { + this.carbonColumns = carbonColumns; + this.finalOutputValues = new Object[carbonColumns.length]; + outputSchema = new StructType(convertCarbonColumnSpark(carbonColumns)); + } + + private StructField[] convertCarbonColumnSpark(CarbonColumn[] columns) { + return (StructField[]) new SparkDataTypeConverterImpl().convertCarbonSchemaToSparkSchema( + columns); + } + + @Override + public T readRow(Object[] data) { + for (int i = 0; i < carbonColumns.length; i++) { + Object originValue = data[i]; + org.apache.spark.sql.types.DataType t = outputSchema.apply(i).dataType(); + finalOutputValues[i] = convertToSparkValue(originValue, t); + } + return (T) new GenericInternalRow(finalOutputValues); + } + private Object convertToSparkValue(Object originValue, org.apache.spark.sql.types.DataType t) { + if (null == originValue) { + return null; + } else { + String value = String.valueOf(originValue); + if (t == org.apache.spark.sql.types.DataTypes.BooleanType) { + return Boolean.parseBoolean(value); + } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) { + return Byte.parseByte(value); + } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) { + return Short.parseShort(value); + } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) { + return Integer.parseInt(value); + } else if (t == org.apache.spark.sql.types.DataTypes.LongType) { + return Long.parseLong(value); + } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) { + return Float.parseFloat(value); + } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) { + return Double.parseDouble(value); + } else if (t == org.apache.spark.sql.types.DataTypes.StringType) { + return UTF8String.fromString(value); + } else if (t instanceof org.apache.spark.sql.types.DecimalType) { + return Decimal.fromDecimal(value); + } else if (t instanceof CalendarIntervalType) { + return CalendarInterval.fromString(value); + } else if (t instanceof org.apache.spark.sql.types.DateType) { + return Integer.parseInt(value); + } else if (t instanceof org.apache.spark.sql.types.TimestampType) { + return Long.parseLong(value); + } else { + return null; + } + } + } + + @Override + public void close() { + + } +} diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/format/VectorCsvReadSupport.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/format/VectorCsvReadSupport.java new file mode 100644 index 00000000000..81bd25dbb2e --- /dev/null +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/format/VectorCsvReadSupport.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.format; + +import java.io.IOException; +import java.math.BigInteger; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.execution.vectorized.ColumnVector; +import org.apache.spark.sql.execution.vectorized.ColumnarBatch; +import org.apache.spark.sql.types.CalendarIntervalType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.DecimalType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * read support for csv vector reader + */ +@InterfaceStability.Evolving +@InterfaceAudience.Internal +public class VectorCsvReadSupport implements CarbonReadSupport { + private static final int MAX_BATCH_SIZE = + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT; + private CarbonColumn[] carbonColumns; + private ColumnarBatch columnarBatch; + private StructType outputSchema; + + @Override + public void initialize(CarbonColumn[] carbonColumns, CarbonTable carbonTable) + throws IOException { + this.carbonColumns = carbonColumns; + outputSchema = new StructType(convertCarbonColumnSpark(carbonColumns)); + } + + private StructField[] convertCarbonColumnSpark(CarbonColumn[] columns) { + return (StructField[]) new SparkDataTypeConverterImpl().convertCarbonSchemaToSparkSchema( + columns); + } + + @Override + public T readRow(Object[] data) { + columnarBatch = ColumnarBatch.allocate(outputSchema, MemoryMode.OFF_HEAP, MAX_BATCH_SIZE); + int rowId = 0; + for (; rowId < data.length; rowId++) { + for (int colIdx = 0; colIdx < carbonColumns.length; colIdx++) { + Object originValue = ((Object[]) data[rowId])[colIdx]; + ColumnVector col = columnarBatch.column(colIdx); + org.apache.spark.sql.types.DataType t = col.dataType(); + if (null == originValue) { + col.putNull(rowId); + } else { + String value = String.valueOf(originValue); + if (t == org.apache.spark.sql.types.DataTypes.BooleanType) { + col.putBoolean(rowId, Boolean.parseBoolean(value)); + } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) { + col.putByte(rowId, Byte.parseByte(value)); + } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) { + col.putShort(rowId, Short.parseShort(value)); + } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) { + col.putInt(rowId, Integer.parseInt(value)); + } else if (t == org.apache.spark.sql.types.DataTypes.LongType) { + col.putLong(rowId, Long.parseLong(value)); + } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) { + col.putFloat(rowId, Float.parseFloat(value)); + } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) { + col.putDouble(rowId, Double.parseDouble(value)); + } else if (t == org.apache.spark.sql.types.DataTypes.StringType) { + UTF8String v = UTF8String.fromString(value); + col.putByteArray(rowId, v.getBytes()); + } else if (t instanceof org.apache.spark.sql.types.DecimalType) { + DecimalType dt = (DecimalType)t; + Decimal d = Decimal.fromDecimal(value); + if (dt.precision() <= Decimal.MAX_INT_DIGITS()) { + col.putInt(rowId, (int)d.toUnscaledLong()); + } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) { + col.putLong(rowId, d.toUnscaledLong()); + } else { + final BigInteger integer = d.toJavaBigDecimal().unscaledValue(); + byte[] bytes = integer.toByteArray(); + col.putByteArray(rowId, bytes, 0, bytes.length); + } + } else if (t instanceof CalendarIntervalType) { + CalendarInterval c = CalendarInterval.fromString(value); + col.getChildColumn(0).putInt(rowId, c.months); + col.getChildColumn(1).putLong(rowId, c.microseconds); + } else if (t instanceof org.apache.spark.sql.types.DateType) { + col.putInt(rowId, Integer.parseInt(value)); + } else if (t instanceof org.apache.spark.sql.types.TimestampType) { + col.putLong(rowId, Long.parseLong(value)); + } + } + } + } + columnarBatch.setNumRows(rowId); + return (T) columnarBatch; + } + + @Override + public void close() { + if (columnarBatch != null) { + columnarBatch.close(); + } + } +} diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 6b439999fd3..3461d3aaf9b 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -59,6 +59,7 @@ import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.spark.InitInputMetrics +import org.apache.carbondata.spark.format.{CsvReadSupport, VectorCsvReadSupport} import org.apache.carbondata.spark.util.{SparkDataTypeConverterImpl, Util} import org.apache.carbondata.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader} @@ -88,6 +89,7 @@ class CarbonScanRDD[T: ClassTag]( private var vectorReader = false private val bucketedTable = tableInfo.getFactTable.getBucketingInfo + private val storageFormat = tableInfo.getFormat @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) @@ -134,10 +136,13 @@ class CarbonScanRDD[T: ClassTag]( // 2. for stream splits, create partition for each split by default val columnarSplits = new ArrayList[InputSplit]() val streamSplits = new ArrayBuffer[InputSplit]() + val externalSplits = new ArrayBuffer[InputSplit]() splits.asScala.foreach { split => val carbonInputSplit = split.asInstanceOf[CarbonInputSplit] if (FileFormat.ROW_V1 == carbonInputSplit.getFileFormat) { streamSplits += split + } else if (FileFormat.EXTERNAL == carbonInputSplit.getFileFormat) { + externalSplits += split } else { columnarSplits.add(split) } @@ -147,31 +152,39 @@ class CarbonScanRDD[T: ClassTag]( distributeEndTime = System.currentTimeMillis() // check and remove InExpression from filterExpression checkAndRemoveInExpressinFromFilterExpression(batchPartitions) - if (streamSplits.isEmpty) { - partitions = batchPartitions.toArray - } else { - val index = batchPartitions.length - val streamPartitions: mutable.Buffer[Partition] = - streamSplits.zipWithIndex.map { splitWithIndex => - val multiBlockSplit = - new CarbonMultiBlockSplit( - Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava, - splitWithIndex._1.getLocations, - FileFormat.ROW_V1) - new CarbonSparkPartition(id, splitWithIndex._2 + index, multiBlockSplit) - } - if (batchPartitions.isEmpty) { - partitions = streamPartitions.toArray - } else { - logInfo( - s""" - | Identified no.of Streaming Blocks: ${ streamPartitions.size }, - """.stripMargin) - // should keep the order by index of partition - batchPartitions.appendAll(streamPartitions) - partitions = batchPartitions.toArray + + def generateNonBatchPartitions(index: Int, splits : ArrayBuffer[InputSplit], + format: FileFormat): mutable.Buffer[Partition] = { + splits.zipWithIndex.map { splitWithIndex => + val multiBlockSplit = + new CarbonMultiBlockSplit( + Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava, + splitWithIndex._1.getLocations, + format) + new CarbonSparkPartition(id, splitWithIndex._2 + index, multiBlockSplit) } } + + val allPartitions: mutable.Buffer[Partition] = mutable.Buffer() + val index = batchPartitions.length + val streamPartitions: mutable.Buffer[Partition] = generateNonBatchPartitions( + index, streamSplits, FileFormat.ROW_V1) + val externalPartitions: mutable.Buffer[Partition] = generateNonBatchPartitions( + index + streamPartitions.length, externalSplits, FileFormat.EXTERNAL) + + if (batchPartitions.nonEmpty) { + LOGGER.info(s"Identified no.of batch blocks: ${batchPartitions.size}") + allPartitions.appendAll(batchPartitions) + } + if (streamPartitions.nonEmpty) { + LOGGER.info(s"Identified no.of stream blocks: ${streamPartitions.size}") + allPartitions.appendAll(streamPartitions) + } + if (externalPartitions.nonEmpty) { + LOGGER.info(s"Identified no.of external blocks: ${externalPartitions.size}") + allPartitions.appendAll(externalPartitions) + } + partitions = allPartitions.toArray partitions } finally { Profiler.invokeIfEnable { @@ -359,7 +372,7 @@ class CarbonScanRDD[T: ClassTag]( } logInfo( s""" - | Identified no.of.blocks: $noOfBlocks, + | Identified no.of.blocks(columnar): $noOfBlocks, | no.of.tasks: $noOfTasks, | no.of.nodes: $noOfNodes, | parallelism: $parallelism @@ -427,6 +440,22 @@ class CarbonScanRDD[T: ClassTag]( CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId)) streamReader.setQueryModel(model) streamReader + case FileFormat.EXTERNAL => + require(storageFormat.equals("csv"), + "Currently we only support csv as external file format") + attemptContext.getConfiguration.set( + CarbonCommonConstants.CARBON_EXTERNAL_FORMAT_CONF_KEY, storageFormat) + val externalRecordReader = format.createRecordReader(inputSplit, attemptContext) + .asInstanceOf[CsvRecordReader[Object]] + externalRecordReader.setVectorReader(vectorReader) + externalRecordReader.setInputMetricsStats(inputMetricsStats) + externalRecordReader.setQueryModel(model) + if (vectorReader) { + externalRecordReader.setReadSupport(new VectorCsvReadSupport[Object]()) + } else { + externalRecordReader.setReadSupport(new CsvReadSupport[Object]()) + } + externalRecordReader case _ => // create record reader for CarbonData file format if (vectorReader) { diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index 12999d052c2..8466bfce1ba 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -187,6 +187,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { protected val STREAM = carbonKeyWord("STREAM") protected val STREAMS = carbonKeyWord("STREAMS") protected val STMPROPERTIES = carbonKeyWord("STMPROPERTIES") + protected val LOCATION = carbonKeyWord("LOCATION") protected val doubleQuotedString = "\"([^\"]+)\"".r protected val singleQuotedString = "'([^']+)'".r diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala index 1b48c08cdec..56e91f9ab7a 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala @@ -890,6 +890,16 @@ class TableNewProcessor(cm: TableModel) { cm.tableName)) tableInfo.setLastUpdatedTime(System.currentTimeMillis()) tableInfo.setFactTable(tableSchema) + val format = cm.tableProperties.get(CarbonCommonConstants.FORMAT) + if (format.isDefined) { + if (!format.get.equalsIgnoreCase("csv")) { + CarbonException.analysisException(s"Currently we only support csv as external file format") + } + tableInfo.setFormat(format.get) + val formatProperties = cm.tableProperties.filter(pair => + pair._1.startsWith(s"${format.get.toLowerCase}.")).asJava + tableInfo.setFormatProperties(formatProperties) + } tableInfo } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddSegmentCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddSegmentCommand.scala new file mode 100644 index 00000000000..e7f6c7fc868 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddSegmentCommand.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import java.util.UUID + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.execution.command.AtomicRunnableCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.util.FileUtils + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datamap.status.DataMapStatusManager +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.statusmanager.{FileFormat, LoadMetadataDetails, SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.{OperationContext, OperationListenerBus} +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} +import org.apache.carbondata.processing.util.CarbonLoaderUtil + +/** + * support `alter table tableName add segment location 'path'` command. + * It will create a segment and map the path of datafile to segment's storage + */ +case class CarbonAddSegmentCommand( + dbNameOp: Option[String], + tableName: String, + filePathFromUser: String, + var operationContext: OperationContext = new OperationContext) extends AtomicRunnableCommand { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + var carbonTable: CarbonTable = _ + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val dbName = CarbonEnv.getDatabaseName(dbNameOp)(sparkSession) + carbonTable = { + val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore + .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] + if (relation == null) { + LOGGER.error(s"Add segment failed due to table $dbName.$tableName not found") + throw new NoSuchTableException(dbName, tableName) + } + relation.carbonTable + } + + if (carbonTable.isHivePartitionTable) { + LOGGER.error("Ignore hive partition table for now") + } + + operationContext.setProperty("isOverwrite", false) + if (CarbonUtil.hasAggregationDataMap(carbonTable)) { + val loadMetadataEvent = new LoadMetadataEvent(carbonTable, false) + OperationListenerBus.getInstance().fireEvent(loadMetadataEvent, operationContext) + } + Seq.empty + } + + // will just mapping external files to segment metadata + override def processData(sparkSession: SparkSession): Seq[Row] = { + // clean up invalid segment before creating a new entry + SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, false, null) + val currentLoadMetadataDetails = SegmentStatusManager.readLoadMetadata( + CarbonTablePath.getMetadataPath(carbonTable.getTablePath)) + val newSegmentId = SegmentStatusManager.createNewSegmentId(currentLoadMetadataDetails).toString + // create new segment folder in carbon store + CarbonLoaderUtil.checkAndCreateCarbonDataLocation(newSegmentId, carbonTable) + + val factFilePath = FileUtils.getPaths(filePathFromUser) + + val uuid = if (carbonTable.isChildDataMap) { + Option(operationContext.getProperty("uuid")).getOrElse("").toString + } else if (carbonTable.hasAggregationDataMap) { + UUID.randomUUID().toString + } else { + "" + } + // associate segment meta with file path, files are separated with comma + val loadModel: CarbonLoadModel = new CarbonLoadModel + loadModel.setSegmentId(newSegmentId) + loadModel.setDatabaseName(carbonTable.getDatabaseName) + loadModel.setTableName(carbonTable.getTableName) + loadModel.setTablePath(carbonTable.getTablePath) + loadModel.setCarbonTransactionalTable(carbonTable.isTransactionalTable) + loadModel.readAndSetLoadMetadataDetails() + loadModel.setFactTimeStamp(CarbonUpdateUtil.readCurrentTime()) + val loadSchema: CarbonDataLoadSchema = new CarbonDataLoadSchema(carbonTable) + loadModel.setCarbonDataLoadSchema(loadSchema) + + val newLoadMetadataDetail: LoadMetadataDetails = new LoadMetadataDetails + + // for external datasource table, there are no index files, so no need to write segment file + + // update table status file + newLoadMetadataDetail.setSegmentFile(null) + newLoadMetadataDetail.setSegmentStatus(SegmentStatus.SUCCESS) + newLoadMetadataDetail.setLoadStartTime(loadModel.getFactTimeStamp) + newLoadMetadataDetail.setLoadEndTime(CarbonUpdateUtil.readCurrentTime()) + newLoadMetadataDetail.setIndexSize("1") + newLoadMetadataDetail.setDataSize("1") + newLoadMetadataDetail.setFileFormat(FileFormat.EXTERNAL) + newLoadMetadataDetail.setFactFilePath(factFilePath) + + val done = CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetadataDetail, loadModel, true, + false, uuid) + if (!done) { + val errorMsg = + s""" + | Data load is failed due to table status update failure for + | ${loadModel.getDatabaseName}.${loadModel.getTableName} + """.stripMargin + throw new Exception(errorMsg) + } else { + DataMapStatusManager.disableAllLazyDataMaps(carbonTable) + } + Seq.empty + } +} diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index 8eb47fc276f..d6691f68c9e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -79,7 +79,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { alterPartition | datamapManagement | alterTableFinishStreaming | stream protected lazy val loadManagement: Parser[LogicalPlan] = - deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew + deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew | addSegment protected lazy val restructure: Parser[LogicalPlan] = alterTableModifyDataType | alterTableDropColumn | alterTableAddColumns @@ -443,6 +443,17 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { partition = partitionSpec) } + /** + * The syntax of + * ALTER TABLE [dbName.]tableName ADD SEGMENT LOCATION 'path/to/data' + */ + protected lazy val addSegment: Parser[LogicalPlan] = + ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ + (ADD ~> SEGMENT ~> LOCATION ~> stringLit) <~ opt(";") ^^ { + case dbName ~ tableName ~ filePath => + CarbonAddSegmentCommand(convertDbNameToLowerCase(dbName), tableName, filePath) + } + protected lazy val deleteLoadsByID: Parser[LogicalPlan] = DELETE ~> FROM ~ TABLE ~> (ident <~ ".").? ~ ident ~ (WHERE ~> (SEGMENT ~ "." ~ ID) ~> IN ~> "(" ~> repsep(segmentId, ",")) <~ ")" ~