diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java index 245d3e89a43..1aad7c80306 100644 --- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java +++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java @@ -52,7 +52,7 @@ public static void main(String[] args) throws Exception { CarbonProperties.getInstance() .addProperty(CarbonLoadOptionConstants.ENABLE_CARBON_LOAD_DIRECT_WRITE_TO_STORE_PATH, "true"); - String path = "s3a://sdk/WriterOutput"; + String path = "s3a://sdk/WriterOutput4"; if (args.length > 3) { path=args[3]; } @@ -62,7 +62,7 @@ public static void main(String[] args) throws Exception { num = Integer.parseInt(args[4]); } - Configuration conf = new Configuration(false); + Configuration conf = new Configuration(true); conf.set(Constants.ACCESS_KEY, args[0]); conf.set(Constants.SECRET_KEY, args[1]); conf.set(Constants.ENDPOINT, args[2]); @@ -70,8 +70,13 @@ public static void main(String[] args) throws Exception { Field[] fields = new Field[2]; fields[0] = new Field("name", DataTypes.STRING); fields[1] = new Field("age", DataTypes.INT); - CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path).withHadoopConf(conf); - CarbonWriter writer = builder.withCsvInput(new Schema(fields)).build(); + CarbonWriter writer = CarbonWriter + .builder() + .outputPath(path) + .withHadoopConf(conf) + .withCsvInput(new Schema(fields)) + .writtenBy("SDKS3Example") + .build(); for (int i = 0; i < num; i++) { writer.write(new String[]{"robot" + (i % 10), String.valueOf(i)}); diff --git a/store/CSDK/CMakeLists.txt b/store/CSDK/CMakeLists.txt index bfda148abd7..ab1429d4cb4 100644 --- a/store/CSDK/CMakeLists.txt +++ b/store/CSDK/CMakeLists.txt @@ -8,7 +8,7 @@ find_package(JNI REQUIRED) include_directories(${JNI_INCLUDE_DIRS}) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") -set(SOURCE_FILES src/CarbonReader.cpp src/CarbonReader.h test/main.cpp src/CarbonRow.h src/CarbonRow.cpp) +set(SOURCE_FILES src/CarbonReader.cpp src/CarbonReader.h test/main.cpp src/CarbonRow.h src/CarbonRow.cpp src/CarbonWriter.h src/CarbonWriter.cpp) add_executable(CJDK ${SOURCE_FILES}) get_filename_component(JAVA_JVM_LIBRARY_DIR ${JAVA_JVM_LIBRARY} DIRECTORY) diff --git a/store/CSDK/src/CarbonWriter.cpp b/store/CSDK/src/CarbonWriter.cpp new file mode 100644 index 00000000000..05497c66a72 --- /dev/null +++ b/store/CSDK/src/CarbonWriter.cpp @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "CarbonWriter.h" + +void CarbonWriter::builder(JNIEnv *env) { + if (env == NULL) { + throw std::runtime_error("JNIEnv parameter can't be NULL."); + } + jniEnv = env; + carbonWriter = env->FindClass("org/apache/carbondata/sdk/file/CarbonWriter"); + if (carbonWriter == NULL) { + throw std::runtime_error("Can't find the class in java: org/apache/carbondata/sdk/file/CarbonWriter"); + } + jmethodID carbonWriterBuilderID = env->GetStaticMethodID(carbonWriter, "builder", + "()Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;"); + if (carbonWriterBuilderID == NULL) { + throw std::runtime_error("Can't find the method in java: carbonWriterBuilder"); + } + carbonWriterBuilderObject = env->CallStaticObjectMethod(carbonWriter, carbonWriterBuilderID); +} + +bool CarbonWriter::checkBuilder() { + if (carbonWriterBuilderObject == NULL) { + throw std::runtime_error("carbonWriterBuilder Object can't be NULL. Please call builder method first."); + } +} + +void CarbonWriter::outputPath(char *path) { + if (path == NULL) { + throw std::runtime_error("path parameter can't be NULL."); + } + checkBuilder(); + jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject); + jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "outputPath", + "(Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;"); + if (methodID == NULL) { + throw std::runtime_error("Can't find the method in java: outputPath"); + } + jstring jPath = jniEnv->NewStringUTF(path); + jvalue args[1]; + args[0].l = jPath; + carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args); +} + +void CarbonWriter::withCsvInput(char *jsonSchema) { + if (jsonSchema == NULL) { + throw std::runtime_error("jsonSchema parameter can't be NULL."); + } + checkBuilder(); + jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject); + jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "withCsvInput", + "(Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;"); + if (methodID == NULL) { + throw std::runtime_error("Can't find the method in java: withCsvInput"); + } + jstring jPath = jniEnv->NewStringUTF(jsonSchema); + jvalue args[1]; + args[0].l = jPath; + carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args); + if (jniEnv->ExceptionCheck()) { + throw jniEnv->ExceptionOccurred(); + } +}; + +void CarbonWriter::withHadoopConf(char *key, char *value) { + if (key == NULL) { + throw std::runtime_error("key parameter can't be NULL."); + } + if (value == NULL) { + throw std::runtime_error("value parameter can't be NULL."); + } + checkBuilder(); + jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject); + jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "withHadoopConf", + "(Ljava/lang/String;Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;"); + if (methodID == NULL) { + throw std::runtime_error("Can't find the method in java: withHadoopConf"); + } + jvalue args[2]; + args[0].l = jniEnv->NewStringUTF(key); + args[1].l = jniEnv->NewStringUTF(value); + carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args); +} + +void CarbonWriter::writtenBy(char *appName) { + checkBuilder(); + jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject); + jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "writtenBy", + "(Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;"); + if (methodID == NULL) { + throw std::runtime_error("Can't find the method in java: writtenBy"); + } + jvalue args[1]; + args[0].l = jniEnv->NewStringUTF(appName); + carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args); +} + +void CarbonWriter::build() { + checkBuilder(); + jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject); + jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "build", + "()Lorg/apache/carbondata/sdk/file/CarbonWriter;"); + if (methodID == NULL) { + throw std::runtime_error("Can't find the method in java: build"); + } + carbonWriterObject = jniEnv->CallObjectMethod(carbonWriterBuilderObject, methodID); + + if (jniEnv->ExceptionCheck()) { + throw jniEnv->ExceptionOccurred(); + } +} + +void CarbonWriter::write(jobject obj) { + if (carbonWriterObject == NULL) { + throw std::runtime_error("Please call build first."); + } + if (writeID == NULL) { + carbonWriter = jniEnv->GetObjectClass(carbonWriterObject); + writeID = jniEnv->GetMethodID(carbonWriter, "write", "(Ljava/lang/Object;)V"); + if (writeID == NULL) { + throw std::runtime_error("Can't find the method in java: write"); + } + } + jvalue args[1]; + args[0].l = obj; + jniEnv->CallBooleanMethodA(carbonWriterObject, writeID, args); + if (jniEnv->ExceptionCheck()) { + throw jniEnv->ExceptionOccurred(); + } +}; + +jboolean CarbonWriter::close() { + if (carbonWriterObject == NULL) { + throw std::runtime_error("Please call build first."); + } + jclass carbonWriter = jniEnv->GetObjectClass(carbonWriterObject); + jmethodID methodID = jniEnv->GetMethodID(carbonWriter, "close", "()V"); + if (methodID == NULL) { + throw std::runtime_error("Can't find the method in java: close"); + } + jniEnv->CallBooleanMethod(carbonWriterObject, methodID); + if (jniEnv->ExceptionCheck()) { + throw jniEnv->ExceptionOccurred(); + } +} \ No newline at end of file diff --git a/store/CSDK/src/CarbonWriter.h b/store/CSDK/src/CarbonWriter.h new file mode 100644 index 00000000000..1ea5baa5367 --- /dev/null +++ b/store/CSDK/src/CarbonWriter.h @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +class CarbonWriter { +private: + /** + * jni env + */ + JNIEnv *jniEnv; + + /** + * carbonWriterBuilder object for building carbonWriter + * it can configure some operation + */ + jobject carbonWriterBuilderObject = NULL; + + /** + * carbonWriter object for writing data + */ + jobject carbonWriterObject; + + /** + * carbon writer class + */ + jclass carbonWriter; + + /** + * write method id + */ + jmethodID writeID = NULL; + + /** + * check whether has called builder + * + * @return true or throw exception + */ + bool checkBuilder(); +public: + /** + * create a CarbonWriterBuilder object for building carbonWriter, + * CarbonWriterBuilder object can configure different parameter + * + * @param env JNIEnv + * @return CarbonWriterBuilder object + */ + void builder(JNIEnv *env); + + /** + * Sets the output path of the writer builder + * + * @param path is the absolute path where output files are written + * This method must be called when building CarbonWriterBuilder + * @return updated CarbonWriterBuilder + */ + void outputPath(char *path); + + /** + * configure the schema with json style schema + * + * @param jsonSchema json style schema + * @return updated CarbonWriterBuilder + */ + void withCsvInput(char *jsonSchema); + + /** + * configure parameter, including ak,sk and endpoint + * + * @param key key word + * @param value value + * @return CarbonWriterBuilder object + */ + void withHadoopConf(char *key, char *value); + + /** + * @param appName appName which is writing the carbondata files + */ + void writtenBy(char *appName); + + /** + * build carbonWriter object for writing data + * it support write data from load disk + * + * @return carbonWriter object + */ + void build(); + + /** + * Write an object to the file, the format of the object depends on the + * implementation. + * Note: This API is not thread safe + */ + void write(jobject obj); + + /** + * close the carbon Writer + * + * @return boolean value + */ + jboolean close(); +}; + + diff --git a/store/CSDK/test/main.cpp b/store/CSDK/test/main.cpp index 843155f678e..2766e5ccf70 100644 --- a/store/CSDK/test/main.cpp +++ b/store/CSDK/test/main.cpp @@ -20,8 +20,10 @@ #include #include #include +#include #include "../src/CarbonReader.h" #include "../src/CarbonRow.h" +#include "../src/CarbonWriter.h" using namespace std; @@ -210,6 +212,288 @@ bool tryCatchException(JNIEnv *env) { } printf("\nfinished handle exception\n"); } + +/** + * test write data to local disk + * + * @param env jni env + * @param path file path + * @param argc argument counter + * @param argv argument vector + * @return true or throw exception + */ +bool testWriteData(JNIEnv *env, char *path, int argc, char *argv[]) { + + char *jsonSchema = "[{stringField:string},{shortField:short},{intField:int},{longField:long},{doubleField:double},{boolField:boolean},{dateField:date},{timeField:timestamp},{floatField:float},{arrayField:array}]"; + try { + CarbonWriter writer; + writer.builder(env); + if (argc > 3) { + writer.withHadoopConf("fs.s3a.access.key", argv[1]); + writer.withHadoopConf("fs.s3a.secret.key", argv[2]); + writer.withHadoopConf("fs.s3a.endpoint", argv[3]); + } + writer.outputPath(path); + writer.withCsvInput(jsonSchema); + writer.writtenBy("CSDK"); + writer.build(); + + int rowNum = 10; + int size = 10; + long longValue = 0; + double doubleValue = 0; + float floatValue = 0; + jclass objClass = env->FindClass("java/lang/String"); + for (int i = 0; i < rowNum; ++i) { + jobjectArray arr = env->NewObjectArray(size, objClass, 0); + char ctrInt[10]; + gcvt(i, 10, ctrInt); + + char a[15] = "robot"; + strcat(a, ctrInt); + jobject stringField = env->NewStringUTF(a); + env->SetObjectArrayElement(arr, 0, stringField); + + char ctrShort[10]; + gcvt(i % 10000, 10, ctrShort); + jobject shortField = env->NewStringUTF(ctrShort); + env->SetObjectArrayElement(arr, 1, shortField); + + jobject intField = env->NewStringUTF(ctrInt); + env->SetObjectArrayElement(arr, 2, intField); + + + char ctrLong[10]; + gcvt(longValue, 10, ctrLong); + longValue = longValue + 2; + jobject longField = env->NewStringUTF(ctrLong); + env->SetObjectArrayElement(arr, 3, longField); + + char ctrDouble[10]; + gcvt(doubleValue, 10, ctrDouble); + doubleValue = doubleValue + 2; + jobject doubleField = env->NewStringUTF(ctrDouble); + env->SetObjectArrayElement(arr, 4, doubleField); + + jobject boolField = env->NewStringUTF("true"); + env->SetObjectArrayElement(arr, 5, boolField); + + jobject dateField = env->NewStringUTF(" 2019-03-02"); + env->SetObjectArrayElement(arr, 6, dateField); + + jobject timeField = env->NewStringUTF("2019-02-12 03:03:34"); + env->SetObjectArrayElement(arr, 7, timeField); + + char ctrFloat[10]; + gcvt(floatValue, 10, ctrFloat); + floatValue = floatValue + 2; + jobject floatField = env->NewStringUTF(ctrFloat); + env->SetObjectArrayElement(arr, 8, floatField); + + jobject arrayField = env->NewStringUTF("Hello#World#From#Carbon"); + env->SetObjectArrayElement(arr, 9, arrayField); + + + writer.write(arr); + + env->DeleteLocalRef(stringField); + env->DeleteLocalRef(shortField); + env->DeleteLocalRef(intField); + env->DeleteLocalRef(longField); + env->DeleteLocalRef(doubleField); + env->DeleteLocalRef(floatField); + env->DeleteLocalRef(dateField); + env->DeleteLocalRef(timeField); + env->DeleteLocalRef(boolField); + env->DeleteLocalRef(arrayField); + env->DeleteLocalRef(arr); + } + writer.close(); + + CarbonReader carbonReader; + carbonReader.builder(env, path); + carbonReader.build(); + int i = 0; + CarbonRow carbonRow(env); + while (carbonReader.hasNext()) { + jobject row = carbonReader.readNextRow(); + i++; + carbonRow.setCarbonRow(row); + printf("%s\t%d\t%ld\t", carbonRow.getString(0), carbonRow.getInt(1), carbonRow.getLong(2)); + jobjectArray array1 = carbonRow.getArray(3); + jsize length = env->GetArrayLength(array1); + int j = 0; + for (j = 0; j < length; j++) { + jobject element = env->GetObjectArrayElement(array1, j); + char *str = (char *) env->GetStringUTFChars((jstring) element, JNI_FALSE); + printf("%s\t", str); + } + printf("%d\t", carbonRow.getShort(4)); + printf("%d\t", carbonRow.getInt(5)); + printf("%ld\t", carbonRow.getLong(6)); + printf("%lf\t", carbonRow.getDouble(7)); + bool bool1 = carbonRow.getBoolean(8); + if (bool1) { + printf("true\t"); + } else { + printf("false\t"); + } + printf("%f\t\n", carbonRow.getFloat(9)); + env->DeleteLocalRef(row); + } + carbonReader.close(); + } catch (jthrowable ex) { + env->ExceptionDescribe(); + env->ExceptionClear(); + } +} + +/** + * test write data to S3 + * + * @param env jni env + * @return + */ +bool writeToS3(JNIEnv *env, char *path, char *argv[]) { + printf("test write data ro S3"); + char *jsonSchema = "[{stringField:string},{shortField:short},{intField:int},{longField:long},{doubleField:double},{boolField:boolean},{dateField:date},{timeField:timestamp},{floatField:float},{arrayField:array}]"; + + char *args[3]; + // "your access key" + args[0] = argv[1]; + // "your secret key" + args[1] = argv[2]; + // "your endPoint" + args[2] = argv[3]; + + CarbonWriter carbonWriterClass; + carbonWriterClass.builder(env); + carbonWriterClass.outputPath(path); + carbonWriterClass.withHadoopConf("fs.s3a.access.key", argv[1]); + carbonWriterClass.withHadoopConf("fs.s3a.secret.key", argv[2]); + carbonWriterClass.withHadoopConf("fs.s3a.endpoint", argv[3]); + carbonWriterClass.withCsvInput(jsonSchema); + carbonWriterClass.writtenBy("CSDK"); + carbonWriterClass.build(); + + int rowNum = 10; + int size = 10; + long longValue = 0; + double doubleValue = 0; + float floatValue = 0; + jclass objClass = env->FindClass("java/lang/String"); + for (int i = 0; i < rowNum; ++i) { + jobjectArray arr = env->NewObjectArray(size, objClass, 0); + char ctrInt[10]; + gcvt(i, 10, ctrInt); + + char a[15] = "robot"; + strcat(a, ctrInt); + jobject stringField = env->NewStringUTF(a); + env->SetObjectArrayElement(arr, 0, stringField); + + char ctrShort[10]; + gcvt(i % 10000, 10, ctrShort); + jobject shortField = env->NewStringUTF(ctrShort); + env->SetObjectArrayElement(arr, 1, shortField); + + jobject intField = env->NewStringUTF(ctrInt); + env->SetObjectArrayElement(arr, 2, intField); + + + char ctrLong[10]; + gcvt(longValue, 10, ctrLong); + longValue = longValue + 2; + jobject longField = env->NewStringUTF(ctrLong); + env->SetObjectArrayElement(arr, 3, longField); + + char ctrDouble[10]; + gcvt(doubleValue, 10, ctrDouble); + doubleValue = doubleValue + 2; + jobject doubleField = env->NewStringUTF(ctrDouble); + env->SetObjectArrayElement(arr, 4, doubleField); + + jobject boolField = env->NewStringUTF("true"); + env->SetObjectArrayElement(arr, 5, boolField); + + jobject dateField = env->NewStringUTF(" 2019-03-02"); + env->SetObjectArrayElement(arr, 6, dateField); + + jobject timeField = env->NewStringUTF("2019-02-12 03:03:34"); + env->SetObjectArrayElement(arr, 7, timeField); + + char ctrFloat[10]; + gcvt(floatValue, 10, ctrFloat); + floatValue = floatValue + 2; + jobject floatField = env->NewStringUTF(ctrFloat); + env->SetObjectArrayElement(arr, 8, floatField); + + jobject arrayField = env->NewStringUTF("Hello#World#From#Carbon"); + env->SetObjectArrayElement(arr, 9, arrayField); + + + carbonWriterClass.write(arr); + + env->DeleteLocalRef(stringField); + env->DeleteLocalRef(shortField); + env->DeleteLocalRef(intField); + env->DeleteLocalRef(longField); + env->DeleteLocalRef(doubleField); + env->DeleteLocalRef(floatField); + env->DeleteLocalRef(dateField); + env->DeleteLocalRef(timeField); + env->DeleteLocalRef(boolField); + env->DeleteLocalRef(arrayField); + env->DeleteLocalRef(arr); + } + try { + carbonWriterClass.close(); + } catch (jthrowable e) { + env->ExceptionDescribe(); + env->ExceptionClear(); + } + CarbonReader carbonReader; + carbonReader.builder(env, path); + carbonReader.withHadoopConf("fs.s3a.access.key", argv[1]); + carbonReader.withHadoopConf("fs.s3a.secret.key", argv[2]); + carbonReader.withHadoopConf("fs.s3a.endpoint", argv[3]); + try { + carbonReader.build(); + } catch (jthrowable e) { + env->ExceptionDescribe(); + env->ExceptionClear(); + } + int i = 0; + CarbonRow carbonRow(env); + while (carbonReader.hasNext()) { + jobject row = carbonReader.readNextRow(); + i++; + carbonRow.setCarbonRow(row); + printf("%s\t%d\t%ld\t", carbonRow.getString(0), carbonRow.getInt(1), carbonRow.getLong(2)); + jobjectArray array1 = carbonRow.getArray(3); + jsize length = env->GetArrayLength(array1); + int j = 0; + for (j = 0; j < length; j++) { + jobject element = env->GetObjectArrayElement(array1, j); + char *str = (char *) env->GetStringUTFChars((jstring) element, JNI_FALSE); + printf("%s\t", str); + } + printf("%d\t", carbonRow.getShort(4)); + printf("%d\t", carbonRow.getInt(5)); + printf("%ld\t", carbonRow.getLong(6)); + printf("%lf\t", carbonRow.getDouble(7)); + bool bool1 = carbonRow.getBoolean(8); + if (bool1) { + printf("true\t"); + } else { + printf("false\t"); + } + printf("%f\t\n", carbonRow.getFloat(9)); + env->DeleteLocalRef(row); + } + carbonReader.close(); +} + /** * read data from S3 * parameter is ak sk endpoint @@ -250,12 +534,15 @@ int main(int argc, char *argv[]) { // init jvm JNIEnv *env; env = initJVM(); + char *S3WritePath = "s3a://sdk/csdk/"; if (argc > 3) { readFromS3(env, argv); + testWriteData(env, S3WritePath, 4, argv); } else { tryCatchException(env); readFromLocalWithoutProjection(env); + testWriteData(env, "./data", 1, argv); readFromLocal(env); } (jvm)->DestroyJavaVM(); diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index a47cc68907d..877777fbf5f 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -268,6 +268,21 @@ public CarbonWriterBuilder withHadoopConf(Configuration conf) { return this; } + /** + * configure hadoop configuration with key value + * + * @param key key word + * @param value value + * @return this object + */ + public CarbonWriterBuilder withHadoopConf(String key, String value) { + if (this.hadoopConf == null) { + this.hadoopConf = new Configuration(true); + } + this.hadoopConf.set(key, value); + return this; + } + /** * To set the carbondata file size in MB between 1MB-2048MB * @param blockSize is size in MB between 1MB to 2048 MB @@ -341,6 +356,19 @@ public CarbonWriterBuilder withCsvInput(Schema schema) { return this; } + /** + * to build a {@link CarbonWriter}, which accepts row in CSV format + * + * @param jsonSchema json Schema string + * @return CarbonWriterBuilder + */ + public CarbonWriterBuilder withCsvInput(String jsonSchema) { + Objects.requireNonNull(jsonSchema, "schema should not be null"); + this.schema = Schema.parseJson(jsonSchema); + this.writerType = WRITER_TYPE.CSV; + return this; + } + /** * to build a {@link CarbonWriter}, which accepts Avro object * diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java index 483ec887e9b..90ea9095fb2 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java @@ -104,6 +104,37 @@ public void testWriteFilesJsonSchema() throws IOException { FileUtils.deleteDirectory(new File(path)); } + @Test + public void testWriteFilesBuildWithJsonSchema() throws IOException, InvalidLoadOptionException, InterruptedException { + String path = "./testWriteFilesJsonSchema"; + FileUtils.deleteDirectory(new File(path)); + + String schema = "[{name:string},{age:int},{height:double}]"; + CarbonWriterBuilder builder = CarbonWriter + .builder() + .outputPath(path) + .withCsvInput(schema) + .writtenBy("testWriteFilesBuildWithJsonSchema"); + + CarbonWriter writer = builder.build(); + for (int i = 0; i < 10; i++) { + writer.write(new String[]{ + "robot" + (i % 10), String.valueOf(i % 3000000), String.valueOf((double) i / 2)}); + } + writer.close(); + + CarbonReader carbonReader = CarbonReader.builder(path).build(); + int i = 0; + while (carbonReader.hasNext()) { + Object[] row = (Object[]) carbonReader.readNextRow(); + Assert.assertEquals(row[0], "robot" + i % 10); + System.out.println(); + i++; + } + carbonReader.close(); + FileUtils.deleteDirectory(new File(path)); + } + @Test public void testAllPrimitiveDataType() throws IOException { // TODO: write all data type and read by CarbonRecordReader to verify the content