Skip to content

Commit

Permalink
[CARBONDATA-300] Suppor read batch row in CSDK
Browse files Browse the repository at this point in the history
            1. support read batch row in SDK
            2. support read batch row in CSDK
            3. improve CSDK read performance

This closes #2816
  • Loading branch information
xubo245 authored and jackylk committed Nov 6, 2018
1 parent c9fb4bc commit a3a83dc
Show file tree
Hide file tree
Showing 18 changed files with 629 additions and 34 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -61,7 +61,7 @@ CarbonData is built using Apache Maven, to [build CarbonData](https://github.com
* [CarbonData Pre-aggregate DataMap](https://github.com/apache/carbondata/blob/master/docs/preaggregate-datamap-guide.md)
* [CarbonData Timeseries DataMap](https://github.com/apache/carbondata/blob/master/docs/timeseries-datamap-guide.md)
* [SDK Guide](https://github.com/apache/carbondata/blob/master/docs/sdk-guide.md)
* [CSDK Guide](https://github.com/apache/carbondata/blob/master/docs/csdk-guide.md)
* [C++ SDK Guide](https://github.com/apache/carbondata/blob/master/docs/csdk-guide.md)
* [Performance Tuning](https://github.com/apache/carbondata/blob/master/docs/performance-tuning.md)
* [S3 Storage](https://github.com/apache/carbondata/blob/master/docs/s3-guide.md)
* [Carbon as Spark's Datasource](https://github.com/apache/carbondata/blob/master/docs/carbon-as-spark-datasource-guide.md)
Expand Down
Expand Up @@ -100,4 +100,17 @@ public int getSize() {
counter++;
return row;
}

/**
* read next batch
*
* @return rows
*/
public List<Object[]> nextBatch() {
if (!hasNext()) {
throw new NoSuchElementException();
}
counter = counter + rows.size();
return rows;
}
}
Expand Up @@ -17,6 +17,8 @@

package org.apache.carbondata.core.scan.result.iterator;

import java.util.List;

import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.core.scan.result.RowBatch;

Expand Down Expand Up @@ -74,4 +76,13 @@ public ChunkRowIterator(CarbonIterator<RowBatch> iterator) {
return currentChunk.next();
}

/**
* read next batch
*
* @return list of batch result
*/
public List<Object[]> nextBatch() {
return currentChunk.nextBatch();
}

}
37 changes: 27 additions & 10 deletions docs/csdk-guide.md
Expand Up @@ -15,20 +15,20 @@
limitations under the License.
-->

# CSDK Guide
# C++ SDK Guide

CarbonData CSDK provides C++ interface to write and read carbon file.
CSDK use JNI to invoke java SDK in C++ code.
CarbonData C++ SDK provides C++ interface to write and read carbon file.
C++ SDK use JNI to invoke java SDK in C++ code.


# CSDK Reader
This CSDK reader reads CarbonData file and carbonindex file at a given path.
# C++ SDK Reader
This C++ SDK reader reads CarbonData file and carbonindex file at a given path.
External client can make use of this reader to read CarbonData files in C++
code and without CarbonSession.


In the carbon jars package, there exist a carbondata-sdk.jar,
including SDK reader for CSDK.
including SDK reader for C++ SDK.
## Quick example

Please find example code at [main.cpp](https://github.com/apache/carbondata/blob/master/store/CSDK/test/main.cpp) of CSDK module
Expand All @@ -38,6 +38,8 @@ carbon reader and read data.There are some example code of read data from local
and read data from S3 at main.cpp of CSDK module. Finally, users need to
release the memory and destroy JVM.

C++ SDK support read batch row. User can set batch by using withBatch(int batch) before build, and read batch by using readNextBatchRow().

## API List
### CarbonReader
```
Expand Down Expand Up @@ -73,6 +75,14 @@ release the memory and destroy JVM.
**/
jobject withHadoopConf(int argc, char *argv[]);
/**
* set batch size
*
* @param batch batch size
* @return CarbonReaderBuilder object
*/
void withBatch(int batch);
/**
* build carbonReader object for reading data
* it support read data from load disk
Expand All @@ -94,6 +104,13 @@ release the memory and destroy JVM.
*/
jobject readNextRow();
/**
* read Next Batch Row
*
* @return rows
*/
jobjectArray readNextBatchRow();
/**
* close the carbon reader
*
Expand All @@ -103,13 +120,13 @@ release the memory and destroy JVM.
```

# CSDK Writer
This CSDK writer writes CarbonData file and carbonindex file at a given path.
# C++ SDK Writer
This C++ SDK writer writes CarbonData file and carbonindex file at a given path.
External client can make use of this writer to write CarbonData files in C++
code and without CarbonSession. CSDK already supports S3 and local disk.
code and without CarbonSession. C++ SDK already supports S3 and local disk.

In the carbon jars package, there exist a carbondata-sdk.jar,
including SDK writer for CSDK.
including SDK writer for C++ SDK.

## Quick example
Please find example code at [main.cpp](https://github.com/apache/carbondata/blob/master/store/CSDK/test/main.cpp) of CSDK module
Expand Down
2 changes: 1 addition & 1 deletion docs/ddl-of-carbondata.md
Expand Up @@ -565,7 +565,7 @@ CarbonData DDL statements are documented here,which includes:
```

Here writer path will have carbondata and index files.
This can be SDK output or CSDK output. Refer [SDK Guide](./sdk-guide.md) and [CSDK Guide](./csdk-guide.md).
This can be SDK output or C++ SDK output. Refer [SDK Guide](./sdk-guide.md) and [C++ SDK Guide](./csdk-guide.md).

**Note:**
1. Dropping of the external table should not delete the files present in the location.
Expand Down
2 changes: 1 addition & 1 deletion docs/quick-start-guide.md
Expand Up @@ -294,7 +294,7 @@ hdfs://<host_name>:port/user/hive/warehouse/carbon.store
## Installing and Configuring CarbonData on Presto

**NOTE:** **CarbonData tables cannot be created nor loaded from Presto. User need to create CarbonData Table and load data into it
either with [Spark](#installing-and-configuring-carbondata-to-run-locally-with-spark-shell) or [SDK](./sdk-guide.md) or [CSDK](./csdk-guide.md).
either with [Spark](#installing-and-configuring-carbondata-to-run-locally-with-spark-shell) or [SDK](./sdk-guide.md) or [C++ SDK](./csdk-guide.md).
Once the table is created,it can be queried from Presto.**


Expand Down
Expand Up @@ -131,6 +131,20 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext context)
return readSupport.readRow(carbonIterator.next());
}

/**
* get batch result
*
* @return rows
*/
public List<Object[]> getBatchValue() {
if (null != inputMetricsStats) {
inputMetricsStats.incrementRecordRead(1L);
}
List<Object[]> objects = ((ChunkRowIterator) carbonIterator).nextBatch();
rowCount += objects.size();
return objects;
}

@Override public float getProgress() throws IOException, InterruptedException {
// TODO : Implement it based on total number of rows it is going to retrieve.
return 0;
Expand Down
15 changes: 15 additions & 0 deletions store/CSDK/CMakeLists.txt
@@ -1,3 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

cmake_minimum_required(VERSION 2.8)
project(CJDK)
set(CMAKE_BUILD_TYPE Debug)
Expand Down
43 changes: 41 additions & 2 deletions store/CSDK/src/CarbonReader.cpp
Expand Up @@ -15,10 +15,10 @@
* limitations under the License.
*/

#include "CarbonReader.h"
#include <jni.h>
#include <mach/mach_types.h>
#include <stdexcept>
#include <sys/time.h>
#include "CarbonReader.h"

void CarbonReader::builder(JNIEnv *env, char *path, char *tableName) {
if (env == NULL) {
Expand Down Expand Up @@ -126,6 +126,33 @@ void CarbonReader::withHadoopConf(char *key, char *value) {
carbonReaderBuilderObject = jniEnv->CallObjectMethodA(carbonReaderBuilderObject, id, args);
}

void CarbonReader::withBatch(int batch) {
checkBuilder();
if (batch < 1) {
throw std::runtime_error("batch parameter can't be negative and 0.");
}
jclass carbonReaderBuilderClass = jniEnv->GetObjectClass(carbonReaderBuilderObject);
jmethodID buildID = jniEnv->GetMethodID(carbonReaderBuilderClass, "withBatch",
"(I)Lorg/apache/carbondata/sdk/file/CarbonReaderBuilder;");
if (buildID == NULL) {
throw std::runtime_error("Can't find the method in java: withBatch.");
}
jvalue args[1];
args[0].i = batch;
carbonReaderBuilderObject = jniEnv->CallObjectMethodA(carbonReaderBuilderObject, buildID, args);
}

void CarbonReader::withRowRecordReader() {
checkBuilder();
jclass carbonReaderBuilderClass = jniEnv->GetObjectClass(carbonReaderBuilderObject);
jmethodID buildID = jniEnv->GetMethodID(carbonReaderBuilderClass, "withRowRecordReader",
"()Lorg/apache/carbondata/sdk/file/CarbonReaderBuilder;");
if (buildID == NULL) {
throw std::runtime_error("Can't find the method in java: withRowRecordReader.");
}
carbonReaderBuilderObject = jniEnv->CallObjectMethod(carbonReaderBuilderObject, buildID);
}

jobject CarbonReader::build() {
checkBuilder();
jclass carbonReaderBuilderClass = jniEnv->GetObjectClass(carbonReaderBuilderObject);
Expand Down Expand Up @@ -180,6 +207,18 @@ jobject CarbonReader::readNextRow() {
return result;
}

jobjectArray CarbonReader::readNextBatchRow() {
if (readNextBatchRowID == NULL) {
jclass carbonReader = jniEnv->GetObjectClass(carbonReaderObject);
readNextBatchRowID = jniEnv->GetMethodID(carbonReader, "readNextBatchRow",
"()[Ljava/lang/Object;");
if (readNextBatchRowID == NULL) {
throw std::runtime_error("Can't find the method in java: readNextBatchRow");
}
}
return (jobjectArray) jniEnv->CallObjectMethod(carbonReaderObject, readNextBatchRowID);
}

void CarbonReader::close() {
checkReader();
jclass carbonReader = jniEnv->GetObjectClass(carbonReaderObject);
Expand Down
25 changes: 25 additions & 0 deletions store/CSDK/src/CarbonReader.h
Expand Up @@ -29,6 +29,11 @@ class CarbonReader {
*/
jmethodID readNextRowID = NULL;

/**
* readNextBatchRow jmethodID
*/
jmethodID readNextBatchRowID = NULL;

/**
* carbonReaderBuilder object for building carbonReader
* it can configure some operation
Expand Down Expand Up @@ -97,6 +102,19 @@ class CarbonReader {
*/
void withHadoopConf(char *key, char *value);

/**
* set batch size
*
* @param batch batch size
* @return CarbonReaderBuilder object
*/
void withBatch(int batch);

/**
* Configure Row Record Reader for reading.
*/
void withRowRecordReader();

/**
* build carbonReader object for reading data
* it support read data from load disk
Expand All @@ -118,6 +136,13 @@ class CarbonReader {
*/
jobject readNextRow();

/**
* read Next Batch Row
*
* @return rows
*/
jobjectArray readNextBatchRow();

/**
* close the carbon reader
*
Expand Down
16 changes: 16 additions & 0 deletions store/CSDK/src/CarbonRow.cpp
Expand Up @@ -102,7 +102,14 @@ void CarbonRow::checkOrdinal(int ordinal) {
}
}

void CarbonRow::checkCarbonRow() {
if (carbonRow == NULL) {
throw std::runtime_error("carbonRow is NULL! Please set carbonRow first..");
}
}

short CarbonRow::getShort(int ordinal) {
checkCarbonRow();
checkOrdinal(ordinal);
jvalue args[2];
args[0].l = carbonRow;
Expand All @@ -111,6 +118,7 @@ short CarbonRow::getShort(int ordinal) {
}

int CarbonRow::getInt(int ordinal) {
checkCarbonRow();
checkOrdinal(ordinal);
jvalue args[2];
args[0].l = carbonRow;
Expand All @@ -119,6 +127,7 @@ int CarbonRow::getInt(int ordinal) {
}

long CarbonRow::getLong(int ordinal) {
checkCarbonRow();
checkOrdinal(ordinal);
jvalue args[2];
args[0].l = carbonRow;
Expand All @@ -127,6 +136,7 @@ long CarbonRow::getLong(int ordinal) {
}

double CarbonRow::getDouble(int ordinal) {
checkCarbonRow();
checkOrdinal(ordinal);
jvalue args[2];
args[0].l = carbonRow;
Expand All @@ -136,6 +146,7 @@ double CarbonRow::getDouble(int ordinal) {


float CarbonRow::getFloat(int ordinal) {
checkCarbonRow();
checkOrdinal(ordinal);
jvalue args[2];
args[0].l = carbonRow;
Expand All @@ -144,6 +155,7 @@ float CarbonRow::getFloat(int ordinal) {
}

jboolean CarbonRow::getBoolean(int ordinal) {
checkCarbonRow();
checkOrdinal(ordinal);
jvalue args[2];
args[0].l = carbonRow;
Expand All @@ -152,6 +164,7 @@ jboolean CarbonRow::getBoolean(int ordinal) {
}

char *CarbonRow::getString(int ordinal) {
checkCarbonRow();
checkOrdinal(ordinal);
jvalue args[2];
args[0].l = carbonRow;
Expand All @@ -163,6 +176,7 @@ char *CarbonRow::getString(int ordinal) {
}

char *CarbonRow::getDecimal(int ordinal) {
checkCarbonRow();
checkOrdinal(ordinal);
jvalue args[2];
args[0].l = carbonRow;
Expand All @@ -174,6 +188,7 @@ char *CarbonRow::getDecimal(int ordinal) {
}

char *CarbonRow::getVarchar(int ordinal) {
checkCarbonRow();
checkOrdinal(ordinal);
jvalue args[2];
args[0].l = carbonRow;
Expand All @@ -185,6 +200,7 @@ char *CarbonRow::getVarchar(int ordinal) {
}

jobjectArray CarbonRow::getArray(int ordinal) {
checkCarbonRow();
checkOrdinal(ordinal);
jvalue args[2];
args[0].l = carbonRow;
Expand Down

0 comments on commit a3a83dc

Please sign in to comment.