Skip to content

Commit

Permalink
Merge pull request #13771: [BEAM-11648] First step in creation of Vor…
Browse files Browse the repository at this point in the history
…tex sink
  • Loading branch information
reuvenlax committed Feb 25, 2021
2 parents b120479 + 20ca1d1 commit d967389
Show file tree
Hide file tree
Showing 7 changed files with 688 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,21 @@ public static TableReference parseTableSpec(String tableSpec) {
return ref.setDatasetId(match.group("DATASET")).setTableId(match.group("TABLE"));
}

public static TableReference parseTableUrn(String tableUrn) {
Matcher match = BigQueryIO.TABLE_URN_SPEC.matcher(tableUrn);
if (!match.matches()) {
throw new IllegalArgumentException(
"Table reference is not in projects/[project_id]/datasets/[dataset_id]/tables/[table_id] "
+ "format: "
+ tableUrn);
}

TableReference ref = new TableReference();
ref.setProjectId(match.group("PROJECT"));

return ref.setDatasetId(match.group("DATASET")).setTableId(match.group("TABLE"));
}

/** Strip off any partition decorator information from a tablespec. */
public static String stripPartitionDecorator(String tableSpec) {
int index = tableSpec.lastIndexOf('$');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,16 @@ public class BigQueryIO {

static final Pattern TABLE_SPEC = Pattern.compile(DATASET_TABLE_REGEXP);

/**
* Matches table specifications in the form {@code "projects/[project_id]/datasets/[dataset_id]/tables[table_id]".
*/
private static final String TABLE_URN_REGEXP =
String.format(
"projects/(?<PROJECT>%s)/datasets/(?<DATASET>%s)/tables/(?<TABLE>%s)",
PROJECT_ID_REGEXP, DATASET_REGEXP, TABLE_REGEXP);

static final Pattern TABLE_URN_SPEC = Pattern.compile(TABLE_URN_REGEXP);

/**
* A formatting function that maps a TableRow to itself. This allows sending a {@code
* PCollection<TableRow>} directly to BigQueryIO.Write.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,31 @@
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.auto.value.AutoValue;
import com.google.protobuf.ByteString;
import com.google.protobuf.DescriptorProtos.DescriptorProto;
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.Descriptors.FileDescriptor;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.apache.avro.Conversions;
Expand Down Expand Up @@ -74,6 +88,21 @@
})
public class BigQueryUtils {

/**
* Given a BigQuery TableSchema, returns a protocol-buffer Descriptor that can be used to write
* data using the Vortex streaming API.
*/
public static Descriptor getDescriptorFromTableSchema(TableSchema jsonSchema)
throws DescriptorValidationException {
DescriptorProto descriptorProto = descriptorSchemaFromTableSchema(jsonSchema);
FileDescriptorProto fileDescriptorProto =
FileDescriptorProto.newBuilder().addMessageType(descriptorProto).build();
FileDescriptor fileDescriptor =
FileDescriptor.buildFrom(fileDescriptorProto, new FileDescriptor[0]);

return Iterables.getOnlyElement(fileDescriptor.getMessageTypes());
}

/** Options for how to convert BigQuery data to Beam data. */
@AutoValue
public abstract static class ConversionOptions implements Serializable {
Expand Down Expand Up @@ -405,6 +434,193 @@ public static Schema fromTableSchema(TableSchema tableSchema, SchemaConversionOp
return fromTableFieldSchema(tableSchema.getFields(), options);
}

static DescriptorProto descriptorSchemaFromTableSchema(TableSchema tableSchema) {
return descriptorSchemaFromTableFieldSchemas(tableSchema.getFields());
}

static DescriptorProto descriptorSchemaFromTableFieldSchemas(
Iterable<TableFieldSchema> tableFieldSchemas) {
DescriptorProto.Builder descriptorBuilder = DescriptorProto.newBuilder();
// Create a unique name for the descriptor ('-' characters cannot be used).
descriptorBuilder.setName("D" + UUID.randomUUID().toString().replace("-", "_"));
int i = 1;
for (TableFieldSchema fieldSchema : tableFieldSchemas) {
fieldDescriptorFromTableField(fieldSchema, i++, descriptorBuilder);
}
return descriptorBuilder.build();
}

static void fieldDescriptorFromTableField(
TableFieldSchema fieldSchema, int fieldNumber, DescriptorProto.Builder descriptorBuilder) {
FieldDescriptorProto.Builder fieldDescriptorBuilder = FieldDescriptorProto.newBuilder();
fieldDescriptorBuilder = fieldDescriptorBuilder.setName(fieldSchema.getName());
fieldDescriptorBuilder = fieldDescriptorBuilder.setNumber(fieldNumber);
switch (fieldSchema.getType()) {
case "STRING":
fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_STRING);
break;
case "BYTES":
fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_BYTES);
break;
case "INT64":
case "INTEGER":
fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_INT64);
break;
case "FLOAT64":
case "FLOAT":
fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_FLOAT);
break;
case "BOOL":
case "BOOLEAN":
fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_BOOL);
break;
case "TIMESTAMP":
case "TIME":
case "DATETIME":
fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_INT64);
break;
case "DATE":
fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_INT32);
break;
case "STRUCT":
case "RECORD":
DescriptorProto nested = descriptorSchemaFromTableFieldSchemas(fieldSchema.getFields());
descriptorBuilder.addNestedType(nested);
fieldDescriptorBuilder =
fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nested.getName());
break;
default:
throw new UnsupportedOperationException(
"Converting BigQuery type " + fieldSchema.getType() + " to Beam type is unsupported");
}

Optional<Mode> fieldMode = Optional.ofNullable(fieldSchema.getMode()).map(Mode::valueOf);
if (fieldMode.filter(m -> m == Mode.REPEATED).isPresent()) {
fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_REPEATED);
} else if (!fieldMode.isPresent() || fieldMode.filter(m -> m == Mode.NULLABLE).isPresent()) {
fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_OPTIONAL);
} else {
fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_REQUIRED);
}
descriptorBuilder.addField(fieldDescriptorBuilder.build());
}

/**
* Given a BigQuery TableRow, returns a protocol-buffer message that can be used to write data
* using the Vortex streaming API.
*/
public static DynamicMessage messageFromTableRow(Descriptor descriptor, TableRow tableRow) {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
for (FieldDescriptor fieldDescriptor : descriptor.getFields()) {
Object value =
messageValueFromFieldValue(fieldDescriptor, tableRow.get(fieldDescriptor.getName()));
if (value != null) {
builder.setField(fieldDescriptor, value);
}
}
return builder.build();
}

static Object messageValueFromFieldValue(FieldDescriptor fieldDescriptor, Object bqValue) {
if (bqValue == null) {
if (fieldDescriptor.isOptional()) {
return null;
} else {
throw new IllegalArgumentException(
"Received null value for non-nullable field " + fieldDescriptor.getName());
}
}
return toProtoValue(fieldDescriptor, bqValue);
}

private static final Map<FieldDescriptor.Type, Function<String, Object>> JSON_PROTO_PARSERS =
ImmutableMap.<FieldDescriptor.Type, Function<String, Object>>builder()
.put(FieldDescriptor.Type.INT32, Integer::valueOf)
.put(FieldDescriptor.Type.INT64, Long::valueOf)
.put(FieldDescriptor.Type.FLOAT, Float::valueOf)
.put(FieldDescriptor.Type.DOUBLE, Double::valueOf)
.put(FieldDescriptor.Type.BOOL, Boolean::valueOf)
.put(FieldDescriptor.Type.STRING, str -> str)
.put(
FieldDescriptor.Type.BYTES,
b64 -> ByteString.copyFrom(BaseEncoding.base64().decode(b64)))
.build();

private static Object toProtoValue(FieldDescriptor fieldDescriptor, Object jsonBQValue) {
if (jsonBQValue instanceof String) {
Function<String, Object> mapper = JSON_PROTO_PARSERS.get(fieldDescriptor.getType());
if (mapper != null) {
return mapper.apply((String) jsonBQValue);
}
} else if (jsonBQValue instanceof Integer) {
switch (fieldDescriptor.getJavaType()) {
case INT:
return Integer.valueOf((int) jsonBQValue);
case LONG:
return Long.valueOf((int) jsonBQValue);
default:
throw new RuntimeException(
"Unexpectecd java type "
+ jsonBQValue.getClass()
+ " for field descriptor "
+ fieldDescriptor);
}
} else if (jsonBQValue instanceof List) {
return ((List<Object>) jsonBQValue)
.stream()
.map(v -> ((Map<String, Object>) v).get("v"))
.map(v -> toProtoValue(fieldDescriptor, v))
.collect(toList());
} else if (jsonBQValue instanceof AbstractMap) {
// This will handle nested rows.
TableRow tr = new TableRow();
tr.putAll((AbstractMap<String, Object>) jsonBQValue);
return messageFromTableRow(fieldDescriptor.getMessageType(), tr);
} else {
return toProtoValue(fieldDescriptor, jsonBQValue.toString());
}

throw new UnsupportedOperationException(
"Converting BigQuery type '"
+ jsonBQValue.getClass()
+ "' to '"
+ fieldDescriptor
+ "' is not supported");
}

public static TableRow tableRowFromMessage(Message message) {
TableRow tableRow = new TableRow();
for (Map.Entry<FieldDescriptor, Object> field : message.getAllFields().entrySet()) {
FieldDescriptor fieldDescriptor = field.getKey();
Object fieldValue = field.getValue();
tableRow.putIfAbsent(
fieldDescriptor.getName(), jsonValueFromMessageValue(fieldDescriptor, fieldValue, true));
}
return tableRow;
}

public static Object jsonValueFromMessageValue(
FieldDescriptor fieldDescriptor, Object fieldValue, boolean expandRepeated) {
if (expandRepeated && fieldDescriptor.isRepeated()) {
List<Object> valueList = (List<Object>) fieldValue;
return valueList.stream()
.map(v -> jsonValueFromMessageValue(fieldDescriptor, v, false))
.collect(toList());
}

switch (fieldDescriptor.getType()) {
case GROUP:
case MESSAGE:
return tableRowFromMessage((Message) fieldValue);
case BYTES:
return BaseEncoding.base64().encode(((ByteString) fieldValue).toByteArray());
case ENUM:
throw new RuntimeException("Enumerations not supported");
default:
return fieldValue;
}
}

/** Convert a list of BigQuery {@link TableFieldSchema} to Avro {@link org.apache.avro.Schema}. */
@Experimental(Kind.SCHEMAS)
public static org.apache.avro.Schema toGenericAvroSchema(
Expand Down

0 comments on commit d967389

Please sign in to comment.