Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beam sink to BigQuery failure: Too many sources provided #199

Closed
samanvp opened this issue Apr 20, 2018 · 5 comments · Fixed by #226 or #232
Closed

Beam sink to BigQuery failure: Too many sources provided #199

samanvp opened this issue Apr 20, 2018 · 5 comments · Fixed by #226 or #232

Comments

@samanvp
Copy link
Member

samanvp commented Apr 20, 2018

We observed during multiple runs of VT pipeline whenever we set the --num_workers to a high number (256 or higher), it fails with the following error message:

BigQuery execution failed., Error:
Message: Too many sources provided: xxxxx. Limit is 10000

After communicating with Beam team we were pointed to this temporary solution and we were told:

BQ sink failure to producing too many files (shards) is a known issue ... hoping to add a BQ custom sink to Python SDK which will fix this. Relevant JIRA is https://issues.apache.org/jira/browse/BEAM-2801.

@arostamianfar
Copy link
Contributor

While the workaround works for some cases, it has two issues:

  • It still fails for very large datasets (or when using large number of workers). I tried with 30B records and 500 workers (7TB of data) and it ended up with 17000 sources.
  • It's super slow when there are a large number of records (we'd hit the same issue of merging across many samples).

We should investigate this in more detail. I wonder if we can get around this by partitioning the input PCollection again and then using append when writing to BQ. That way, every partition would (hopefully) have a small number of sources, but they all end up appending to the same table. We dont care about order, so it should be fine. Not sure if it would work in practice though. Should also ask the Dataflow team.

@bashir2
Copy link
Member

bashir2 commented Apr 26, 2018

Out of curiosity, does this also happen if we set --max_num_workers to something that is not too high, such that the total number of workers is bounded by say a few hundred (by worker here I mean number of cores not just machines)?

I think the source of confusion is that we don't know where these "sources" come from, correct? Because I assumed each worker will be one source but it seems that is not the case.

If we can figure what the "fix" that the Beam team is planning to do for this "custom sink" for Python SDK, maybe we can prioritize that to do it ourselves and send a Pull Request.

@samanvp
Copy link
Member Author

samanvp commented Apr 27, 2018

It seems it's independent from the number of workers. Today Allie had a run with only 50 workers and it failed due to the same reason.
Yes, you are right about source of confusion, however, it seems even data-flow team do not fully understand the reason, this is a comment on the submitted bug:
"I suspect this is due to Dataflow liquid sharding creating empty splits for shuffle (even though it can only create 1000 files with valid records). So probably we'll need a custom sink to fully fix the "too many sources" error."

Their fix is to write a custom sink for BQ, again another comment for the bug report:
"to add a BQ custom sink to Python SDK which will fix this. Relevant JIRA is https://issues.apache.org/jira/browse/BEAM-2801."
There is not much info about this fix, so I am not really sure what is the best course of actions.

@bashir2
Copy link
Member

bashir2 commented Apr 27, 2018

Hmm, okay, I think Asha mentioned that this is "fixed" in the Java SDK, @arostamianfar can you provide more context here?

@samanvp, given your last comment, do you think we should either revert PR #197 or reduce its constant (for number of keys)? My concern is that we are adding a GroupByKey step which is turned on by default for "large inputs" (which is expensive) without really fixing anything (Allie is saying that her case fails even with optimize_for_large_inputs).

@arostamianfar
Copy link
Contributor

arostamianfar commented Apr 28, 2018

I looked into this a bit more and I think I have a better idea about what's going on. The BQ sink writes data to GCS files as json and not actually to BQ (the json files are stored under a directory like gs://temp_dir/job-name.1524897592.832255/10033804958436819552/dax-tmp-.../...json).
At the end of the pipeline, there is another process (somewhere?) that runs a bq import job to load these json files to BQ. I don't actually know how these json files are generated and how they're split, but the LimitWrite solution forces exactly n of them to be generated with data. However, for some reason, you still end up with a lot of empty json files. I think an easy workaround for the dataflow team is to just ignore these empty json files when loading them to BQ until BEAM-2801 is fixed. Saman, could you please ask them whether this is an easy to change to implement for now?

Partitioning the data just before write, doing LimitWrite on each partition, and then always appending data to the BQ table could also help with both performance and limiting the num of sources (you'd have n separate BQ write requests rather than a single large one).

RE Java fix: I recall seeing it on some thread. Need to dig it up.

tneymanov added a commit to tneymanov/gcp-variant-transforms that referenced this issue Jun 25, 2019
…form, and remove num_bigquery_write_shards flag.

Previously, issue googlegenomics#199 forced us to use a hack to shard the variants before they are written to BigQuery, which negatively affects the speed of the tool. With the implementation of the new sink, the flag is no longer need.
tneymanov added a commit to tneymanov/gcp-variant-transforms that referenced this issue Jun 25, 2019
…form, and remove num_bigquery_write_shards flag.

Previously, issue googlegenomics#199 forced us to use a hack to shard the variants before they are written to BigQuery, which negatively affects the speed of the tool. With the implementation of the new sink, the flag is no longer need.
tneymanov added a commit to tneymanov/gcp-variant-transforms that referenced this issue Jun 25, 2019
…form, and remove num_bigquery_write_shards flag.

Previously, issue googlegenomics#199 forced us to use a hack to shard the variants before they are written to BigQuery, which negatively affects the speed of the tool. With the implementation of the new sink, the flag is no longer need.
tneymanov added a commit to tneymanov/gcp-variant-transforms that referenced this issue Nov 4, 2019
…form, and remove num_bigquery_write_shards flag.

Previously, issue googlegenomics#199 forced us to use a hack to shard the variants before they are written to BigQuery, which negatively affects the speed of the tool. With the implementation of the new sink, the flag is no longer need.
tneymanov added a commit to tneymanov/gcp-variant-transforms that referenced this issue Nov 6, 2019
…form, and remove num_bigquery_write_shards flag.

Previously, issue googlegenomics#199 forced us to use a hack to shard the variants before they are written to BigQuery, which negatively affects the speed of the tool. With the implementation of the new sink, the flag is no longer need.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
No open projects
3 participants