New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[LIVY-622][LIVY-623][LIVY-624][LIVY-625][Thrift]Support GetFunctions, GetSchemas, GetTables, GetColumns in Livy thrift server #194
Conversation
@@ -427,8 +427,8 @@ abstract class ThriftCLIService(val cliService: LivyCLIService, val serviceName: | |||
override def GetSchemas(req: TGetSchemasReq): TGetSchemasResp = { | |||
val resp = new TGetSchemasResp | |||
try { | |||
val opHandle = cliService.getSchemas( | |||
new SessionHandle(req.getSessionHandle), req.getCatalogName, req.getSchemaName) | |||
val opHandle = cliService.getSchemas(createSessionHandle(req.getSessionHandle), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create a session handle with the real protocol version of this session. The original is using version_v1 as default, which will not pass the requirement when generating the thrift result set. see here
@@ -44,6 +45,40 @@ abstract class MetadataOperation(sessionHandle: SessionHandle, opType: Operation | |||
if (orientation.equals(FetchOrientation.FETCH_FIRST)) { | |||
rowSet.setRowOffset(0) | |||
} | |||
rowSet | |||
rowSet.extractSubset(maxRows) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix metadata resultset is infinite issue.
A new rowSet will be generated which contains subset data. The offset of the original row will be moved.
Unlike spark thrift server, we use spark catalog to fetch the metadata instead of Hive client, to avoid a too strong binding relationship between livy and hive. @mgaido91 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may you please also try and test your patch using Squirrel or similar stuff, in order to ensure that the information is retrieved correctly for the metadata? It would be great to include screenshots in order to show that it is working.
Thanks for your contribution!
thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyCLIService.scala
Outdated
Show resolved
Hide resolved
thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyCLIService.scala
Outdated
Show resolved
Hide resolved
thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyCLIService.scala
Outdated
Show resolved
Hide resolved
thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyCLIService.scala
Outdated
Show resolved
Hide resolved
...erver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetColumnsOperation.scala
Outdated
Show resolved
Hide resolved
for(Row r : rows) { | ||
schemas.add(new Object[]{ | ||
r.getString(0), | ||
"" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this?if it is always empty, makes few sense to return it, doesn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetTablesJob.java
Outdated
Show resolved
Hide resolved
} | ||
} | ||
|
||
public static Integer getColumnSize(org.apache.spark.sql.types.DataType type) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where did you take this and the following methods from?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They're from spark thrift server. Please find the related code here
thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java
Outdated
Show resolved
Hide resolved
thriftserver/session/src/test/java/org/apache/livy/thriftserver/session/ThriftSessionTest.java
Outdated
Show resolved
Hide resolved
I have fixed the comments. @mgaido91 can you start another round of code review, thank! |
thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyCLIService.scala
Outdated
Show resolved
Hide resolved
thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala
Outdated
Show resolved
Hide resolved
...erver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetColumnsOperation.scala
Outdated
Show resolved
Hide resolved
...erver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetColumnsOperation.scala
Outdated
Show resolved
Hide resolved
...erver/server/src/main/scala/org/apache/livy/thriftserver/operation/GetColumnsOperation.scala
Outdated
Show resolved
Hide resolved
...ver/server/src/main/scala/org/apache/livy/thriftserver/operation/SparkCatalogOperation.scala
Outdated
Show resolved
Hide resolved
...ver/server/src/main/scala/org/apache/livy/thriftserver/operation/SparkCatalogOperation.scala
Outdated
Show resolved
Hide resolved
...ver/server/src/main/scala/org/apache/livy/thriftserver/operation/SparkCatalogOperation.scala
Outdated
Show resolved
Hide resolved
thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetColumnsJob.java
Outdated
Show resolved
Hide resolved
...server/session/src/main/java/org/apache/livy/thriftserver/session/FetchCatalogResultJob.java
Show resolved
Hide resolved
Codecov Report
@@ Coverage Diff @@
## master #194 +/- ##
=============================================
+ Coverage 28.33% 68.62% +40.28%
- Complexity 343 912 +569
=============================================
Files 100 100
Lines 5679 5679
Branches 855 855
=============================================
+ Hits 1609 3897 +2288
+ Misses 3739 1224 -2515
- Partials 331 558 +227
Continue to review full report at Codecov.
|
thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyOperationManager.scala
Outdated
Show resolved
Hide resolved
...ver/server/src/main/scala/org/apache/livy/thriftserver/operation/SparkCatalogOperation.scala
Outdated
Show resolved
Hide resolved
thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetSchemasJob.java
Outdated
Show resolved
Hide resolved
thriftserver/session/src/main/java/org/apache/livy/thriftserver/session/GetTablesJob.java
Outdated
Show resolved
Hide resolved
// The initialization need to be lazy in order not to block when the instance is created | ||
protected lazy val rscClient = { | ||
// This call is blocking, we are waiting for the session to be ready. | ||
sessionManager.getLivySession(sessionHandle).client.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we check if client is not null, require(client != null)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From this code, it seems if we cannot get a session, an error will be thrown in livy session manager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just some small issues.
LGTM, merging to master branch, thanks for the contribution. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are still come critical parts in this PR despite it was merged. I'd suggest to continue the discussion and create a followup fixing the remaining issues
GetFunctionsOperation.SCHEMA | ||
} | ||
|
||
private def convertFunctionName(name: String): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, I still don't understand why we need this method. May you explain me?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/** | ||
* MetadataOperation is the base class for operations which do not perform any call on Spark side | ||
* | ||
* @param sessionHandle |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does this mean? I mean, no description at all, the name of the parameters can also be read from the method signature...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me remove it.
val maxRows = maxRowsL.toInt | ||
val results = rscClient.submit(new FetchCatalogResultJob(sessionId, jobId, maxRows)).get() | ||
|
||
val rowSet = ThriftResultSet.apply(getResultSetSchema, protocolVersion) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not ThriftResultSet.apply(results)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
results
is List<Object[]>
import org.apache.livy.Job; | ||
import org.apache.livy.JobContext; | ||
|
||
public class CleanupCatalogResultJob implements Job<Boolean> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why we need this and the new state, instead of reusing the existing one for statements?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, I fetch metadata objects from Spark SessionCatalog
API and construct the result set in the Livy server side. So the schema
and types
of the StatementState
are useless in this case. And Iterator<Row>
is not quite fit the data to send.
Maybe we can change to construct the ResultSet
on SparkCatalogJob
and return it directly to the client on Livy server. This can reduce the number of SparkCatalogOperation
. Is this what you mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can change to construct the ResultSet on SparkCatalogJob
This should be definitely done. We should alway transferResultSet
on the wire since it a compressed representation of the data compared to normal arrays of objects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. I will submit a PR to refactor the code
try { | ||
rscClient.submit(new GetTablesJob( | ||
convertSchemaPattern(schemaName), | ||
convertIdentifierPattern(tableName, datanucleusFormat = true), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure how you decided to put datanucleusFormat
to true
or false
in the various calls, may you explain?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When passing the pattern to Spark SessionCatalog API, e.g. list tables, the datanucleusFormat
is set to true
. The convertPattern
will replace %
with *
. The SessionCatalog require *
wildcard, and it will convert it to .*
internally, please see here.
When filtering the objects by the pattern in the Livy code, the datanucleusFormat
is set to false
. %
will be replaced by .*
as we use regex to filter the names, e.g. list columns. Please see here
…Set in catalog operations ## What changes were proposed in this pull request? This is a followup of #194 which addresses all the remaining concerns. The main changes are: - reverting the introduction of a state specific for catalog operations; - usage of `ResultSet` to send over the wire the data for catalog operations too. ## How was this patch tested? existing modified UTs Author: Marco Gaido <mgaido@apache.org> Closes #217 from mgaido91/LIVY-622_followup.
What changes were proposed in this pull request?
In this patch, we add the implementations of GetSchemas, GetFunctions, GetTables, and GetColumns in Livy Thrift server.
https://issues.apache.org/jira/browse/LIVY-622
https://issues.apache.org/jira/browse/LIVY-623
https://issues.apache.org/jira/browse/LIVY-624
https://issues.apache.org/jira/browse/LIVY-625
How was this patch tested?
Add new unit tests and integration test. Run them with existing tests.