Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question] Does emulator support streaming data with BigQuery Storage Write API? #29

Closed
OfirCohen29 opened this issue Aug 28, 2022 · 11 comments · Fixed by #89
Closed

[Question] Does emulator support streaming data with BigQuery Storage Write API? #29

OfirCohen29 opened this issue Aug 28, 2022 · 11 comments · Fixed by #89
Labels
enhancement New feature or request

Comments

@OfirCohen29
Copy link

OfirCohen29 commented Aug 28, 2022

Hey,
Thanks for creating the bigquery emulator.

Can i use the BigQuery Storage Write API?

https://cloud.google.com/python/docs/reference/bigquerystorage/latest/google.cloud.bigquery_storage_v1.services.big_query_write.BigQueryWriteClient#google_cloud_bigquery_storage_v1_services_big_query_write_BigQueryWriteClient_create_write_stream

I tried to create a write stream in Python for the emulator, and when creating it the execution get stuck.

The emulator was deployed on a docker container

Thanks

@goccy
Copy link
Owner

goccy commented Aug 28, 2022

We don't currently support the streaming api, but we have plans to do so.
However, as written in the README, I think there is a way to not use the streaming api explicitly, so I haven't given it a high priority.
If there is any reason why you cannot avoid using the streaming api, please let me know !

@OfirCohen29
Copy link
Author

OfirCohen29 commented Aug 28, 2022

Thanks for answering

Yes we cannot avoid using the streaming API so it will help a lot if the emulator will support that

Thanks again

@goccy
Copy link
Owner

goccy commented Aug 28, 2022

OK, I see. Could you please provide a simple and reproducible example code here ?
I would like to use it as a reference.

@OfirCohen29
Copy link
Author

OfirCohen29 commented Aug 28, 2022

Yes

Dockerfile (port is set on docker-compose):

FROM ghcr.io/goccy/bigquery-emulator:latest
COPY data/data.yaml ./server/testdata/data.yaml
CMD bigquery-emulator --project=testing --dataset=dataset1 --data-from-yaml=./server/testdata/data.yaml

Python code:


from google.api_core.client_options import ClientOptions
from google.auth.credentials import AnonymousCredentials
from google.cloud.bigquery_storage import BigQueryWriteClient
from google.cloud.bigquery_storage import WriteStream
from google.cloud.bigquery_storage import CreateWriteStreamRequest
from google.cloud.bigquery_storage_v1.types import TableSchema, TableFieldSchema

# connecting with container
client_options = ClientOptions(api_endpoint="http://localhost:5042")
bigqury_write = BigQueryWriteClient(credentials=AnonymousCredentials(), client_options=client_options)

# Configuring Stream

id_field = TableFieldSchema({
    "name": "id",
    # 2 for INTEGER
    "type_": TableFieldSchema.Type(value=2),
    # 2 for REQUIRED
    "mode": TableFieldSchema.Type(value=2)
})

name_field = TableFieldSchema({
    "name": "myname",
    # 1 for STRING
    "type_": TableFieldSchema.Type(value=1),
    # 2 for REQUIRED
    "mode": TableFieldSchema.Type(value=2)
})

table_schema = TableSchema({
    "fields": [id_field, name_field]
})

write_stream = WriteStream({
        "name": "projects/testing/datasets/dataset1/tables/tests/streams/mystream",
        "type": "COMMITTED",
        "table_schema": table_schema,
        # 1 for INSERT
        "write_mode": WriteStream.WriteMode(value=1)
})

request = CreateWriteStreamRequest({
    "parent": "projects/testing/datasets/dataset1/tables/tests",
    "write_stream": write_stream
})


bigqury_write.create_write_stream(request=request)

data.yaml:

projects:
- id: testing
  datasets:
    - id: dataset1
      tables:
        - id: tests
          columns:
            - name: id
              type: INTEGER
            - name: myname
              type: STRING
          data:
            - id: 1
              myname: alice
            - id: 2
              myname: bob

Please write here if something else is needed.

Thanks

@goccy
Copy link
Owner

goccy commented Aug 30, 2022

Thank you for presenting the example.
I was thinking about cases where the streaming api is used implicitly, but for your use case you want to use it explicitly.
I understand it.

@goccy goccy added the enhancement New feature or request label Aug 30, 2022
@andreas-lindfalk
Copy link

I'm using the InsertAll API to stream things into BigQuery (making use of https://github.com/OTA-Insight/bqwriter), and it kind of works against the emulator... there are two things at least that does not work super well:

  1. the emulator hangs for around 15 seconds when the batch is flushed to the emulator
  2. if I have a record field with a repeated record field in it, then the data does not make it into the emulator (the operation times out eventually)

Really amazing work with the emulator btw!

@goccy
Copy link
Owner

goccy commented Nov 9, 2022

I've supported for the Read API for the time being. Please wait a little longer for the Write API. I'm also looking for sponsors. Please consider sponsoring me :)

@goccy
Copy link
Owner

goccy commented Nov 18, 2022

@OfirCohen29 @andreas-dentech @adamszadkowski
Hi, I supported BigQuery Storage API from v0.1.27 . It was a lot of work, and I hope you will take advantage of it. Also, I'd be happy to receive feedback on how it works !

@OfirCohen29
Copy link
Author

Thanks a lot

@adamszadkowski
Copy link

@goccy thank you very much for making this emulator and providing us with such good support :) I have been able to test streaming API with Spark integration using Java BigQuery libraries. Unfortunately I have found some other issues which I would like to share with you. I will note them here, but let me know if you would rather like them in separate issues.

This time also everything which is not working is "documented" in the repository https://github.com/adamszadkowski/bigquery-emulator-issue

Handling of multiple read streams

Unfortunately spark integration with BigQuery requires reading multiple streams.

val rows = sparkSession.read
  .format("bigquery")
  .load(s"$projectId.$datasetId.$tableId")
  .collectAsList()

Code above doesn't work. There is a workaround, when parallelism can be set to 1, but it would require change in
production code.

val rows = sparkSession.read
  .format("bigquery")
  .option("parallelism", 1) // required by bigquery-emulator
  .load(s"$projectId.$datasetId.$tableId")
  .collectAsList()

Even if technically this is possible to change this value - in practice it is very hard, to make that change in every
possible place. Additionally, it might cause some other issues.

Support for partitioned tables

It looks like bigquery-emulator is not adding _PARTITIONDATE and _PARTITIONTIME columns to partitioned tables.
When table is created like this:

service.create(TableInfo.of(
  TableId.of(projectId, datasetId, tableId),
  StandardTableDefinition.newBuilder()
    .setSchema(schema)
    .setTimePartitioning(TimePartitioning.of(DAY))
    .build()))

Spark tries to read additional columns. It can be spotted in bigquery-emulator logs:

2022-12-14T11:08:47.941+0100	INFO	contentdata/repository.go:135		{"query": "SELECT `id`,`otherProp`,`_PARTITIONTIME`,`_PARTITIONDATE` FROM `mytablename` ", "values": []}

In spark on the other hand there is an error passed from bigquery-emulator:

Caused by: com.google.cloud.spark.bigquery.repackaged.io.grpc.StatusRuntimeException: UNKNOWN: failed to analyze: INVALID_ARGUMENT: Unrecognized name: _PARTITIONTIME [at 1:25]
	at com.google.cloud.spark.bigquery.repackaged.io.grpc.Status.asRuntimeException(Status.java:535)
	... 14 more

Problems with streaming write

It looks like there should be default stream for writing. Currently, error is returned:

com.google.api.gax.rpc.UnknownException: io.grpc.StatusRuntimeException: UNKNOWN: failed to append rows: failed to get stream from projects/test/datasets/testingbq/tables/mytablename/_default
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:119)
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:41)

Creating stream before write with below code doesn't work either:

val createWriteStreamRequest = CreateWriteStreamRequest.newBuilder()
	.setParent(TableName.of(projectId, datasetId, tableId).toString())
	.setWriteStream(WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
	.build()

val writeStream = client.createWriteStream(createWriteStreamRequest)

Execution of create stream request for the first time when bigquery-emulator has been started causes error:

com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Project test is not found. Make sure it references valid GCP project that hasn't been deleted.; Project id: test
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:90)
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:41)

Consecutive executions causes test code to hang on timeout after which there is another error:

com.google.api.gax.rpc.DeadlineExceededException: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 1199.955284179s. [closed=[], open=[[buffered_nanos=233286500, buffered_nanos=7866891, remote_addr=localhost/127.0.0.1:9060]]]

	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:94)
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:41)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:86)
	a

After that it is impossible to close gracefully bigquery-emulator. Ctrl + C in console causes hang:

^C[bigquery-emulator] receive interrupt. shutdown gracefully

Only kill -9 pid helps.

@goccy
Copy link
Owner

goccy commented Dec 16, 2022

@adamszadkowski Thank you for your report. However, since this issue is already closed, please create a new issue as a new topic and paste this problem to it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants