Skip to content

Commit

Permalink
Spanner Dataflow export performance improvements: look up the index o…
Browse files Browse the repository at this point in the history
…f each field by name only once per job instead of once per row.

PiperOrigin-RevId: 590252399
  • Loading branch information
cloud-teleport committed Dec 12, 2023
1 parent 65eb2db commit b7e51fd
Showing 1 changed file with 19 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class SpannerRecordConverter {
private final Dialect dialect;
private final List<FieldInfo> fields;
private static final char ZERO_DIGIT = (new DecimalFormatSymbols()).getZeroDigit();
private boolean fieldsColumnIndicesInitialized = false;

private static class FieldInfo {
private final Schema.Field field;
Expand All @@ -60,6 +61,8 @@ private static class FieldInfo {
private final boolean matchesVarchar;
private final boolean matchesVarcharArray;

private int rowColumnIndex = -1;

public FieldInfo(Schema.Field field) {
this.field = field;
this.generated = field.getProp("generationExpression") != null;
Expand Down Expand Up @@ -120,6 +123,14 @@ public String getSpannerType() {
return spannerType;
}

public int getColumnIndex() {
return rowColumnIndex;
}

public void setColumnIndex(Struct row) {
rowColumnIndex = row.getColumnIndex(getName());
}

public boolean matchesStringPattern() {
return matchesString;
}
Expand Down Expand Up @@ -153,6 +164,13 @@ private List<FieldInfo> processFields() {
}

public GenericRecord convert(Struct row) {
synchronized (this.fields) {
if (!fieldsColumnIndicesInitialized) {
this.fields.stream().forEach(fieldInfo -> fieldInfo.setColumnIndex(row));
fieldsColumnIndicesInitialized = true;
}
}

GenericRecordBuilder builder = new GenericRecordBuilder(schema);
for (FieldInfo fieldInfo : this.fields) {
if (fieldInfo.isGenerated()) {
Expand All @@ -167,7 +185,7 @@ public GenericRecord convert(Struct row) {
Schema type = fieldInfo.getType();
String spannerType = fieldInfo.getSpannerType();

int fieldIndex = row.getColumnIndex(fieldName);
int fieldIndex = fieldInfo.getColumnIndex();

boolean nullValue = row.isNull(fieldIndex);
if (nullValue && !fieldInfo.isNullable()) {
Expand Down

0 comments on commit b7e51fd

Please sign in to comment.