-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Closed
Description
What happened?
When writing Beam Rows to BigQuery via Storage API, my Timestamp elements are arriving to the table with only second precision. Note: this specifically happens when using Beam schema (setting a TableSchema via .withSchema() works fine).
The pipeline below reproduces this:
public static void main(String[] args) {
List<Instant> times = Arrays.asList(
Instant.ofEpochSecond(100, 10 * 1000),
Instant.ofEpochSecond(200, 20 * 1000),
Instant.ofEpochSecond(300, 30 * 1000));
Schema schema = Schema.of(Field.of("time", FieldType.logicalType(SqlTypes.TIMESTAMP)));
Pipeline p = Pipeline.create();
String table = "<project>:<dataset>.<table>";
p
.apply(Create.of(times))
.apply(MapElements.into(TypeDescriptor.of(Row.class)).via(
time -> Row.withSchema(schema).withFieldValue("time", time).build()
)).setRowSchema(schema)
.apply(BigQueryIO.<Row>write().to(table)
.useBeamSchema()
.withMethod(Write.Method.STORAGE_WRITE_API)
.withFormatFunction(BigQueryUtils.toTableRow()));
p.run().waitUntilFinish();
}
I expect the following values to be written:
1970-01-01T00:05:00.000030
1970-01-01T00:03:20.000020
1970-01-01T00:01:40.000010
But instead I get these:
1970-01-01 00:05:00
1970-01-01 00:03:20
1970-01-01 00:01:40
Issue Priority
Priority: 1 (data loss / total loss of function)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner
Reactions are currently unavailable