Skip to content

Commit

Permalink
updated rename command so that table directory is not renamed
Browse files Browse the repository at this point in the history
  • Loading branch information
kunal642 committed Jul 6, 2018
1 parent aeb2ec4 commit ee37643
Show file tree
Hide file tree
Showing 38 changed files with 234 additions and 333 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.apache.carbondata.core.scan.model.ProjectionDimension;
import org.apache.carbondata.core.scan.model.ProjectionMeasure;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;

import org.apache.commons.lang3.ArrayUtils;

Expand Down Expand Up @@ -378,8 +377,8 @@ private static List<DictionaryColumnUniqueIdentifier> getDictionaryColumnUniqueI
ColumnIdentifier columnIdentifier;
if (null != dimension.getColumnSchema().getParentColumnTableRelations() && !dimension
.getColumnSchema().getParentColumnTableRelations().isEmpty()) {
dictionarySourceAbsoluteTableIdentifier = getTableIdentifierForColumn(dimension,
carbonTable.getAbsoluteTableIdentifier());
dictionarySourceAbsoluteTableIdentifier =
getTableIdentifierForColumn(dimension);
columnIdentifier = new ColumnIdentifier(
dimension.getColumnSchema().getParentColumnTableRelations().get(0).getColumnId(),
dimension.getColumnProperties(), dimension.getDataType());
Expand All @@ -397,18 +396,22 @@ private static List<DictionaryColumnUniqueIdentifier> getDictionaryColumnUniqueI
return dictionaryColumnUniqueIdentifiers;
}

public static AbsoluteTableIdentifier getTableIdentifierForColumn(CarbonDimension carbonDimension,
AbsoluteTableIdentifier identifier) {
public static AbsoluteTableIdentifier getTableIdentifierForColumn(
CarbonDimension carbonDimension) {
RelationIdentifier parentRelationIdentifier =
carbonDimension.getColumnSchema().getParentColumnTableRelations().get(0)
.getRelationIdentifier();
String parentTablePath = CarbonMetadata.getInstance()
.getCarbonTable(parentRelationIdentifier.getDatabaseName(),
parentRelationIdentifier.getTableName()).getTablePath();
RelationIdentifier relation = carbonDimension.getColumnSchema()
.getParentColumnTableRelations()
.get(0)
.getRelationIdentifier();
String parentTableName = relation.getTableName();
String parentDatabaseName = relation.getDatabaseName();
String parentTableId = relation.getTableId();
String newTablePath =
CarbonTablePath.getNewTablePath(identifier.getTablePath(), parentTableName);
return AbsoluteTableIdentifier.from(newTablePath, parentDatabaseName, parentTableName,
return AbsoluteTableIdentifier.from(parentTablePath, parentDatabaseName, parentTableName,
parentTableId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1237,7 +1237,7 @@ public static Dictionary getForwardDictionaryCache(
if (null != carbonDimension.getColumnSchema().getParentColumnTableRelations()
&& carbonDimension.getColumnSchema().getParentColumnTableRelations().size() == 1) {
dictionarySourceAbsoluteTableIdentifier = QueryUtil
.getTableIdentifierForColumn(carbonDimension, carbonTable.getAbsoluteTableIdentifier());
.getTableIdentifierForColumn(carbonDimension);
columnIdentifier = new ColumnIdentifier(
carbonDimension.getColumnSchema().getParentColumnTableRelations().get(0).getColumnId(),
carbonDimension.getColumnProperties(), carbonDimension.getDataType());
Expand Down
12 changes: 0 additions & 12 deletions core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.block.AbstractIndex;
Expand Down Expand Up @@ -356,17 +355,6 @@ public static void deleteFoldersAndFiles(final CarbonFile... file)
});
}

public static String getBadLogPath(String storeLocation) {
String badLogStoreLocation = CarbonProperties.getInstance()
.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH);
if (null == badLogStoreLocation) {
badLogStoreLocation =
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
}
badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
return badLogStoreLocation;
}

public static void deleteFoldersAndFilesSilent(final CarbonFile... file)
throws IOException, InterruptedException {
UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.apache.carbondata.core.locks.LockUsage;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;

import org.apache.hadoop.fs.Path;

/**
* Helps to get Table content paths.
*/
Expand Down Expand Up @@ -385,21 +383,6 @@ public static String getStreamingCheckpointDir(String tablePath) {
return tablePath + File.separator + STREAMING_DIR + File.separator + STREAMING_CHECKPOINT_DIR;
}

/**
* get the parent folder of old table path and returns the new tablePath by appending new
* tableName to the parent
*
* @param tablePath Old tablePath
* @param newTableName new table name
* @return the new table path
*/
public static String getNewTablePath(
String tablePath,
String newTableName) {
Path parentPath = new Path(tablePath).getParent();
return parentPath.toString() + CarbonCommonConstants.FILE_SEPARATOR + newTableName;
}

/**
* Return store path for datamap based on the taskNo,if three tasks get launched during loading,
* then three folders will be created based on the shard name and lucene index file will be
Expand Down Expand Up @@ -772,4 +755,15 @@ public static String getTableStatusHistoryFilePath(String tablePath) {
return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
+ TABLE_STATUS_HISTORY_FILE;
}

public static String generateBadRecordsPath(String badLogStoreLocation, String segmentId,
String taskNo, boolean isTransactionalTable) {
if (!isTransactionalTable) {
return badLogStoreLocation + File.separator + "SdkWriterBadRecords"
+ CarbonCommonConstants.FILE_SEPARATOR + taskNo;
} else {
return badLogStoreLocation + File.separator + segmentId + CarbonCommonConstants.FILE_SEPARATOR
+ taskNo;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,16 +206,6 @@ public void testToDeleteFolderWithInterruptedException()
assertTrue(!testDir.exists());
}

@Test public void testToGetBadLogPath() throws InterruptedException {
new MockUp<CarbonProperties>() {
@SuppressWarnings("unused") @Mock public String getProperty(String key) {
return "../unibi-solutions/system/carbon/badRecords";
}
};
String badLogStoreLocation = CarbonUtil.getBadLogPath("badLogPath");
assertEquals(badLogStoreLocation.replace("\\", "/"), "../unibi-solutions/system/carbon/badRecords/badLogPath");
}

@Test public void testToDeleteFoldersAndFilesForCarbonFileSilently()
throws IOException, InterruptedException {
LocalCarbonFile testDir = new LocalCarbonFile("../core/src/test/resources/testDir");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ public void initIndexColumnConverters(CarbonTable carbonTable, List<CarbonColumn
CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
dataField.setTimestampFormat(tsFormat);
FieldConverter fieldConverter =
FieldEncoderFactory.getInstance().createFieldEncoder(dataField, absoluteTableIdentifier,
i, nullFormat, null, false, localCaches[i], false);
FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
.createFieldEncoder(dataField, absoluteTableIdentifier, i, nullFormat, null, false,
localCaches[i], false, carbonTable.getTablePath());
this.name2Converters.put(indexedColumn.get(i).getColName(), fieldConverter);
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -279,9 +280,9 @@ public static CarbonLoadModel getLoadModel(Configuration conf) throws IOExceptio
model.setDatabaseName(CarbonTableOutputFormat.getDatabaseName(conf));
model.setTableName(CarbonTableOutputFormat.getTableName(conf));
model.setCarbonTransactionalTable(true);
model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(getCarbonTable(conf)));
CarbonTable carbonTable = getCarbonTable(conf);
model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable));
model.setTablePath(getTablePath(conf));

setFileHeader(conf, model);
model.setSerializationNullFormat(conf.get(SERIALIZATION_NULL_FORMAT, "\\N"));
model.setBadRecordsLoggerEnable(
Expand Down Expand Up @@ -345,14 +346,18 @@ public static CarbonLoadModel getLoadModel(Configuration conf) throws IOExceptio
CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))));

model.setBadRecordsLocation(
conf.get(BAD_RECORD_PATH,
carbonProperty.getProperty(
CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
carbonProperty.getProperty(
CarbonCommonConstants.CARBON_BADRECORDS_LOC,
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))));

String badRecordsPath = conf.get(BAD_RECORD_PATH);
if (StringUtils.isEmpty(badRecordsPath)) {
badRecordsPath =
carbonTable.getTableInfo().getFactTable().getTableProperties().get("bad_records_path");
if (StringUtils.isEmpty(badRecordsPath)) {
badRecordsPath = carbonProperty
.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, carbonProperty
.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL));
}
}
model.setBadRecordsLocation(badRecordsPath);
model.setUseOnePass(
conf.getBoolean(IS_ONE_PASS_LOAD,
Boolean.parseBoolean(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ class TestTimeSeriesUnsupportedSuite extends QueryTest with BeforeAndAfterAll wi
sql("alter table maintable_agg1_minute rename to maintable_agg1_minute_new")
}
assert(e.getMessage.contains(
"Rename operation for pre-aggregate table is not supported."))
"Rename operation for datamaps is not supported."))

// check datamap after alter
checkExistence(sql("SHOW DATAMAP ON TABLE mainTable"), true, "maintable_agg1_minute")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,6 @@ class BadRecordActionTest extends QueryTest {
Seq(Row(2)))
}

test("test bad record REDIRECT but not having location should throw exception") {
sql("drop table if exists sales")
sql(
"""CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String,
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
val exMessage = intercept[Exception] {
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
"('bad_records_action'='REDIRECT', 'DELIMITER'=" +
" ',', 'QUOTECHAR'= '\"', 'BAD_RECORD_PATH'='','timestampformat'='yyyy/MM/dd')")
}
assert(exMessage.getMessage.contains("Invalid bad records location."))
}

test("test bad record REDIRECT but not having empty location in option should throw exception") {
sql("drop table if exists sales")
sql(
Expand All @@ -153,7 +140,8 @@ class BadRecordActionTest extends QueryTest {
"('bad_records_action'='REDIRECT', 'DELIMITER'=" +
" ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
}
assert(exMessage.getMessage.contains("Invalid bad records location."))
assert(exMessage.getMessage
.contains("Cannot redirect bad records as bad record location is not provided."))
}
finally {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ class StandardPartitionBadRecordLoggerTest extends QueryTest with BeforeAndAfter

test("test partition redirect") {
sql(
"""CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp,
actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (country String) STORED BY 'carbondata'""")
s"""CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp,
actual_price Double, Quantity int, sold_price Decimal(19,2)) partitioned by (country String) STORED BY 'carbondata' TBLPROPERTIES('BAD_RECORDS_PATH'='$warehouse')""")

val csvFilePath = s"$resourcesPath/badrecords/datasample.csv"
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util

import scala.collection.JavaConverters._

import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.SparkSession
Expand All @@ -40,6 +41,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
import org.apache.carbondata.processing.util.CarbonBadRecordUtil
import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
import org.apache.carbondata.streaming.segment.StreamSegment
Expand Down Expand Up @@ -255,6 +257,8 @@ object StreamSinkFactory {
optionsFinal.put("fileheader", carbonTable.getCreateOrderColumn(carbonTable.getTableName)
.asScala.map(_.getColName).mkString(","))
}
optionsFinal
.put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(parameters.asJava, carbonTable))
val carbonLoadModel = new CarbonLoadModel()
new CarbonLoadModelBuilder(carbonTable).build(
parameters.asJava,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import org.apache.spark.sql.util.CarbonException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.exception.InvalidConfigurationException
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, DecimalType}
Expand All @@ -39,7 +41,7 @@ import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationId
import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema, ParentColumnTableRelation}
import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager}
import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.CompactionType
Expand Down Expand Up @@ -703,6 +705,11 @@ class TableNewProcessor(cm: TableModel) {
}
// Add table comment to table properties
tablePropertiesMap.put("comment", cm.tableComment.getOrElse(""))
val badRecordsPath = getBadRecordsPath(tablePropertiesMap,
cm.tableName,
tableSchema.getTableId,
cm.databaseNameOp.getOrElse("default"))
tablePropertiesMap.put("bad_records_path", badRecordsPath)
tableSchema.setTableProperties(tablePropertiesMap)
if (cm.bucketFields.isDefined) {
val bucketCols = cm.bucketFields.get.bucketColumns.map { b =>
Expand Down Expand Up @@ -748,6 +755,20 @@ class TableNewProcessor(cm: TableModel) {
tableInfo
}

private def getBadRecordsPath(tablePropertiesMap: util.HashMap[String, String],
tableName: String,
tableId: String,
databaseName: String): String = {
val badRecordsPath = tablePropertiesMap.asScala
.getOrElse("bad_records_path", CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)
if (badRecordsPath == null || badRecordsPath.isEmpty) {
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL
} else {
badRecordsPath + CarbonCommonConstants.FILE_SEPARATOR + databaseName +
CarbonCommonConstants.FILE_SEPARATOR + s"${tableName}_$tableId"
}
}

/**
* Method to check to get the encoder from parent or not
* @param field column field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.util.SparkTypeConverter
import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.ColumnIdentifier
import org.apache.carbondata.core.metadata.{CarbonMetadata, ColumnIdentifier}
import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
Expand Down Expand Up @@ -275,9 +275,13 @@ case class CarbonDictionaryDecoder(
if (null != carbonDimension.getColumnSchema.getParentColumnTableRelations &&
!carbonDimension
.getColumnSchema.getParentColumnTableRelations.isEmpty) {
val parentRelationIdentifier = carbonDimension.getColumnSchema
.getParentColumnTableRelations.get(0).getRelationIdentifier
val parentTablePath = CarbonMetadata.getInstance()
.getCarbonTable(parentRelationIdentifier.getDatabaseName,
parentRelationIdentifier.getTableName).getTablePath
(QueryUtil
.getTableIdentifierForColumn(carbonDimension,
atiMap(tableName).getAbsoluteTableIdentifier),
.getTableIdentifierForColumn(carbonDimension),
new ColumnIdentifier(carbonDimension.getColumnSchema
.getParentColumnTableRelations.get(0).getColumnId,
carbonDimension.getColumnProperties,
Expand Down
Loading

0 comments on commit ee37643

Please sign in to comment.