New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CARBONDATA-1526] [PreAgg] Added support to compact segments in pre-agg table #1605
Conversation
@jackylk @ravipesala Can you please start first level review. |
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2051/ |
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1661/ |
cc58e9d
to
0558db5
Compare
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2054/ |
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1667/ |
0558db5
to
5ef3e18
Compare
Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/417/ |
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2075/ |
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1689/ |
@kunal642 Please add the description of the work done in this PR. |
@ravipesala description added |
@@ -133,25 +133,25 @@ case class AlterTableRenameAbortEvent(carbonTable: CarbonTable, | |||
/** | |||
* | |||
* @param carbonTable | |||
* @param carbonLoadModel | |||
* @param carbonMergerMapping |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add description of all these parameters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
* @param mergedLoadName | ||
* @param sQLContext | ||
*/ | ||
case class AlterTableCompactionPreEvent(sparkSession: SparkSession, carbonTable: CarbonTable, | ||
carbonLoadModel: CarbonLoadModel, | ||
case class AlterTableCompactionPreEvent(carbonTable: CarbonTable, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move carbonTable
to next line, same for all events
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
mergedLoadName: String, | ||
sQLContext: SQLContext) extends Event with AlterTableCompactionEventInfo | ||
|
||
|
||
/** | ||
* | ||
* @param carbonTable | ||
* @param carbonLoadModel | ||
* @param carbonMergerMapping |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add description of all these parameters, same for all related events
* Class for handling operations after data load completion and before final commit of load | ||
* operation. Example usage: For loading pre-aggregate tables | ||
*/ | ||
case class LoadTablePreStatusUpdateEvent(sparkSession: SparkSession, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The usage of this is not clear, why is it needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This event will be used to perform some task just before updating the carbontable status.
For example: When loading data into parent table we need to start load for all child datamaps so that if any of the load fails then parent table status file would not be written.
carbonLoadModel, | ||
mergedLoadName, | ||
sc) | ||
AlterTableCompactionPreEvent(carbonTable, carbonMergerMapping, mergedLoadName, sc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please modify line 55, it is not storePath, it is tablePath. change CarbonMergerMapping definition also
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@@ -492,6 +492,10 @@ object CarbonDataRDDFactory { | |||
throw new Exception("No Data to load") | |||
} | |||
writeDictionary(carbonLoadModel, result, writeAll = false) | |||
val loadTablePreStatusUpdateEvent = LoadTablePreStatusUpdateEvent(sqlContext.sparkSession, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used to do some operation before commiting table status of parent
@@ -159,7 +161,14 @@ case class CarbonAlterTableCompactionCommand( | |||
// if system level compaction is enabled then only one compaction can run in the system | |||
// if any other request comes at this time then it will create a compaction request file. | |||
// so that this will be taken up by the compaction process which is executing. | |||
if (!isConcurrentCompactionAllowed) { | |||
if (carbonTable.isChildDataMap) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please modify the comment start from line 161, put them in correct place and add comment for this newly added if condition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -159,7 +161,14 @@ case class CarbonAlterTableCompactionCommand( | |||
// if system level compaction is enabled then only one compaction can run in the system | |||
// if any other request comes at this time then it will create a compaction request file. | |||
// so that this will be taken up by the compaction process which is executing. | |||
if (!isConcurrentCompactionAllowed) { | |||
if (carbonTable.isChildDataMap) { | |||
carbonLoadModel.setCompactionType(alterTableModel.compactionType.toUpperCase match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change setCompactionType
to accept enum instead of String
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
val segments = loadMetaDataDetails.asScala.map(_.getLoadName) | ||
if (segments.nonEmpty) { | ||
CarbonSession | ||
.threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not pass parameters in this way, it is not good for maintenance. Pass the parameter to the function you need
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used to set the segments to access to load incremental data into the child table. There is no other way to set segments in CarbonScanRDD other than this
private def startCompactionForDataMap(carbonLoadModel: CarbonLoadModel, | ||
sparkSession: SparkSession): Unit = { | ||
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable | ||
val loadMetaDataDetails = CarbonDataMergerUtil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct the coding style in this function, please follow this coding style in future:
val a = foo(
paramA,
paramB)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
carbonLoadModel.getTableName, "false") | ||
val headers = carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala | ||
.map(_.getColumnName).mkString(",") | ||
val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this do? Can you make it more readable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
.addPreAggLoadFunction(PreAggregateUtil | ||
.createChildSelectQuery(carbonTable.getTableInfo.getFactTable))).drop("preAggLoad") | ||
try { | ||
CarbonLoadDataCommand(Some(carbonTable.getDatabaseName), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When invoking command, follow this code style:
CarbonXXXCommand(
paramA = valueA,
paramB = valueB,
...
).run(sparkSession)
Do add parameter name for each line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
// allows it. | ||
if (!carbonLoadModel.getCompactionType.equals(CompactionType.MAJOR)) { | ||
CommonUtil.readLoadMetadataDetails(carbonLoadModel) | ||
startCompactionForDataMap(carbonLoadModel, sparkSession) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should avoid the recursive call, you can invoke it in caller of this function.
@@ -130,6 +131,9 @@ case class CarbonLoadDataCommand( | |||
carbonLoadModel.setFactFilePath(factPath) | |||
carbonLoadModel.setAggLoadRequest(internalOptions | |||
.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean) | |||
carbonLoadModel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
following the coding style
a.foo(
paramA,
paramB)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
* @param event | ||
* @param operationContext | ||
*/ | ||
override def onEvent(event: Event, operationContext: OperationContext): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does the listener do? please add comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added comment
|
||
} | ||
|
||
object AlterPreAggregateTableCompactionPostEvent extends OperationEventListener { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is listener, not event
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed the name
val compactionType = compactionEvent.carbonMergerMapping.campactionType | ||
val sparkSession = compactionEvent.sQLContext.sparkSession | ||
if (carbonTable.hasDataMapSchema) { | ||
for (dataMapSchema: DataMapSchema <- carbonTable.getTableInfo.getDataMapSchemaList |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use carbonTable.getTableInfo.getDataMapSchemaList.foreach
instead
21ae844
to
be9c72b
Compare
Build Failed with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/531/ |
be9c72b
to
f5c9d09
Compare
Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/533/ |
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2158/ |
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2161/ |
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2162/ |
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1791/ |
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2164/ |
import org.apache.carbondata.processing.merger.CarbonDataMergerUtil | ||
import org.apache.carbondata.processing.util.CarbonLoaderUtil | ||
|
||
abstract class Compactable(carbonLoadModel: CarbonLoadModel, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name this class as Compactor, and other Compactor class can be moved to CarbonTableCompactor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the class name to Compactor
/** | ||
* This class is used to perform compaction on carbon table. | ||
*/ | ||
class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move all the compactor interfaces and impls to spark-common
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AggregateDataMapCompactor requires CarbonSession which is not available in spark-common
f5c9d09
to
9a6ace9
Compare
Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/553/ |
@@ -541,4 +541,20 @@ object PreAggregateUtil { | |||
} | |||
} | |||
|
|||
def createChildSelectQuery(tableSchema: TableSchema): String = { | |||
val aggregateColumns = scala.collection.mutable.ArrayBuffer.empty[String] | |||
val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use ArrayBuffer.empty[String]
} | ||
} | ||
s"select ${ groupingExpressions.mkString(",") },${ aggregateColumns.mkString(",") | ||
} from ${ tableSchema.getTableName } group by ${ groupingExpressions.mkString(",") }" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not require the database name
// adding the aggregation load UDF | ||
.addPreAggLoadFunction(PreAggregateUtil | ||
// creating the select query on the bases on table schema | ||
.createChildSelectQuery(carbonTable.getTableInfo.getFactTable))).drop("preAggLoad") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation is incorrect
val testData = s"$resourcesPath/sample.csv" | ||
|
||
override def beforeEach(): Unit = { | ||
sql("DROP TABLE IF EXISTS maintable") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest using a new database, not default database
9a6ace9
to
f32cebf
Compare
Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/560/ |
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1809/ |
Retest this please |
Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/571/ |
f32cebf
to
205ac43
Compare
Build Success with Spark 2.2.0, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/583/ |
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1817/ |
retest sdv please |
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2189/ |
LGTM |
…gg table This PR will add to compact the pre-aggregate tables. A pre-aggregate table can be compacted using the alter command i.e alter table table_name compact 'minor/major'. If a table with some pre-aggregate table is compacted, then all the pre-aggregate tables are also compacted with the parent table This closes apache#1605
…gg table This PR will add to compact the pre-aggregate tables. A pre-aggregate table can be compacted using the alter command i.e alter table table_name compact 'minor/major'. If a table with some pre-aggregate table is compacted, then all the pre-aggregate tables are also compacted with the parent table This closes apache#1605
This PR will add to compact the pre-aggregate tables.
Be sure to do all of the following checklist to help us incorporate
your contribution quickly and easily:
Any interfaces changed?
Any backward compatibility impacted?
Document update required?
Testing done
Please provide details on
- Whether new unit test cases have been added or why no new tests are required?
- How it is tested? Please attach test report.
- Is it a performance related change? Please attach the performance test report.
- Any additional information to help reviewers in testing this change.
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.