New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CARBONDATA-3337] Implemented Hadoop RPC framework for index server #3171
[CARBONDATA-3337] Implemented Hadoop RPC framework for index server #3171
Conversation
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2838/ |
Build Failed with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/11098/ |
Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/3069/ |
48c9cc3
to
c8c79b9
Compare
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2839/ |
c8c79b9
to
d3edfd7
Compare
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/2840/ |
Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/3071/ |
Build Success with Spark 2.3.2, Please check CI http://136.243.101.176:8080/job/carbondataprbuilder2.3/11100/ |
.setBindAddress(serverIp) | ||
.setPort(serverPort) | ||
.setProtocol(classOf[ServerInterface]).build | ||
server.start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does start function have return code to judge the status of startting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, return type is void
@@ -2104,4 +2104,17 @@ private CarbonCommonConstants() { | |||
*/ | |||
public static final String CARBON_QUERY_DATAMAP_BLOOM_CACHE_SIZE_DEFAULT_VAL = "512"; | |||
|
|||
} | |||
public static final String CARBON_INDEX_SERVER_POLICY = "carbon.index.server.policy"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please write description for this config in comment, like in what scenario this config is used, why user would config it, what valid values are, whether it is dynamic configurable, etc
|
||
public static final String CARBON_DRIVER_PRUNE_POLICY = "driver"; | ||
|
||
public static final String CARBON_INDEX_SERVER_IP = "carbon.index.server.ip"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please write comment for all new configuration. Same comment we should put to document
@@ -1601,4 +1601,47 @@ private void validateDetailQueryBatchSize() { | |||
} | |||
} | |||
} | |||
} | |||
|
|||
public String getIndexServerPolicy() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add comment for all public function
|
||
public static final String CARBON_DISTRIBUTED_PRUNE_POLICY = "distributed"; | ||
|
||
public static final String CARBON_EMBEDDED_PRUNE_POLICY = "embedded"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
create a "CARBON_INDEX_SERVER_POLICY_DEFAULT" constant
LOGGER.info("The configured value for " + CarbonCommonConstants.CARBON_INDEX_SERVER_POLICY | ||
+ " is not valid, therefore taking " + CarbonCommonConstants.CARBON_EMBEDDED_PRUNE_POLICY | ||
+ " as the index server prune policy"); | ||
return CarbonCommonConstants.CARBON_EMBEDDED_PRUNE_POLICY; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use CARBON_INDEX_SERVER_POLICY_DEFAULT
import org.apache.carbondata.core.indexstore.ExtendedBlocklet | ||
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf | ||
|
||
class DistributedDataMapJob extends AbstractDataMapJob { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this class in spark2 integration module? Can't put in carbondata-core?
import org.apache.carbondata.spark.rdd.CarbonRDD | ||
|
||
|
||
class DistributedPruneRDD(@transient private val ss: SparkSession, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
write comment please
|
||
override def internalCompute(split: Partition, | ||
context: TaskContext): Iterator[(String, ExtendedBlocklet)] = { | ||
Nil.iterator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No implementation?
throw new InvalidConfigurationException("Spark master URL is not set.") | ||
} | ||
val spark = SparkSession | ||
.builder().config(new SparkConf()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move config together
} | ||
|
||
object IndexServer extends ServerInterface { | ||
val prunePolicy: String = CarbonProperties.getInstance().getIndexServerPolicy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use these variables in main directly, no need to declare local variables
} | ||
|
||
def getClient: ServerInterface = { | ||
import org.apache.hadoop.ipc.RPC |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to head
|
||
private val serverPort: Int = CarbonProperties.getInstance().getIndexServerPort | ||
|
||
def getSplits(request: DistributableDataMapFormat): Array[(String, ExtendedBlocklet)] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no override
?
Be sure to do all of the following checklist to help us incorporate
your contribution quickly and easily:
Any interfaces changed?
Any backward compatibility impacted?
Document update required?
Testing done
Please provide details on
- Whether new unit test cases have been added or why no new tests are required?
- How it is tested? Please attach test report.
- Is it a performance related change? Please attach the performance test report.
- Any additional information to help reviewers in testing this change.
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.