Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Proof of concept of a typed BigQuery reader#41

Closed
alexvanboxel wants to merge 1 commit intoGoogleCloudPlatform:masterfrom
alexvanboxel:feature/bq
Closed

Proof of concept of a typed BigQuery reader#41
alexvanboxel wants to merge 1 commit intoGoogleCloudPlatform:masterfrom
alexvanboxel:feature/bq

Conversation

@alexvanboxel
Copy link
Contributor

The is not a Pull Request for merge, but a Request For Comment on a feature I'm developing. I like to know if this is a feature that would be considered to be merged in. It needs some more development but if I know it won't be accepted anyway I will abandon this feature as a "core" merge.

What is the feature: An object mapper for BigQuery. My experience with Dataflow tells me to stop using loosely typed object as soon as possible, so it's best to go types at the border: The reader. So having a build in mapper helps data flow users. (this will come in handy more for ETL type of flows). Example usage:

    PCollection<Match> pcMatch = pipeline
      .apply(BigQueryIO.Read
        .withType(Match.class)
        .named("Foo")
        .from("test.match"))

Note: This is a Proof Of Concept and need development. What needs development:

  • Only support Integer, Float and String. Need more types
  • Maybe use the Jackson Object mapper
  • Can't figure out to Coder working on the Dataflow service. Could use some pointers
  • Need more tests

@alexvanboxel
Copy link
Contributor Author

Additional remark why I implemented this as a core feature:

  • Access to the full BigQuery schema
  • No lose in precision. Especially in dates. Let's say when we support Java 8 dates we will not loose any precision (microseconds i.s.o milliseconds now)

@bjchambers
Copy link
Contributor

Hi Alex! Thanks for the pull request! Sorry we didn't get back to you sooner.

This is a feature we'd like to have as well. I need to look at the code and have a few conversations with people here before I can comment on the approach, but I'm hoping I can respond with more details and next-steps by the end of the week.

@alexvanboxel
Copy link
Contributor Author

No problem. Note that it's a proof of concept and welcome any feedback. I'll be happy to incorporate that into my next iteration.

@bjchambers
Copy link
Contributor

Hi Alex!

As I said earlier we definitely want to support a typed BigQuery API. We have a few suggestions for the general direction and some comments on the details, but we’d like to help you move forward with this and get it checked in.

Our biggest concern is that this is hard-coding a single way of converting a TableRow to a type. We’d suggest adding an interface like the following and allowing the user to specify that:

interface BigQueryRowParser<T> extends Serializable {
  T parse(Schema schema, TableRow row);
  TypeDescriptor<T> getTypeDescriptor();
}

There would be a default BigQueryRowParser<TableRow> which is essentially the identity function, and corresponds to the existing behavior. Your proposed reflective mapping could be a predefined BigQueryRowParser that took the Class<T> in the constructor. The parer could be set on the Bound class using a withParser(BigQueryRowParser<T>) method.

The specific parser will be serialized as part of the Bound instance, and will specify how to do the type conversion.

Some additional suggestions:

  1. Rather than defaulting to SerializableCoder<T>, this should use use the standard approach from elsewhere in the SDK. It can use the CoderRegistry to attempt to find the coder from the BigQueryRowParser#getTypeDescriptor(). When that doesn’t work, the user can still call setCoder on the PCollection after applying the read. You can see an example in ParDo
  2. You may want to look at using Jackson and associated annotations to automate the TableRow to T conversion for the reflective case. This would allow for more control over how the TableRow is decoded without needing to implement a specific BigQueryRowParser.

There will likely be more comments on the actual implementation, but hopefully the above can get you moving forward.

@alexvanboxel
Copy link
Contributor Author

Thanks, I will certainly try to incorporate the feedback. And I needed the feedback to continue to work on the implementation. A question though. Every new feedback, do you want me to squash the commit into one. Or keep the original commit and work onto of it?

@bjchambers
Copy link
Contributor

Hi Alex. As far as the squashing or not goes, its partly whats easiest for you. From a review perspective, it may make sense for you to incorporate the feedback and squash all that into a single commit that we can review, and then for each round of review to be a separate commit. This would allow us to review the incremental changes between each round.

@dhalperi
Copy link
Contributor

Hi Alex,

Any new thoughts or updates on the revised implementation?

Thanks

@alexvanboxel
Copy link
Contributor Author

Not yet, as I'm writing this in my free time I'm currently have no time. I'm focusing at preparing for the Devoxx conference (second week of November), I'm part of the steering. After that I'll pick this up again.

I tried rebasing my branch though and the merge conflicts where so high it's probably better to start from scratch with the comments.

@dhalperi
Copy link
Contributor

dhalperi commented Nov 5, 2015

Great, no problem. Take your time!

@alexvanboxel
Copy link
Contributor Author

Note: Because the SDK has been deviated so far and there where a lot of changes requested in the review I started a new branch.

I'm currently far along with implementing the feature, but I stumbled along what I think is kind of a blocker. I did implement as separate input parser, the types are inferred by the default registry (all as suggested by @bjchambers).

But as the time came to do a test run on the DataFlow service I got the following error (and indeed I added a new Property: input_parser to provide a hint to the service of the parser that is used.

Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "(382397c2ce191a97): Error processing workflow. Causes: (382397c2ce191eb8): Unrecognized input input_parser for step s1.",
    "reason" : "badRequest"
  } ],
  "message" : "(382397c2ce191a97): Error processing workflow. Causes: (382397c2ce191eb8): Unrecognized input input_parser for step s1.",
  "status" : "INVALID_ARGUMENT"
}

In the BigQueryReaderFactory the input_parser property is again picked up to de-serialize the parser.

But I assume that it's not allowed to create new properties... am I right?

@davorbonaci
Copy link
Contributor

Indeed, the Dataflow Service performs a strict check on the properties passed in. It is not allowed to create new properties on the fly.

Normally, this is not an issue because such properties can be passed through as a part of the serialized object. In this specific case, however, since BigQueryIO is currently implemented as a "native source", it doesn't have a corresponding serialized object to piggy back on.

A way to solve this problem is to generate two steps under the hood. BigQueryIO translation can generate a standard BigQuery read operation, which will return PCollection<TableRow>, and optionally a ParDo that does a conversion from TableRow to the specific type if .withParser(BigQueryRowParser<T>) is specified. That ParDo should be able to carry all information that is needed.

That said, adding new translation fields is not out of the question when there's a specific need. It is just a process that takes a while to complete.

@alexvanboxel
Copy link
Contributor Author

I was afraid of that. I will implement the workaround with the ParDo step so I can continue and test it on the service, but keep the implementation with the Parser pushed do right down to the BigQueryReader (for reviewing).

Thanks.

lukecwik pushed a commit that referenced this pull request Jan 15, 2016
This is a partial revert of commits f5e3b8e and 18c82ad.

When running a batch Dataflow job on Cloud Dataflow service, the data
are produced by running a BigQuery export job and then reading all the
files in parallel. When run in the DirectPipelineRunner, BigQuery's JSON
API is used directly. These data come back in different formats.

To compensate, we use BigQueryTableRowIterator to normalize the behavior in
DirectPipelineRunner to the behavior seen when running on the service.
  (We cannot change this decision without a major breaking change.)

This patch fixes some discrepancies in the way that BigQueryTableRowIterator is
implemented. Specifically,

*) In commit 18c82ad (response to issue #20) we updated the format of
timestamps to be printed as strings. However, we did not correctly match the
behavior of BigQuery export. Here is a sample set of times from the export job
vs the JSON API.

2016-01-06 06:38:00 UTC		1.45206228E9
2016-01-06 06:38:11 UTC		1.452062291E9
2016-01-06 06:38:11.1 UTC	1.4520622911E9
2016-01-06 06:38:11.12 UTC	1.45206229112E9
2016-01-06 06:38:11.123 UTC	1.452062291123E9   *
2016-01-06 06:38:11.1234 UTC	1.4520622911234E9
2016-01-06 06:38:11.12345 UTC	1.45206229112345E9
2016-01-06 06:38:11.123456 UTC	1.452062291123456E9

Before, only the * test would have passed.

*) In commit f5e3b8e we updated TableRow iterator to preserve the
usual TableRow field `f` corresponding to getF(), which returns a
list of fields in Schema order. This was my mistaken attempt to better support
users who have prior experience with BigQuery's API and expect to use
getF()/getV(). However, there were two issues:
  1. this change did not affect the behavior in the DataflowPipelineRunner.
  2. this was actually a breaking backwards-incompatible change, because common
     downstream DoFns may iterate over the keys of the TableRow, and it added
     the field "f".
So we should not propagate the change to DataflowPipelineRunner, but instead we
should revert the change to BigQueryTableRowIterator.
  (Note this is also a slightly-backwards-incompatible change, but it's
  reverting to old behavior and users are more likely to be depending on
  DataflowPipelineRunner rather than DirectPipelineRunner.)

Fix both these issues and add tests.

This is still ugly for now. The long-term fix here is to support a parser that
lets users skip TableRow altogether and goes straight to POJOs of their
choosing (See #41). That would also eliminate our performance and typing issues
using TableRow as an inner type in pipelines (See e.g.
http://stackoverflow.com/questions/33622227/dataflow-mixing-integer-long-types).

----Release Notes----
[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=111746236
robertwb pushed a commit to robertwb/DataflowJavaSDK that referenced this pull request Feb 19, 2016
This is a partial revert of commits f5e3b8e and 18c82ad.

When running a batch Dataflow job on Cloud Dataflow service, the data
are produced by running a BigQuery export job and then reading all the
files in parallel. When run in the DirectPipelineRunner, BigQuery's JSON
API is used directly. These data come back in different formats.

To compensate, we use BigQueryTableRowIterator to normalize the behavior in
DirectPipelineRunner to the behavior seen when running on the service.
  (We cannot change this decision without a major breaking change.)

This patch fixes some discrepancies in the way that BigQueryTableRowIterator is
implemented. Specifically,

*) In commit 18c82ad (response to issue GoogleCloudPlatform#20) we updated the format of
timestamps to be printed as strings. However, we did not correctly match the
behavior of BigQuery export. Here is a sample set of times from the export job
vs the JSON API.

2016-01-06 06:38:00 UTC		1.45206228E9
2016-01-06 06:38:11 UTC		1.452062291E9
2016-01-06 06:38:11.1 UTC	1.4520622911E9
2016-01-06 06:38:11.12 UTC	1.45206229112E9
2016-01-06 06:38:11.123 UTC	1.452062291123E9   *
2016-01-06 06:38:11.1234 UTC	1.4520622911234E9
2016-01-06 06:38:11.12345 UTC	1.45206229112345E9
2016-01-06 06:38:11.123456 UTC	1.452062291123456E9

Before, only the * test would have passed.

*) In commit f5e3b8e we updated TableRow iterator to preserve the
usual TableRow field `f` corresponding to getF(), which returns a
list of fields in Schema order. This was my mistaken attempt to better support
users who have prior experience with BigQuery's API and expect to use
getF()/getV(). However, there were two issues:
  1. this change did not affect the behavior in the DataflowPipelineRunner.
  2. this was actually a breaking backwards-incompatible change, because common
     downstream DoFns may iterate over the keys of the TableRow, and it added
     the field "f".
So we should not propagate the change to DataflowPipelineRunner, but instead we
should revert the change to BigQueryTableRowIterator.
  (Note this is also a slightly-backwards-incompatible change, but it's
  reverting to old behavior and users are more likely to be depending on
  DataflowPipelineRunner rather than DirectPipelineRunner.)

Fix both these issues and add tests.

This is still ugly for now. The long-term fix here is to support a parser that
lets users skip TableRow altogether and goes straight to POJOs of their
choosing (See GoogleCloudPlatform#41). That would also eliminate our performance and typing issues
using TableRow as an inner type in pipelines (See e.g.
http://stackoverflow.com/questions/33622227/dataflow-mixing-integer-long-types).

----Release Notes----
[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=111746236
@dhalperi
Copy link
Contributor

Closing as obsolete. A cool idea that I hope will make it into Dataflow Java SDK 2.0 based on Apache Beam

@dhalperi dhalperi closed this Apr 10, 2017
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants