Skip to content

Commit

Permalink
Merge ea2b149 into deb08c3
Browse files Browse the repository at this point in the history
  • Loading branch information
NamanRastogi committed Jan 3, 2019
2 parents deb08c3 + ea2b149 commit b5934c4
Show file tree
Hide file tree
Showing 26 changed files with 677 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.util.path.CarbonTablePath.DataFileUtil;
Expand Down Expand Up @@ -101,6 +102,8 @@ public class TableBlockInfo implements Distributable, Serializable {

private String dataMapWriterPath;

private transient DataFileFooter dataFileFooter;

/**
* comparator to sort by block size in descending order.
* Since each line is not exactly the same, the size of a InputSplit may differs,
Expand Down Expand Up @@ -462,6 +465,14 @@ public void setDataMapWriterPath(String dataMapWriterPath) {
this.dataMapWriterPath = dataMapWriterPath;
}

public DataFileFooter getDataFileFooter() {
return dataFileFooter;
}

public void setDataFileFooter(DataFileFooter dataFileFooter) {
this.dataFileFooter = dataFileFooter;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("TableBlockInfo{");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ public class DataFileFooter implements Serializable {
*/
private long schemaUpdatedTimeStamp;

/**
* boolean representing if the file is sorted or not.
*/
private Boolean isSorted = true;

/**
* @return the versionId
*/
Expand Down Expand Up @@ -179,4 +184,12 @@ public long getSchemaUpdatedTimeStamp() {
public void setSchemaUpdatedTimeStamp(long schemaUpdatedTimeStamp) {
this.schemaUpdatedTimeStamp = schemaUpdatedTimeStamp;
}

public void setSorted(Boolean isSorted) {
this.isSorted = isSorted;
}

public Boolean isSorted() {
return isSorted;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ private List<AbstractIndex> getDataBlocks(QueryModel queryModel) throws IOExcept
.isUseMinMaxForPruning()) {
blockInfo.setBlockOffset(blockletDetailInfo.getBlockFooterOffset());
DataFileFooter fileFooter = filePathToFileFooterMapping.get(blockInfo.getFilePath());
if (null != blockInfo.getDataFileFooter()) {
fileFooter = blockInfo.getDataFileFooter();
}
if (null == fileFooter) {
blockInfo.setDetailInfo(null);
fileFooter = CarbonUtil.readMetadataFile(blockInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public DataFileFooter convertDataFileFooter(FileHeader fileHeader, FileFooter3 f
dataFileFooter.setNumberOfRows(footer.getNum_rows());
dataFileFooter.setSegmentInfo(getSegmentInfo(footer.getSegment_info()));
dataFileFooter.setSchemaUpdatedTimeStamp(fileHeader.getTime_stamp());
if (footer.isSetIs_sort()) {
dataFileFooter.setSorted(footer.isIs_sort());
} else {
dataFileFooter.setSorted(null);
}
List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
List<org.apache.carbondata.format.ColumnSchema> table_columns = fileHeader.getColumn_schema();
for (int i = 0; i < table_columns.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
package org.apache.carbondata.spark.testsuite.compaction

import java.io.{BufferedWriter, File, FileWriter}

import scala.collection.mutable.ListBuffer

import au.com.bytecode.opencsv.CSVWriter
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.test.util.QueryTest
import org.junit.Assert
import org.scalatest.Matchers._
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties


class TestHybridCompaction extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {

val rootPath = new File(this.getClass.getResource("/").getPath + "../../../..").getCanonicalPath

val csvPath1 =
s"$rootPath/integration/spark-common-test/src/test/resources/compaction/hybridCompaction1.csv"

val csvPath2 =
s"$rootPath/integration/spark-common-test/src/test/resources/compaction/hybridCompaction2.csv"

val tableName = "t1"


override def beforeAll: Unit = {
generateCSVFiles()
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "MM/dd/yyyy")
}


override def afterAll: Unit = {
deleteCSVFiles()
}


override def beforeEach(): Unit = {
dropTable()
createTable()
}


override def afterEach(): Unit = {
dropTable()
}


def generateCSVFiles(): Unit = {
val rows1 = new ListBuffer[Array[String]]
rows1 += Array("seq", "first", "last", "age", "city", "state", "date")
rows1 += Array("1", "Augusta", "Nichols", "20", "Varasdo", "WA", "07/05/2003")
rows1 += Array("2", "Luis", "Barnes", "39", "Oroaklim", "MT", "04/05/2048")
rows1 += Array("3", "Leah", "Guzman", "54", "Culeosa", "KS", "02/23/1983")
rows1 += Array("4", "Ian", "Ford", "61", "Rufado", "AL", "03/02/1995")
rows1 += Array("5", "Fanny", "Horton", "37", "Rorlihbem", "CT", "05/12/1987")
createCSV(rows1, csvPath1)

val rows2 = new ListBuffer[Array[String]]
rows2 += Array("seq", "first", "last", "age", "city", "state", "date")
rows2 += Array("11", "Claudia", "Sullivan", "42", "Dilwuani", "ND", "09/01/2003")
rows2 += Array("12", "Kate", "Adkins", "54", "Fokafrid", "WA", "10/13/2013")
rows2 += Array("13", "Eliza", "Lynch", "23", "Bonpige", "ME", "05/02/2015")
rows2 += Array("14", "Sarah", "Fleming", "60", "Duvugove", "IA", "04/15/2036")
rows2 += Array("15", "Maude", "Bass", "44", "Ukozedka", "CT", "11/08/1988")
createCSV(rows2, csvPath2)
}


def createCSV(rows: ListBuffer[Array[String]], csvPath: String): Unit = {
val out = new BufferedWriter(new FileWriter(csvPath))
val writer: CSVWriter = new CSVWriter(out)

for (row <- rows) {
writer.writeNext(row)
}

out.close()
writer.close()
}


def deleteCSVFiles(): Unit = {
try {
FileUtils.forceDelete(new File(csvPath1))
FileUtils.forceDelete(new File(csvPath2))
}
catch {
case e: Exception =>
e.printStackTrace()
Assert.fail(e.getMessage)
}
}


def createTable(): Unit = {
sql(
s"""
| CREATE TABLE $tableName(seq int, first string, last string,
| age int, city string, state string, date date)
| STORED BY 'carbondata'
| TBLPROPERTIES(
| 'sort_scope'='local_sort',
| 'sort_columns'='state, age',
| 'dateformat'='MM/dd/yyyy')
""".stripMargin)
}


def loadUnsortedData(n : Int = 1): Unit = {
for(_ <- 1 to n) {
sql(
s"""
| LOAD DATA INPATH '$csvPath1' INTO TABLE $tableName
| OPTIONS (
| 'sort_scope'='no_sort')""".stripMargin)
}
}


def loadSortedData(n : Int = 1): Unit = {
for(_ <- 1 to n) {
sql(
s"""
| LOAD DATA INPATH '$csvPath2' INTO TABLE $tableName
| OPTIONS (
| 'sort_scope'='local_sort')""".stripMargin)
}
}


def dropTable(): Unit = {
sql(s"DROP TABLE IF EXISTS $tableName")
}


test("SORTED LOADS") {
loadSortedData(2)
sql(s"ALTER TABLE $tableName COMPACT 'major'")

val out = sql(s"SELECT state, age FROM $tableName").collect()
out.map(_.get(0).toString) should
equal(Array("CT", "CT", "IA", "IA", "ME", "ME", "ND", "ND", "WA", "WA"))
}


test("UNSORTED LOADS") {
loadUnsortedData(2)
sql(s"ALTER TABLE $tableName COMPACT 'major'")

val out = sql(s"SELECT state, age FROM $tableName").collect()
out.map(_.get(0).toString) should
equal(Array("AL", "AL", "CT", "CT", "KS", "KS", "MT", "MT", "WA", "WA"))
}


test("MIXED LOADS") {
loadSortedData()
loadUnsortedData()
sql(s"ALTER TABLE $tableName COMPACT 'major'")
val out = sql(s"SELECT state, age FROM $tableName").collect()
out.map(_.get(0).toString) should
equal(Array("AL", "CT", "CT", "IA", "KS", "ME", "MT", "ND", "WA", "WA"))
out.map(_.get(1).toString) should
equal(Array("61", "37", "44", "60", "54", "23", "39", "42", "20", "54"))
}


test("INSERT") {
loadSortedData()
loadUnsortedData()
sql(
s"""
| INSERT INTO $tableName
| VALUES('20', 'Naman', 'Rastogi', '23', 'Bengaluru', 'ZZ', '12/28/2018')
""".stripMargin)
sql(s"ALTER TABLE $tableName COMPACT 'major'")

val out = sql(s"SELECT state FROM $tableName").collect()
out.map(_.get(0).toString) should equal(
Array("AL", "CT", "CT", "IA", "KS", "ME", "MT", "ND", "WA", "WA", "ZZ"))
}


test("UPDATE") {
loadSortedData()
loadUnsortedData()
sql(s"UPDATE $tableName SET (state)=('CT') WHERE seq='13'").collect()
sql(s"ALTER TABLE $tableName COMPACT 'major'")

val out = sql(s"SELECT state FROM $tableName WHERE seq='13'").collect()
out.map(_.get(0).toString) should equal(Array("CT"))
}

test("DELETE") {
loadSortedData()
loadUnsortedData()
sql(s"DELETE FROM $tableName WHERE seq='13'").collect()
sql(s"ALTER TABLE $tableName COMPACT 'major'")

val out = sql(s"SELECT state FROM $tableName").collect()
out.map(_.get(0).toString) should equal(
Array("AL", "CT", "CT", "IA", "KS", "MT", "ND", "WA", "WA"))
}


test("RESTRUCTURE TABLE REMOVE COLUMN NOT IN SORT_COLUMNS") {
loadSortedData()
loadUnsortedData()
sql(s"ALTER TABLE $tableName DROP COLUMNS(city)")
sql(s"ALTER TABLE $tableName COMPACT 'major'")

val out = sql(s"SELECT age FROM $tableName").collect()
out.map(_.get(0).toString) should equal(
Array("61", "37", "44", "60", "54", "23", "39", "42", "20", "54"))
}


test("RESTRUCTURE TABLE REMOVE COLUMN IN SORT_COLUMNS") {
loadSortedData()
loadUnsortedData()
sql(s"ALTER TABLE $tableName DROP COLUMNS(state)")
sql(s"ALTER TABLE $tableName COMPACT 'major'")

val out = sql(s"SELECT age FROM $tableName").collect()
out.map(_.get(0).toString) should equal(
Array("20", "23", "37", "39", "42", "44", "54", "54", "60", "61"))
}


test("PREAGG") {
loadSortedData()
loadUnsortedData()
val datamapName = "d1"
val tableNameDatamapName = tableName + "_" + datamapName

sql(
s"""
| CREATE DATAMAP $datamapName
| ON TABLE $tableName
| USING 'preaggregate'
| AS
| SELECT AVG(age), state
| FROM $tableName
| GROUP BY state
""".stripMargin)

loadSortedData()
loadUnsortedData()

sql(s"ALTER TABLE $tableName COMPACT 'major'")
val out = sql(s"SELECT * FROM $tableNameDatamapName").collect()
out.map(_.get(2).toString) should equal(
Array("AL", "CT", "IA", "KS", "ME", "MT", "ND", "WA"))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,6 @@ class TestCreateTableWithSortScope extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS tableWithBatchSort")
sql("DROP TABLE IF EXISTS tableWithNoSort")
sql("DROP TABLE IF EXISTS tableWithUnsupportSortScope")
sql("DROP TABLE IF EXISTS tableLoadWithSortScope")
}

test("Do not support load data with specify sort scope") {
sql(
s"""
| CREATE TABLE tableLoadWithSortScope(
| intField INT,
| stringField STRING
| )
| STORED BY 'carbondata'
| TBLPROPERTIES('SORT_COLUMNS'='stringField')
""".stripMargin)

val exception_loaddata_sortscope: Exception = intercept[Exception] {
sql("LOAD DATA LOCAL INPATH '/path/to/data' INTO TABLE tableLoadWithSortScope " +
"OPTIONS('SORT_SCOPE'='GLOBAL_SORT')")
}
assert(exception_loaddata_sortscope.getMessage.contains("Error: Invalid option(s): sort_scope"))
}

test("test create table with sort scope in normal cases") {
Expand Down

0 comments on commit b5934c4

Please sign in to comment.