Skip to content
Permalink
Browse files
feat: writeapi v1 manual client lib (#1323)
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/java-bigquerystorage/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> ☕️
  • Loading branch information
yirutang committed Sep 24, 2021
1 parent e33f388 commit baf8fb3adc2e5135b71dd918ab30b619493a1b83
Showing with 6,086 additions and 0 deletions.
  1. +152 −0 ...erystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BQTableSchemaToProtoDescriptor.java
  2. +86 −0 ...gquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigDecimalByteStringEncoder.java
  3. +314 −0 ...le-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/CivilTimeEncoder.java
  4. +334 −0 ...le-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java
  5. +349 −0 ...-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java
  6. +118 −0 ...loud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ProtoSchemaConverter.java
  7. +104 −0 ...le-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamConnection.java
  8. +621 −0 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java
  9. +400 −0 ...torage/src/test/java/com/google/cloud/bigquery/storage/v1/BQTableSchemaToProtoDescriptorTest.java
  10. +93 −0 ...e-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java
  11. +234 −0 ...oud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java
  12. +41 −0 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeClock.java
  13. +346 −0 ...querystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeScheduledExecutorService.java
  14. +538 −0 ...loud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java
  15. +754 −0 ...ud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessageTest.java
  16. +192 −0 ...-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ProtoSchemaConverterTest.java
  17. +507 −0 ...le-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java
  18. +182 −0 .../test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryBigDecimalByteStringEncoderTest.java
  19. +190 −0 ...querystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryTimeEncoderTest.java
  20. +531 −0 ...torage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java
@@ -0,0 +1,152 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.DescriptorProtos.DescriptorProto;
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.FileDescriptor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

/**
* Converts a BQ table schema to protobuf descriptor. All field names will be converted to lowercase
* when constructing the protobuf descriptor. The mapping between field types and field modes are
* shown in the ImmutableMaps below.
*/
public class BQTableSchemaToProtoDescriptor {
private static ImmutableMap<TableFieldSchema.Mode, FieldDescriptorProto.Label>
BQTableSchemaModeMap =
ImmutableMap.of(
TableFieldSchema.Mode.NULLABLE, FieldDescriptorProto.Label.LABEL_OPTIONAL,
TableFieldSchema.Mode.REPEATED, FieldDescriptorProto.Label.LABEL_REPEATED,
TableFieldSchema.Mode.REQUIRED, FieldDescriptorProto.Label.LABEL_REQUIRED);

private static ImmutableMap<TableFieldSchema.Type, FieldDescriptorProto.Type>
BQTableSchemaTypeMap =
new ImmutableMap.Builder<TableFieldSchema.Type, FieldDescriptorProto.Type>()
.put(TableFieldSchema.Type.BOOL, FieldDescriptorProto.Type.TYPE_BOOL)
.put(TableFieldSchema.Type.BYTES, FieldDescriptorProto.Type.TYPE_BYTES)
.put(TableFieldSchema.Type.DATE, FieldDescriptorProto.Type.TYPE_INT32)
.put(TableFieldSchema.Type.DATETIME, FieldDescriptorProto.Type.TYPE_INT64)
.put(TableFieldSchema.Type.DOUBLE, FieldDescriptorProto.Type.TYPE_DOUBLE)
.put(TableFieldSchema.Type.GEOGRAPHY, FieldDescriptorProto.Type.TYPE_STRING)
.put(TableFieldSchema.Type.INT64, FieldDescriptorProto.Type.TYPE_INT64)
.put(TableFieldSchema.Type.NUMERIC, FieldDescriptorProto.Type.TYPE_BYTES)
.put(TableFieldSchema.Type.STRING, FieldDescriptorProto.Type.TYPE_STRING)
.put(TableFieldSchema.Type.STRUCT, FieldDescriptorProto.Type.TYPE_MESSAGE)
.put(TableFieldSchema.Type.TIME, FieldDescriptorProto.Type.TYPE_INT64)
.put(TableFieldSchema.Type.TIMESTAMP, FieldDescriptorProto.Type.TYPE_INT64)
.build();

/**
* Converts TableFieldSchema to a Descriptors.Descriptor object.
*
* @param BQTableSchema
* @throws Descriptors.DescriptorValidationException
*/
public static Descriptor convertBQTableSchemaToProtoDescriptor(TableSchema BQTableSchema)
throws Descriptors.DescriptorValidationException {
Preconditions.checkNotNull(BQTableSchema, "BQTableSchema is null.");
return convertBQTableSchemaToProtoDescriptorImpl(
BQTableSchema, "root", new HashMap<ImmutableList<TableFieldSchema>, Descriptor>());
}

/**
* Converts a TableFieldSchema to a Descriptors.Descriptor object.
*
* @param BQTableSchema
* @param scope Keeps track of current scope to prevent repeated naming while constructing
* descriptor.
* @param dependencyMap Stores already constructed descriptors to prevent reconstruction
* @throws Descriptors.DescriptorValidationException
*/
private static Descriptor convertBQTableSchemaToProtoDescriptorImpl(
TableSchema BQTableSchema,
String scope,
HashMap<ImmutableList<TableFieldSchema>, Descriptor> dependencyMap)
throws Descriptors.DescriptorValidationException {
List<FileDescriptor> dependenciesList = new ArrayList<FileDescriptor>();
List<FieldDescriptorProto> fields = new ArrayList<FieldDescriptorProto>();
int index = 1;
for (TableFieldSchema BQTableField : BQTableSchema.getFieldsList()) {
String currentScope = scope + "__" + BQTableField.getName();
if (BQTableField.getType() == TableFieldSchema.Type.STRUCT) {
ImmutableList<TableFieldSchema> fieldList =
ImmutableList.copyOf(BQTableField.getFieldsList());
if (dependencyMap.containsKey(fieldList)) {
Descriptor descriptor = dependencyMap.get(fieldList);
dependenciesList.add(descriptor.getFile());
fields.add(convertBQTableFieldToProtoField(BQTableField, index++, descriptor.getName()));
} else {
Descriptor descriptor =
convertBQTableSchemaToProtoDescriptorImpl(
TableSchema.newBuilder().addAllFields(fieldList).build(),
currentScope,
dependencyMap);
dependenciesList.add(descriptor.getFile());
dependencyMap.put(fieldList, descriptor);
fields.add(convertBQTableFieldToProtoField(BQTableField, index++, currentScope));
}
} else {
fields.add(convertBQTableFieldToProtoField(BQTableField, index++, currentScope));
}
}
FileDescriptor[] dependenciesArray = new FileDescriptor[dependenciesList.size()];
dependenciesArray = dependenciesList.toArray(dependenciesArray);
DescriptorProto descriptorProto =
DescriptorProto.newBuilder().setName(scope).addAllField(fields).build();
FileDescriptorProto fileDescriptorProto =
FileDescriptorProto.newBuilder().addMessageType(descriptorProto).build();
FileDescriptor fileDescriptor =
FileDescriptor.buildFrom(fileDescriptorProto, dependenciesArray);
Descriptor descriptor = fileDescriptor.findMessageTypeByName(scope);
return descriptor;
}

/**
* Converts a BQTableField to ProtoField
*
* @param BQTableField BQ Field used to construct a FieldDescriptorProto
* @param index Index for protobuf fields.
* @param scope used to name descriptors
*/
private static FieldDescriptorProto convertBQTableFieldToProtoField(
TableFieldSchema BQTableField, int index, String scope) {
TableFieldSchema.Mode mode = BQTableField.getMode();
String fieldName = BQTableField.getName().toLowerCase();
if (BQTableField.getType() == TableFieldSchema.Type.STRUCT) {
return FieldDescriptorProto.newBuilder()
.setName(fieldName)
.setTypeName(scope)
.setLabel((FieldDescriptorProto.Label) BQTableSchemaModeMap.get(mode))
.setNumber(index)
.build();
}
return FieldDescriptorProto.newBuilder()
.setName(fieldName)
.setType((FieldDescriptorProto.Type) BQTableSchemaTypeMap.get(BQTableField.getType()))
.setLabel((FieldDescriptorProto.Label) BQTableSchemaModeMap.get(mode))
.setNumber(index)
.build();
}
}
@@ -0,0 +1,86 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
* This code was ported from ZetaSQL and can be found here:
* https://github.com/google/zetasql/blob/c55f967a5ae35b476437210c529691d8a73f5507/java/com/google/zetasql/Value.java
*/

package com.google.cloud.bigquery.storage.v1;

import com.google.common.primitives.Bytes;
import com.google.protobuf.ByteString;
import java.math.BigDecimal;
import java.math.BigInteger;

public class BigDecimalByteStringEncoder {
private static int NumericScale = 9;
private static final BigDecimal MAX_NUMERIC_VALUE =
new BigDecimal("99999999999999999999999999999.999999999");
private static final BigDecimal MIN_NUMERIC_VALUE =
new BigDecimal("-99999999999999999999999999999.999999999");

public static ByteString encodeToNumericByteString(BigDecimal bigDecimal) {
ByteString byteString =
serializeBigDecimal(
bigDecimal, NumericScale, MAX_NUMERIC_VALUE, MIN_NUMERIC_VALUE, "ByteString");
return byteString;
}

public static BigDecimal decodeNumericByteString(ByteString byteString) {
BigDecimal bigDecimal =
deserializeBigDecimal(
byteString, NumericScale, MAX_NUMERIC_VALUE, MIN_NUMERIC_VALUE, "BigDecimal");
return bigDecimal;
}
// Make these private and make public wrapper that internalizes these min/max/scale/type
private static BigDecimal deserializeBigDecimal(
ByteString serializedValue,
int scale,
BigDecimal maxValue,
BigDecimal minValue,
String typeName) {
byte[] bytes = serializedValue.toByteArray();
// NUMERIC/BIGNUMERIC values are serialized as scaled integers in two's complement form in
// little endian order. BigInteger requires the same encoding but in big endian order,
// therefore we must reverse the bytes that come from the proto.
Bytes.reverse(bytes);
BigInteger scaledValue = new BigInteger(bytes);
BigDecimal decimalValue = new BigDecimal(scaledValue, scale);
if (decimalValue.compareTo(maxValue) > 0 || decimalValue.compareTo(minValue) < 0) {
throw new IllegalArgumentException(typeName + " overflow: " + decimalValue.toPlainString());
}
return decimalValue;
}
/** Returns a numeric Value that equals to {@code v}. */
private static ByteString serializeBigDecimal(
BigDecimal v, int scale, BigDecimal maxValue, BigDecimal minValue, String typeName) {
if (v.scale() > scale) {
throw new IllegalArgumentException(
typeName + " scale cannot exceed " + scale + ": " + v.toPlainString());
}
if (v.compareTo(maxValue) > 0 || v.compareTo(minValue) < 0) {
throw new IllegalArgumentException(typeName + " overflow: " + v.toPlainString());
}
byte[] bytes = v.setScale(scale).unscaledValue().toByteArray();
// NUMERIC/BIGNUMERIC values are serialized as scaled integers in two's complement form in
// little endian
// order. BigInteger requires the same encoding but in big endian order, therefore we must
// reverse the bytes that come from the proto.
Bytes.reverse(bytes);
return ByteString.copyFrom(bytes);
}
}

0 comments on commit baf8fb3

Please sign in to comment.