Skip to content

Commit

Permalink
Fallback to parsing datetime and time strings w/ and w/o timezones in…
Browse files Browse the repository at this point in the history
… case DateTimeParseException is thrown (#13745)

* Fall back to parsing w/ or w/o TZ if parsing a date or a time string fails
* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
grishick and octavia-squidington-iii committed Jun 14, 2022
1 parent fe6eda5 commit 2cd6200
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.4.22
dockerImageTag: 0.4.23
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6719,7 +6719,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:0.4.22"
- dockerImage: "airbyte/source-postgres:0.4.23"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.22
LABEL io.airbyte.version=0.4.23
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.format.DateTimeParseException;
import java.util.Collections;
import org.postgresql.jdbc.PgResultSetMetaData;
import org.slf4j.Logger;
Expand Down Expand Up @@ -108,22 +109,42 @@ public void setStatementField(final PreparedStatement preparedStatement,
}
}

private void setTimeWithTimezone(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
preparedStatement.setObject(parameterIndex, OffsetTime.parse(value));
private void setTimeWithTimezone(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
try {
preparedStatement.setObject(parameterIndex, OffsetTime.parse(value));
} catch (final DateTimeParseException e) {
//attempt to parse the time w/o timezone. This can be caused by schema created with a different version of the connector
preparedStatement.setObject(parameterIndex, LocalTime.parse(value));
}
}

private void setTimestampWithTimezone(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value));
private void setTimestampWithTimezone(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
try {
preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value));
} catch (final DateTimeParseException e) {
//attempt to parse the datetime w/o timezone. This can be caused by schema created with a different version of the connector
preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value));
}
}

@Override
protected void setTimestamp(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value));
protected void setTimestamp(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
try {
preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value));
} catch (final DateTimeParseException e) {
//attempt to parse the datetime with timezone. This can be caused by schema created with an older version of the connector
preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value));
}
}

@Override
protected void setTime(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
preparedStatement.setObject(parameterIndex, LocalTime.parse(value));
protected void setTime(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
try {
preparedStatement.setObject(parameterIndex, LocalTime.parse(value));
} catch (final DateTimeParseException e) {
//attempt to parse the datetime with timezone. This can be caused by schema created with an older version of the connector
preparedStatement.setObject(parameterIndex, OffsetTime.parse(value));
}
}

@Override
Expand Down Expand Up @@ -170,21 +191,21 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob
}

@Override
protected void putDate(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
LocalDate date = getDateTimeObject(resultSet, index, LocalDate.class);
protected void putDate(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final LocalDate date = getDateTimeObject(resultSet, index, LocalDate.class);
node.put(columnName, resolveEra(date, date.toString()));
}

@Override
protected void putTime(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
LocalTime time = getDateTimeObject(resultSet, index, LocalTime.class);
protected void putTime(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final LocalTime time = getDateTimeObject(resultSet, index, LocalTime.class);
node.put(columnName, time.toString());
}

@Override
protected void putTimestamp(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class);
LocalDate date = timestamp.toLocalDate();
protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class);
final LocalDate date = timestamp.toLocalDate();
node.put(columnName, resolveEra(date, timestamp.toString()));
}

Expand Down Expand Up @@ -214,7 +235,7 @@ public JDBCType getFieldType(final JsonNode field) {
}

@Override
public JsonSchemaType getJsonType(JDBCType jdbcType) {
public JsonSchemaType getJsonType(final JDBCType jdbcType) {
return switch (jdbcType) {
case BOOLEAN -> JsonSchemaType.BOOLEAN;
case TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, REAL, NUMERIC, DECIMAL -> JsonSchemaType.NUMBER;
Expand Down Expand Up @@ -264,7 +285,7 @@ private void putHstoreAsJson(final ObjectNode node, final String columnName, fin
final var data = resultSet.getObject(index);
try {
node.put(columnName, OBJECT_MAPPER.writeValueAsString(data));
} catch (JsonProcessingException e) {
} catch (final JsonProcessingException e) {
throw new RuntimeException("Could not parse 'hstore' value:" + e);
}
}
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------|
| 0.4.23 | 2022-06-13 | [13655](https://github.com/airbytehq/airbyte/pull/13745) | Fixed handling datetime cursors when upgrading from older versions of the connector |
| 0.4.22 | 2022-06-09 | [13655](https://github.com/airbytehq/airbyte/pull/13655) | Fixed bug with unsupported date-time datatypes during incremental sync |
| 0.4.21 | 2022-06-06 | [13435](https://github.com/airbytehq/airbyte/pull/13435) | Adjust JDBC fetch size based on max memory and max row size |
| 0.4.20 | 2022-06-02 | [13367](https://github.com/airbytehq/airbyte/pull/13367) | Added convertion hstore to json format |
Expand Down

0 comments on commit 2cd6200

Please sign in to comment.