Navigation Menu

Skip to content

Commit

Permalink
Fix double boundary condition and clear datamaps issue
Browse files Browse the repository at this point in the history
  • Loading branch information
ravipesala committed Aug 28, 2018
1 parent d402633 commit 851d09f
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 17 deletions.
Expand Up @@ -315,6 +315,13 @@ public TableDataMap getDataMap(CarbonTable table, DataMapSchema dataMapSchema) {
String tableUniqueName =
table.getAbsoluteTableIdentifier().getCarbonTableIdentifier().getTableUniqueName();
List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
if (tableIndices == null) {
String keyUsingTablePath = getKeyUsingTablePath(table.getTablePath());
if (keyUsingTablePath != null) {
tableUniqueName = keyUsingTablePath;
tableIndices = allDataMaps.get(tableUniqueName);
}
}
TableDataMap dataMap = null;
if (tableIndices != null) {
dataMap = getTableDataMap(dataMapSchema.getDataMapName(), tableIndices);
Expand All @@ -341,6 +348,18 @@ public TableDataMap getDataMap(CarbonTable table, DataMapSchema dataMapSchema) {
return dataMap;
}

private String getKeyUsingTablePath(String tablePath) {
if (tablePath != null) {
// Try get using table path
for (Map.Entry<String, String> entry : tablePathMap.entrySet()) {
if (new Path(entry.getValue()).equals(new Path(tablePath))) {
return entry.getKey();
}
}
}
return null;
}

/**
* Return a new datamap instance and registered in the store manager.
* The datamap is created using datamap name, datamap factory class and table identifier.
Expand Down Expand Up @@ -378,6 +397,13 @@ public TableDataMap registerDataMap(CarbonTable table,
// Just update the segmentRefreshMap with the table if not added.
getTableSegmentRefresher(table);
List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
if (tableIndices == null) {
String keyUsingTablePath = getKeyUsingTablePath(table.getTablePath());
if (keyUsingTablePath != null) {
tableUniqueName = keyUsingTablePath;
tableIndices = allDataMaps.get(tableUniqueName);
}
}
if (tableIndices == null) {
tableIndices = new ArrayList<>();
}
Expand Down Expand Up @@ -434,14 +460,11 @@ public void clearDataMaps(AbsoluteTableIdentifier identifier) {
CarbonTable carbonTable = getCarbonTable(identifier);
String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName();
List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
if (tableIndices == null && identifier.getTablePath() != null) {
// Try get using table path
for (Map.Entry<String, String> entry : tablePathMap.entrySet()) {
if (new Path(entry.getValue()).equals(new Path(identifier.getTablePath()))) {
tableIndices = allDataMaps.get(entry.getKey());
tableUniqueName = entry.getKey();
break;
}
if (tableIndices == null) {
String keyUsingTablePath = getKeyUsingTablePath(identifier.getTablePath());
if (keyUsingTablePath != null) {
tableUniqueName = keyUsingTablePath;
tableIndices = allDataMaps.get(tableUniqueName);
}
}
if (null != carbonTable && tableIndices != null) {
Expand Down Expand Up @@ -473,7 +496,7 @@ public CarbonTable getCarbonTable(AbsoluteTableIdentifier identifier) {
.buildFromTablePath(identifier.getTableName(), identifier.getDatabaseName(),
identifier.getTablePath(), identifier.getCarbonTableIdentifier().getTableId());
} catch (IOException e) {
LOGGER.error("failed to get carbon table from table Path");
LOGGER.warn("failed to get carbon table from table Path" + e.getMessage());
// ignoring exception
}
}
Expand Down
Expand Up @@ -222,11 +222,17 @@ public void update(long value) {
* TODO: it operation is costly, optimize for performance
*/
private int getDecimalCount(double value) {
String strValue = BigDecimal.valueOf(Math.abs(value)).toPlainString();
int integerPlaces = strValue.indexOf('.');
int decimalPlaces = 0;
if (-1 != integerPlaces) {
decimalPlaces = strValue.length() - integerPlaces - 1;
try {
String strValue = BigDecimal.valueOf(Math.abs(value)).toPlainString();
int integerPlaces = strValue.indexOf('.');
if (-1 != integerPlaces) {
decimalPlaces = strValue.length() - integerPlaces - 1;
}
} catch (NumberFormatException e) {
if (!Double.isInfinite(value)) {
throw e;
}
}
return decimalPlaces;
}
Expand Down
Expand Up @@ -87,9 +87,14 @@ class SparkCarbonFileFormat extends FileFormat
} else {
defaultFsUrl + CarbonCommonConstants.FILE_SEPARATOR + path
}
case _ => FileFactory.getUpdatedFilePath(files.head.getPath.getParent.toUri.toString)
case _ if files.nonEmpty =>
FileFactory.getUpdatedFilePath(files.head.getPath.getParent.toUri.toString)
case _ =>
return None
}
if (options.get(CarbonCommonConstants.SORT_COLUMNS).isDefined) {
throw new UnsupportedOperationException("Cannot use sort columns during infer schema")
}

val tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(tablePath, "", ""), false)
val table = CarbonTable.buildFromTableInfo(tableInfo)
var schema = new StructType
Expand Down
Expand Up @@ -17,14 +17,15 @@
package org.apache.spark.sql.carbondata.datasource


import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.carbondata.datasource.TestUtil._
import org.scalatest.{BeforeAndAfterAll, FunSuite}

import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier

class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {


test("test write using dataframe") {
Expand Down Expand Up @@ -508,12 +509,125 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
spark.sql("drop table if exists c_jin")
}

test("test write and create table with sort columns not allow") {
spark.sql("drop table if exists test123")
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
import spark.implicits._
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, "b", "c" + x, "d" + x, x.toShort, x, x.toLong, x.toDouble, BigDecimal
.apply(x)))
.toDF("c1", "c2", "c3", "c4", "shortc", "intc", "longc", "doublec", "bigdecimalc")

// Saves dataframe to carbon file
df.write.format("carbon").save(s"$warehouse1/test_folder/")
if (!spark.sparkContext.version.startsWith("2.1")) {
intercept[UnsupportedOperationException] {
spark
.sql(s"create table test123 using carbon options('sort_columns'='shortc,c2') location " +
s"'$warehouse1/test_folder/'")
}
}
spark.sql("drop table if exists test123")
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
}

test("valdate if path not specified during table creation") {
spark.sql("drop table if exists test123")
val ex = intercept[AnalysisException] {
spark.sql(s"create table test123 using carbon options('sort_columns'='shortc,c2')")
}
assert(ex.getMessage().contains("Unable to infer schema for carbon"))
}

test("test stats with subfolder") {
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
spark.sql("drop table if exists par")
spark.sql("drop table if exists t1_flat")
import spark.implicits._
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, "b", x))
.toDF("c1", "c2", "number")

df.write.format("parquet").saveAsTable("par")

spark.sql("create table t1_flat (c1 string, c2 string, n int) using carbon")
spark.sql("insert into t1_flat select * from par")
spark.sql("insert into t1_flat select * from par")
spark.sql("insert into t1_flat select * from par")

spark.sql("explain cost select * from t1_flat").show(false)

// Saves dataframe to carbon file
df.write.format("carbon").save(warehouse1 + "/test_folder/" + System.nanoTime())
df.write.format("carbon").save(warehouse1 + "/test_folder/" + System.nanoTime())
df.write.format("carbon").save(warehouse1 + "/test_folder/" + System.nanoTime())

val frame = spark.read.format("carbon").load(warehouse1 + "/test_folder")
frame.registerTempTable("t1")
spark.sql("explain cost select * from t1").show(false)

FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
spark.sql("drop table if exists par")
spark.sql("drop table if exists t1_flat")
}

test("test double boundary") {
spark.sql("drop table if exists par")
spark.sql("drop table if exists car")

spark.sql("create table par (c1 string, c2 double, n int) using parquet")
spark.sql("create table car (c1 string, c2 double, n int) using carbon")
spark.sql("insert into par select 'a', 1.7986931348623157E308, 215565665556")
spark.sql("insert into car select 'a', 1.7986931348623157E308, 215565665556")

checkAnswer(spark.sql("select * from car"), spark.sql("select * from par"))
spark.sql("drop table if exists par")
spark.sql("drop table if exists car")
}

test("test clearing datamaps") {
if (!spark.sparkContext.version.startsWith("2.1")) {
import spark.implicits._
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => ("a" + x % 10, "b", x))
.toDF("c1", "c2", "number")
spark.sql("drop table if exists testparquet")
spark.sql("drop table if exists carbon_table")
spark.sql("drop table if exists carbon_table1")
// Saves dataframe to carbon file
df.write
.format("parquet").saveAsTable("testparquet")
spark.sql("create table carbon_table(c1 string, c2 string, number int) using carbon")
spark.sql("create table carbon_table1(c1 string, c2 string, number int) using carbon")
spark.sql("insert into carbon_table select * from testparquet")
spark.sql("insert into carbon_table1 select * from testparquet")
DataMapStoreManager.getInstance().getAllDataMaps.clear()
spark.sql("select * from carbon_table where c1='a1'").collect()
assert(DataMapStoreManager.getInstance().getAllDataMaps.size() == 1)
spark.sql("select * from carbon_table where c1='a2'").collect()
assert(DataMapStoreManager.getInstance().getAllDataMaps.size() == 1)
spark.sql("select * from carbon_table1 where c1='a1'").collect()
assert(DataMapStoreManager.getInstance().getAllDataMaps.size() == 2)
spark.sql("select * from carbon_table1 where c1='a2'").collect()
assert(DataMapStoreManager.getInstance().getAllDataMaps.size() == 2)
DataMapStoreManager.getInstance()
.clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/carbon_table"))
assert(DataMapStoreManager.getInstance().getAllDataMaps.size() == 1)
DataMapStoreManager.getInstance()
.clearDataMaps(AbsoluteTableIdentifier.from(warehouse1 + "/carbon_table1"))
assert(DataMapStoreManager.getInstance().getAllDataMaps.size() == 0)
spark.sql("drop table if exists testparquet")
spark.sql("drop table if exists carbon_table")
spark.sql("drop table if exists carbon_table1")
}
}


override protected def beforeAll(): Unit = {
drop
}

override def afterAll():Unit = {
override def afterAll(): Unit = {
drop
}

Expand Down

0 comments on commit 851d09f

Please sign in to comment.