Skip to content

Commit

Permalink
[CARBONDATA-2558] Optimize carbon schema reader interface of SDK
Browse files Browse the repository at this point in the history
Optimize carbon schema reader interface of SDK

1.create CarbonSchemaReader and move schema read interface from CarbonReader to CarbonSchemaReader
2.change the return type from List to SDK Schema, remove the tableInfo return type
3.Optimize the document

This closes #2353
  • Loading branch information
xubo245 authored and jackylk committed May 29, 2018
1 parent 2993034 commit e740182
Show file tree
Hide file tree
Showing 7 changed files with 305 additions and 143 deletions.
107 changes: 93 additions & 14 deletions docs/sdk-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -435,20 +435,6 @@ Find example code at [CarbonReaderExample](https://github.com/apache/carbondata/
public static CarbonReaderBuilder builder(String tablePath, String tableName);
```

```
/**
* Read carbondata file and return the schema
*/
public static List<ColumnSchema> readSchemaInDataFile(String dataFilePath);
```

```
/**
* Read schema file and return table info object
*/
public static TableInfo readSchemaFile(String schemaFilePath);
```

```
/**
* Return true if has next row
Expand Down Expand Up @@ -598,4 +584,97 @@ Find example code at [CarbonReaderExample](https://github.com/apache/carbondata/
*/
public <T> CarbonReader<T> build();
```
### Class org.apache.carbondata.sdk.file.CarbonSchemaReader
```
/**
* Read schema file and return the schema
*
* @param schemaFilePath complete path including schema file name
* @return schema object
* @throws IOException
*/
public static Schema readSchemaInSchemaFile(String schemaFilePath);
```

```
/**
* Read carbondata file and return the schema
*
* @param dataFilePath complete path including carbondata file name
* @return Schema object
* @throws IOException
*/
public static Schema readSchemaInDataFile(String dataFilePath);
```

```
/**
* Read carbonindex file and return the schema
*
* @param indexFilePath complete path including index file name
* @return schema object
* @throws IOException
*/
public static Schema readSchemaInIndexFile(String indexFilePath);
```

### Class org.apache.carbondata.sdk.file.Schema
```
/**
* construct a schema with fields
* @param fields
*/
public Schema(Field[] fields);
```

```
/**
* construct a schema with List<ColumnSchema>
*
* @param columnSchemaList column schema list
*/
public Schema(List<ColumnSchema> columnSchemaList);
```

```
/**
* Create a Schema using JSON string, for example:
* [
* {"name":"string"},
* {"age":"int"}
* ]
* @param json specified as string
* @return Schema
*/
public static Schema parseJson(String json);
```

```
/**
* Sort the schema order as original order
*
* @return Schema object
*/
public Schema asOriginOrder();
```

### Class org.apache.carbondata.sdk.file.Field
```
/**
* Field Constructor
* @param name name of the field
* @param type datatype of field, specified in strings.
*/
public Field(String name, String type);
```

```
/**
* Construct Field from ColumnSchema
*
* @param columnSchema ColumnSchema, Store the information about the column meta data
*/
public Field(ColumnSchema columnSchema);
```

Find S3 example code at [SDKS3Example](https://github.com/apache/carbondata/blob/master/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java) in the CarbonData repo.
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,11 @@

package org.apache.carbondata.sdk.file;

import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.converter.SchemaConverter;
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.reader.CarbonHeaderReader;
import org.apache.carbondata.core.reader.CarbonIndexFileReader;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import static org.apache.carbondata.core.util.CarbonUtil.thriftColumnSchemaToWrapperColumnSchema;

import org.apache.hadoop.mapreduce.RecordReader;

Expand Down Expand Up @@ -103,76 +88,6 @@ public static CarbonReaderBuilder builder(String tablePath, String tableName) {
return new CarbonReaderBuilder(tablePath, tableName);
}

/**
* Read carbondata file and return the schema
*/
public static List<ColumnSchema> readSchemaInDataFile(String dataFilePath) throws IOException {
CarbonHeaderReader reader = new CarbonHeaderReader(dataFilePath);
return reader.readSchema();
}

/**
* Read carbonindex file and return the schema
*
* @param indexFilePath complete path including index file name
* @return null, if the index file is not present in the path.
* List<ColumnSchema> from the index file.
* @throws IOException
*/
public static List<ColumnSchema> readSchemaInIndexFile(String indexFilePath) throws IOException {
CarbonFile indexFile =
FileFactory.getCarbonFile(indexFilePath, FileFactory.getFileType(indexFilePath));
if (!indexFile.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
throw new IOException("Not an index file name");
}
// read schema from the first index file
DataInputStream dataInputStream =
FileFactory.getDataInputStream(indexFilePath, FileFactory.getFileType(indexFilePath));
byte[] bytes = new byte[(int) indexFile.getSize()];
try {
//get the file in byte buffer
dataInputStream.readFully(bytes);
CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
// read from byte buffer.
indexReader.openThriftReader(bytes);
// get the index header
org.apache.carbondata.format.IndexHeader readIndexHeader = indexReader.readIndexHeader();
List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
List<org.apache.carbondata.format.ColumnSchema> table_columns =
readIndexHeader.getTable_columns();
for (org.apache.carbondata.format.ColumnSchema columnSchema : table_columns) {
columnSchemaList.add(thriftColumnSchemaToWrapperColumnSchema(columnSchema));
}
return columnSchemaList;
} finally {
dataInputStream.close();
}
}

/**
* Read CarbonData file and return the user schema,
* the schema order is the same as user save schema
*/
public static List<ColumnSchema> readUserSchema(String indexFilePath) throws IOException {
List<ColumnSchema> columnSchemas = readSchemaInIndexFile(indexFilePath);
Collections.sort(columnSchemas, new Comparator<ColumnSchema>() {
@Override
public int compare(ColumnSchema o1, ColumnSchema o2) {
return Integer.compare(o1.getSchemaOrdinal(), o2.getSchemaOrdinal());
}
});
return columnSchemas;
}

/**
* Read schema file and return table info object
*/
public static TableInfo readSchemaFile(String schemaFilePath) throws IOException {
org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil.readSchemaFile(schemaFilePath);
SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
return schemaConverter.fromExternalToWrapperTableInfo(tableInfo, "", "", "");
}

/**
* Close reader
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.sdk.file;

import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.converter.SchemaConverter;
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.reader.CarbonHeaderReader;
import org.apache.carbondata.core.reader.CarbonIndexFileReader;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;

import static org.apache.carbondata.core.util.CarbonUtil.thriftColumnSchemaToWrapperColumnSchema;

/**
* Schema reader for carbon files, including carbondata file, carbonindex file, and schema file
*/
public class CarbonSchemaReader {

/**
* Read schema file and return the schema
*
* @param schemaFilePath complete path including schema file name
* @return schema object
* @throws IOException
*/
public static Schema readSchemaInSchemaFile(String schemaFilePath) throws IOException {
org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil.readSchemaFile(schemaFilePath);
SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
List<ColumnSchema> schemaList = schemaConverter
.fromExternalToWrapperTableInfo(tableInfo, "", "", "")
.getFactTable()
.getListOfColumns();
return new Schema(schemaList);
}

/**
* Read carbondata file and return the schema
*
* @param dataFilePath complete path including carbondata file name
* @return Schema object
* @throws IOException
*/
public static Schema readSchemaInDataFile(String dataFilePath) throws IOException {
CarbonHeaderReader reader = new CarbonHeaderReader(dataFilePath);
return new Schema(reader.readSchema());
}

/**
* Read carbonindex file and return the schema
*
* @param indexFilePath complete path including index file name
* @return schema object
* @throws IOException
*/
public static Schema readSchemaInIndexFile(String indexFilePath) throws IOException {
CarbonFile indexFile =
FileFactory.getCarbonFile(indexFilePath, FileFactory.getFileType(indexFilePath));
if (!indexFile.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
throw new IOException("Not an index file name");
}
// read schema from the first index file
DataInputStream dataInputStream =
FileFactory.getDataInputStream(indexFilePath, FileFactory.getFileType(indexFilePath));
byte[] bytes = new byte[(int) indexFile.getSize()];
try {
//get the file in byte buffer
dataInputStream.readFully(bytes);
CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
// read from byte buffer.
indexReader.openThriftReader(bytes);
// get the index header
org.apache.carbondata.format.IndexHeader readIndexHeader = indexReader.readIndexHeader();
List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
List<org.apache.carbondata.format.ColumnSchema> table_columns =
readIndexHeader.getTable_columns();
for (org.apache.carbondata.format.ColumnSchema columnSchema : table_columns) {
columnSchemaList.add(thriftColumnSchemaToWrapperColumnSchema(columnSchema));
}
return new Schema(columnSchemaList);
} finally {
dataInputStream.close();
}
}

}
16 changes: 16 additions & 0 deletions store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.carbondata.sdk.file;

import java.util.LinkedList;
import java.util.List;

import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.datatype.StructField;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;

/**
* A field represent one column
Expand Down Expand Up @@ -126,6 +128,20 @@ public Field(String name, DataType type) {
this.type = type;
}

/**
* Construct Field from ColumnSchema
*
* @param columnSchema ColumnSchema, Store the information about the column meta data
*/
public Field(ColumnSchema columnSchema) {
this.name = columnSchema.getColumnName();
this.type = columnSchema.getDataType();
children = new LinkedList<>();
schemaOrdinal = columnSchema.getSchemaOrdinal();
precision = columnSchema.getPrecision();
scale = columnSchema.getScale();
}

public String getFieldName() {
return name;
}
Expand Down

0 comments on commit e740182

Please sign in to comment.