Skip to content

Commit

Permalink
Fix BigQueryIO.Read to work the same in Direct and Dataflow runners
Browse files Browse the repository at this point in the history
This is a partial revert of commits f5e3b8e and 18c82ad.

When running a batch Dataflow job on Cloud Dataflow service, the data
are produced by running a BigQuery export job and then reading all the
files in parallel. When run in the DirectPipelineRunner, BigQuery's JSON
API is used directly. These data come back in different formats.

To compensate, we use BigQueryTableRowIterator to normalize the behavior in
DirectPipelineRunner to the behavior seen when running on the service.
  (We cannot change this decision without a major breaking change.)

This patch fixes some discrepancies in the way that BigQueryTableRowIterator is
implemented. Specifically,

*) In commit 18c82ad (response to issue apache#20) we updated the format of
timestamps to be printed as strings. However, we did not correctly match the
behavior of BigQuery export. Here is a sample set of times from the export job
vs the JSON API.

2016-01-06 06:38:00 UTC		1.45206228E9
2016-01-06 06:38:11 UTC		1.452062291E9
2016-01-06 06:38:11.1 UTC	1.4520622911E9
2016-01-06 06:38:11.12 UTC	1.45206229112E9
2016-01-06 06:38:11.123 UTC	1.452062291123E9   *
2016-01-06 06:38:11.1234 UTC	1.4520622911234E9
2016-01-06 06:38:11.12345 UTC	1.45206229112345E9
2016-01-06 06:38:11.123456 UTC	1.452062291123456E9

Before, only the * test would have passed.

*) In commit f5e3b8e we updated TableRow iterator to preserve the
usual TableRow field `f` corresponding to getF(), which returns a
list of fields in Schema order. This was my mistaken attempt to better support
users who have prior experience with BigQuery's API and expect to use
getF()/getV(). However, there were two issues:
  1. this change did not affect the behavior in the DataflowPipelineRunner.
  2. this was actually a breaking backwards-incompatible change, because common
     downstream DoFns may iterate over the keys of the TableRow, and it added
     the field "f".
So we should not propagate the change to DataflowPipelineRunner, but instead we
should revert the change to BigQueryTableRowIterator.
  (Note this is also a slightly-backwards-incompatible change, but it's
  reverting to old behavior and users are more likely to be depending on
  DataflowPipelineRunner rather than DirectPipelineRunner.)

Fix both these issues and add tests.

This is still ugly for now. The long-term fix here is to support a parser that
lets users skip TableRow altogether and goes straight to POJOs of their
choosing (See apache#41). That would also eliminate our performance and typing issues
using TableRow as an inner type in pipelines (See e.g.
http://stackoverflow.com/questions/33622227/dataflow-mixing-integer-long-types).

----Release Notes----
[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=111746236
  • Loading branch information
dhalperi authored and lukecwik committed Jan 15, 2016
1 parent a918a31 commit 6a11a72
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

package com.google.cloud.dataflow.sdk.util;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.ClassInfo;
import com.google.api.client.util.Data;
import com.google.api.client.util.Sleeper;
import com.google.api.services.bigquery.Bigquery;
Expand Down Expand Up @@ -49,6 +51,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -133,8 +136,44 @@ public boolean hasNext() {
}

/**
* Adjusts a field returned from the BigQuery API to match the type that will be seen when
* run on the backend service. The end result is:
* Formats BigQuery seconds-since-epoch into String matching JSON export. Thread-safe and
* immutable.
*/
private static final DateTimeFormatter DATE_AND_SECONDS_FORMATER =
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC();
private static String formatTimestamp(String timestamp) {
// timestamp is in "seconds since epoch" format, with scientific notation.
// e.g., "1.45206229112345E9" to mean "2016-01-06 06:38:11.123456 UTC".
// Separate into seconds and microseconds.
double timestampDoubleMicros = Double.parseDouble(timestamp) * 1000000;
long timestampMicros = (long) timestampDoubleMicros;
long seconds = timestampMicros / 1000000;
int micros = (int) (timestampMicros % 1000000);
String dayAndTime = DATE_AND_SECONDS_FORMATER.print(seconds * 1000);

// No sub-second component.
if (micros == 0) {
return String.format("%s UTC", dayAndTime);
}

// Sub-second component.
int digits = 6;
int subsecond = micros;
while (subsecond % 10 == 0) {
digits--;
subsecond /= 10;
}
String formatString = String.format("%%0%dd", digits);
String fractionalSeconds = String.format(formatString, subsecond);
return String.format("%s.%s UTC", dayAndTime, fractionalSeconds);
}

/**
* Adjusts a field returned from the BigQuery API to match what we will receive when running
* BigQuery's export-to-GCS and parallel read, which is the efficient parallel implementation
* used for batch jobs executed on the Cloud Dataflow service.
*
* <p>The following is the relationship between BigQuery schema and Java types:
*
* <ul>
* <li>Nulls are {@code null}.
Expand All @@ -143,18 +182,17 @@ public boolean hasNext() {
* <li>{@code BOOLEAN} columns are JSON booleans, hence Java {@code Boolean} objects.
* <li>{@code FLOAT} columns are JSON floats, hence Java {@code Double} objects.
* <li>{@code TIMESTAMP} columns are {@code String} objects that are of the format
* {@code yyyy-MM-dd HH:mm:ss.SSS UTC}.
* {@code yyyy-MM-dd HH:mm:ss[.SSSSSS] UTC}, where the {@code .SSSSSS} has no trailing
* zeros and can be 1 to 6 digits long.
* <li>Every other atomic type is a {@code String}.
* </ul>
*
* <p>Note that currently integers are encoded as strings to match
* the behavior of the backend service.
* <p>Note that integers are encoded as strings to match BigQuery's exported JSON format.
*
* <p>Finally, values are stored in the {@link TableRow} as {"field name": value} pairs
* and are not accessible through the {@link TableRow#getF} function.
*/
private Object getTypedCellValue(TableFieldSchema fieldSchema, Object v) {
// In the input from the BQ API, atomic types all come in as
// strings, while on the Dataflow service they have more precise
// types.

@Nullable private Object getTypedCellValue(TableFieldSchema fieldSchema, Object v) {
if (Data.isNull(v)) {
return null;
}
Expand Down Expand Up @@ -185,16 +223,22 @@ private Object getTypedCellValue(TableFieldSchema fieldSchema, Object v) {
}

if (fieldSchema.getType().equals("TIMESTAMP")) {
// Seconds to milliseconds
long milliSecs = (new Double(Double.parseDouble((String) v) * 1000)).longValue();
DateTimeFormatter formatter =
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS").withZoneUTC();
return formatter.print(milliSecs) + " UTC";
return formatTimestamp((String) v);
}

return v;
}

/**
* A list of the field names that cannot be used in BigQuery tables processed by Dataflow,
* because they are reserved keywords in {@link TableRow}.
*/
// TODO: This limitation is unfortunate. We need to give users a way to use BigQueryIO that does
// not indirect through our broken use of {@link TableRow}.
// See discussion: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/pull/41
private static final Collection<String> RESERVED_FIELD_NAMES =
ClassInfo.of(TableRow.class).getNames();

/**
* Converts a row returned from the BigQuery JSON API as a {@code Map<String, Object>} into a
* Java {@link TableRow} with nested {@link TableCell TableCells}. The {@code Object} values in
Expand All @@ -206,62 +250,46 @@ private Object getTypedCellValue(TableFieldSchema fieldSchema, Object v) {
private TableRow getTypedTableRow(List<TableFieldSchema> fields, Map<String, Object> rawRow) {
// If rawRow is a TableRow, use it. If not, create a new one.
TableRow row;
List<? extends Map<String, Object>> cells;
if (rawRow instanceof TableRow) {
// Since rawRow is a TableRow, we also know that rawRow.getF() returns a List<TableCell>.
// We do not need to do any type conversion.
// Since rawRow is a TableRow it already has TableCell objects in setF. We do not need to do
// any type conversion, but extract the cells for cell-wise processing below.
row = (TableRow) rawRow;
cells = row.getF();
// Clear the cells from the row, so that row.getF() will return null. This matches the
// behavior of rows produced by the BigQuery export API used on the service.
row.setF(null);
} else {
row = new TableRow();

// Since rawRow is a Map<String, Object> we use Map.get("f") instead of TableRow.getF() to
// get its cells. Similarly, when rawCell is a Map<String, Object> instead of a TableCell,
// we will use Map.get("v") instead of TableCell.getV() get its value.
@SuppressWarnings("unchecked")
List<Map<String, Object>> rawCells = (List<Map<String, Object>>) rawRow.get("f");

ImmutableList.Builder<TableCell> builder = ImmutableList.builder();
for (Map<String, Object> rawCell : rawCells) {
// If rawCell is a TableCell, use it. If not, create a new one.
if (rawCell instanceof TableCell) {
builder.add((TableCell) rawCell);
} else {
builder.add(new TableCell().setV(rawCell.get("v")));
}
}
row.setF(builder.build());
List<? extends Map<String, Object>> rawCells =
(List<? extends Map<String, Object>>) rawRow.get("f");
cells = rawCells;
}

// From here, everything is a TableRow/TableCell, no need to interpret as Map<String,Object>.
List<TableCell> cells = row.getF();
checkState(cells.size() == fields.size(),
"Expected that the row has the same number of cells %s as fields in the schema %s",
cells.size(), fields.size());

// Loop through all the fields in the row, normalizing their types with the TableFieldSchema
// and also storing the normalized values by field name in the Map<String, Object> that
// and storing the normalized values by field name in the Map<String, Object> that
// underlies the TableRow.
Iterator<TableCell> cellIt = cells.iterator();
Iterator<? extends Map<String, Object>> cellIt = cells.iterator();
Iterator<TableFieldSchema> fieldIt = fields.iterator();
while (cellIt.hasNext()) {
TableCell cell = cellIt.next();
Map<String, Object> cell = cellIt.next();
TableFieldSchema fieldSchema = fieldIt.next();

// Convert the object in this cell to the Java type corresponding to its type in the schema.
Object convertedValue = getTypedCellValue(fieldSchema, cell.getV());
cell.setV(convertedValue);
Object convertedValue = getTypedCellValue(fieldSchema, cell.get("v"));

String fieldName = fieldSchema.getName();
if (fieldName.equals("f")) {
// This is a workaround for a crash when the schema has a field named "f". Specifically,
// tableRow.set("f", value) is equivalent to tableRow.setF(value), and value must be a
// List<TableCell> or a ClassCastException will be thrown. To avoid the crash, we simply
// do not set the Map property named "f".
//
// The value for a field named "f" can instead be retrieved by calling tableRow.getF() and
// to get the list of cells, and accessing the positional entry that corresponds to the
// position of the "f" field in the TableSchema.
continue;
}
checkArgument(!RESERVED_FIELD_NAMES.contains(fieldName),
"BigQueryIO does not support records with columns named %s", fieldName);
row.set(fieldName, convertedValue);
}
return row;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import static org.mockito.Matchers.contains;
import static org.mockito.Matchers.endsWith;
import static org.mockito.Matchers.eq;
Expand All @@ -35,16 +35,16 @@
import com.google.api.client.testing.http.MockLowLevelHttpRequest;
import com.google.api.client.testing.http.MockLowLevelHttpResponse;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.TableCell;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.dataflow.sdk.util.Transport;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.worker.Reader;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
Expand All @@ -63,6 +63,8 @@
*/
@RunWith(JUnit4.class)
public class BigQueryReaderTest {
@Rule public final ExpectedException thrown = ExpectedException.none();

private static final String PROJECT_ID = "project";
private static final String DATASET = "dataset";
private static final String TABLE = "table";
Expand All @@ -73,14 +75,6 @@ public class BigQueryReaderTest {
private static final String GET_TABLE_REQUEST_PATH =
String.format("projects/%s/datasets/%s/tables/%s", PROJECT_ID, DATASET, TABLE);

private static final List<TableCell> makeCellList(Object... fields) {
ImmutableList.Builder<TableCell> cells = ImmutableList.builder();
for (Object o : fields) {
cells.add(new TableCell().setV(o));
}
return cells.build();
}

// This is a real response (with some unused fields removed) for the table created from this
// schema:
// [
Expand Down Expand Up @@ -578,12 +572,12 @@ public void testReadQuery() throws Exception {

assertEquals("Arthur", row.get("name"));
assertEquals("42", row.get("integer"));
assertEquals(makeCellList("Arthur", "42"), row.getF());
assertNull(row.getF());

row = iterator.next().getValue();
assertEquals("Allison", row.get("name"));
assertEquals("79", row.get("integer"));
assertEquals(makeCellList("Allison", "79"), row.getF());
assertNull(row.getF());

iterator.close();

Expand Down Expand Up @@ -784,7 +778,7 @@ public void testReadTable() throws Exception {
TableRow nested = (TableRow) row.get("record");
assertEquals("43", nested.get("nestedInt"));
assertEquals(4.14159, nested.get("nestedFloat"));
assertEquals(makeCellList("43", 4.14159), nested.getF());
assertNull(nested.getF());

assertEquals(Lists.newArrayList("42", "43", "79"), row.get("repeatedInt"));
assertTrue(((List<?>) row.get("repeatedFloat")).isEmpty());
Expand All @@ -800,7 +794,7 @@ public void testReadTable() throws Exception {
nested = (TableRow) row.get("record");
assertEquals("80", nested.get("nestedInt"));
assertEquals(3.71828, nested.get("nestedFloat"));
assertEquals(makeCellList("80", 3.71828), nested.getF());
assertNull(nested.getF());

assertTrue(((List<?>) row.get("repeatedInt")).isEmpty());
assertEquals(Lists.newArrayList(3.14159, 2.71828), row.get("repeatedFloat"));
Expand All @@ -810,10 +804,10 @@ public void testReadTable() throws Exception {
assertEquals(2, nestedRecords.size());
assertEquals("hello", nestedRecords.get(0).get("string"));
assertEquals(true, nestedRecords.get(0).get("bool"));
assertEquals(makeCellList(true, "hello"), nestedRecords.get(0).getF());
assertNull(nestedRecords.get(0).getF());
assertEquals("world", nestedRecords.get(1).get("string"));
assertEquals(false, nestedRecords.get(1).get("bool"));
assertEquals(makeCellList(false, "world"), nestedRecords.get(1).getF());
assertNull(nestedRecords.get(1).getF());

assertFalse(iterator.hasNext());

Expand Down Expand Up @@ -909,12 +903,9 @@ public void testReadTableWithFieldF() throws Exception {
Reader.ReaderIterator<WindowedValue<TableRow>> iterator = reader.iterator();
assertTrue(iterator.hasNext());

TableRow row = iterator.next().getValue();
assertEquals(makeCellList("5", "Arthur"), row.getF());
assertEquals("Arthur", row.getF().get(1).getV());
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("BigQueryIO does not support records with columns named f");

row = iterator.next().getValue();
assertEquals(makeCellList("42", "Allison"), row.getF());
assertEquals("Allison", row.getF().get(1).getV());
iterator.next().getValue();
}
}
Loading

0 comments on commit 6a11a72

Please sign in to comment.