Skip to content

Commit

Permalink
Merge 5f7d2b9 into 8f0724e
Browse files Browse the repository at this point in the history
  • Loading branch information
kunal642 committed Aug 9, 2019
2 parents 8f0724e + 5f7d2b9 commit 4e55e86
Show file tree
Hide file tree
Showing 31 changed files with 311 additions and 416 deletions.
Expand Up @@ -18,10 +18,15 @@
package org.apache.carbondata.core.datamap;

import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;

import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
import org.apache.carbondata.core.scan.expression.ColumnExpression;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;

Expand All @@ -39,9 +44,51 @@ public class DataMapFilter implements Serializable {
public DataMapFilter(CarbonTable table, Expression expression) {
this.table = table;
this.expression = expression;
if (expression != null) {
checkIfFilterColumnExistsInTable();
}
resolve();
}

private Set<String> extractColumnExpressions(Expression expression) {
Set<String> columnExpressionList = new HashSet<>();
for (Expression expressions: expression.getChildren()) {
if (expressions != null && expressions.getChildren() != null
&& expressions.getChildren().size() > 0) {
columnExpressionList.addAll(extractColumnExpressions(expressions));
} else if (expressions instanceof ColumnExpression) {
columnExpressionList.add(((ColumnExpression) expressions).getColumnName());
}
}
return columnExpressionList;
}

private void checkIfFilterColumnExistsInTable() {
Set<String> columnExpressionList = extractColumnExpressions(expression);
for (String colExpression : columnExpressionList) {
if (colExpression.equalsIgnoreCase("positionid")) {
continue;
}
boolean exists = false;
for (CarbonMeasure carbonMeasure : table.getAllMeasures()) {
if (!carbonMeasure.isInvisible() && carbonMeasure.getColName()
.equalsIgnoreCase(colExpression)) {
exists = true;
}
}
for (CarbonDimension carbonDimension : table.getAllDimensions()) {
if (!carbonDimension.isInvisible() && carbonDimension.getColName()
.equalsIgnoreCase(colExpression)) {
exists = true;
}
}
if (!exists) {
throw new RuntimeException(
"Column " + colExpression + " not found in table " + table.getTableUniqueName());
}
}
}

public DataMapFilter(FilterResolverIntf resolver) {
this.resolver = resolver;
}
Expand Down
Expand Up @@ -699,8 +699,13 @@ public static class TableSegmentRefresher {
SegmentUpdateDetails[] updateStatusDetails = statusManager.getUpdateStatusDetails();
for (SegmentUpdateDetails updateDetails : updateStatusDetails) {
UpdateVO updateVO = statusManager.getInvalidTimestampRange(updateDetails.getSegmentName());
segmentRefreshTime.put(updateVO.getSegmentId(),
new SegmentRefreshInfo(updateVO.getCreatedOrUpdatedTimeStamp(), 0));
SegmentRefreshInfo segmentRefreshInfo;
if (updateVO != null && updateVO.getLatestUpdateTimestamp() != null) {
segmentRefreshInfo = new SegmentRefreshInfo(updateVO.getCreatedOrUpdatedTimeStamp(), 0);
} else {
segmentRefreshInfo = new SegmentRefreshInfo(0L, 0);
}
segmentRefreshTime.put(updateVO.getSegmentId(), segmentRefreshInfo);
}
}

Expand All @@ -709,7 +714,7 @@ public boolean isRefreshNeeded(Segment seg, UpdateVO updateVo) throws IOExceptio
seg.getSegmentRefreshInfo(updateVo);
String segmentId = seg.getSegmentNo();
if (segmentRefreshTime.get(segmentId) == null
&& segmentRefreshInfo.getSegmentUpdatedTimestamp() != null) {
&& segmentRefreshInfo.getSegmentUpdatedTimestamp() != 0) {
segmentRefreshTime.put(segmentId, segmentRefreshInfo);
return true;
}
Expand Down
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.carbondata.core.metadata;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -180,4 +181,12 @@ private CarbonDimension getCarbonChildDimsBasedOnColIdentifier(String columnIden
}
return null;
}

public List<CarbonTable> getAllTables() {
return new ArrayList<>(tableInfoMap.values());
}

public void clearAll() {
tableInfoMap.clear();
}
}
Expand Up @@ -28,6 +28,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
Expand Down Expand Up @@ -286,7 +287,8 @@ public static CarbonTable buildFromTableInfo(TableInfo tableInfo) {
* @return
*/
public static String buildUniqueName(String databaseName, String tableName) {
return databaseName + CarbonCommonConstants.UNDERSCORE + tableName;
return (databaseName + CarbonCommonConstants.UNDERSCORE + tableName).toLowerCase(
Locale.getDefault());
}

/**
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import org.apache.carbondata.core.constants.CarbonCommonConstants;
Expand Down Expand Up @@ -142,6 +143,9 @@ public String getTableName() {
* @param tableName the tableName to set
*/
public void setTableName(String tableName) {
if (tableName != null) {
tableName = tableName.toLowerCase(Locale.getDefault());
}
this.tableName = tableName;
}

Expand Down
Expand Up @@ -774,32 +774,6 @@ public UpdateVO getInvalidTimestampRange(String segmentId) {
return range;
}

/**
* Returns the invalid timestamp range of a segment.
* @return
*/
public List<UpdateVO> getInvalidTimestampRange() {
List<UpdateVO> ranges = new ArrayList<UpdateVO>();
for (LoadMetadataDetails segment : segmentDetails) {
if ((SegmentStatus.LOAD_FAILURE == segment.getSegmentStatus()
|| SegmentStatus.COMPACTED == segment.getSegmentStatus()
|| SegmentStatus.MARKED_FOR_DELETE == segment.getSegmentStatus())) {
UpdateVO range = new UpdateVO();
range.setSegmentId(segment.getLoadName());
range.setFactTimestamp(segment.getLoadStartTime());
if (!segment.getUpdateDeltaStartTimestamp().isEmpty() &&
!segment.getUpdateDeltaEndTimestamp().isEmpty()) {
range.setUpdateDeltaStartTimestamp(
CarbonUpdateUtil.getTimeStampAsLong(segment.getUpdateDeltaStartTimestamp()));
range.setLatestUpdateTimestamp(
CarbonUpdateUtil.getTimeStampAsLong(segment.getUpdateDeltaEndTimestamp()));
}
ranges.add(range);
}
}
return ranges;
}

/**
*
* @param block
Expand Down
Expand Up @@ -2239,7 +2239,6 @@ public static org.apache.carbondata.format.TableInfo inferSchema(String carbonDa
org.apache.carbondata.format.TableInfo tableInfo =
new org.apache.carbondata.format.TableInfo(thriftFactTable,
new ArrayList<org.apache.carbondata.format.TableSchema>());

tableInfo.setDataMapSchemas(null);
return tableInfo;
}
Expand Down
Expand Up @@ -47,6 +47,7 @@ public class CarbonMetadataTest {

@BeforeClass public static void setUp() {
carbonMetadata = CarbonMetadata.getInstance();
carbonMetadata.clearAll();
carbonMetadata.loadTableMetadata(getTableInfo(10000));
tableUniqueName = CarbonTable.buildUniqueName("carbonTestDatabase", "carbonTestTable");
}
Expand Down Expand Up @@ -77,13 +78,13 @@ public class CarbonMetadataTest {
@Test public void testGetCarbonTableReturingProperTableWithProperDimensionCount() {
int expectedResult = 1;
assertEquals(expectedResult,
carbonMetadata.getCarbonTable(tableUniqueName).getNumberOfDimensions("carbonTestTable"));
carbonMetadata.getCarbonTable(tableUniqueName).getNumberOfDimensions("carbontesttable"));
}

@Test public void testGetCarbonTableReturingProperTableWithProperMeasureCount() {
int expectedResult = 1;
assertEquals(expectedResult,
carbonMetadata.getCarbonTable(tableUniqueName).getNumberOfMeasures("carbonTestTable"));
carbonMetadata.getCarbonTable(tableUniqueName).getNumberOfMeasures("carbontesttable"));
}

@Test public void testGetCarbonTableReturingProperTableWithProperDatabaseName() {
Expand All @@ -92,7 +93,7 @@ public class CarbonMetadataTest {
}

@Test public void testGetCarbonTableReturingProperTableWithProperFactTableName() {
String expectedResult = "carbonTestTable";
String expectedResult = "carbontesttable";
assertEquals(expectedResult, carbonMetadata.getCarbonTable(tableUniqueName).getTableName());
}

Expand Down
Expand Up @@ -1704,7 +1704,7 @@ public org.apache.carbondata.format.ColumnSchema setDefault_value(byte[] default

@Test public void testFromExternalToWrapperTableSchema() {
String tableId = "1";
String tableName = "tableName";
String tableName = "tablename";
TableSchema actualResult =
thriftWrapperSchemaConverter.fromExternalToWrapperTableSchema(tabSchema, "tableName");
assertEquals(tableId, actualResult.getTableId());
Expand All @@ -1729,7 +1729,7 @@ public org.apache.carbondata.format.ColumnSchema setDefault_value(byte[] default
TableInfo actualResult = thriftWrapperSchemaConverter
.fromExternalToWrapperTableInfo(externalTableInfo, "dbName", "tableName", "/path");
assertEquals(time, actualResult.getLastUpdatedTime());
assertEquals("dbName_tableName", actualResult.getTableUniqueName());
assertEquals("dbname_tablename", actualResult.getTableUniqueName());
}

}
Expand Up @@ -44,19 +44,19 @@ public class CarbonTableTest extends TestCase {
}

@Test public void testNumberOfDimensionReturnsProperCount() {
assertEquals(1, carbonTable.getNumberOfDimensions("carbonTestTable"));
assertEquals(1, carbonTable.getNumberOfDimensions("carbontesttable"));
}

@Test public void testNumberOfMeasureReturnsProperCount() {
assertEquals(1, carbonTable.getNumberOfMeasures("carbonTestTable"));
assertEquals(1, carbonTable.getNumberOfMeasures("carbontesttable"));
}

@Test public void testGetDatabaseNameResturnsDatabaseName() {
assertEquals("carbonTestDatabase", carbonTable.getDatabaseName());
}

@Test public void testFactTableNameReturnsProperFactTableName() {
assertEquals("carbonTestTable", carbonTable.getTableName());
assertEquals("carbontesttable", carbonTable.getTableName());
}

@Test public void testTableUniqueNameIsProper() {
Expand All @@ -65,7 +65,7 @@ public class CarbonTableTest extends TestCase {

@Test public void testDimensionPresentInTableIsProper() {
CarbonDimension dimension = new CarbonDimension(getColumnarDimensionColumn(), 0, -1, -1);
assertTrue(carbonTable.getDimensionByName("carbonTestTable", "IMEI").equals(dimension));
assertTrue(carbonTable.getDimensionByName("carbontesttable", "IMEI").equals(dimension));
}

static ColumnSchema getColumnarDimensionColumn() {
Expand Down
Expand Up @@ -43,19 +43,19 @@ public class CarbonTableWithComplexTypesTest extends TestCase {
}

@Test public void testNumberOfDimensionReturnsProperCount() {
assertEquals(2, carbonTable.getNumberOfDimensions("carbonTestTable"));
assertEquals(2, carbonTable.getNumberOfDimensions("carbontesttable"));
}

@Test public void testNumberOfMeasureReturnsProperCount() {
assertEquals(1, carbonTable.getNumberOfMeasures("carbonTestTable"));
assertEquals(1, carbonTable.getNumberOfMeasures("carbontesttable"));
}

@Test public void testGetDatabaseNameResturnsDatabaseName() {
assertEquals("carbonTestDatabase", carbonTable.getDatabaseName());
}

@Test public void testFactTableNameReturnsProperFactTableName() {
assertEquals("carbonTestTable", carbonTable.getTableName());
assertEquals("carbontesttable", carbonTable.getTableName());
}

@Test public void testTableUniqueNameIsProper() {
Expand Down
Expand Up @@ -261,31 +261,6 @@ class DBLocationCarbonTableTestCase extends QueryTest with BeforeAndAfterEach {
sql("drop table carbontable")
}

test("test mdt file path with configured paths") {
sql(s"create database carbon location '$dblocation'")
sql("use carbon")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER, "/tmp/carbondata1/carbondata2/")
val (timestampFile, timestampFileType) = getMdtFileAndType()
FileFactory.deleteFile(timestampFile, timestampFileType)
sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
sql("drop table carbontable")
// perform file check
assert(FileFactory.isFileExist(timestampFile, true) ||
CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetaStore.isReadFromHiveMetaStore)

CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER,
CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER_DEFAULT)
val (timestampFile2, timestampFileType2) = getMdtFileAndType()
FileFactory.deleteFile(timestampFile2, timestampFileType2)
sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
sql("drop table carbontable")
// perform file check
assert(FileFactory.isFileExist(timestampFile, true) ||
CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetaStore.isReadFromHiveMetaStore)
}

override def afterEach {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER,
Expand Down
Expand Up @@ -71,20 +71,20 @@ object CarbonSessionUtil {
case _ =>
}
isRelationRefreshed =
CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession)
CarbonEnv.isRefreshRequired(catalogTable.identifier)(sparkSession)
case _ =>
}
}

rtnRelation match {
case SubqueryAlias(_,
MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, catalogTable)) =>
isRelationRefreshed = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
isRelationRefreshed = CarbonEnv.isRefreshRequired(name)(sparkSession)
if (catalogTable.isInstanceOf[Option[CatalogTable]]) {
catalogTable.asInstanceOf[Option[CatalogTable]].foreach(setStatsNone)
}
case MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, catalogTable) =>
isRelationRefreshed = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
isRelationRefreshed = CarbonEnv.isRefreshRequired(name)(sparkSession)
if (catalogTable.isInstanceOf[Option[CatalogTable]]) {
catalogTable.asInstanceOf[Option[CatalogTable]].foreach(setStatsNone)
}
Expand Down
Expand Up @@ -106,18 +106,18 @@ object IndexServer extends ServerInterface {
sparkSession.sparkContext
.setLocalProperty("spark.job.description", request.getTaskGroupDesc)
}
if (!request.getInvalidSegments.isEmpty) {
DistributedRDDUtils
.invalidateSegmentMapping(request.getCarbonTable.getTableUniqueName,
request.getInvalidSegments.asScala)
}
val splits = new DistributedPruneRDD(sparkSession, request).collect()
if (!request.isFallbackJob) {
DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
}
if (request.isJobToClearDataMaps) {
DistributedRDDUtils.invalidateTableMapping(request.getCarbonTable.getTableUniqueName)
}
if (!request.getInvalidSegments.isEmpty) {
DistributedRDDUtils
.invalidateSegmentMapping(request.getCarbonTable.getTableUniqueName,
request.getInvalidSegments.asScala)
}
new ExtendedBlockletWrapperContainer(splits.map(_._2), request.isFallbackJob)
}
}
Expand Down

0 comments on commit 4e55e86

Please sign in to comment.