-
Notifications
You must be signed in to change notification settings - Fork 85
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
917e8ae
commit f142827
Showing
4 changed files
with
463 additions
and
0 deletions.
There are no files selected for viewing
187 changes: 187 additions & 0 deletions
187
samples/snippets/src/main/java/com/example/bigquerystorage/StorageArrowSample.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,187 @@ | ||
/* | ||
* Copyright 2019 Google LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.example.bigquerystorage; | ||
|
||
// [START bigquerystorage_arrow_quickstart] | ||
|
||
import com.google.api.gax.rpc.ServerStream; | ||
import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch; | ||
import com.google.cloud.bigquery.storage.v1.ArrowSchema; | ||
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient; | ||
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; | ||
import com.google.cloud.bigquery.storage.v1.DataFormat; | ||
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; | ||
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; | ||
import com.google.cloud.bigquery.storage.v1.ReadSession; | ||
import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers; | ||
import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions; | ||
import com.google.common.base.Preconditions; | ||
import com.google.protobuf.Timestamp; | ||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import org.apache.arrow.memory.BufferAllocator; | ||
import org.apache.arrow.memory.RootAllocator; | ||
import org.apache.arrow.vector.FieldVector; | ||
import org.apache.arrow.vector.VectorLoader; | ||
import org.apache.arrow.vector.VectorSchemaRoot; | ||
import org.apache.arrow.vector.ipc.ReadChannel; | ||
import org.apache.arrow.vector.ipc.message.MessageSerializer; | ||
import org.apache.arrow.vector.types.pojo.Field; | ||
import org.apache.arrow.vector.types.pojo.Schema; | ||
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel; | ||
|
||
public class StorageArrowSample { | ||
|
||
/* | ||
* SimpleRowReader handles deserialization of the Apache Arrow-encoded row batches transmitted | ||
* from the storage API using a generic datum decoder. | ||
*/ | ||
private static class SimpleRowReader implements AutoCloseable { | ||
|
||
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); | ||
|
||
// Decoder object will be reused to avoid re-allocation and too much garbage collection. | ||
private final VectorSchemaRoot root; | ||
private final VectorLoader loader; | ||
|
||
public SimpleRowReader(ArrowSchema arrowSchema) throws IOException { | ||
Schema schema = | ||
MessageSerializer.deserializeSchema( | ||
new ReadChannel( | ||
new ByteArrayReadableSeekableByteChannel( | ||
arrowSchema.getSerializedSchema().toByteArray()))); | ||
Preconditions.checkNotNull(schema); | ||
List<FieldVector> vectors = new ArrayList<>(); | ||
for (Field field : schema.getFields()) { | ||
vectors.add(field.createVector(allocator)); | ||
} | ||
root = new VectorSchemaRoot(vectors); | ||
loader = new VectorLoader(root); | ||
} | ||
|
||
/** | ||
* Sample method for processing Arrow data which only validates decoding. | ||
* | ||
* @param batch object returned from the ReadRowsResponse. | ||
*/ | ||
public void processRows(ArrowRecordBatch batch) throws IOException { | ||
org.apache.arrow.vector.ipc.message.ArrowRecordBatch deserializedBatch = | ||
MessageSerializer.deserializeRecordBatch( | ||
new ReadChannel( | ||
new ByteArrayReadableSeekableByteChannel( | ||
batch.getSerializedRecordBatch().toByteArray())), | ||
allocator); | ||
|
||
loader.load(deserializedBatch); | ||
// Release buffers from batch (they are still held in the vectors in root). | ||
deserializedBatch.close(); | ||
System.out.println(root.contentToTSVString()); | ||
// Release buffers from vectors in root. | ||
root.clear(); | ||
} | ||
|
||
@Override | ||
public void close() { | ||
root.close(); | ||
allocator.close(); | ||
} | ||
} | ||
|
||
public static void main(String... args) throws Exception { | ||
// Sets your Google Cloud Platform project ID. | ||
// String projectId = "YOUR_PROJECT_ID"; | ||
String projectId = args[0]; | ||
Integer snapshotMillis = null; | ||
if (args.length > 1) { | ||
snapshotMillis = Integer.parseInt(args[1]); | ||
} | ||
|
||
try (BigQueryReadClient client = BigQueryReadClient.create()) { | ||
String parent = String.format("projects/%s", projectId); | ||
|
||
// This example uses baby name data from the public datasets. | ||
String srcTable = | ||
String.format( | ||
"projects/%s/datasets/%s/tables/%s", | ||
"bigquery-public-data", "usa_names", "usa_1910_current"); | ||
|
||
// We specify the columns to be projected by adding them to the selected fields, | ||
// and set a simple filter to restrict which rows are transmitted. | ||
TableReadOptions options = | ||
TableReadOptions.newBuilder() | ||
.addSelectedFields("name") | ||
.addSelectedFields("number") | ||
.addSelectedFields("state") | ||
.setRowRestriction("state = \"WA\"") | ||
.build(); | ||
|
||
// Start specifying the read session we want created. | ||
ReadSession.Builder sessionBuilder = | ||
ReadSession.newBuilder() | ||
.setTable(srcTable) | ||
// This API can also deliver data serialized in Apache Avro format. | ||
// This example leverages Apache Arrow. | ||
.setDataFormat(DataFormat.ARROW) | ||
.setReadOptions(options); | ||
|
||
// Optionally specify the snapshot time. When unspecified, snapshot time is "now". | ||
if (snapshotMillis != null) { | ||
Timestamp t = | ||
Timestamp.newBuilder() | ||
.setSeconds(snapshotMillis / 1000) | ||
.setNanos((int) ((snapshotMillis % 1000) * 1000000)) | ||
.build(); | ||
TableModifiers modifiers = TableModifiers.newBuilder().setSnapshotTime(t).build(); | ||
sessionBuilder.setTableModifiers(modifiers); | ||
} | ||
|
||
// Begin building the session creation request. | ||
CreateReadSessionRequest.Builder builder = | ||
CreateReadSessionRequest.newBuilder() | ||
.setParent(parent) | ||
.setReadSession(sessionBuilder) | ||
.setMaxStreamCount(1); | ||
|
||
ReadSession session = client.createReadSession(builder.build()); | ||
// Setup a simple reader and start a read session. | ||
try (SimpleRowReader reader = new SimpleRowReader(session.getArrowSchema())) { | ||
|
||
// Assert that there are streams available in the session. An empty table may not have | ||
// data available. If no sessions are available for an anonymous (cached) table, consider | ||
// writing results of a query to a named table rather than consuming cached results | ||
// directly. | ||
Preconditions.checkState(session.getStreamsCount() > 0); | ||
|
||
// Use the first stream to perform reading. | ||
String streamName = session.getStreams(0).getName(); | ||
|
||
ReadRowsRequest readRowsRequest = | ||
ReadRowsRequest.newBuilder().setReadStream(streamName).build(); | ||
|
||
// Process each block of rows as they arrive and decode using our simple row reader. | ||
ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest); | ||
for (ReadRowsResponse response : stream) { | ||
Preconditions.checkState(response.hasArrowRecordBatch()); | ||
reader.processRows(response.getArrowRecordBatch()); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
// [END bigquerystorage_arrow_quickstart] |
160 changes: 160 additions & 0 deletions
160
samples/snippets/src/main/java/com/example/bigquerystorage/StorageSample.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
/* | ||
* Copyright 2019 Google LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.example.bigquerystorage; | ||
|
||
// [START bigquerystorage_quickstart] | ||
import com.google.api.gax.rpc.ServerStream; | ||
import com.google.cloud.bigquery.storage.v1.AvroRows; | ||
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient; | ||
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; | ||
import com.google.cloud.bigquery.storage.v1.DataFormat; | ||
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; | ||
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; | ||
import com.google.cloud.bigquery.storage.v1.ReadSession; | ||
import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers; | ||
import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions; | ||
import com.google.common.base.Preconditions; | ||
import com.google.protobuf.Timestamp; | ||
import java.io.IOException; | ||
import org.apache.avro.Schema; | ||
import org.apache.avro.generic.GenericDatumReader; | ||
import org.apache.avro.generic.GenericRecord; | ||
import org.apache.avro.io.BinaryDecoder; | ||
import org.apache.avro.io.DatumReader; | ||
import org.apache.avro.io.DecoderFactory; | ||
|
||
public class StorageSample { | ||
|
||
/* | ||
* SimpleRowReader handles deserialization of the Avro-encoded row blocks transmitted | ||
* from the storage API using a generic datum decoder. | ||
*/ | ||
private static class SimpleRowReader { | ||
|
||
private final DatumReader<GenericRecord> datumReader; | ||
|
||
// Decoder object will be reused to avoid re-allocation and too much garbage collection. | ||
private BinaryDecoder decoder = null; | ||
|
||
// GenericRecord object will be reused. | ||
private GenericRecord row = null; | ||
|
||
public SimpleRowReader(Schema schema) { | ||
Preconditions.checkNotNull(schema); | ||
datumReader = new GenericDatumReader<>(schema); | ||
} | ||
|
||
/** | ||
* Sample method for processing AVRO rows which only validates decoding. | ||
* | ||
* @param avroRows object returned from the ReadRowsResponse. | ||
*/ | ||
public void processRows(AvroRows avroRows) throws IOException { | ||
decoder = | ||
DecoderFactory.get() | ||
.binaryDecoder(avroRows.getSerializedBinaryRows().toByteArray(), decoder); | ||
|
||
while (!decoder.isEnd()) { | ||
// Reusing object row | ||
row = datumReader.read(row, decoder); | ||
System.out.println(row.toString()); | ||
} | ||
} | ||
} | ||
|
||
public static void main(String... args) throws Exception { | ||
// Sets your Google Cloud Platform project ID. | ||
// String projectId = "YOUR_PROJECT_ID"; | ||
String projectId = args[0]; | ||
Integer snapshotMillis = null; | ||
if (args.length > 1) { | ||
snapshotMillis = Integer.parseInt(args[1]); | ||
} | ||
|
||
try (BigQueryReadClient client = BigQueryReadClient.create()) { | ||
String parent = String.format("projects/%s", projectId); | ||
|
||
// This example uses baby name data from the public datasets. | ||
String srcTable = | ||
String.format( | ||
"projects/%s/datasets/%s/tables/%s", | ||
"bigquery-public-data", "usa_names", "usa_1910_current"); | ||
|
||
// We specify the columns to be projected by adding them to the selected fields, | ||
// and set a simple filter to restrict which rows are transmitted. | ||
TableReadOptions options = | ||
TableReadOptions.newBuilder() | ||
.addSelectedFields("name") | ||
.addSelectedFields("number") | ||
.addSelectedFields("state") | ||
.setRowRestriction("state = \"WA\"") | ||
.build(); | ||
|
||
// Start specifying the read session we want created. | ||
ReadSession.Builder sessionBuilder = | ||
ReadSession.newBuilder() | ||
.setTable(srcTable) | ||
// This API can also deliver data serialized in Apache Avro format. | ||
// This example leverages Apache Avro. | ||
.setDataFormat(DataFormat.AVRO) | ||
.setReadOptions(options); | ||
|
||
// Optionally specify the snapshot time. When unspecified, snapshot time is "now". | ||
if (snapshotMillis != null) { | ||
Timestamp t = | ||
Timestamp.newBuilder() | ||
.setSeconds(snapshotMillis / 1000) | ||
.setNanos((int) ((snapshotMillis % 1000) * 1000000)) | ||
.build(); | ||
TableModifiers modifiers = TableModifiers.newBuilder().setSnapshotTime(t).build(); | ||
sessionBuilder.setTableModifiers(modifiers); | ||
} | ||
|
||
// Begin building the session creation request. | ||
CreateReadSessionRequest.Builder builder = | ||
CreateReadSessionRequest.newBuilder() | ||
.setParent(parent) | ||
.setReadSession(sessionBuilder) | ||
.setMaxStreamCount(1); | ||
|
||
// Request the session creation. | ||
ReadSession session = client.createReadSession(builder.build()); | ||
|
||
SimpleRowReader reader = | ||
new SimpleRowReader(new Schema.Parser().parse(session.getAvroSchema().getSchema())); | ||
|
||
// Assert that there are streams available in the session. An empty table may not have | ||
// data available. If no sessions are available for an anonymous (cached) table, consider | ||
// writing results of a query to a named table rather than consuming cached results directly. | ||
Preconditions.checkState(session.getStreamsCount() > 0); | ||
|
||
// Use the first stream to perform reading. | ||
String streamName = session.getStreams(0).getName(); | ||
|
||
ReadRowsRequest readRowsRequest = | ||
ReadRowsRequest.newBuilder().setReadStream(streamName).build(); | ||
|
||
// Process each block of rows as they arrive and decode using our simple row reader. | ||
ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest); | ||
for (ReadRowsResponse response : stream) { | ||
Preconditions.checkState(response.hasAvroRows()); | ||
reader.processRows(response.getAvroRows()); | ||
} | ||
} | ||
} | ||
} | ||
// [END bigquerystorage_quickstart] |
Oops, something went wrong.