Skip to content

Commit

Permalink
[CARBONDATA-2613] Support csv based carbon table
Browse files Browse the repository at this point in the history
1. create csv based carbon table using
CREATE TABLE fact_table (col1 bigint, col2 string, ..., col100 string)
STORED BY 'CarbonData'
TBLPROPERTIES(
  'foramt'='csv',
  'csv.delimiter'=',',
  'csv.header'='col1,col2,col100')

2. Load data to this table using
ALTER TABLE fact_table ADD SEGMENT LOCATION 'path/to/data1'

This closes #2374
  • Loading branch information
xuchuanyin authored and jackylk committed Aug 7, 2018
1 parent 7843845 commit 1a26ac1
Show file tree
Hide file tree
Showing 20 changed files with 1,499 additions and 35 deletions.
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.common.annotations;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* The annotation indicates that the version number since a member or a type has been present.
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.TYPE, ElementType.METHOD})
public @interface Since {
/**
* the value indicating a version number since this member
* or type has been present.
*/
String value();
}
Expand Up @@ -971,6 +971,8 @@ public final class CarbonCommonConstants {
*/
public static final String DICTIONARY_PATH = "dictionary_path";
public static final String SORT_COLUMNS = "sort_columns";
// file format for the data files
public static final String FORMAT = "format";
public static final String PARTITION_TYPE = "partition_type";
public static final String NUM_PARTITIONS = "num_partitions";
public static final String RANGE_INFO = "range_info";
Expand All @@ -993,6 +995,8 @@ public final class CarbonCommonConstants {
// Flat folder support on table. when it is true all carbondata files store directly under table
// path instead of sub folders.
public static final String FLAT_FOLDER = "flat_folder";
// this will be used in hadoop conf to pass the format type to executor
public static final String CARBON_EXTERNAL_FORMAT_CONF_KEY = "carbon_external_format_type";

/**
* 16 mb size
Expand Down
Expand Up @@ -25,11 +25,13 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.carbondata.common.annotations.Since;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
Expand Down Expand Up @@ -89,6 +91,17 @@ public class TableInfo implements Serializable, Writable {
*
*/
private boolean isTransactionalTable = true;
/**
* The format of the fact table.
* By default it is carbondata, and we also support other format like CSV
*/
@Since("1.4.1")
private String format = "carbondata";
/**
* properties for the format, such as delimiter/header for csv format
*/
@Since("1.4.1")
private Map<String, String> formatProperties;

// this identifier is a lazy field which will be created when it is used first time
private AbsoluteTableIdentifier identifier;
Expand All @@ -104,6 +117,7 @@ public class TableInfo implements Serializable, Writable {

public TableInfo() {
dataMapSchemaList = new ArrayList<>();
formatProperties = new HashMap<>();
isTransactionalTable = true;
}

Expand Down Expand Up @@ -196,6 +210,22 @@ public void setTablePath(String tablePath) {
this.tablePath = tablePath;
}

public String getFormat() {
return format;
}

public void setFormat(String format) {
this.format = format;
}

public Map<String, String> getFormatProperties() {
return formatProperties;
}

public void setFormatProperties(Map<String, String> formatProperties) {
this.formatProperties = formatProperties;
}

public List<DataMapSchema> getDataMapSchemaList() {
return dataMapSchemaList;
}
Expand Down Expand Up @@ -291,6 +321,17 @@ public void write(DataOutput out) throws IOException {
}
}
out.writeBoolean(isSchemaModified);

out.writeUTF(format);
boolean isFormatPropertiesExists = null != formatProperties && formatProperties.size() > 0;
out.writeBoolean(isFormatPropertiesExists);
if (isFormatPropertiesExists) {
out.writeShort(formatProperties.size());
for (Map.Entry<String, String> entry : formatProperties.entrySet()) {
out.writeUTF(entry.getKey());
out.writeUTF(entry.getValue());
}
}
}

@Override public void readFields(DataInput in) throws IOException {
Expand Down Expand Up @@ -327,6 +368,17 @@ public void write(DataOutput out) throws IOException {
}
}
this.isSchemaModified = in.readBoolean();

this.format = in.readUTF();
boolean isFormatPropertiesExists = in.readBoolean();
if (isFormatPropertiesExists) {
short size = in.readShort();
for (int i = 0; i < size; i++) {
String key = in.readUTF();
String value = in.readUTF();
this.formatProperties.put(key, value);
}
}
}

public AbsoluteTableIdentifier getOrCreateAbsoluteTableIdentifier() {
Expand Down
Expand Up @@ -18,15 +18,19 @@
package org.apache.carbondata.core.statusmanager;

/**
* The data file format supported in carbondata project
* The data file format supported in carbondata project.
* The fileformat along with its property will be stored in tableinfo
*/
public enum FileFormat {

// carbondata columnar file format, optimized for read
COLUMNAR_V3,

// carbondata row file format, optimized for write
ROW_V1;
ROW_V1,

// external file format, such as parquet/csv
EXTERNAL;

public static FileFormat getByOrdinal(int ordinal) {
if (ordinal < 0 || ordinal >= FileFormat.values().length) {
Expand All @@ -38,6 +42,8 @@ public static FileFormat getByOrdinal(int ordinal) {
return COLUMNAR_V3;
case 1:
return ROW_V1;
case 2:
return EXTERNAL;
}

return COLUMNAR_V3;
Expand Down
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.core.statusmanager;

/**
* Provides the constant name for the file format properties
*/
public class FileFormatProperties {
public static class CSV {
public static final String HEADER = "csv.header";
public static final String DELIMITER = "csv.delimiter";
public static final String COMMENT = "csv.comment";
public static final String SKIP_EMPTY_LINE = "csv.skipemptyline";
public static final String QUOTE = "csv.quote";
public static final String ESCAPE = "csv.escape";
}
}
Expand Up @@ -122,6 +122,11 @@ public void setIndexSize(String indexSize) {
* the file format of this segment
*/
private FileFormat fileFormat = FileFormat.COLUMNAR_V3;
/**
* path of the fact files.
* Since format and formatProperties are stored in tableInfo, we do not store it in each segment
*/
private String factFilePath;

/**
* Segment file name where it has the information of partition information.
Expand Down Expand Up @@ -429,8 +434,17 @@ public void setSegmentFile(String segmentFile) {
this.segmentFile = segmentFile;
}

public String getFactFilePath() {
return factFilePath;
}

public void setFactFilePath(String factFilePath) {
this.factFilePath = factFilePath;
}

@Override public String toString() {
return "LoadMetadataDetails{" + "loadStatus=" + loadStatus + ", loadName='" + loadName + '\''
+ ", loadStartTime='" + loadStartTime + '\'' + ", segmentFile='" + segmentFile + '\'' + '}';
+ ", loadStartTime='" + loadStartTime + '\'' + ", factFilePath='" + factFilePath + '\''
+ ", segmentFile='" + segmentFile + '\'' + '}';
}
}
Expand Up @@ -64,12 +64,18 @@ public CarbonMultiBlockSplit(List<Distributable> blocks, String hostname) {
this.splitList.add((CarbonInputSplit)block);
}
this.locations = new String[]{hostname};
if (splitList.size() > 0) {
this.fileFormat = splitList.get(0).getFileFormat();
}
}

public CarbonMultiBlockSplit(List<CarbonInputSplit> splitList,
String[] locations) {
this.splitList = splitList;
this.locations = locations;
if (splitList.size() > 0) {
this.fileFormat = splitList.get(0).getFileFormat();
}
calculateLength();
}

Expand Down

0 comments on commit 1a26ac1

Please sign in to comment.