-
Notifications
You must be signed in to change notification settings - Fork 703
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CARBONDATA-1738] [PreAgg] Block direct insert/load on pre-aggregate table #1508
Conversation
@@ -212,6 +212,18 @@ object CarbonSession { | |||
ThreadLocalSessionInfo.setCarbonSessionInfo(currentThreadSessionInfo) | |||
} | |||
|
|||
def threadUnset(key: String): Unit = { | |||
var currentThreadSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo | |||
if (currentThreadSessionInfo == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If null then not required to unset
@@ -51,11 +51,39 @@ object LoadPostAggregateListener extends OperationEventListener { | |||
val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName | |||
val selectQuery = dataMapSchema.getProperties.get("CHILD_SELECT QUERY") | |||
sparkSession.sql(s"insert into $childDatabaseName.$childTableName $selectQuery") | |||
CarbonSession.threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add this unset to finally
val carbonLoadModel = loadEvent.carbonLoadModel | ||
val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable | ||
val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo | ||
val isInternalLoadCall = carbonSessionInfo.getSessionParams.getAll.keySet().asScala |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use specifc parameter instead of using this VALIDATE_CARBON_INPUT_SEGMENTS
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1182/ |
9d48d35
to
3848d85
Compare
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1210/ |
3848d85
to
d2906e8
Compare
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1215/ |
* Whether load/insert command is fired internally or by the user. | ||
* Used to block load/insert on pre-aggregate if fired by user | ||
*/ | ||
public static final String IS_INTERNAL_LOAD_CALL = "is.internal.load.call"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems no testcase for this option. And the option name should start with carbon
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This option/property will not be exposed to the user. It will be set by the post load listener to know whether the load is fired by the user or is it an internal call.
Test case is added in TestPreAggregateLoad
retest this please |
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1240/ |
d2906e8
to
c5f56c1
Compare
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1288/ |
retest this please |
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1289/ |
955a4a8
to
718a9b1
Compare
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1308/ |
718a9b1
to
137496f
Compare
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1313/ |
137496f
to
70af7d8
Compare
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1324/ |
70af7d8
to
644b0fc
Compare
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1326/ |
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1784/ |
@ravipesala Please review |
retest sdv please |
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1801/ |
retest sdv please |
s"insert into ${ tableModel.databaseName }.${ tableModel.tableName } $queryString") | ||
val availableLoads = PreAggregateUtil.checkMainTableLoad(parentTable) | ||
if (availableLoads) { | ||
val headers = childSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move line done
if (availableLoads) { | ||
val headers = childSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName) | ||
.mkString(",") | ||
val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move down after (
val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser() | ||
.addPreAggLoadFunction(s"${ dataMapSchema.getProperties.get("CHILD_SELECT QUERY") } ")) | ||
.drop("preAggLoad") | ||
val headers = dataMapSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move down
throw new UnsupportedOperationException( | ||
"Cannot insert/load data directly into pre-aggregate table") | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove line
@@ -253,8 +260,8 @@ object PreAggregateUtil { | |||
carbonTable.getColumnByName(parentTableName, attr.name).getColumnId, | |||
parentTableName, | |||
parentDatabaseName, parentTableId = parentTableId) | |||
case _ => | |||
throw new MalformedCarbonCommandException("Un-Supported Aggregation Type") | |||
case a@_ => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep as case others
attr.name + "_sum")(), | ||
Alias(attrExpression | ||
.copy(aggregateFunction = Count(cast), | ||
resultId = NamedExpression.newExprId), attr.name + "_count")()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please format it properly
filteredExpressions | ||
.exists { | ||
expr => !expr.name.equalsIgnoreCase("PreAgg") && | ||
expr.name.equalsIgnoreCase("preAggLoad") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
format properly
def addPreAggLoadFunction(sql: String): String = { | ||
addPreAggLoad(new lexical.Scanner(sql.toLowerCase)) match { | ||
case Success(query, _) => query | ||
case _ => throw new MalformedCarbonCommandException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move down line
492575b
to
fa231e8
Compare
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1639/ |
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1641/ |
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2031/ |
@ravipesala Handled all the review comments |
fa231e8
to
b028824
Compare
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2053/ |
b028824
to
c1a8949
Compare
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1664/ |
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2055/ |
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1668/ |
c1a8949
to
6e135d6
Compare
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/2057/ |
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1670/ |
Build Success with Spark 2.2, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/400/ |
LGTM |
Block load/insert on pre-aggregate table
No
No
No
Added test case