Skip to content

Commit

Permalink
[CARBONDATA-3218] Fix schema refresh and wrong query result issues in…
Browse files Browse the repository at this point in the history
… presto.

Problem:
Schema which is updated in spark is not reflecting in presto. which results in wrong query result in presto.

Solution:
Update the schema in presto whenever the schema changed in spark. And also override the putNulls method in all presto readers to work for null data scenarios.

This closes #3041
  • Loading branch information
ravipesala authored and kumarvishal09 committed Jan 2, 2019
1 parent 7477527 commit f8697b1
Show file tree
Hide file tree
Showing 13 changed files with 215 additions and 113 deletions.
Expand Up @@ -230,10 +230,11 @@ private CarbonTable getCarbonTable(HiveSplit carbonSplit, Configuration configur
.getCarbonCache(new SchemaTableName(carbonSplit.getDatabase(), carbonSplit.getTable()),
carbonSplit.getSchema().getProperty("tablePath"), configuration);
checkNotNull(tableCacheModel, "tableCacheModel should not be null");
checkNotNull(tableCacheModel.carbonTable, "tableCacheModel.carbonTable should not be null");
checkNotNull(tableCacheModel.carbonTable.getTableInfo(),
checkNotNull(tableCacheModel.getCarbonTable(),
"tableCacheModel.carbonTable should not be null");
checkNotNull(tableCacheModel.getCarbonTable().getTableInfo(),
"tableCacheModel.carbonTable.tableInfo should not be null");
return tableCacheModel.carbonTable;
return tableCacheModel.getCarbonTable();
}

}
Expand Up @@ -119,45 +119,40 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHand
configuration = carbonTableReader.updateS3Properties(configuration);
CarbonTableCacheModel cache =
carbonTableReader.getCarbonCache(schemaTableName, location, configuration);
if (null != cache) {
Expression filters = PrestoFilterUtil.parseFilterExpression(predicate);
try {

List<CarbonLocalMultiBlockSplit> splits =
carbonTableReader.getInputSplits2(cache, filters, predicate, configuration);

ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder();
long index = 0;
for (CarbonLocalMultiBlockSplit split : splits) {
index++;
Properties properties = new Properties();
for (Map.Entry<String, String> entry : table.getStorage().getSerdeParameters()
.entrySet()) {
properties.setProperty(entry.getKey(), entry.getValue());
}
properties.setProperty("tablePath", cache.carbonTable.getTablePath());
properties.setProperty("carbonSplit", split.getJsonString());
properties.setProperty("queryId", queryId);
properties.setProperty("index", String.valueOf(index));
cSplits.add(
new HiveSplit(schemaTableName.getSchemaName(), schemaTableName.getTableName(),
schemaTableName.getTableName(), "", 0, 0, 0, properties, new ArrayList(),
getHostAddresses(split.getLocations()), OptionalInt.empty(), false, predicate,
new HashMap<>(), Optional.empty()));
Expression filters = PrestoFilterUtil.parseFilterExpression(predicate);
try {

List<CarbonLocalMultiBlockSplit> splits =
carbonTableReader.getInputSplits2(cache, filters, predicate, configuration);

ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder();
long index = 0;
for (CarbonLocalMultiBlockSplit split : splits) {
index++;
Properties properties = new Properties();
for (Map.Entry<String, String> entry : table.getStorage().getSerdeParameters().entrySet()) {
properties.setProperty(entry.getKey(), entry.getValue());
}
properties.setProperty("tablePath", cache.getCarbonTable().getTablePath());
properties.setProperty("carbonSplit", split.getJsonString());
properties.setProperty("queryId", queryId);
properties.setProperty("index", String.valueOf(index));
cSplits.add(new HiveSplit(schemaTableName.getSchemaName(), schemaTableName.getTableName(),
schemaTableName.getTableName(), "", 0, 0, 0, properties, new ArrayList(),
getHostAddresses(split.getLocations()), OptionalInt.empty(), false, predicate,
new HashMap<>(), Optional.empty()));
}

statisticRecorder.logStatisticsAsTableDriver();
statisticRecorder.logStatisticsAsTableDriver();

statistic.addStatistics(QueryStatisticsConstants.BLOCK_IDENTIFICATION,
System.currentTimeMillis());
statisticRecorder.recordStatisticsForDriver(statistic, queryId);
statisticRecorder.logStatisticsAsTableDriver();
return new FixedSplitSource(cSplits.build());
} catch (Exception ex) {
throw new RuntimeException(ex.getMessage(), ex);
}
statistic
.addStatistics(QueryStatisticsConstants.BLOCK_IDENTIFICATION, System.currentTimeMillis());
statisticRecorder.recordStatisticsForDriver(statistic, queryId);
statisticRecorder.logStatisticsAsTableDriver();
return new FixedSplitSource(cSplits.build());
} catch (Exception ex) {
throw new RuntimeException(ex.getMessage(), ex);
}
return null;
}

private static List<HostAddress> getHostAddresses(String[] hosts) {
Expand Down
Expand Up @@ -25,10 +25,35 @@
*/
public class CarbonTableCacheModel {

public CarbonTable carbonTable;
private long lastUpdatedTime;

private boolean isValid;

private CarbonTable carbonTable;

public CarbonTableCacheModel(long lastUpdatedTime, CarbonTable carbonTable) {
this.lastUpdatedTime = lastUpdatedTime;
this.carbonTable = carbonTable;
this.isValid = true;
}

public void setCurrentSchemaTime(long currentSchemaTime) {
if (lastUpdatedTime != currentSchemaTime) {
isValid = false;
}
this.lastUpdatedTime = currentSchemaTime;
}

public CarbonTable getCarbonTable() {
return carbonTable;
}

public boolean isValid() {
return carbonTable != null;
return isValid;
}

public void setCarbonTable(CarbonTable carbonTable) {
this.carbonTable = carbonTable;
this.isValid = true;
}
}
Expand Up @@ -20,19 +20,19 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
Expand All @@ -57,13 +57,11 @@
import org.apache.carbondata.presto.PrestoFilterUtil;

import com.facebook.presto.hadoop.$internal.com.google.gson.Gson;
import com.facebook.presto.hadoop.$internal.io.netty.util.internal.ConcurrentSet;
import com.facebook.presto.hadoop.$internal.org.apache.commons.collections.CollectionUtils;
import com.facebook.presto.hive.HiveColumnHandle;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.google.inject.Inject;
import org.apache.commons.lang.time.DateUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
Expand Down Expand Up @@ -96,15 +94,11 @@ public class CarbonTableReader {
}
};
public CarbonTableConfig config;
/**
* The names of the tables under the schema (this.carbonFileList).
*/
private ConcurrentSet<SchemaTableName> tableList;
/**
* A cache for Carbon reader, with this cache,
* metadata of a table is only read from file system once.
*/
private AtomicReference<HashMap<SchemaTableName, CarbonTableCacheModel>> carbonCache;
private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;

private String queryId;

Expand All @@ -121,8 +115,7 @@ public class CarbonTableReader {

@Inject public CarbonTableReader(CarbonTableConfig config) {
this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
this.carbonCache = new AtomicReference(new HashMap());
tableList = new ConcurrentSet<>();
this.carbonCache = new AtomicReference(new ConcurrentHashMap<>());
populateCarbonProperties();
}

Expand All @@ -134,23 +127,12 @@ public class CarbonTableReader {
*/
public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
Configuration config) {
if (!carbonCache.get().containsKey(table) || carbonCache.get().get(table) == null) {
updateSchemaTables(table, config);
parseCarbonMetadata(table, location, config);
}
if (carbonCache.get().containsKey(table)) {
return carbonCache.get().get(table);
} else {
return null;
updateSchemaTables(table, config);
CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table);
if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) {
return parseCarbonMetadata(table, location, config);
}
}

private void removeTableFromCache(SchemaTableName table) {
DataMapStoreManager.getInstance()
.clearDataMaps(carbonCache.get().get(table).carbonTable.getAbsoluteTableIdentifier());
carbonCache.get().remove(table);
tableList.remove(table);

return carbonTableCacheModel;
}

/**
Expand All @@ -159,22 +141,19 @@ private void removeTableFromCache(SchemaTableName table) {
* is called, it clears this.tableList and populate the list by reading the files.
*/
private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
// update logic determine later
boolean isKeyExists = carbonCache.get().containsKey(schemaTableName);

if (isKeyExists) {
CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
if (carbonTableCacheModel != null && carbonTableCacheModel.carbonTable.getTableInfo() != null
&& carbonTableCacheModel.carbonTable.isTransactionalTable()) {
Long latestTime = FileFactory.getCarbonFile(CarbonTablePath
.getSchemaFilePath(
carbonCache.get().get(schemaTableName).carbonTable.getTablePath()),
config).getLastModifiedTime();
Long oldTime = carbonTableCacheModel.carbonTable.getTableInfo().getLastUpdatedTime();
if (DateUtils.truncate(new Date(latestTime), Calendar.MINUTE)
.after(DateUtils.truncate(new Date(oldTime), Calendar.MINUTE))) {
removeTableFromCache(schemaTableName);
}
CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
if (carbonTableCacheModel != null &&
carbonTableCacheModel.getCarbonTable().isTransactionalTable()) {
CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable();
long latestTime = FileFactory.getCarbonFile(CarbonTablePath
.getSchemaFilePath(
carbonTable.getTablePath()),
config).getLastModifiedTime();
carbonTableCacheModel.setCurrentSchemaTime(latestTime);
if (!carbonTableCacheModel.isValid()) {
// Invalidate datamaps
DataMapStoreManager.getInstance()
.clearDataMaps(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier());
}
}
}
Expand All @@ -184,26 +163,22 @@ private void updateSchemaTables(SchemaTableName schemaTableName, Configuration c
* and cache it in this.carbonCache (CarbonTableReader cache).
*
* @param table name of the given table.
* @return the CarbonTable instance which contains all the needed metadata for a table.
* @return the CarbonTableCacheModel instance which contains all the needed metadata for a table.
*/
private CarbonTable parseCarbonMetadata(SchemaTableName table, String tablePath,
private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
Configuration config) {
CarbonTable result;
try {
CarbonTableCacheModel cache = carbonCache.get().get(table);
if (cache == null) {
cache = new CarbonTableCacheModel();
if (cache != null && cache.isValid()) {
return cache;
}
if (cache.isValid()) {
return cache.carbonTable;
}
// If table is not previously cached, then:

// Step 1: get store path of the table and cache it.
String metadataPath = CarbonTablePath.getSchemaFilePath(tablePath);
String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath);
// If metadata folder exists, it is a transactional table
boolean isTransactionalTable = FileFactory.getCarbonFile(metadataPath, config).exists();
CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
boolean isTransactionalTable = schemaFile.exists();
org.apache.carbondata.format.TableInfo tableInfo;
long modifiedTime = System.currentTimeMillis();
if (isTransactionalTable) {
//Step 2: read the metadata (tableInfo) of the table.
ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
Expand All @@ -216,10 +191,11 @@ public TBase create() {
}
};
ThriftReader thriftReader =
new ThriftReader(CarbonTablePath.getSchemaFilePath(tablePath), createTBase, config);
new ThriftReader(schemaFilePath, createTBase, config);
thriftReader.open();
tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
thriftReader.close();
modifiedTime = schemaFile.getLastModifiedTime();
} else {
tableInfo = CarbonUtil
.inferSchema(tablePath, table.getTableName(), false, config);
Expand All @@ -234,30 +210,33 @@ public TBase create() {

wrapperTableInfo.setTransactionalTable(isTransactionalTable);

CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
// Step 4: Load metadata info into CarbonMetadata
CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);

cache.carbonTable =
CarbonMetadata.getInstance().getCarbonTable(table.getSchemaName(), table.getTableName());

// cache the table
carbonCache.get().put(table, cache);

result = cache.carbonTable;
CarbonTable carbonTable = Objects.requireNonNull(
CarbonMetadata.getInstance().getCarbonTable(table.getSchemaName(), table.getTableName()),
"carbontable is null");
// If table is not previously cached, then:
if (cache == null) {
cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
// cache the table
carbonCache.get().put(table, cache);
} else {
cache.setCarbonTable(carbonTable);
}
return cache;
} catch (Exception ex) {
throw new RuntimeException(ex);
}

return result;
}

public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
Expression filters, TupleDomain<HiveColumnHandle> constraints, Configuration config)
throws IOException {
List<CarbonLocalInputSplit> result = new ArrayList<>();
List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
CarbonTable carbonTable = tableCacheModel.carbonTable;
TableInfo tableInfo = tableCacheModel.carbonTable.getTableInfo();
CarbonTable carbonTable = tableCacheModel.getCarbonTable();
TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
Expand All @@ -268,7 +247,7 @@ public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel ta
CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());

JobConf jobConf = new JobConf(config);
List<PartitionSpec> filteredPartitions = new ArrayList();
List<PartitionSpec> filteredPartitions = new ArrayList<>();

PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName());
LoadMetadataDetails[] loadMetadataDetails = null;
Expand Down
Expand Up @@ -82,6 +82,12 @@ public BooleanStreamReader(int batchSize, DataType dataType, Dictionary dictiona
builder.appendNull();
}

@Override public void putNulls(int rowId, int count) {
for (int i = 0; i < count; i++) {
builder.appendNull();
}
}

@Override public void reset() {
builder = type.createBlockBuilder(null, batchSize);
}
Expand Down
Expand Up @@ -89,6 +89,18 @@ public DecimalSliceStreamReader(int batchSize,
decimalBlockWriter(value);
}

@Override public void putDecimals(int rowId, int count, BigDecimal value, int precision) {
for (int i = 0; i < count; i++) {
putDecimal(rowId++, value, precision);
}
}

@Override public void putNulls(int rowId, int count) {
for (int i = 0; i < count; i++) {
builder.appendNull();
}
}

@Override public void putNull(int rowId) {
builder.appendNull();
}
Expand Down

0 comments on commit f8697b1

Please sign in to comment.