Skip to content

Commit

Permalink
Merge 0e69ef7 into bbeb974
Browse files Browse the repository at this point in the history
  • Loading branch information
kunal642 committed Aug 9, 2019
2 parents bbeb974 + 0e69ef7 commit a2330e4
Show file tree
Hide file tree
Showing 29 changed files with 295 additions and 386 deletions.
Expand Up @@ -18,10 +18,15 @@
package org.apache.carbondata.core.datamap;

import java.io.Serializable;
import java.util.HashSet;
import java.util.Set;

import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
import org.apache.carbondata.core.scan.expression.ColumnExpression;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;

Expand All @@ -39,9 +44,51 @@ public class DataMapFilter implements Serializable {
public DataMapFilter(CarbonTable table, Expression expression) {
this.table = table;
this.expression = expression;
if (expression != null) {
checkIfFilterColumnExistsInTable();
}
resolve();
}

private Set<String> extractColumnExpressions(Expression expression) {
Set<String> columnExpressionList = new HashSet<>();
for (Expression expressions: expression.getChildren()) {
if (expressions != null && expressions.getChildren() != null
&& expressions.getChildren().size() > 0) {
columnExpressionList.addAll(extractColumnExpressions(expressions));
} else if (expressions instanceof ColumnExpression) {
columnExpressionList.add(((ColumnExpression) expressions).getColumnName());
}
}
return columnExpressionList;
}

private void checkIfFilterColumnExistsInTable() {
Set<String> columnExpressionList = extractColumnExpressions(expression);
for (String colExpression : columnExpressionList) {
if (colExpression.equalsIgnoreCase("positionid")) {
continue;
}
boolean exists = false;
for (CarbonMeasure carbonMeasure : table.getAllMeasures()) {
if (!carbonMeasure.isInvisible() && carbonMeasure.getColName()
.equalsIgnoreCase(colExpression)) {
exists = true;
}
}
for (CarbonDimension carbonDimension : table.getAllDimensions()) {
if (!carbonDimension.isInvisible() && carbonDimension.getColName()
.equalsIgnoreCase(colExpression)) {
exists = true;
}
}
if (!exists) {
throw new RuntimeException(
"Column " + colExpression + " not found in table " + table.getTableUniqueName());
}
}
}

public DataMapFilter(FilterResolverIntf resolver) {
this.resolver = resolver;
}
Expand Down
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.carbondata.core.metadata;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -180,4 +181,12 @@ private CarbonDimension getCarbonChildDimsBasedOnColIdentifier(String columnIden
}
return null;
}

public List<CarbonTable> getAllTables() {
return new ArrayList<>(tableInfoMap.values());
}

public void clearAll() {
tableInfoMap.clear();
}
}
Expand Up @@ -28,6 +28,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
Expand Down Expand Up @@ -286,7 +287,8 @@ public static CarbonTable buildFromTableInfo(TableInfo tableInfo) {
* @return
*/
public static String buildUniqueName(String databaseName, String tableName) {
return databaseName + CarbonCommonConstants.UNDERSCORE + tableName;
return (databaseName + CarbonCommonConstants.UNDERSCORE + tableName).toLowerCase(
Locale.getDefault());
}

/**
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import org.apache.carbondata.core.constants.CarbonCommonConstants;
Expand Down Expand Up @@ -142,6 +143,9 @@ public String getTableName() {
* @param tableName the tableName to set
*/
public void setTableName(String tableName) {
if (tableName != null) {
tableName = tableName.toLowerCase(Locale.getDefault());
}
this.tableName = tableName;
}

Expand Down
Expand Up @@ -2239,7 +2239,6 @@ public static org.apache.carbondata.format.TableInfo inferSchema(String carbonDa
org.apache.carbondata.format.TableInfo tableInfo =
new org.apache.carbondata.format.TableInfo(thriftFactTable,
new ArrayList<org.apache.carbondata.format.TableSchema>());

tableInfo.setDataMapSchemas(null);
return tableInfo;
}
Expand Down
Expand Up @@ -47,6 +47,7 @@ public class CarbonMetadataTest {

@BeforeClass public static void setUp() {
carbonMetadata = CarbonMetadata.getInstance();
carbonMetadata.clearAll();
carbonMetadata.loadTableMetadata(getTableInfo(10000));
tableUniqueName = CarbonTable.buildUniqueName("carbonTestDatabase", "carbonTestTable");
}
Expand Down Expand Up @@ -77,13 +78,13 @@ public class CarbonMetadataTest {
@Test public void testGetCarbonTableReturingProperTableWithProperDimensionCount() {
int expectedResult = 1;
assertEquals(expectedResult,
carbonMetadata.getCarbonTable(tableUniqueName).getNumberOfDimensions("carbonTestTable"));
carbonMetadata.getCarbonTable(tableUniqueName).getNumberOfDimensions("carbontesttable"));
}

@Test public void testGetCarbonTableReturingProperTableWithProperMeasureCount() {
int expectedResult = 1;
assertEquals(expectedResult,
carbonMetadata.getCarbonTable(tableUniqueName).getNumberOfMeasures("carbonTestTable"));
carbonMetadata.getCarbonTable(tableUniqueName).getNumberOfMeasures("carbontesttable"));
}

@Test public void testGetCarbonTableReturingProperTableWithProperDatabaseName() {
Expand All @@ -92,7 +93,7 @@ public class CarbonMetadataTest {
}

@Test public void testGetCarbonTableReturingProperTableWithProperFactTableName() {
String expectedResult = "carbonTestTable";
String expectedResult = "carbontesttable";
assertEquals(expectedResult, carbonMetadata.getCarbonTable(tableUniqueName).getTableName());
}

Expand Down
Expand Up @@ -1704,7 +1704,7 @@ public org.apache.carbondata.format.ColumnSchema setDefault_value(byte[] default

@Test public void testFromExternalToWrapperTableSchema() {
String tableId = "1";
String tableName = "tableName";
String tableName = "tablename";
TableSchema actualResult =
thriftWrapperSchemaConverter.fromExternalToWrapperTableSchema(tabSchema, "tableName");
assertEquals(tableId, actualResult.getTableId());
Expand All @@ -1729,7 +1729,7 @@ public org.apache.carbondata.format.ColumnSchema setDefault_value(byte[] default
TableInfo actualResult = thriftWrapperSchemaConverter
.fromExternalToWrapperTableInfo(externalTableInfo, "dbName", "tableName", "/path");
assertEquals(time, actualResult.getLastUpdatedTime());
assertEquals("dbName_tableName", actualResult.getTableUniqueName());
assertEquals("dbname_tablename", actualResult.getTableUniqueName());
}

}
Expand Up @@ -44,19 +44,19 @@ public class CarbonTableTest extends TestCase {
}

@Test public void testNumberOfDimensionReturnsProperCount() {
assertEquals(1, carbonTable.getNumberOfDimensions("carbonTestTable"));
assertEquals(1, carbonTable.getNumberOfDimensions("carbontesttable"));
}

@Test public void testNumberOfMeasureReturnsProperCount() {
assertEquals(1, carbonTable.getNumberOfMeasures("carbonTestTable"));
assertEquals(1, carbonTable.getNumberOfMeasures("carbontesttable"));
}

@Test public void testGetDatabaseNameResturnsDatabaseName() {
assertEquals("carbonTestDatabase", carbonTable.getDatabaseName());
}

@Test public void testFactTableNameReturnsProperFactTableName() {
assertEquals("carbonTestTable", carbonTable.getTableName());
assertEquals("carbontesttable", carbonTable.getTableName());
}

@Test public void testTableUniqueNameIsProper() {
Expand All @@ -65,7 +65,7 @@ public class CarbonTableTest extends TestCase {

@Test public void testDimensionPresentInTableIsProper() {
CarbonDimension dimension = new CarbonDimension(getColumnarDimensionColumn(), 0, -1, -1);
assertTrue(carbonTable.getDimensionByName("carbonTestTable", "IMEI").equals(dimension));
assertTrue(carbonTable.getDimensionByName("carbontesttable", "IMEI").equals(dimension));
}

static ColumnSchema getColumnarDimensionColumn() {
Expand Down
Expand Up @@ -43,19 +43,19 @@ public class CarbonTableWithComplexTypesTest extends TestCase {
}

@Test public void testNumberOfDimensionReturnsProperCount() {
assertEquals(2, carbonTable.getNumberOfDimensions("carbonTestTable"));
assertEquals(2, carbonTable.getNumberOfDimensions("carbontesttable"));
}

@Test public void testNumberOfMeasureReturnsProperCount() {
assertEquals(1, carbonTable.getNumberOfMeasures("carbonTestTable"));
assertEquals(1, carbonTable.getNumberOfMeasures("carbontesttable"));
}

@Test public void testGetDatabaseNameResturnsDatabaseName() {
assertEquals("carbonTestDatabase", carbonTable.getDatabaseName());
}

@Test public void testFactTableNameReturnsProperFactTableName() {
assertEquals("carbonTestTable", carbonTable.getTableName());
assertEquals("carbontesttable", carbonTable.getTableName());
}

@Test public void testTableUniqueNameIsProper() {
Expand Down
Expand Up @@ -130,8 +130,8 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty extends QueryTest with Be
"alter table metaCache set tblproperties('column_meta_cache'='c1,c2', 'CACHE_LEVEL'='BLOCKLET')")
wrapper = SegmentPropertiesAndSchemaHolder.getInstance()
.getSegmentPropertiesWrapper(segmentPropertyIndex)
// after alter operation cache should not be cleaned as value are unchanged
assert(null != wrapper)
// after alter operation cache should be cleaned as schema file is touched.
assert(null == wrapper)

// alter table to cache no column in column_meta_cache
segmentPropertyIndex = dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex
Expand Down
Expand Up @@ -261,31 +261,6 @@ class DBLocationCarbonTableTestCase extends QueryTest with BeforeAndAfterEach {
sql("drop table carbontable")
}

test("test mdt file path with configured paths") {
sql(s"create database carbon location '$dblocation'")
sql("use carbon")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER, "/tmp/carbondata1/carbondata2/")
val (timestampFile, timestampFileType) = getMdtFileAndType()
FileFactory.deleteFile(timestampFile, timestampFileType)
sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
sql("drop table carbontable")
// perform file check
assert(FileFactory.isFileExist(timestampFile, true) ||
CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetaStore.isReadFromHiveMetaStore)

CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER,
CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER_DEFAULT)
val (timestampFile2, timestampFileType2) = getMdtFileAndType()
FileFactory.deleteFile(timestampFile2, timestampFileType2)
sql("""create table carbon.carbontable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
sql("drop table carbontable")
// perform file check
assert(FileFactory.isFileExist(timestampFile, true) ||
CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetaStore.isReadFromHiveMetaStore)
}

override def afterEach {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER,
Expand Down
Expand Up @@ -71,20 +71,20 @@ object CarbonSessionUtil {
case _ =>
}
isRelationRefreshed =
CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession)
CarbonEnv.isRefreshRequired(catalogTable.identifier)(sparkSession)
case _ =>
}
}

rtnRelation match {
case SubqueryAlias(_,
MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, catalogTable)) =>
isRelationRefreshed = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
isRelationRefreshed = CarbonEnv.isRefreshRequired(name)(sparkSession)
if (catalogTable.isInstanceOf[Option[CatalogTable]]) {
catalogTable.asInstanceOf[Option[CatalogTable]].foreach(setStatsNone)
}
case MatchLogicalRelation(_: CarbonDatasourceHadoopRelation, _, catalogTable) =>
isRelationRefreshed = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
isRelationRefreshed = CarbonEnv.isRefreshRequired(name)(sparkSession)
if (catalogTable.isInstanceOf[Option[CatalogTable]]) {
catalogTable.asInstanceOf[Option[CatalogTable]].foreach(setStatsNone)
}
Expand Down
Expand Up @@ -33,7 +33,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util._
import org.apache.carbondata.datamap.{TextMatchMaxDocUDF, TextMatchUDF}
Expand Down Expand Up @@ -215,38 +215,35 @@ object CarbonEnv {
databaseNameOp: Option[String],
tableName: String)
(sparkSession: SparkSession): CarbonTable = {
refreshRelationFromCache(TableIdentifier(tableName, databaseNameOp))(sparkSession)
val databaseName = getDatabaseName(databaseNameOp)(sparkSession)
val catalog = getInstance(sparkSession).carbonMetaStore
// refresh cache
catalog.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, databaseNameOp))

// try to get it from catch, otherwise lookup in catalog
catalog.getTableFromMetadataCache(databaseName, tableName)
.getOrElse(
catalog
.lookupRelation(databaseNameOp, tableName)(sparkSession)
.asInstanceOf[CarbonRelation]
.carbonTable)
// if relation is not refreshed of the table does not exist in cache then
if (isRefreshRequired(TableIdentifier(tableName, databaseNameOp))(sparkSession)) {
catalog
.lookupRelation(databaseNameOp, tableName)(sparkSession)
.asInstanceOf[CarbonRelation]
.carbonTable
} else {
CarbonMetadata.getInstance().getCarbonTable(databaseNameOp.getOrElse(sparkSession
.catalog.currentDatabase), tableName)
}
}

def refreshRelationFromCache(identifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
var isRefreshed = false
/**
*
* @return true is the relation was changes and was removed from cache. false is there is no
* change in the relation.
*/
def isRefreshRequired(identifier: TableIdentifier)(sparkSession: SparkSession): Boolean = {
val carbonEnv = getInstance(sparkSession)
val table = carbonEnv.carbonMetaStore.getTableFromMetadataCache(
identifier.database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase),
identifier.table)
if (carbonEnv.carbonMetaStore
.checkSchemasModifiedTimeAndReloadTable(identifier) && table.isDefined) {
sparkSession.sessionState.catalog.refreshTable(identifier)
val tablePath = table.get.getTablePath
DataMapStoreManager.getInstance().
clearDataMaps(AbsoluteTableIdentifier.from(tablePath,
val databaseName = identifier.database.getOrElse(sparkSession.catalog.currentDatabase)
val table = CarbonMetadata.getInstance().getCarbonTable(databaseName, identifier.table)
if (table == null) {
true
} else {
carbonEnv.carbonMetaStore.isSchemaRefreshed(AbsoluteTableIdentifier.from(table.getTablePath,
identifier.database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase),
identifier.table, table.get.getTableInfo.getFactTable.getTableId))
isRefreshed = true
identifier.table, table.getTableInfo.getFactTable.getTableId), sparkSession)
}
isRefreshed
}

/**
Expand Down
Expand Up @@ -67,7 +67,6 @@ case class CarbonDropDataMapCommand(
val carbonEnv = CarbonEnv.getInstance(sparkSession)
val catalog = carbonEnv.carbonMetaStore
val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName)(sparkSession)
catalog.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
if (mainTable == null) {
mainTable = try {
CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
Expand Down
Expand Up @@ -105,8 +105,6 @@ case class RefreshCarbonTableCommand(
}
}
}
// update the schema modified time
metaStore.updateAndTouchSchemasUpdatedTime()
Seq.empty
}

Expand Down

0 comments on commit a2330e4

Please sign in to comment.