-
Notifications
You must be signed in to change notification settings - Fork 647
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BigQuery: Use Unmarshalling API to handle parsing #2533
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea. I'm afraid the use of type classes might make the API a little more cumbersome for new users, especially since multiple unmarshallers are required to support the intermediate need to deserialize results to access pageToken
and jobIds
(as you mention here #2353 (comment)), but if the need for this is explained clearly in the docs then I don't see a problem.
We're planning an Alpakka 3.0 soon, so breaking API changes may be alright. This is also a new connector with limited users. @ennru WDYT of rewriting this API? Do we need to support both for a time?
Thanks for your review and support! Alpakka BigQuery is currently marked as "API may change" so I was under the impression that this would be okay. However, it is straightforward to continue to support the current API that uses the
If new users are encouraged to use the default Spray API, then an import from I agree that Java users will find this new API much more cumbersome. |
That's true. There should be no problem, but I'm leaning towards supporting both.
I think it would be a good idea to support both APIs. We can keep the docs for this connector the same, but add a new section that discusses using the new API for custom serdes. |
Makes sense, I restored the old API. However, I still think the docs should be updated with the new API. The new API is not just for custom serdes, it reduces boilerplate code (at least for Scala users). It is also more in line with the Akka HTTP API which is entirely based on marshaling. Consider the following example from the docs. Old API: case class User(uid: String, name: String)
implicit val userFormatter = jsonFormat2(User)
def parserFn(result: JsObject): Try[User] = Try(result.convertTo[User])
val userStream =
GoogleBigQuerySource
.runQuery[User]("SELECT uid, name FROM bigQueryDatasetName.myTable", parserFn, BigQueryCallbacks.ignore, config) New API: import akka.stream.alpakka.googlecloud.bigquery.scaladsl.SprayJsonSupport._
case class User(uid: String, name: String)
implicit val userFormatter = jsonFormat2(User)
val userStream =
GoogleBigQuerySource[User]
.runQuery("SELECT uid, name FROM bigQueryDatasetName.myTable", BigQueryCallbacks.ignore, config) Thanks again for your feedback! |
That's true. Ok, let's use it as the default example for Scala, but use the old API for Java. I think we should still have a custom serdes section where you can show using something custom, like your Circe use case. |
Thanks, I'm working on updating the docs now. However, while doing so I realized that the current docs are wrong/misleading. Consider this example from the current documentation. case class User(uid: String, name: String)
implicit val userFormatter = jsonFormat2(User)
def parserFn(result: JsObject): Try[User] = Try(result.convertTo[User])
val userStream =
GoogleBigQuerySource
.runQuery[User]("SELECT uid, name FROM bigQueryDatasetName.myTable", parserFn, BigQueryCallbacks.ignore, config) The documentation implies that by simply providing an parser for
I.e., the Actually, I prefer the API described in the docs: in most cases, I just want a Not sure whether fixing any of this is in-scope for my PR, especially since it warrants a bigger discussion. I was planning to demonstrate the custom serdes by reusing the |
I agree that parsing If it's not too much trouble to update the existing docs to match the existing behaviour so that you may add your new section then that's probably the best path forward for now. You can make a follow up PR if you wish. This problem is a consequence of having docs examples that don't actually run. Ideally the snippet would run as part of the test suite. |
Thanks as always for your fast responses!
I ended up making this change as it was closely intertwined with my parsing implementation. I've now updated the docs with an expanded section on parsing and an example using Circe. For this purpose I added Circe as a test dependency; hope this is okay.
I agree, should be possible to setup with Hoverfly API simulator like the end-to-end spec? Hopefully, no further major changes/additions to my PR and only minor fixes from here. Thanks again for all of your helpful feedback! |
I take it back, there is one more thing that I think should be fixed: the fact that the user-provided parser has to be "bulletproof" or else risk creating a Source that is stuck in an infinite loop makes for a poor API experience that is difficult to debug. Even with parsers 100% automatically derived from case classes there can still be simple user errors if the case class doesn't perfectly match the query. @tg44, I see that you authored the commit that first mentions this. Do you mind clarifying why this is necessary: is it in fact true that the Google BigQuery API can return a blank response with no error code/message? Is this documented anywhere? Thanks in advance. |
It was more than 2.5 years ago when I wrote this, so my memories and the underlying infra could have changed till then. As I remember BQ returns with pages of data. And it is doing some map-reduce like thing in the background. So you start a query, and BQ starts the inner data gathering, you want to get a page, and you only get rows back for that page if a full page of rows is gathered already. This means that if your data is big enough and your query is slow enough and your data is not evenly distributed, and you consume returned data fast enough, you can get back "empty" responses, which translated to "data is not ready, pls come back later". In most cases, you won't see this with small databases. (I think a suitable test-case would be millions of lines where only a really-really small percentage matches the "where" case. But probably we should reread the docs to be sure.) I hoped we have tests for the observed behavior, but I only found these tests and on the usage side with a fully described QueryResponse hierarchy. The catch in the "original" repository was that we haven't done any serious deserialization, because our use-case was to transform multiple types of databases to a unified API. Also, we needed the metadata from the requests for whatever reasons that I totally forget :D (BTW as I know the linked "original" repo is in production in the last 2 years or so, so at least it works!) As I remember I refactored some code when those moved here, but the two implementations should mostly identical. If you have any other questions, I will try to remember harder, and I hope my WOT helped at least a bit. |
Thanks so much for the fast response @tg44! This was really, really helpful, I appreciate you thinking back a couple years. This makes a lot of sense, the main clarification I'm looking for is what exactly "empty" means? Literally an empty response with no body, or e.g. a JSON object with no "rows" key? Thanks! |
Based o the code and my memories, I think the rows are not present in this case. This is somewhat in align with the documentation where we have a note, namely; |
Ok, thanks, that was my understanding too. So then it is totally possible to check/guard for this without relying on the user-provided parser to be "bulletproof". |
I just checked the PR. I think it's mostly good, only one thing bothers me, which was not documented before, and now I understand why it was not easy to use for others :) "What is the json/string that we need to provide a parser to?" From now on I could be totally wrong, and if that happened sorry for that, I still didn't try the connector current state against a real endpoint. Also, I have old memories and I just browsed the code and the BQ documentation for validating my beliefs. I think the old implementation wanted a parser for the whole ResponseBody. I think your implementation does this too (based on the java tests), but the scala example is not really doing this (it uses the response rows as I understand the code which is a BUT my main concern is;
Which means none of the provided auto derived parsers will actually parse any of the data from the real service (or at least I didn't found code that flattens the schema+rows to a case-class). I found an example response which exactly describes why we let the user parse the data without implementing a handy parser :D That response is ugly as hell... Also it uses strings everywhere... So for correctly parsing this shitstorm, you need to properly parse the schema, then parse the rows one-by-one and - based on the schema - you need to convert the string values to the correct primitives. At this point you have two options; generate a new json where the auto derived decoder will work (a normal one with actual key-values :D ), or you somehow force the user to give you a member from a new typeclass which can detect the fields, check the types, and wraps them to a requested type... (We chose the second when we wanted a I think we should add an e2e like test with real requests and responses to mock the whole flow. Probably the documentation was unusable before, but my spider-sense screams that we will break a hard to use but working code with this modification. EDIT; I found a post in my opened tabs about nested results just to push this even further :D |
Thanks for your review! Appreciate your feedback on this.
This is true.
Yes and no. See my comment above and @seglo's response. Basically, what we decided was that
Is this the flattening code you are looking for?
There is already an end-to-end test present that appears to be based on a real request and response flow. Hope this all makes sense! |
Nope; If you check the example response. The rows has a structure of; "rows": [
{
"f": [
{
"v": "3290275"
},
{
"v": "5l"
},
{
"v": "5l"
},
{
"v": "1322575099"
},
{
"v": "1.322575099E9"
},
...
]
},
...
] I tried to find the code which transforms this f/v thing to a json which can be parsed by the auto derived decoder. Also it is possible that the BQ documentation is somewhat outdated and you can get back the results as a "rows": [
{"id": 3290275, "by": "5l", "author": "5l", "time": 1322575099, "time_ts": 1.322575099E9, ...},
...
] format with a flag/param, but I hasn't found that either. |
Aha, thanks for clarifying, totally my bad ... I didn't realize the rows were encoded like this. That is ugly! Good catch, thank you. As it stands, this doesn't break my PR, but it does mean we can't take advantage of Spray's automatic JSON-parser-from-case-class derivation. Users would have to implement their own parsers. You are right that the examples would not work. However, I believe this is very fixable. Currently, we use Thanks again for catching this! Really appreciate your review. |
Just pushed a fix for this: I've added an additional test to the e2e spec that demonstrates correct unmarshalling of a real API response into a case class with an automatically derived parser. All that remains is to update the docs to explain this new API. Thanks again for your help with this @tg44! |
String uid = ((JsString) f.elements().apply(0).asJsObject().fields().apply("v")).value(); | ||
String name = ((JsString) f.elements().apply(1).asJsObject().fields().apply("v")).value(); | ||
return new User(uid, name); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should consider providing a different default JSON implementation for the Java API, e.g. Jackson. Spray does not have a Java API and is painful to use as this example illustrates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would recommend doing this in a follow up PR, if you're interested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a heroic work! Some minor concerns below.
(And I totally agree that this response that google gives back is ugly...)
When calling the `GoogleBigQuerySource.raw` method, your parser is passed the [entire response body](https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/getQueryResults#response-body). | ||
When calling the `GoogleBigQuerySource.runQuery` method, your parser is passed one-by-one each entry in the `rows` key of the response. | ||
Note that these entries are represented as "a series of JSON f,v objects for indicating fields and values" ([reference documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/getQueryResults#body.GetQueryResultsResponse.FIELDS.rows)). | ||
@scala[To correctly generate a Spray `JsonFormat[T]` for your case class `T`, you must `import akka.stream.alpakka.googlecloud.bigquery.scaladsl.BigQueryJsonProtocol._` and use the provided `bigQueryJsonFormatN(T)` methods as a drop-in replacement for the usual `jsonFormatN(T)`. Furthermore, the order and presence of parameters in `T` is required to strictly match the query.] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure, that BQ can't mess up the param order. I hope they didn't go that far, but would be good to check it...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You raise an important point; I'll look into it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This stackoverflow answer suggests that it is "pretty safe" to assume this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope so. My only concern is that there is a "schema" field that returns the explicit return order. I hope that they don't mess up things for sport :D
...rc/main/scala/akka/stream/alpakka/googlecloud/bigquery/scaladsl/BigQueryProductFormats.scala
Outdated
Show resolved
Hide resolved
Thanks a lot for jumping in @tg44. Do you think there's anything else that needs to be addressed? |
@seglo I didn't found anything else. I think it is a great work overall, and I'm glad that others could build something more useful/user-friendly on the top of our "minimal" code :D You can always mention me under BQ PRs or issues, I'm happy to help if I can! |
Thanks, I appreciate the kind words! FYI I am planning to follow up with another PR implementing Sinks for streaming/loading data into BigQuery, if you have any experience with that. |
I think we never needed to load data to BQ. When we needed that feature (tests) we simply inserted it with SQL like insert into. |
Great. Thanks for the offer @tg44 :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for all the work @armanbilge and thank you again @tg44 for your reviews and guidance.
Awesome! Thanks for merging and to everyone for all of your reviews and feedback. |
References #2353
My attempt to support arbitrary JSON parsers by using the Akka HTTP unmarshaling API for
GoogleBigQuerySource
, as discussed in #2353 (cc @ennru, @mrooding).To implement this, the major changes I made are:
parserFn: JsObject => Try[T]
argument fromGoogleBigQuerySource
, add a type parameterJ
for the intermediate JSON representation (e.g.JsValue
in Spray), and require three implicit unmarshallers:jsonUnmarshaller: FromEntityUnmarshaller[J]
responseUnmarshaller: Unmarshaller[J, BigQueryJsonProtocol.Response]
unmarshaller: Unmarshaller[J, T]
impl.parser.Parser.ParserJsonProtocol
as the publicBigQueryJsonProtocol
. This is so the user can provide an unmarshaller forBigQueryJsonProtocol.Response
which is used internally by theGoogleBigQuerySource
implementation.scaladsl.SprayJsonSupport._
. Similarly, a utility classjavadsl.SprayJsonSupport
provides Java users with the necessary unmarshallers.I've updated the tests/examples to demonstrate my new API. For the Scala API, it reduces boilerplate code by removing the need to manually implement the
parserFn
and instead implicitly deriving the necessary unmarshaller from the in-scope SprayJsonFormat
.Remaining work
Future
s andmapAsync
stages to the graph inimpl.parser.Parser
. The overhead can probably be reduced by judicial use ofExecutionContext.parisitic
FastFuture
.Thanks for your consideration and looking forward to your feedback.