From 8cd7d00629fdc2d2dacc9674e65dbd6201c9a692 Mon Sep 17 00:00:00 2001 From: xubo245 Date: Fri, 19 Oct 2018 15:40:04 +0800 Subject: [PATCH 1/2] [CARBONDATA-3000] Provide C++ interface for writing carbon data in CSDK 1.suport string, short, int, long, double, float, array, boolean data type 2.provide builder, build, write, close interface 3.TODO: support S3 fix error optimize --- store/CSDK/CMakeLists.txt | 2 +- store/CSDK/src/CarbonWriter.cpp | 90 ++++++ store/CSDK/src/CarbonWriter.h | 111 +++++++ store/CSDK/test/main.cpp | 275 ++++++++++++++++++ .../sdk/file/CarbonWriterBuilder.java | 28 ++ .../sdk/file/CSVCarbonWriterTest.java | 30 ++ 6 files changed, 535 insertions(+), 1 deletion(-) create mode 100644 store/CSDK/src/CarbonWriter.cpp create mode 100644 store/CSDK/src/CarbonWriter.h diff --git a/store/CSDK/CMakeLists.txt b/store/CSDK/CMakeLists.txt index bfda148abd7..ab1429d4cb4 100644 --- a/store/CSDK/CMakeLists.txt +++ b/store/CSDK/CMakeLists.txt @@ -8,7 +8,7 @@ find_package(JNI REQUIRED) include_directories(${JNI_INCLUDE_DIRS}) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") -set(SOURCE_FILES src/CarbonReader.cpp src/CarbonReader.h test/main.cpp src/CarbonRow.h src/CarbonRow.cpp) +set(SOURCE_FILES src/CarbonReader.cpp src/CarbonReader.h test/main.cpp src/CarbonRow.h src/CarbonRow.cpp src/CarbonWriter.h src/CarbonWriter.cpp) add_executable(CJDK ${SOURCE_FILES}) get_filename_component(JAVA_JVM_LIBRARY_DIR ${JAVA_JVM_LIBRARY} DIRECTORY) diff --git a/store/CSDK/src/CarbonWriter.cpp b/store/CSDK/src/CarbonWriter.cpp new file mode 100644 index 00000000000..2f2419c9da7 --- /dev/null +++ b/store/CSDK/src/CarbonWriter.cpp @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "CarbonWriter.h" + +void CarbonWriter::builder(JNIEnv *env) { + jniEnv = env; + carbonWriter = env->FindClass("org/apache/carbondata/sdk/file/CarbonWriter"); + jmethodID carbonWriterBuilderID = env->GetStaticMethodID(carbonWriter, "builder", + "()Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;"); + carbonWriterBuilderObject = env->CallStaticObjectMethod(carbonWriter, carbonWriterBuilderID); +} + +void CarbonWriter::outputPath(char *path) { + jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject); + jmethodID carbonWriterBuilderID = jniEnv->GetMethodID(carbonWriterBuilderClass, "outputPath", + "(Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;"); + jstring jPath = jniEnv->NewStringUTF(path); + jvalue args[1]; + args[0].l = jPath; + carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, carbonWriterBuilderID, args); +} + +void CarbonWriter::withCsvInput(char *jsonSchema) { + + jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject); + jmethodID carbonWriterBuilderID = jniEnv->GetMethodID(carbonWriterBuilderClass, "withCsvInput", + "(Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;"); + jstring jPath = jniEnv->NewStringUTF(jsonSchema); + jvalue args[1]; + args[0].l = jPath; + carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, carbonWriterBuilderID, args); +}; + +void CarbonWriter::withHadoopConf(char *key, char *value) { + jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject); + jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "withHadoopConf", + "(Ljava/lang/String;Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;"); + jvalue args[2]; + args[0].l = jniEnv->NewStringUTF(key); + args[1].l = jniEnv->NewStringUTF(value); + carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args); +} + +void CarbonWriter::writtenBy(char *appName) { + jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject); + jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "writtenBy", + "(Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;"); + jvalue args[1]; + args[0].l = jniEnv->NewStringUTF(appName); + carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args); +} + +void CarbonWriter::build() { + jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject); + jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "build", + "()Lorg/apache/carbondata/sdk/file/CarbonWriter;"); + carbonWriterObject = jniEnv->CallObjectMethod(carbonWriterBuilderObject, methodID); + carbonWriter = jniEnv->GetObjectClass(carbonWriterObject); + writeID = jniEnv->GetMethodID(carbonWriter, "write", "(Ljava/lang/Object;)V"); +} + +void CarbonWriter::write(jobject obj) { + jvalue args[1]; + args[0].l = obj; + jniEnv->CallBooleanMethodA(carbonWriterObject, writeID, args); +}; + +jboolean CarbonWriter::close() { + jclass carbonWriter = jniEnv->GetObjectClass(carbonWriterObject); + jmethodID closeID = jniEnv->GetMethodID(carbonWriter, "close", "()V"); + jniEnv->CallBooleanMethod(carbonWriterObject, closeID); + if (jniEnv->ExceptionCheck()) { + throw jniEnv->ExceptionOccurred(); + } +} \ No newline at end of file diff --git a/store/CSDK/src/CarbonWriter.h b/store/CSDK/src/CarbonWriter.h new file mode 100644 index 00000000000..b954849dba5 --- /dev/null +++ b/store/CSDK/src/CarbonWriter.h @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +class CarbonWriter { +private: + /** + * jni env + */ + JNIEnv *jniEnv; + + /** + * carbonWriterBuilder object for building carbonWriter + * it can configure some operation + */ + jobject carbonWriterBuilderObject; + + /** + * carbonWriter object for writing data + */ + jobject carbonWriterObject; + + /** + * carbon writer class + */ + jclass carbonWriter; + + /** + * write method id + */ + jmethodID writeID; +public: + /** + * create a CarbonWriterBuilder object for building carbonWriter, + * CarbonWriterBuilder object can configure different parameter + * + * @param env JNIEnv + * @return CarbonWriterBuilder object + */ + void builder(JNIEnv *env); + + /** + * Sets the output path of the writer builder + * + * @param path is the absolute path where output files are written + * This method must be called when building CarbonWriterBuilder + * @return updated CarbonWriterBuilder + */ + void outputPath(char *path); + + /** + * configure the schema with json style schema + * + * @param jsonSchema json style schema + * @return updated CarbonWriterBuilder + */ + void withCsvInput(char *jsonSchema); + + /** + * configure parameter, including ak,sk and endpoint + * + * @param key key word + * @param value value + * @return CarbonWriterBuilder object + */ + void withHadoopConf(char *key, char *value); + + /** + * @param appName appName which is writing the carbondata files + */ + void writtenBy(char *appName); + + /** + * build carbonWriter object for writing data + * it support write data from load disk + * + * @return carbonWriter object + */ + void build(); + + /** + * Write an object to the file, the format of the object depends on the + * implementation. + * Note: This API is not thread safe + */ + void write(jobject obj); + + /** + * close the carbon Writer + * + * @return boolean value + */ + jboolean close(); +}; + + diff --git a/store/CSDK/test/main.cpp b/store/CSDK/test/main.cpp index 843155f678e..15fd49aa069 100644 --- a/store/CSDK/test/main.cpp +++ b/store/CSDK/test/main.cpp @@ -20,8 +20,10 @@ #include #include #include +#include #include "../src/CarbonReader.h" #include "../src/CarbonRow.h" +#include "../src/CarbonWriter.h" using namespace std; @@ -210,6 +212,276 @@ bool tryCatchException(JNIEnv *env) { } printf("\nfinished handle exception\n"); } + +/** + * test write data to local disk + * + * @param env jni env + * @return + */ +bool writeToLocal(JNIEnv *env, char *path) { + + char *jsonSchema = "[{stringField:string},{shortField:short},{intField:int},{longField:long},{doubleField:double},{boolField:boolean},{dateField:date},{timeField:timestamp},{floatField:float},{arrayField:array}]"; + + CarbonWriter carbonWriterClass; + carbonWriterClass.builder(env); + carbonWriterClass.outputPath(path); + carbonWriterClass.withCsvInput(jsonSchema); + carbonWriterClass.writtenBy("CSDK"); + carbonWriterClass.build(); + + int rowNum = 10; + int size = 10; + long longValue = 0; + double doubleValue = 0; + float floatValue = 0; + jclass objClass = env->FindClass("java/lang/String"); + for (int i = 0; i < rowNum; ++i) { + jobjectArray arr = env->NewObjectArray(size, objClass, 0); + char ctrInt[10]; + gcvt(i, 10, ctrInt); + + char a[15] = "robot"; + strcat(a, ctrInt); + jobject stringField = env->NewStringUTF(a); + env->SetObjectArrayElement(arr, 0, stringField); + + char ctrShort[10]; + gcvt(i % 10000, 10, ctrShort); + jobject shortField = env->NewStringUTF(ctrShort); + env->SetObjectArrayElement(arr, 1, shortField); + + jobject intField = env->NewStringUTF(ctrInt); + env->SetObjectArrayElement(arr, 2, intField); + + + char ctrLong[10]; + gcvt(longValue, 10, ctrLong); + longValue = longValue + 2; + jobject longField = env->NewStringUTF(ctrLong); + env->SetObjectArrayElement(arr, 3, longField); + + char ctrDouble[10]; + gcvt(doubleValue, 10, ctrDouble); + doubleValue = doubleValue + 2; + jobject doubleField = env->NewStringUTF(ctrDouble); + env->SetObjectArrayElement(arr, 4, doubleField); + + jobject boolField = env->NewStringUTF("true"); + env->SetObjectArrayElement(arr, 5, boolField); + + jobject dateField = env->NewStringUTF(" 2019-03-02"); + env->SetObjectArrayElement(arr, 6, dateField); + + jobject timeField = env->NewStringUTF("2019-02-12 03:03:34"); + env->SetObjectArrayElement(arr, 7, timeField); + + char ctrFloat[10]; + gcvt(floatValue, 10, ctrFloat); + floatValue = floatValue + 2; + jobject floatField = env->NewStringUTF(ctrFloat); + env->SetObjectArrayElement(arr, 8, floatField); + + jobject arrayField = env->NewStringUTF("Hello#World#From#Carbon"); + env->SetObjectArrayElement(arr, 9, arrayField); + + + carbonWriterClass.write(arr); + + env->DeleteLocalRef(stringField); + env->DeleteLocalRef(shortField); + env->DeleteLocalRef(intField); + env->DeleteLocalRef(longField); + env->DeleteLocalRef(doubleField); + env->DeleteLocalRef(floatField); + env->DeleteLocalRef(dateField); + env->DeleteLocalRef(timeField); + env->DeleteLocalRef(boolField); + env->DeleteLocalRef(arrayField); + env->DeleteLocalRef(arr); + } + carbonWriterClass.close(); + + CarbonReader carbonReader; + carbonReader.builder(env, path); + carbonReader.build(); + int i = 0; + CarbonRow carbonRow(env); + while (carbonReader.hasNext()) { + jobject row = carbonReader.readNextRow(); + i++; + carbonRow.setCarbonRow(row); + printf("%s\t%d\t%ld\t", carbonRow.getString(0), carbonRow.getInt(1), carbonRow.getLong(2)); + jobjectArray array1 = carbonRow.getArray(3); + jsize length = env->GetArrayLength(array1); + int j = 0; + for (j = 0; j < length; j++) { + jobject element = env->GetObjectArrayElement(array1, j); + char *str = (char *) env->GetStringUTFChars((jstring) element, JNI_FALSE); + printf("%s\t", str); + } + printf("%d\t", carbonRow.getShort(4)); + printf("%d\t", carbonRow.getInt(5)); + printf("%ld\t", carbonRow.getLong(6)); + printf("%lf\t", carbonRow.getDouble(7)); + bool bool1 = carbonRow.getBoolean(8); + if (bool1) { + printf("true\t"); + } else { + printf("false\t"); + } + printf("%f\t\n", carbonRow.getFloat(9)); + env->DeleteLocalRef(row); + } + carbonReader.close(); +} + +/** + * test write data to S3 + * + * @param env jni env + * @return + */ +bool writeToS3(JNIEnv *env, char *path, char *argv[]) { + printf("test write data ro S3"); + char *jsonSchema = "[{stringField:string},{shortField:short},{intField:int},{longField:long},{doubleField:double},{boolField:boolean},{dateField:date},{timeField:timestamp},{floatField:float},{arrayField:array}]"; + + char *args[3]; + // "your access key" + args[0] = argv[1]; + // "your secret key" + args[1] = argv[2]; + // "your endPoint" + args[2] = argv[3]; + + CarbonWriter carbonWriterClass; + carbonWriterClass.builder(env); + carbonWriterClass.outputPath(path); + carbonWriterClass.withHadoopConf("fs.s3a.access.key", argv[1]); + carbonWriterClass.withHadoopConf("fs.s3a.secret.key", argv[2]); + carbonWriterClass.withHadoopConf("fs.s3a.endpoint", argv[3]); + carbonWriterClass.withCsvInput(jsonSchema); + carbonWriterClass.writtenBy("CSDK"); + carbonWriterClass.build(); + + int rowNum = 10; + int size = 10; + long longValue = 0; + double doubleValue = 0; + float floatValue = 0; + jclass objClass = env->FindClass("java/lang/String"); + for (int i = 0; i < rowNum; ++i) { + jobjectArray arr = env->NewObjectArray(size, objClass, 0); + char ctrInt[10]; + gcvt(i, 10, ctrInt); + + char a[15] = "robot"; + strcat(a, ctrInt); + jobject stringField = env->NewStringUTF(a); + env->SetObjectArrayElement(arr, 0, stringField); + + char ctrShort[10]; + gcvt(i % 10000, 10, ctrShort); + jobject shortField = env->NewStringUTF(ctrShort); + env->SetObjectArrayElement(arr, 1, shortField); + + jobject intField = env->NewStringUTF(ctrInt); + env->SetObjectArrayElement(arr, 2, intField); + + + char ctrLong[10]; + gcvt(longValue, 10, ctrLong); + longValue = longValue + 2; + jobject longField = env->NewStringUTF(ctrLong); + env->SetObjectArrayElement(arr, 3, longField); + + char ctrDouble[10]; + gcvt(doubleValue, 10, ctrDouble); + doubleValue = doubleValue + 2; + jobject doubleField = env->NewStringUTF(ctrDouble); + env->SetObjectArrayElement(arr, 4, doubleField); + + jobject boolField = env->NewStringUTF("true"); + env->SetObjectArrayElement(arr, 5, boolField); + + jobject dateField = env->NewStringUTF(" 2019-03-02"); + env->SetObjectArrayElement(arr, 6, dateField); + + jobject timeField = env->NewStringUTF("2019-02-12 03:03:34"); + env->SetObjectArrayElement(arr, 7, timeField); + + char ctrFloat[10]; + gcvt(floatValue, 10, ctrFloat); + floatValue = floatValue + 2; + jobject floatField = env->NewStringUTF(ctrFloat); + env->SetObjectArrayElement(arr, 8, floatField); + + jobject arrayField = env->NewStringUTF("Hello#World#From#Carbon"); + env->SetObjectArrayElement(arr, 9, arrayField); + + + carbonWriterClass.write(arr); + + env->DeleteLocalRef(stringField); + env->DeleteLocalRef(shortField); + env->DeleteLocalRef(intField); + env->DeleteLocalRef(longField); + env->DeleteLocalRef(doubleField); + env->DeleteLocalRef(floatField); + env->DeleteLocalRef(dateField); + env->DeleteLocalRef(timeField); + env->DeleteLocalRef(boolField); + env->DeleteLocalRef(arrayField); + env->DeleteLocalRef(arr); + } + try { + carbonWriterClass.close(); + } catch (jthrowable e) { + env->ExceptionDescribe(); + env->ExceptionClear(); + } + CarbonReader carbonReader; + carbonReader.builder(env, path); + carbonReader.withHadoopConf("fs.s3a.access.key", argv[1]); + carbonReader.withHadoopConf("fs.s3a.secret.key", argv[2]); + carbonReader.withHadoopConf("fs.s3a.endpoint", argv[3]); + try { + carbonReader.build(); + } catch (jthrowable e) { + env->ExceptionDescribe(); + env->ExceptionClear(); + } + int i = 0; + CarbonRow carbonRow(env); + while (carbonReader.hasNext()) { + jobject row = carbonReader.readNextRow(); + i++; + carbonRow.setCarbonRow(row); + printf("%s\t%d\t%ld\t", carbonRow.getString(0), carbonRow.getInt(1), carbonRow.getLong(2)); + jobjectArray array1 = carbonRow.getArray(3); + jsize length = env->GetArrayLength(array1); + int j = 0; + for (j = 0; j < length; j++) { + jobject element = env->GetObjectArrayElement(array1, j); + char *str = (char *) env->GetStringUTFChars((jstring) element, JNI_FALSE); + printf("%s\t", str); + } + printf("%d\t", carbonRow.getShort(4)); + printf("%d\t", carbonRow.getInt(5)); + printf("%ld\t", carbonRow.getLong(6)); + printf("%lf\t", carbonRow.getDouble(7)); + bool bool1 = carbonRow.getBoolean(8); + if (bool1) { + printf("true\t"); + } else { + printf("false\t"); + } + printf("%f\t\n", carbonRow.getFloat(9)); + env->DeleteLocalRef(row); + } + carbonReader.close(); +} + /** * read data from S3 * parameter is ak sk endpoint @@ -250,12 +522,15 @@ int main(int argc, char *argv[]) { // init jvm JNIEnv *env; env = initJVM(); + char *S3WritePath = "s3a://sdk/ges/write2"; if (argc > 3) { readFromS3(env, argv); + writeToS3(env, S3WritePath, argv); } else { tryCatchException(env); readFromLocalWithoutProjection(env); + writeToLocal(env, "./data"); readFromLocal(env); } (jvm)->DestroyJavaVM(); diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index a47cc68907d..877777fbf5f 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -268,6 +268,21 @@ public CarbonWriterBuilder withHadoopConf(Configuration conf) { return this; } + /** + * configure hadoop configuration with key value + * + * @param key key word + * @param value value + * @return this object + */ + public CarbonWriterBuilder withHadoopConf(String key, String value) { + if (this.hadoopConf == null) { + this.hadoopConf = new Configuration(true); + } + this.hadoopConf.set(key, value); + return this; + } + /** * To set the carbondata file size in MB between 1MB-2048MB * @param blockSize is size in MB between 1MB to 2048 MB @@ -341,6 +356,19 @@ public CarbonWriterBuilder withCsvInput(Schema schema) { return this; } + /** + * to build a {@link CarbonWriter}, which accepts row in CSV format + * + * @param jsonSchema json Schema string + * @return CarbonWriterBuilder + */ + public CarbonWriterBuilder withCsvInput(String jsonSchema) { + Objects.requireNonNull(jsonSchema, "schema should not be null"); + this.schema = Schema.parseJson(jsonSchema); + this.writerType = WRITER_TYPE.CSV; + return this; + } + /** * to build a {@link CarbonWriter}, which accepts Avro object * diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java index 483ec887e9b..d066fce3c37 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java @@ -104,6 +104,36 @@ public void testWriteFilesJsonSchema() throws IOException { FileUtils.deleteDirectory(new File(path)); } + @Test + public void testWriteFilesBuildWithJsonSchema() throws IOException, InvalidLoadOptionException, InterruptedException { + String path = "./testWriteFilesJsonSchema"; + FileUtils.deleteDirectory(new File(path)); + + String schema = "[{name:string},{age:int},{height:double}]"; + CarbonWriterBuilder builder = CarbonWriter + .builder() + .outputPath(path) + .withCsvInput(schema); + + CarbonWriter writer = builder.build(); + for (int i = 0; i < 10; i++) { + writer.write(new String[]{ + "robot" + (i % 10), String.valueOf(i % 3000000), String.valueOf((double) i / 2)}); + } + writer.close(); + + CarbonReader carbonReader = CarbonReader.builder(path).build(); + int i = 0; + while (carbonReader.hasNext()) { + Object[] row = (Object[]) carbonReader.readNextRow(); + Assert.assertEquals(row[0], "robot" + i % 10); + System.out.println(); + i++; + } + carbonReader.close(); + FileUtils.deleteDirectory(new File(path)); + } + @Test public void testAllPrimitiveDataType() throws IOException { // TODO: write all data type and read by CarbonRecordReader to verify the content From 5f4115bf64ed4d9fc2b0ae8dc667937f18e5fbb0 Mon Sep 17 00:00:00 2001 From: xubo245 Date: Sat, 27 Oct 2018 12:10:31 +0800 Subject: [PATCH 2/2] optimize --- .../carbondata/examples/sdk/SDKS3Example.java | 13 +- store/CSDK/src/CarbonWriter.cpp | 91 ++++++- store/CSDK/src/CarbonWriter.h | 11 +- store/CSDK/test/main.cpp | 240 +++++++++--------- .../sdk/file/CSVCarbonWriterTest.java | 3 +- 5 files changed, 227 insertions(+), 131 deletions(-) diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java index 245d3e89a43..1aad7c80306 100644 --- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java +++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java @@ -52,7 +52,7 @@ public static void main(String[] args) throws Exception { CarbonProperties.getInstance() .addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH, "true"); - String path = "s3a://sdk/WriterOutput"; + String path = "s3a://sdk/WriterOutput4"; if (args.length > 3) { path=args[3]; } @@ -62,7 +62,7 @@ public static void main(String[] args) throws Exception { num = Integer.parseInt(args[4]); } - Configuration conf = new Configuration(false); + Configuration conf = new Configuration(true); conf.set(Constants.ACCESS_KEY, args[0]); conf.set(Constants.SECRET_KEY, args[1]); conf.set(Constants.ENDPOINT, args[2]); @@ -70,8 +70,13 @@ public static void main(String[] args) throws Exception { Field[] fields = new Field[2]; fields[0] = new Field("name", DataTypes.STRING); fields[1] = new Field("age", DataTypes.INT); - CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path).withHadoopConf(conf); - CarbonWriter writer = builder.withCsvInput(new Schema(fields)).build(); + CarbonWriter writer = CarbonWriter + .builder() + .outputPath(path) + .withHadoopConf(conf) + .withCsvInput(new Schema(fields)) + .writtenBy("SDKS3Example") + .build(); for (int i = 0; i < num; i++) { writer.write(new String[]{"robot" + (i % 10), String.valueOf(i)}); diff --git a/store/CSDK/src/CarbonWriter.cpp b/store/CSDK/src/CarbonWriter.cpp index 2f2419c9da7..05497c66a72 100644 --- a/store/CSDK/src/CarbonWriter.cpp +++ b/store/CSDK/src/CarbonWriter.cpp @@ -15,41 +15,83 @@ * limitations under the License. */ +#include #include "CarbonWriter.h" void CarbonWriter::builder(JNIEnv *env) { + if (env == NULL) { + throw std::runtime_error("JNIEnv parameter can't be NULL."); + } jniEnv = env; carbonWriter = env->FindClass("org/apache/carbondata/sdk/file/CarbonWriter"); + if (carbonWriter == NULL) { + throw std::runtime_error("Can't find the class in java: org/apache/carbondata/sdk/file/CarbonWriter"); + } jmethodID carbonWriterBuilderID = env->GetStaticMethodID(carbonWriter, "builder", "()Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;"); + if (carbonWriterBuilderID == NULL) { + throw std::runtime_error("Can't find the method in java: carbonWriterBuilder"); + } carbonWriterBuilderObject = env->CallStaticObjectMethod(carbonWriter, carbonWriterBuilderID); } +bool CarbonWriter::checkBuilder() { + if (carbonWriterBuilderObject == NULL) { + throw std::runtime_error("carbonWriterBuilder Object can't be NULL. Please call builder method first."); + } +} + void CarbonWriter::outputPath(char *path) { + if (path == NULL) { + throw std::runtime_error("path parameter can't be NULL."); + } + checkBuilder(); jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject); - jmethodID carbonWriterBuilderID = jniEnv->GetMethodID(carbonWriterBuilderClass, "outputPath", + jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "outputPath", "(Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;"); + if (methodID == NULL) { + throw std::runtime_error("Can't find the method in java: outputPath"); + } jstring jPath = jniEnv->NewStringUTF(path); jvalue args[1]; args[0].l = jPath; - carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, carbonWriterBuilderID, args); + carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args); } void CarbonWriter::withCsvInput(char *jsonSchema) { - + if (jsonSchema == NULL) { + throw std::runtime_error("jsonSchema parameter can't be NULL."); + } + checkBuilder(); jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject); - jmethodID carbonWriterBuilderID = jniEnv->GetMethodID(carbonWriterBuilderClass, "withCsvInput", + jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "withCsvInput", "(Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;"); + if (methodID == NULL) { + throw std::runtime_error("Can't find the method in java: withCsvInput"); + } jstring jPath = jniEnv->NewStringUTF(jsonSchema); jvalue args[1]; args[0].l = jPath; - carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, carbonWriterBuilderID, args); + carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args); + if (jniEnv->ExceptionCheck()) { + throw jniEnv->ExceptionOccurred(); + } }; void CarbonWriter::withHadoopConf(char *key, char *value) { + if (key == NULL) { + throw std::runtime_error("key parameter can't be NULL."); + } + if (value == NULL) { + throw std::runtime_error("value parameter can't be NULL."); + } + checkBuilder(); jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject); jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "withHadoopConf", "(Ljava/lang/String;Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;"); + if (methodID == NULL) { + throw std::runtime_error("Can't find the method in java: withHadoopConf"); + } jvalue args[2]; args[0].l = jniEnv->NewStringUTF(key); args[1].l = jniEnv->NewStringUTF(value); @@ -57,33 +99,62 @@ void CarbonWriter::withHadoopConf(char *key, char *value) { } void CarbonWriter::writtenBy(char *appName) { + checkBuilder(); jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject); jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "writtenBy", - "(Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;"); + "(Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;"); + if (methodID == NULL) { + throw std::runtime_error("Can't find the method in java: writtenBy"); + } jvalue args[1]; args[0].l = jniEnv->NewStringUTF(appName); carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args); } void CarbonWriter::build() { + checkBuilder(); jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject); jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "build", "()Lorg/apache/carbondata/sdk/file/CarbonWriter;"); + if (methodID == NULL) { + throw std::runtime_error("Can't find the method in java: build"); + } carbonWriterObject = jniEnv->CallObjectMethod(carbonWriterBuilderObject, methodID); - carbonWriter = jniEnv->GetObjectClass(carbonWriterObject); - writeID = jniEnv->GetMethodID(carbonWriter, "write", "(Ljava/lang/Object;)V"); + + if (jniEnv->ExceptionCheck()) { + throw jniEnv->ExceptionOccurred(); + } } void CarbonWriter::write(jobject obj) { + if (carbonWriterObject == NULL) { + throw std::runtime_error("Please call build first."); + } + if (writeID == NULL) { + carbonWriter = jniEnv->GetObjectClass(carbonWriterObject); + writeID = jniEnv->GetMethodID(carbonWriter, "write", "(Ljava/lang/Object;)V"); + if (writeID == NULL) { + throw std::runtime_error("Can't find the method in java: write"); + } + } jvalue args[1]; args[0].l = obj; jniEnv->CallBooleanMethodA(carbonWriterObject, writeID, args); + if (jniEnv->ExceptionCheck()) { + throw jniEnv->ExceptionOccurred(); + } }; jboolean CarbonWriter::close() { + if (carbonWriterObject == NULL) { + throw std::runtime_error("Please call build first."); + } jclass carbonWriter = jniEnv->GetObjectClass(carbonWriterObject); - jmethodID closeID = jniEnv->GetMethodID(carbonWriter, "close", "()V"); - jniEnv->CallBooleanMethod(carbonWriterObject, closeID); + jmethodID methodID = jniEnv->GetMethodID(carbonWriter, "close", "()V"); + if (methodID == NULL) { + throw std::runtime_error("Can't find the method in java: close"); + } + jniEnv->CallBooleanMethod(carbonWriterObject, methodID); if (jniEnv->ExceptionCheck()) { throw jniEnv->ExceptionOccurred(); } diff --git a/store/CSDK/src/CarbonWriter.h b/store/CSDK/src/CarbonWriter.h index b954849dba5..1ea5baa5367 100644 --- a/store/CSDK/src/CarbonWriter.h +++ b/store/CSDK/src/CarbonWriter.h @@ -28,7 +28,7 @@ class CarbonWriter { * carbonWriterBuilder object for building carbonWriter * it can configure some operation */ - jobject carbonWriterBuilderObject; + jobject carbonWriterBuilderObject = NULL; /** * carbonWriter object for writing data @@ -43,7 +43,14 @@ class CarbonWriter { /** * write method id */ - jmethodID writeID; + jmethodID writeID = NULL; + + /** + * check whether has called builder + * + * @return true or throw exception + */ + bool checkBuilder(); public: /** * create a CarbonWriterBuilder object for building carbonWriter, diff --git a/store/CSDK/test/main.cpp b/store/CSDK/test/main.cpp index 15fd49aa069..2766e5ccf70 100644 --- a/store/CSDK/test/main.cpp +++ b/store/CSDK/test/main.cpp @@ -217,123 +217,135 @@ bool tryCatchException(JNIEnv *env) { * test write data to local disk * * @param env jni env - * @return + * @param path file path + * @param argc argument counter + * @param argv argument vector + * @return true or throw exception */ -bool writeToLocal(JNIEnv *env, char *path) { +bool testWriteData(JNIEnv *env, char *path, int argc, char *argv[]) { char *jsonSchema = "[{stringField:string},{shortField:short},{intField:int},{longField:long},{doubleField:double},{boolField:boolean},{dateField:date},{timeField:timestamp},{floatField:float},{arrayField:array}]"; - - CarbonWriter carbonWriterClass; - carbonWriterClass.builder(env); - carbonWriterClass.outputPath(path); - carbonWriterClass.withCsvInput(jsonSchema); - carbonWriterClass.writtenBy("CSDK"); - carbonWriterClass.build(); - - int rowNum = 10; - int size = 10; - long longValue = 0; - double doubleValue = 0; - float floatValue = 0; - jclass objClass = env->FindClass("java/lang/String"); - for (int i = 0; i < rowNum; ++i) { - jobjectArray arr = env->NewObjectArray(size, objClass, 0); - char ctrInt[10]; - gcvt(i, 10, ctrInt); - - char a[15] = "robot"; - strcat(a, ctrInt); - jobject stringField = env->NewStringUTF(a); - env->SetObjectArrayElement(arr, 0, stringField); - - char ctrShort[10]; - gcvt(i % 10000, 10, ctrShort); - jobject shortField = env->NewStringUTF(ctrShort); - env->SetObjectArrayElement(arr, 1, shortField); - - jobject intField = env->NewStringUTF(ctrInt); - env->SetObjectArrayElement(arr, 2, intField); - - - char ctrLong[10]; - gcvt(longValue, 10, ctrLong); - longValue = longValue + 2; - jobject longField = env->NewStringUTF(ctrLong); - env->SetObjectArrayElement(arr, 3, longField); - - char ctrDouble[10]; - gcvt(doubleValue, 10, ctrDouble); - doubleValue = doubleValue + 2; - jobject doubleField = env->NewStringUTF(ctrDouble); - env->SetObjectArrayElement(arr, 4, doubleField); - - jobject boolField = env->NewStringUTF("true"); - env->SetObjectArrayElement(arr, 5, boolField); - - jobject dateField = env->NewStringUTF(" 2019-03-02"); - env->SetObjectArrayElement(arr, 6, dateField); - - jobject timeField = env->NewStringUTF("2019-02-12 03:03:34"); - env->SetObjectArrayElement(arr, 7, timeField); - - char ctrFloat[10]; - gcvt(floatValue, 10, ctrFloat); - floatValue = floatValue + 2; - jobject floatField = env->NewStringUTF(ctrFloat); - env->SetObjectArrayElement(arr, 8, floatField); - - jobject arrayField = env->NewStringUTF("Hello#World#From#Carbon"); - env->SetObjectArrayElement(arr, 9, arrayField); - - - carbonWriterClass.write(arr); - - env->DeleteLocalRef(stringField); - env->DeleteLocalRef(shortField); - env->DeleteLocalRef(intField); - env->DeleteLocalRef(longField); - env->DeleteLocalRef(doubleField); - env->DeleteLocalRef(floatField); - env->DeleteLocalRef(dateField); - env->DeleteLocalRef(timeField); - env->DeleteLocalRef(boolField); - env->DeleteLocalRef(arrayField); - env->DeleteLocalRef(arr); - } - carbonWriterClass.close(); - - CarbonReader carbonReader; - carbonReader.builder(env, path); - carbonReader.build(); - int i = 0; - CarbonRow carbonRow(env); - while (carbonReader.hasNext()) { - jobject row = carbonReader.readNextRow(); - i++; - carbonRow.setCarbonRow(row); - printf("%s\t%d\t%ld\t", carbonRow.getString(0), carbonRow.getInt(1), carbonRow.getLong(2)); - jobjectArray array1 = carbonRow.getArray(3); - jsize length = env->GetArrayLength(array1); - int j = 0; - for (j = 0; j < length; j++) { - jobject element = env->GetObjectArrayElement(array1, j); - char *str = (char *) env->GetStringUTFChars((jstring) element, JNI_FALSE); - printf("%s\t", str); + try { + CarbonWriter writer; + writer.builder(env); + if (argc > 3) { + writer.withHadoopConf("fs.s3a.access.key", argv[1]); + writer.withHadoopConf("fs.s3a.secret.key", argv[2]); + writer.withHadoopConf("fs.s3a.endpoint", argv[3]); } - printf("%d\t", carbonRow.getShort(4)); - printf("%d\t", carbonRow.getInt(5)); - printf("%ld\t", carbonRow.getLong(6)); - printf("%lf\t", carbonRow.getDouble(7)); - bool bool1 = carbonRow.getBoolean(8); - if (bool1) { - printf("true\t"); - } else { - printf("false\t"); + writer.outputPath(path); + writer.withCsvInput(jsonSchema); + writer.writtenBy("CSDK"); + writer.build(); + + int rowNum = 10; + int size = 10; + long longValue = 0; + double doubleValue = 0; + float floatValue = 0; + jclass objClass = env->FindClass("java/lang/String"); + for (int i = 0; i < rowNum; ++i) { + jobjectArray arr = env->NewObjectArray(size, objClass, 0); + char ctrInt[10]; + gcvt(i, 10, ctrInt); + + char a[15] = "robot"; + strcat(a, ctrInt); + jobject stringField = env->NewStringUTF(a); + env->SetObjectArrayElement(arr, 0, stringField); + + char ctrShort[10]; + gcvt(i % 10000, 10, ctrShort); + jobject shortField = env->NewStringUTF(ctrShort); + env->SetObjectArrayElement(arr, 1, shortField); + + jobject intField = env->NewStringUTF(ctrInt); + env->SetObjectArrayElement(arr, 2, intField); + + + char ctrLong[10]; + gcvt(longValue, 10, ctrLong); + longValue = longValue + 2; + jobject longField = env->NewStringUTF(ctrLong); + env->SetObjectArrayElement(arr, 3, longField); + + char ctrDouble[10]; + gcvt(doubleValue, 10, ctrDouble); + doubleValue = doubleValue + 2; + jobject doubleField = env->NewStringUTF(ctrDouble); + env->SetObjectArrayElement(arr, 4, doubleField); + + jobject boolField = env->NewStringUTF("true"); + env->SetObjectArrayElement(arr, 5, boolField); + + jobject dateField = env->NewStringUTF(" 2019-03-02"); + env->SetObjectArrayElement(arr, 6, dateField); + + jobject timeField = env->NewStringUTF("2019-02-12 03:03:34"); + env->SetObjectArrayElement(arr, 7, timeField); + + char ctrFloat[10]; + gcvt(floatValue, 10, ctrFloat); + floatValue = floatValue + 2; + jobject floatField = env->NewStringUTF(ctrFloat); + env->SetObjectArrayElement(arr, 8, floatField); + + jobject arrayField = env->NewStringUTF("Hello#World#From#Carbon"); + env->SetObjectArrayElement(arr, 9, arrayField); + + + writer.write(arr); + + env->DeleteLocalRef(stringField); + env->DeleteLocalRef(shortField); + env->DeleteLocalRef(intField); + env->DeleteLocalRef(longField); + env->DeleteLocalRef(doubleField); + env->DeleteLocalRef(floatField); + env->DeleteLocalRef(dateField); + env->DeleteLocalRef(timeField); + env->DeleteLocalRef(boolField); + env->DeleteLocalRef(arrayField); + env->DeleteLocalRef(arr); } - printf("%f\t\n", carbonRow.getFloat(9)); - env->DeleteLocalRef(row); + writer.close(); + + CarbonReader carbonReader; + carbonReader.builder(env, path); + carbonReader.build(); + int i = 0; + CarbonRow carbonRow(env); + while (carbonReader.hasNext()) { + jobject row = carbonReader.readNextRow(); + i++; + carbonRow.setCarbonRow(row); + printf("%s\t%d\t%ld\t", carbonRow.getString(0), carbonRow.getInt(1), carbonRow.getLong(2)); + jobjectArray array1 = carbonRow.getArray(3); + jsize length = env->GetArrayLength(array1); + int j = 0; + for (j = 0; j < length; j++) { + jobject element = env->GetObjectArrayElement(array1, j); + char *str = (char *) env->GetStringUTFChars((jstring) element, JNI_FALSE); + printf("%s\t", str); + } + printf("%d\t", carbonRow.getShort(4)); + printf("%d\t", carbonRow.getInt(5)); + printf("%ld\t", carbonRow.getLong(6)); + printf("%lf\t", carbonRow.getDouble(7)); + bool bool1 = carbonRow.getBoolean(8); + if (bool1) { + printf("true\t"); + } else { + printf("false\t"); + } + printf("%f\t\n", carbonRow.getFloat(9)); + env->DeleteLocalRef(row); + } + carbonReader.close(); + } catch (jthrowable ex) { + env->ExceptionDescribe(); + env->ExceptionClear(); } - carbonReader.close(); } /** @@ -522,15 +534,15 @@ int main(int argc, char *argv[]) { // init jvm JNIEnv *env; env = initJVM(); - char *S3WritePath = "s3a://sdk/ges/write2"; + char *S3WritePath = "s3a://sdk/csdk/"; if (argc > 3) { readFromS3(env, argv); - writeToS3(env, S3WritePath, argv); + testWriteData(env, S3WritePath, 4, argv); } else { tryCatchException(env); readFromLocalWithoutProjection(env); - writeToLocal(env, "./data"); + testWriteData(env, "./data", 1, argv); readFromLocal(env); } (jvm)->DestroyJavaVM(); diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java index d066fce3c37..90ea9095fb2 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java @@ -113,7 +113,8 @@ public void testWriteFilesBuildWithJsonSchema() throws IOException, InvalidLoadO CarbonWriterBuilder builder = CarbonWriter .builder() .outputPath(path) - .withCsvInput(schema); + .withCsvInput(schema) + .writtenBy("testWriteFilesBuildWithJsonSchema"); CarbonWriter writer = builder.build(); for (int i = 0; i < 10; i++) {