From 4a8af3f2052cb408a0d4bbbcf13e7d26a071eb8c Mon Sep 17 00:00:00 2001 From: ravipesala Date: Mon, 4 Dec 2017 16:07:03 +0530 Subject: [PATCH 1/2] Added outputformat for carbon --- .../core/metadata/datatype/StructType.java | 2 +- .../hadoop/api/CarbonOutputCommitter.java | 101 ++++++ .../hadoop/api/CarbonTableOutputFormat.java | 309 +++++++++++++++++- .../hadoop/ft/CarbonOutputMapperTest.java | 122 +++++++ .../hadoop/test/util/StoreCreator.java | 32 +- .../carbondata/spark/util/CommonUtil.scala | 45 --- .../spark/rdd/CarbonDataRDDFactory.scala | 6 +- .../management/CarbonLoadDataCommand.scala | 7 +- .../iterator/CarbonOutputIteratorWrapper.java | 118 +++++++ .../processing/util/CarbonLoaderUtil.java | 53 ++- .../streaming/StreamHandoffRDD.scala | 2 +- 11 files changed, 712 insertions(+), 85 deletions(-) create mode 100644 hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java create mode 100644 hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java create mode 100644 processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructType.java index 6417f37f0cd..97cc4f03183 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructType.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructType.java @@ -19,7 +19,7 @@ import java.util.List; -class StructType extends DataType { +public class StructType extends DataType { private List fields; diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java new file mode 100644 index 00000000000..6fd50f02ee6 --- /dev/null +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.api; + +import java.io.IOException; + +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatus; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.processing.util.CarbonLoaderUtil; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; + +/** + * Outputcommitter which manages the segments during loading. + */ +public class CarbonOutputCommitter extends FileOutputCommitter { + + public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException { + super(outputPath, context); + } + + /** + * Update the tablestatus with inprogress while setup the job. + * + * @param context + * @throws IOException + */ + @Override public void setupJob(JobContext context) throws IOException { + super.setupJob(context); + boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration()); + CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); + try { + CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, overwriteSet); + } catch (InterruptedException e) { + throw new IOException(e); + } + CarbonTableOutputFormat.setLoadModel(context.getConfiguration(), loadModel); + } + + /** + * Update the tablestatus as success after job is success + * + * @param context + * @throws IOException + */ + @Override public void commitJob(JobContext context) throws IOException { + super.commitJob(context); + boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration()); + CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); + try { + LoadMetadataDetails newMetaEntry = + loadModel.getLoadMetadataDetails().get(loadModel.getLoadMetadataDetails().size() - 1); + CarbonLoaderUtil.populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS, + loadModel.getFactTimeStamp(), true); + CarbonUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), + loadModel.getCarbonDataLoadSchema().getCarbonTable()); + CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + /** + * Update the tablestatus as fail if any fail happens. + * + * @param context + * @param state + * @throws IOException + */ + @Override public void abortJob(JobContext context, JobStatus.State state) throws IOException { + super.abortJob(context, state); + CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); + try { + CarbonLoaderUtil.updateTableStatusForFailure(loadModel); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + +} diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java index 7c9b3edd3f9..3c1df16b0fc 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java @@ -18,22 +18,311 @@ package org.apache.carbondata.hadoop.api; import java.io.IOException; +import java.util.List; +import java.util.Random; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.util.Progressable; +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; +import org.apache.carbondata.core.metadata.datatype.StructField; +import org.apache.carbondata.core.metadata.datatype.StructType; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.processing.loading.DataLoadExecutor; +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; +import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper; +import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * Base class for all output format for CarbonData file. - * @param */ -public abstract class CarbonTableOutputFormat extends FileOutputFormat { +public class CarbonTableOutputFormat extends FileOutputFormat { + + private static final String LOAD_MODEL = "mapreduce.carbonoutputformat.load.mmodel"; + private static final String DATABASE_NAME = "mapreduce.carbonoutputformat.databaseName"; + private static final String TABLE_NAME = "mapreduce.carbonoutputformat.tableName"; + private static final String TABLE = "mapreduce.carbonoutputformat.table"; + private static final String TABLE_PATH = "mapreduce.carbonoutputformat.tablepath"; + private static final String INPUT_SCHEMA = "mapreduce.carbonoutputformat.inputschema"; + private static final String TEMP_STORE_LOCATIONS = "carbon.load.tempstore.locations"; + private static final String OVERWRITE_SET = "carbon.load.set.overwrite"; + public static final String COMPLEX_DELIMITERS = "mapreduce.carbonoutputformat.complex_delimiters"; + public static final String SERIALIZATION_NULL_FORMAT = + "mapreduce.carbonoutputformat.serialization.null.format"; + public static final String BAD_RECORDS_LOGGER_ENABLE = + "mapreduce.carbonoutputformat.bad.records.logger.enable"; + public static final String BAD_RECORDS_LOGGER_ACTION = + "mapreduce.carbonoutputformat.bad.records.logger.action"; + public static final String IS_EMPTY_DATA_BAD_RECORD = + "mapreduce.carbonoutputformat.empty.data.bad.record"; + public static final String SKIP_EMPTY_LINE = "mapreduce.carbonoutputformat.skip.empty.line"; + public static final String SORT_SCOPE = "mapreduce.carbonoutputformat.load.sort.scope"; + public static final String BATCH_SORT_SIZE_INMB = + "mapreduce.carbonoutputformat.batch.sort.size.inmb"; + public static final String GLOBAL_SORT_PARTITIONS = + "mapreduce.carbonoutputformat.global.sort.partitions"; + public static final String BAD_RECORD_PATH = "mapreduce.carbonoutputformat.bad.record.path"; + public static final String DATE_FORMAT = "mapreduce.carbonoutputformat.date.format"; + public static final String TIMESTAMP_FORMAT = "mapreduce.carbonoutputformat.timestamp.format"; + public static final String IS_ONE_PASS_LOAD = "mapreduce.carbonoutputformat.one.pass.load"; + public static final String DICTIONARY_SERVER_HOST = + "mapreduce.carbonoutputformat.dict.server.host"; + public static final String DICTIONARY_SERVER_PORT = + "mapreduce.carbonoutputformat.dict.server.port"; + + private CarbonOutputCommitter committer; + + public static void setDatabaseName(Configuration configuration, String databaseName) { + if (null != databaseName) { + configuration.set(DATABASE_NAME, databaseName); + } + } + + public static String getDatabaseName(Configuration configuration) { + return configuration.get(DATABASE_NAME); + } + + public static void setTableName(Configuration configuration, String tableName) { + if (null != tableName) { + configuration.set(TABLE_NAME, tableName); + } + } + + public static String getTableName(Configuration configuration) { + return configuration.get(TABLE_NAME); + } + + public static void setTablePath(Configuration configuration, String tablePath) { + if (null != tablePath) { + configuration.set(TABLE_PATH, tablePath); + } + } + + public static String getTablePath(Configuration configuration) { + return configuration.get(TABLE_PATH); + } - @Override - public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, - Progressable progress) throws IOException { + public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable) + throws IOException { + if (carbonTable != null) { + configuration.set(TABLE, + ObjectSerializationUtil.convertObjectToString(carbonTable.getTableInfo().serialize())); + } + } + + public static CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException { + CarbonTable carbonTable = null; + String encodedString = configuration.get(TABLE); + if (encodedString != null) { + byte[] bytes = (byte[]) ObjectSerializationUtil.convertStringToObject(encodedString); + TableInfo tableInfo = TableInfo.deserialize(bytes); + carbonTable = CarbonTable.buildFromTableInfo(tableInfo); + } + return carbonTable; + } + + public static void setLoadModel(Configuration configuration, CarbonLoadModel loadModel) + throws IOException { + if (loadModel != null) { + configuration.set(LOAD_MODEL, ObjectSerializationUtil.convertObjectToString(loadModel)); + } + } + + public static void setInputSchema(Configuration configuration, StructType inputSchema) + throws IOException { + if (inputSchema != null && inputSchema.getFields().size() > 0) { + configuration.set(INPUT_SCHEMA, ObjectSerializationUtil.convertObjectToString(inputSchema)); + } else { + throw new UnsupportedOperationException("Input schema must be set"); + } + } + + private static StructType getInputSchema(Configuration configuration) throws IOException { + String encodedString = configuration.get(INPUT_SCHEMA); + if (encodedString != null) { + return (StructType) ObjectSerializationUtil.convertStringToObject(encodedString); + } return null; } + + public static boolean isOverwriteSet(Configuration configuration) { + String overwrite = configuration.get(OVERWRITE_SET); + if (overwrite != null) { + return Boolean.parseBoolean(overwrite); + } + return false; + } + + public static void setOverwrite(Configuration configuration, boolean overwrite) { + configuration.set(OVERWRITE_SET, String.valueOf(overwrite)); + } + + public static void setTempStoreLocations(Configuration configuration, String[] tempLocations) + throws IOException { + if (tempLocations != null && tempLocations.length > 0) { + configuration + .set(TEMP_STORE_LOCATIONS, ObjectSerializationUtil.convertObjectToString(tempLocations)); + } + } + + private static String[] getTempStoreLocations(TaskAttemptContext taskAttemptContext) + throws IOException { + String encodedString = taskAttemptContext.getConfiguration().get(TEMP_STORE_LOCATIONS); + if (encodedString != null) { + return (String[]) ObjectSerializationUtil.convertStringToObject(encodedString); + } + return new String[] { + System.getProperty("java.io.tmpdir") + "/" + System.nanoTime() + "_" + taskAttemptContext + .getTaskAttemptID().toString() }; + } + + @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException { + if (this.committer == null) { + Path output = getOutputPath(context); + this.committer = new CarbonOutputCommitter(output, context); + } + + return this.committer; + } + + @Override public RecordWriter getRecordWriter( + TaskAttemptContext taskAttemptContext) throws IOException { + final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration()); + loadModel.setTaskNo(new Random().nextInt(Integer.MAX_VALUE) + ""); + final String[] tempStoreLocations = getTempStoreLocations(taskAttemptContext); + final CarbonOutputIteratorWrapper iteratorWrapper = new CarbonOutputIteratorWrapper(); + final DataLoadExecutor dataLoadExecutor = new DataLoadExecutor(); + CarbonRecordWriter recordWriter = new CarbonRecordWriter(iteratorWrapper, dataLoadExecutor); + new Thread() { + @Override public void run() { + try { + dataLoadExecutor + .execute(loadModel, tempStoreLocations, new CarbonIterator[] { iteratorWrapper }); + } catch (Exception e) { + dataLoadExecutor.close(); + throw new RuntimeException(e); + } + } + }.start(); + + return recordWriter; + } + + public static CarbonLoadModel getLoadModel(Configuration conf) throws IOException { + CarbonLoadModel model; + String encodedString = conf.get(LOAD_MODEL); + if (encodedString != null) { + model = (CarbonLoadModel) ObjectSerializationUtil.convertStringToObject(encodedString); + return model; + } + model = new CarbonLoadModel(); + CarbonProperties carbonProperty = CarbonProperties.getInstance(); + model.setDatabaseName(CarbonTableOutputFormat.getDatabaseName(conf)); + model.setTableName(CarbonTableOutputFormat.getTableName(conf)); + model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(getOrCreateCarbonTable(conf))); + model.setTablePath(getTablePath(conf)); + + setFileHeader(conf, model); + model.setSerializationNullFormat(conf.get(SERIALIZATION_NULL_FORMAT, "\\N")); + model.setBadRecordsLoggerEnable(conf.get(BAD_RECORDS_LOGGER_ENABLE, carbonProperty + .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE, + CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT))); + model.setBadRecordsAction(conf.get(BAD_RECORDS_LOGGER_ACTION, carbonProperty + .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, + CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))); + + model.setIsEmptyDataBadRecord(conf.get(IS_EMPTY_DATA_BAD_RECORD, carbonProperty + .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, + CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT))); + + model.setSkipEmptyLine(conf.get(SKIP_EMPTY_LINE, + carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SKIP_EMPTY_LINE))); + + String complexDelim = conf.get(COMPLEX_DELIMITERS, "\\$" + "," + "\\:"); + String[] split = complexDelim.split(","); + model.setComplexDelimiterLevel1(split[0]); + if (split.length > 1) { + model.setComplexDelimiterLevel1(split[1]); + } + model.setDateFormat(conf.get(DATE_FORMAT, carbonProperty + .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, + CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT))); + + model.setTimestampformat(conf.get(TIMESTAMP_FORMAT, carbonProperty + .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT, + CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT_DEFAULT))); + + model.setGlobalSortPartitions(conf.get(GLOBAL_SORT_PARTITIONS, carbonProperty + .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, null))); + + model.setBatchSortSizeInMb(conf.get(BATCH_SORT_SIZE_INMB, carbonProperty + .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB, carbonProperty + .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, + CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)))); + + model.setBadRecordsLocation(conf.get(BAD_RECORD_PATH, carbonProperty + .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, carbonProperty + .getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, + CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)))); + + model.setUseOnePass(conf.getBoolean(IS_ONE_PASS_LOAD, Boolean.parseBoolean(carbonProperty + .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS, + CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)))); + return model; + } + + private static void setFileHeader(Configuration configuration, CarbonLoadModel model) + throws IOException { + StructType inputSchema = getInputSchema(configuration); + if (inputSchema == null || inputSchema.getFields().size() == 0) { + throw new UnsupportedOperationException("Input schema must be set"); + } + List fields = inputSchema.getFields(); + StringBuilder builder = new StringBuilder(); + String[] columns = new String[fields.size()]; + int i = 0; + for (StructField field : fields) { + builder.append(field.getFieldName()); + builder.append(","); + columns[i++] = field.getFieldName(); + } + String header = builder.toString(); + model.setCsvHeader(header.substring(0, header.length() - 1)); + model.setCsvHeaderColumns(columns); + } + + private static class CarbonRecordWriter extends RecordWriter { + + private CarbonOutputIteratorWrapper iteratorWrapper; + + private DataLoadExecutor dataLoadExecutor; + + public CarbonRecordWriter(CarbonOutputIteratorWrapper iteratorWrapper, + DataLoadExecutor dataLoadExecutor) { + this.iteratorWrapper = iteratorWrapper; + this.dataLoadExecutor = dataLoadExecutor; + } + + @Override public void write(NullWritable aVoid, StringArrayWritable strings) + throws InterruptedException { + iteratorWrapper.write(strings.get()); + } + + @Override public void close(TaskAttemptContext taskAttemptContext) { + iteratorWrapper.close(); + dataLoadExecutor.close(); + } + } } diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java new file mode 100644 index 00000000000..0dca16ca69a --- /dev/null +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.hadoop.ft; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hadoop.test.util.StoreCreator; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.Test; + +public class CarbonOutputMapperTest extends TestCase { + + CarbonLoadModel carbonLoadModel; + + // changed setUp to static init block to avoid un wanted multiple time store creation + static { + CarbonProperties.getInstance(). + addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords"); + } + + + @Test public void testOutputFormat() throws Exception { + runJob(""); + String segmentPath = CarbonTablePath.getSegmentPath(carbonLoadModel.getTablePath(), "0"); + File file = new File(segmentPath); + assert (file.exists()); + File[] listFiles = file.listFiles(new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return name.endsWith(".carbondata") || name.endsWith(".carbonindex"); + } + }); + + assert (listFiles.length == 2); + + } + + + @Override public void tearDown() throws Exception { + super.tearDown(); + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true"); + } + + @Override public void setUp() throws Exception { + super.setUp(); + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false"); + carbonLoadModel = StoreCreator.getCarbonLoadModel(); + } + + public static class Map extends Mapper { + + @Override protected void map(NullWritable key, StringArrayWritable value, Context context) + throws IOException, InterruptedException { + context.write(key, value); + } + } + + private void runJob(String outPath) + throws Exception { + + Configuration configuration = new Configuration(); + configuration.set("mapreduce.cluster.local.dir", new File(outPath + "1").getCanonicalPath()); + Job job = Job.getInstance(configuration); + job.setJarByClass(CarbonOutputMapperTest.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(StringArrayWritable.class); + job.setMapperClass(Map.class); + job.setNumReduceTasks(0); + + FileInputFormat.addInputPath(job, new Path(carbonLoadModel.getFactFilePath())); + CarbonTableOutputFormat.setLoadModel(job.getConfiguration(), carbonLoadModel); + CarbonTableOutputFormat.setCarbonTable(job.getConfiguration(), carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable()); + CSVInputFormat.setHeaderExtractionEnabled(job.getConfiguration(), true); + job.setInputFormatClass(CSVInputFormat.class); + job.setOutputFormatClass(CarbonTableOutputFormat.class); + CarbonUtil.deleteFoldersAndFiles(new File(carbonLoadModel.getTablePath() + "1")); + FileOutputFormat.setOutputPath(job, new Path(carbonLoadModel.getTablePath() + "1")); + job.getConfiguration().set("outpath", outPath); + job.getConfiguration().set("query.id", String.valueOf(System.nanoTime())); + job.waitForCompletion(true); + } + + public static void main(String[] args) throws Exception { + + CarbonOutputMapperTest carbonOutputMapperTest = new CarbonOutputMapperTest(); + carbonOutputMapperTest.setUp(); + carbonOutputMapperTest.runJob(""); + } +} diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java index 531bed5e238..d3fd0878b6d 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java @@ -164,17 +164,7 @@ public static CarbonLoadModel buildCarbonLoadModel(CarbonTable table, String fac */ public static void createCarbonStore() { try { - String factFilePath = - new File("../hadoop/src/test/resources/data.csv").getCanonicalPath(); - File storeDir = new File(storePath); - CarbonUtil.deleteFoldersAndFiles(storeDir); - CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, - storePath); - - CarbonTable table = createTable(absoluteTableIdentifier); - writeDictionary(factFilePath, table); - CarbonLoadModel loadModel = - buildCarbonLoadModel(table, factFilePath, absoluteTableIdentifier); + CarbonLoadModel loadModel = getCarbonLoadModel(); executeGraph(loadModel, storePath); @@ -183,6 +173,19 @@ public static void createCarbonStore() { } } + public static CarbonLoadModel getCarbonLoadModel() throws Exception { + String factFilePath = + new File("../hadoop/src/test/resources/data.csv").getCanonicalPath(); + File storeDir = new File(storePath); + CarbonUtil.deleteFoldersAndFiles(storeDir); + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, + storePath); + + CarbonTable table = createTable(absoluteTableIdentifier); + writeDictionary(factFilePath, table); + return buildCarbonLoadModel(table, factFilePath, absoluteTableIdentifier); + } + public static CarbonTable createTable( AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException { TableInfo tableInfo = new TableInfo(); @@ -198,6 +201,7 @@ public static CarbonTable createTable( id.setDataType(DataTypes.INT); id.setEncodingList(encodings); id.setColumnUniqueId(UUID.randomUUID().toString()); + id.setColumnReferenceId(id.getColumnUniqueId()); id.setDimensionColumn(true); id.setColumnGroup(1); columnSchemas.add(id); @@ -211,6 +215,7 @@ public static CarbonTable createTable( date.setDimensionColumn(true); date.setColumnGroup(2); date.setSortColumn(true); + date.setColumnReferenceId(id.getColumnUniqueId()); columnSchemas.add(date); ColumnSchema country = new ColumnSchema(); @@ -222,6 +227,7 @@ public static CarbonTable createTable( country.setDimensionColumn(true); country.setColumnGroup(3); country.setSortColumn(true); + country.setColumnReferenceId(id.getColumnUniqueId()); columnSchemas.add(country); ColumnSchema name = new ColumnSchema(); @@ -233,6 +239,7 @@ public static CarbonTable createTable( name.setDimensionColumn(true); name.setColumnGroup(4); name.setSortColumn(true); + name.setColumnReferenceId(id.getColumnUniqueId()); columnSchemas.add(name); ColumnSchema phonetype = new ColumnSchema(); @@ -244,6 +251,7 @@ public static CarbonTable createTable( phonetype.setDimensionColumn(true); phonetype.setColumnGroup(5); phonetype.setSortColumn(true); + phonetype.setColumnReferenceId(id.getColumnUniqueId()); columnSchemas.add(phonetype); ColumnSchema serialname = new ColumnSchema(); @@ -255,6 +263,7 @@ public static CarbonTable createTable( serialname.setDimensionColumn(true); serialname.setColumnGroup(6); serialname.setSortColumn(true); + serialname.setColumnReferenceId(id.getColumnUniqueId()); columnSchemas.add(serialname); ColumnSchema salary = new ColumnSchema(); @@ -264,6 +273,7 @@ public static CarbonTable createTable( salary.setEncodingList(new ArrayList()); salary.setColumnUniqueId(UUID.randomUUID().toString()); salary.setDimensionColumn(false); + salary.setColumnReferenceId(id.getColumnUniqueId()); salary.setColumnGroup(7); columnSchemas.add(salary); diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index 943c0a5e0c0..ab475322064 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -514,51 +514,6 @@ object CommonUtil { parsedPropertyValueString } - def readAndUpdateLoadProgressInTableMeta(model: CarbonLoadModel, - insertOverwrite: Boolean): Unit = { - val newLoadMetaEntry = new LoadMetadataDetails - val status = if (insertOverwrite) { - SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS - } else { - SegmentStatus.INSERT_IN_PROGRESS - } - - // reading the start time of data load. - val loadStartTime = CarbonUpdateUtil.readCurrentTime - model.setFactTimeStamp(loadStartTime) - CarbonLoaderUtil.populateNewLoadMetaEntry( - newLoadMetaEntry, status, model.getFactTimeStamp, false) - val entryAdded: Boolean = - CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite) - if (!entryAdded) { - sys.error(s"Failed to add entry in table status for " + - s"${ model.getDatabaseName }.${model.getTableName}") - } - } - - /** - * This method will update the load failure entry in the table status file - * - * @param model - */ - def updateTableStatusForFailure( - model: CarbonLoadModel): Unit = { - // in case if failure the load status should be "Marked for delete" so that it will be taken - // care during clean up - val loadStatus = SegmentStatus.MARKED_FOR_DELETE - // always the last entry in the load metadata details will be the current load entry - val loadMetaEntry = model.getLoadMetadataDetails.get(model.getLoadMetadataDetails.size - 1) - CarbonLoaderUtil - .populateNewLoadMetaEntry(loadMetaEntry, loadStatus, model.getFactTimeStamp, true) - val updationStatus = CarbonLoaderUtil.recordNewLoadMetadata(loadMetaEntry, model, false, false) - if (!updationStatus) { - sys - .error(s"Failed to update failure entry in table status for ${ - model - .getDatabaseName - }.${ model.getTableName }") - } - } def readLoadMetadataDetails(model: CarbonLoadModel): Unit = { val metadataPath = model.getCarbonDataLoadSchema.getCarbonTable.getMetaDataFilepath diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 4933a45957f..d15514aa468 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -473,7 +473,7 @@ object CarbonDataRDDFactory { } if (loadStatus == SegmentStatus.LOAD_FAILURE) { // update the load entry in table status file for changing the status to marked for delete - CommonUtil.updateTableStatusForFailure(carbonLoadModel) + CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel) LOGGER.info("********starting clean up**********") CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt) LOGGER.info("********clean up done**********") @@ -488,7 +488,7 @@ object CarbonDataRDDFactory { status(0)._2._2.failureCauses == FailureCauses.BAD_RECORDS && carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) { // update the load entry in table status file for changing the status to marked for delete - CommonUtil.updateTableStatusForFailure(carbonLoadModel) + CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel) LOGGER.info("********starting clean up**********") CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt) LOGGER.info("********clean up done**********") @@ -527,7 +527,7 @@ object CarbonDataRDDFactory { newEntryLoadStatus, overwriteTable) if (!done) { - CommonUtil.updateTableStatusForFailure(carbonLoadModel) + CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel) LOGGER.info("********starting clean up**********") CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt) LOGGER.info("********clean up done**********") diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 0c6aedae78d..58671b72f1c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -46,6 +46,7 @@ import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.loading.exception.NoRetryException import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} +import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel} import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataLoadingUtil, GlobalDictionaryUtil} @@ -153,7 +154,7 @@ case class CarbonLoadDataCommand( GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata // add the start entry for the new load in the table status file if (updateModel.isEmpty) { - CommonUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, isOverwriteTable) + CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, isOverwriteTable) } if (isOverwriteTable) { LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress") @@ -197,12 +198,12 @@ case class CarbonLoadDataCommand( } catch { case CausedBy(ex: NoRetryException) => // update the load entry in table status file for changing the status to marked for delete - CommonUtil.updateTableStatusForFailure(carbonLoadModel) + CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel) LOGGER.error(ex, s"Dataload failure for $dbName.$tableName") throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}") case ex: Exception => // update the load entry in table status file for changing the status to marked for delete - CommonUtil.updateTableStatusForFailure(carbonLoadModel) + CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel) LOGGER.error(ex) LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs") throw ex diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java new file mode 100644 index 00000000000..ec945722919 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.loading.iterator; + +import java.util.concurrent.ArrayBlockingQueue; + +import org.apache.carbondata.common.CarbonIterator; + +public class CarbonOutputIteratorWrapper extends CarbonIterator { + + private boolean close = false; + + /** + * Number of rows kept in memory at most will be batchSize * queue size + */ + private int batchSize = 1000; + + private RowBatch loadBatch = new RowBatch(batchSize); + + private RowBatch readBatch; + + private ArrayBlockingQueue queue = new ArrayBlockingQueue<>(10); + + public void write(String[] row) throws InterruptedException { + + if (!loadBatch.addRow(row)) { + loadBatch.readyRead(); + queue.put(loadBatch); + loadBatch = new RowBatch(batchSize); + } + } + + @Override public boolean hasNext() { + return !queue.isEmpty() || !close || readBatch != null && readBatch.hasNext(); + } + + @Override public String[] next() { + if (readBatch == null || !readBatch.hasNext()) { + try { + readBatch = queue.take(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + return readBatch.next(); + } + + @Override public void close() { + if (loadBatch.isLoading()) { + try { + loadBatch.readyRead(); + queue.put(loadBatch); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + close = true; + } + + private static class RowBatch extends CarbonIterator { + + private int counter; + + private String[][] batch; + + private int size; + + private boolean isLoading = true; + + public RowBatch(int size) { + batch = new String[size][]; + this.size = size; + } + + public boolean addRow(String[] row) { + batch[counter++] = row; + if (counter > size) { + return false; + } + return true; + } + + public void readyRead() { + size = counter; + counter = 0; + isLoading = false; + } + + public boolean isLoading() { + return isLoading; + } + + @Override public boolean hasNext() { + return counter < size; + } + + @Override public String[] next() { + assert (counter < size); + return batch[counter++]; + } + } + +} diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index b0c690b3a26..7d68fcfa239 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -24,17 +24,7 @@ import java.net.UnknownHostException; import java.nio.charset.Charset; import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; @@ -373,6 +363,47 @@ public static String getEscapeChar(String escapeCharacter) { } + public static void readAndUpdateLoadProgressInTableMeta(CarbonLoadModel model, + boolean insertOverwrite) throws IOException, InterruptedException { + LoadMetadataDetails newLoadMetaEntry = new LoadMetadataDetails(); + SegmentStatus status = SegmentStatus.INSERT_IN_PROGRESS; + if (insertOverwrite) { + status = SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS; + } + + // reading the start time of data load. + long loadStartTime = CarbonUpdateUtil.readCurrentTime(); + model.setFactTimeStamp(loadStartTime); + CarbonLoaderUtil + .populateNewLoadMetaEntry(newLoadMetaEntry, status, model.getFactTimeStamp(), false); + boolean entryAdded = + CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite); + if (!entryAdded) { + throw new IOException("Failed to add entry in table status for " + model.getTableName()); + } + } + + /** + * This method will update the load failure entry in the table status file + */ + public static void updateTableStatusForFailure(CarbonLoadModel model) + throws IOException, InterruptedException { + // in case if failure the load status should be "Marked for delete" so that it will be taken + // care during clean up + SegmentStatus loadStatus = SegmentStatus.MARKED_FOR_DELETE; + // always the last entry in the load metadata details will be the current load entry + LoadMetadataDetails loadMetaEntry = + model.getLoadMetadataDetails().get(model.getLoadMetadataDetails().size() - 1); + CarbonLoaderUtil + .populateNewLoadMetaEntry(loadMetaEntry, loadStatus, model.getFactTimeStamp(), true); + boolean updationStatus = + CarbonLoaderUtil.recordNewLoadMetadata(loadMetaEntry, model, false, false); + if (!updationStatus) { + throw new IOException( + "Failed to update failure entry in table status for " + model.getTableName()); + } + } + public static Dictionary getDictionary(DictionaryColumnUniqueIdentifier columnIdentifier) throws IOException { Cache dictCache = diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala index 8c4d5bacd7c..37aaea5689e 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala @@ -315,7 +315,7 @@ object StreamHandoffRDD { } if (loadStatus == SegmentStatus.LOAD_FAILURE) { - CommonUtil.updateTableStatusForFailure(carbonLoadModel) + CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel) LOGGER.info("********starting clean up**********") CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt) LOGGER.info("********clean up done**********") From 5bdf3b451b43d334929c132a1b997103c0afc239 Mon Sep 17 00:00:00 2001 From: ravipesala Date: Sat, 16 Dec 2017 10:06:46 +0530 Subject: [PATCH 2/2] Fixed comments --- .../carbondata/core/util/CarbonUtil.java | 15 -- .../hadoop/api/CarbonOutputCommitter.java | 39 ++-- .../hadoop/api/CarbonTableOutputFormat.java | 182 +++++++++++------- .../hadoop/ft/CarbonOutputMapperTest.java | 10 +- .../spark/rdd/CarbonDataRDDFactory.scala | 2 +- .../iterator/CarbonOutputIteratorWrapper.java | 32 +-- .../loading/model/CarbonLoadModel.java | 9 + .../merger/CarbonDataMergerUtil.java | 4 +- .../processing/util/CarbonLoaderUtil.java | 27 ++- 9 files changed, 180 insertions(+), 140 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 148098d8e0e..910efea228b 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -2156,21 +2156,6 @@ public static String getNewTablePath( return parentPath.toString() + CarbonCommonConstants.FILE_SEPARATOR + newTableName; } - /* - * This method will add data size and index size into tablestatus for each segment - */ - public static void addDataIndexSizeIntoMetaEntry(LoadMetadataDetails loadMetadataDetails, - String segmentId, CarbonTable carbonTable) throws IOException { - CarbonTablePath carbonTablePath = - CarbonStorePath.getCarbonTablePath((carbonTable.getAbsoluteTableIdentifier())); - Map dataIndexSize = - CarbonUtil.getDataSizeAndIndexSize(carbonTablePath, segmentId); - loadMetadataDetails - .setDataSize(dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE).toString()); - loadMetadataDetails - .setIndexSize(dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE).toString()); - } - /** * This method will calculate the data size and index size for carbon table */ diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java index 6fd50f02ee6..9bcb2be6b7d 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java @@ -19,9 +19,10 @@ import java.io.IOException; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; -import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.carbondata.processing.util.CarbonLoaderUtil; @@ -32,10 +33,14 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; /** - * Outputcommitter which manages the segments during loading. + * Outputcommitter which manages the segments during loading.It commits segment information to the + * tablestatus file upon success or fail. */ public class CarbonOutputCommitter extends FileOutputCommitter { + private static final LogService LOGGER = + LogServiceFactory.getLogService(CarbonOutputCommitter.class.getName()); + public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException { super(outputPath, context); } @@ -50,11 +55,7 @@ public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws super.setupJob(context); boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration()); CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); - try { - CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, overwriteSet); - } catch (InterruptedException e) { - throw new IOException(e); - } + CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, overwriteSet); CarbonTableOutputFormat.setLoadModel(context.getConfiguration(), loadModel); } @@ -68,17 +69,12 @@ public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws super.commitJob(context); boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration()); CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); - try { - LoadMetadataDetails newMetaEntry = - loadModel.getLoadMetadataDetails().get(loadModel.getLoadMetadataDetails().size() - 1); - CarbonLoaderUtil.populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS, - loadModel.getFactTimeStamp(), true); - CarbonUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), - loadModel.getCarbonDataLoadSchema().getCarbonTable()); - CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet); - } catch (InterruptedException e) { - throw new IOException(e); - } + LoadMetadataDetails newMetaEntry = loadModel.getCurrentLoadMetadataDetail(); + CarbonLoaderUtil.populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS, + loadModel.getFactTimeStamp(), true); + CarbonLoaderUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), + loadModel.getCarbonDataLoadSchema().getCarbonTable()); + CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet); } /** @@ -91,11 +87,8 @@ public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws @Override public void abortJob(JobContext context, JobStatus.State state) throws IOException { super.abortJob(context, state); CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); - try { - CarbonLoaderUtil.updateTableStatusForFailure(loadModel); - } catch (InterruptedException e) { - throw new IOException(e); - } + CarbonLoaderUtil.updateTableStatusForFailure(loadModel); + LOGGER.error("Loading failed with job status : " + state); } } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java index 3c1df16b0fc..95045023583 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java @@ -19,7 +19,6 @@ import java.io.IOException; import java.util.List; -import java.util.Random; import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.core.constants.CarbonCommonConstants; @@ -45,41 +44,44 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** - * Base class for all output format for CarbonData file. + * This is table level output format which writes the data to store in new segment. Each load + * creates new segment folder and manages the folder through tablestatus file. + * It also generate and writes dictionary data during load only if dictionary server is configured. */ +// TODO Move dictionary generater which is coded in spark to MR framework. public class CarbonTableOutputFormat extends FileOutputFormat { - private static final String LOAD_MODEL = "mapreduce.carbonoutputformat.load.mmodel"; - private static final String DATABASE_NAME = "mapreduce.carbonoutputformat.databaseName"; - private static final String TABLE_NAME = "mapreduce.carbonoutputformat.tableName"; - private static final String TABLE = "mapreduce.carbonoutputformat.table"; - private static final String TABLE_PATH = "mapreduce.carbonoutputformat.tablepath"; - private static final String INPUT_SCHEMA = "mapreduce.carbonoutputformat.inputschema"; - private static final String TEMP_STORE_LOCATIONS = "carbon.load.tempstore.locations"; - private static final String OVERWRITE_SET = "carbon.load.set.overwrite"; - public static final String COMPLEX_DELIMITERS = "mapreduce.carbonoutputformat.complex_delimiters"; + private static final String LOAD_MODEL = "mapreduce.carbontable.load.model"; + private static final String DATABASE_NAME = "mapreduce.carbontable.databaseName"; + private static final String TABLE_NAME = "mapreduce.carbontable.tableName"; + private static final String TABLE = "mapreduce.carbontable.table"; + private static final String TABLE_PATH = "mapreduce.carbontable.tablepath"; + private static final String INPUT_SCHEMA = "mapreduce.carbontable.inputschema"; + private static final String TEMP_STORE_LOCATIONS = "mapreduce.carbontable.tempstore.locations"; + private static final String OVERWRITE_SET = "mapreduce.carbontable.set.overwrite"; + public static final String COMPLEX_DELIMITERS = "mapreduce.carbontable.complex_delimiters"; public static final String SERIALIZATION_NULL_FORMAT = - "mapreduce.carbonoutputformat.serialization.null.format"; + "mapreduce.carbontable.serialization.null.format"; public static final String BAD_RECORDS_LOGGER_ENABLE = - "mapreduce.carbonoutputformat.bad.records.logger.enable"; + "mapreduce.carbontable.bad.records.logger.enable"; public static final String BAD_RECORDS_LOGGER_ACTION = - "mapreduce.carbonoutputformat.bad.records.logger.action"; + "mapreduce.carbontable.bad.records.logger.action"; public static final String IS_EMPTY_DATA_BAD_RECORD = - "mapreduce.carbonoutputformat.empty.data.bad.record"; - public static final String SKIP_EMPTY_LINE = "mapreduce.carbonoutputformat.skip.empty.line"; - public static final String SORT_SCOPE = "mapreduce.carbonoutputformat.load.sort.scope"; + "mapreduce.carbontable.empty.data.bad.record"; + public static final String SKIP_EMPTY_LINE = "mapreduce.carbontable.skip.empty.line"; + public static final String SORT_SCOPE = "mapreduce.carbontable.load.sort.scope"; public static final String BATCH_SORT_SIZE_INMB = - "mapreduce.carbonoutputformat.batch.sort.size.inmb"; + "mapreduce.carbontable.batch.sort.size.inmb"; public static final String GLOBAL_SORT_PARTITIONS = - "mapreduce.carbonoutputformat.global.sort.partitions"; - public static final String BAD_RECORD_PATH = "mapreduce.carbonoutputformat.bad.record.path"; - public static final String DATE_FORMAT = "mapreduce.carbonoutputformat.date.format"; - public static final String TIMESTAMP_FORMAT = "mapreduce.carbonoutputformat.timestamp.format"; - public static final String IS_ONE_PASS_LOAD = "mapreduce.carbonoutputformat.one.pass.load"; + "mapreduce.carbontable.global.sort.partitions"; + public static final String BAD_RECORD_PATH = "mapreduce.carbontable.bad.record.path"; + public static final String DATE_FORMAT = "mapreduce.carbontable.date.format"; + public static final String TIMESTAMP_FORMAT = "mapreduce.carbontable.timestamp.format"; + public static final String IS_ONE_PASS_LOAD = "mapreduce.carbontable.one.pass.load"; public static final String DICTIONARY_SERVER_HOST = - "mapreduce.carbonoutputformat.dict.server.host"; + "mapreduce.carbontable.dict.server.host"; public static final String DICTIONARY_SERVER_PORT = - "mapreduce.carbonoutputformat.dict.server.port"; + "mapreduce.carbontable.dict.server.port"; private CarbonOutputCommitter committer; @@ -121,7 +123,7 @@ public static void setCarbonTable(Configuration configuration, CarbonTable carbo } } - public static CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException { + public static CarbonTable getCarbonTable(Configuration configuration) throws IOException { CarbonTable carbonTable = null; String encodedString = configuration.get(TABLE); if (encodedString != null) { @@ -187,20 +189,21 @@ private static String[] getTempStoreLocations(TaskAttemptContext taskAttemptCont .getTaskAttemptID().toString() }; } - @Override public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) + @Override + public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException { if (this.committer == null) { Path output = getOutputPath(context); this.committer = new CarbonOutputCommitter(output, context); } - return this.committer; } - @Override public RecordWriter getRecordWriter( + @Override + public RecordWriter getRecordWriter( TaskAttemptContext taskAttemptContext) throws IOException { final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration()); - loadModel.setTaskNo(new Random().nextInt(Integer.MAX_VALUE) + ""); + loadModel.setTaskNo(taskAttemptContext.getTaskAttemptID().getTaskID().getId() + ""); final String[] tempStoreLocations = getTempStoreLocations(taskAttemptContext); final CarbonOutputIteratorWrapper iteratorWrapper = new CarbonOutputIteratorWrapper(); final DataLoadExecutor dataLoadExecutor = new DataLoadExecutor(); @@ -208,8 +211,10 @@ private static String[] getTempStoreLocations(TaskAttemptContext taskAttemptCont new Thread() { @Override public void run() { try { - dataLoadExecutor - .execute(loadModel, tempStoreLocations, new CarbonIterator[] { iteratorWrapper }); + dataLoadExecutor.execute( + loadModel, + tempStoreLocations, + new CarbonIterator[] { iteratorWrapper }); } catch (Exception e) { dataLoadExecutor.close(); throw new RuntimeException(e); @@ -231,24 +236,35 @@ public static CarbonLoadModel getLoadModel(Configuration conf) throws IOExceptio CarbonProperties carbonProperty = CarbonProperties.getInstance(); model.setDatabaseName(CarbonTableOutputFormat.getDatabaseName(conf)); model.setTableName(CarbonTableOutputFormat.getTableName(conf)); - model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(getOrCreateCarbonTable(conf))); + model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(getCarbonTable(conf))); model.setTablePath(getTablePath(conf)); setFileHeader(conf, model); model.setSerializationNullFormat(conf.get(SERIALIZATION_NULL_FORMAT, "\\N")); - model.setBadRecordsLoggerEnable(conf.get(BAD_RECORDS_LOGGER_ENABLE, carbonProperty - .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE, - CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT))); - model.setBadRecordsAction(conf.get(BAD_RECORDS_LOGGER_ACTION, carbonProperty - .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, - CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))); - - model.setIsEmptyDataBadRecord(conf.get(IS_EMPTY_DATA_BAD_RECORD, carbonProperty - .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, - CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT))); - - model.setSkipEmptyLine(conf.get(SKIP_EMPTY_LINE, - carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SKIP_EMPTY_LINE))); + model.setBadRecordsLoggerEnable( + conf.get( + BAD_RECORDS_LOGGER_ENABLE, + carbonProperty.getProperty( + CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE, + CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT))); + model.setBadRecordsAction( + conf.get( + BAD_RECORDS_LOGGER_ACTION, + carbonProperty.getProperty( + CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, + CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))); + + model.setIsEmptyDataBadRecord( + conf.get( + IS_EMPTY_DATA_BAD_RECORD, + carbonProperty.getProperty( + CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, + CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT))); + + model.setSkipEmptyLine( + conf.get( + SKIP_EMPTY_LINE, + carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SKIP_EMPTY_LINE))); String complexDelim = conf.get(COMPLEX_DELIMITERS, "\\$" + "," + "\\:"); String[] split = complexDelim.split(","); @@ -256,30 +272,50 @@ public static CarbonLoadModel getLoadModel(Configuration conf) throws IOExceptio if (split.length > 1) { model.setComplexDelimiterLevel1(split[1]); } - model.setDateFormat(conf.get(DATE_FORMAT, carbonProperty - .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, - CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT))); - - model.setTimestampformat(conf.get(TIMESTAMP_FORMAT, carbonProperty - .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT, - CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT_DEFAULT))); - - model.setGlobalSortPartitions(conf.get(GLOBAL_SORT_PARTITIONS, carbonProperty - .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, null))); - - model.setBatchSortSizeInMb(conf.get(BATCH_SORT_SIZE_INMB, carbonProperty - .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB, carbonProperty - .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, - CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)))); - - model.setBadRecordsLocation(conf.get(BAD_RECORD_PATH, carbonProperty - .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, carbonProperty - .getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, - CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)))); - - model.setUseOnePass(conf.getBoolean(IS_ONE_PASS_LOAD, Boolean.parseBoolean(carbonProperty - .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS, - CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)))); + model.setDateFormat( + conf.get( + DATE_FORMAT, + carbonProperty.getProperty( + CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, + CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT))); + + model.setTimestampformat( + conf.get( + TIMESTAMP_FORMAT, + carbonProperty.getProperty( + CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT, + CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT_DEFAULT))); + + model.setGlobalSortPartitions( + conf.get( + GLOBAL_SORT_PARTITIONS, + carbonProperty.getProperty( + CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, + null))); + + model.setBatchSortSizeInMb( + conf.get( + BATCH_SORT_SIZE_INMB, + carbonProperty.getProperty( + CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB, + carbonProperty.getProperty( + CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, + CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)))); + + model.setBadRecordsLocation( + conf.get(BAD_RECORD_PATH, + carbonProperty.getProperty( + CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, + carbonProperty.getProperty( + CarbonCommonConstants.CARBON_BADRECORDS_LOC, + CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)))); + + model.setUseOnePass( + conf.getBoolean(IS_ONE_PASS_LOAD, + Boolean.parseBoolean( + carbonProperty.getProperty( + CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS, + CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)))); return model; } @@ -315,12 +351,14 @@ public CarbonRecordWriter(CarbonOutputIteratorWrapper iteratorWrapper, this.dataLoadExecutor = dataLoadExecutor; } - @Override public void write(NullWritable aVoid, StringArrayWritable strings) + @Override + public void write(NullWritable aVoid, StringArrayWritable strings) throws InterruptedException { iteratorWrapper.write(strings.get()); } - @Override public void close(TaskAttemptContext taskAttemptContext) { + @Override + public void close(TaskAttemptContext taskAttemptContext) { iteratorWrapper.close(); dataLoadExecutor.close(); } diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java index 0dca16ca69a..006ffd28572 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java @@ -88,9 +88,7 @@ public static class Map extends Mapper { private boolean close = false; @@ -37,7 +41,6 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator { private ArrayBlockingQueue queue = new ArrayBlockingQueue<>(10); public void write(String[] row) throws InterruptedException { - if (!loadBatch.addRow(row)) { loadBatch.readyRead(); queue.put(loadBatch); @@ -45,11 +48,13 @@ public void write(String[] row) throws InterruptedException { } } - @Override public boolean hasNext() { + @Override + public boolean hasNext() { return !queue.isEmpty() || !close || readBatch != null && readBatch.hasNext(); } - @Override public String[] next() { + @Override + public String[] next() { if (readBatch == null || !readBatch.hasNext()) { try { readBatch = queue.take(); @@ -60,7 +65,8 @@ public void write(String[] row) throws InterruptedException { return readBatch.next(); } - @Override public void close() { + @Override + public void close() { if (loadBatch.isLoading()) { try { loadBatch.readyRead(); @@ -82,17 +88,19 @@ private static class RowBatch extends CarbonIterator { private boolean isLoading = true; - public RowBatch(int size) { + private RowBatch(int size) { batch = new String[size][]; this.size = size; } + /** + * Add row to the batch, it can hold rows till the batch size. + * @param row + * @return false if the row cannot be added as batch is full. + */ public boolean addRow(String[] row) { batch[counter++] = row; - if (counter > size) { - return false; - } - return true; + return counter < size; } public void readyRead() { @@ -105,11 +113,13 @@ public boolean isLoading() { return isLoading; } - @Override public boolean hasNext() { + @Override + public boolean hasNext() { return counter < size; } - @Override public String[] next() { + @Override + public String[] next() { assert (counter < size); return batch[counter++]; } diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java index c85d50f2143..7b952e3c7af 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java @@ -524,6 +524,15 @@ public List getLoadMetadataDetails() { return loadMetadataDetails; } + /** + * Get the current load metadata. + * + * @return + */ + public LoadMetadataDetails getCurrentLoadMetadataDetail() { + return loadMetadataDetails.get(loadMetadataDetails.size() - 1); + } + /** * setLoadMetadataDetails. * diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index d6f2d9a2fcd..3729b1d4b72 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -50,11 +50,11 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.processing.util.CarbonLoaderUtil; /** * utility class for load merging. @@ -327,7 +327,7 @@ public static boolean updateLoadMetadataWithMergeStatus(List updateLoadMetadataFromOldToNew( return newListMetadata; } + /* + * This method will add data size and index size into tablestatus for each segment + */ + public static void addDataIndexSizeIntoMetaEntry(LoadMetadataDetails loadMetadataDetails, + String segmentId, CarbonTable carbonTable) throws IOException { + CarbonTablePath carbonTablePath = + CarbonStorePath.getCarbonTablePath((carbonTable.getAbsoluteTableIdentifier())); + Map dataIndexSize = + CarbonUtil.getDataSizeAndIndexSize(carbonTablePath, segmentId); + loadMetadataDetails + .setDataSize(dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE).toString()); + loadMetadataDetails + .setIndexSize(dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE).toString()); + } }