Skip to content

Commit

Permalink
Common Jsons: add flag to apply flatten to arrays (#20993)
Browse files Browse the repository at this point in the history
* add flag to apply flatten to arrays

* add additional unit test cases for array flattening

* add backward compatibility function

* bump dest-redshift version and add changelog

Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
Co-authored-by: Adam Bloom <adam-bloom@users.noreply.github.com>
Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com>
Co-authored-by: Davin Chia <davinchia@gmail.com>
  • Loading branch information
5 people committed Jan 5, 2023
1 parent cb84383 commit 606e59e
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 17 deletions.
27 changes: 24 additions & 3 deletions airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java
Original file line number Diff line number Diff line change
Expand Up @@ -253,17 +253,29 @@ public static int getIntOrZero(final JsonNode json, final List<String> keys) {
}

/**
* Flattens an ObjectNode, or dumps it into a {null: value} map if it's not an object.
* Flattens an ObjectNode, or dumps it into a {null: value} map if it's not an object. When
* applyFlattenToArray is true, each element in the array will be one entry in the returned map.
* This behavior is used in the Redshift SUPER type. When it is false, the whole array will be one
* entry. This is used in the JobTracker.
*/
@SuppressWarnings("PMD.ForLoopCanBeForeach")
public static Map<String, Object> flatten(final JsonNode node) {
public static Map<String, Object> flatten(final JsonNode node, final Boolean applyFlattenToArray) {
if (node.isObject()) {
final Map<String, Object> output = new HashMap<>();
for (final Iterator<Entry<String, JsonNode>> it = node.fields(); it.hasNext();) {
final Entry<String, JsonNode> entry = it.next();
final String field = entry.getKey();
final JsonNode value = entry.getValue();
mergeMaps(output, field, flatten(value));
mergeMaps(output, field, flatten(value, applyFlattenToArray));
}
return output;
} else if (node.isArray() && applyFlattenToArray) {
final Map<String, Object> output = new HashMap<>();
final int arrayLen = node.size();
for (int i = 0; i < arrayLen; i++) {
final String field = String.format("[%d]", i);
final JsonNode value = node.get(i);
mergeMaps(output, field, flatten(value, applyFlattenToArray));
}
return output;
} else {
Expand All @@ -286,6 +298,15 @@ public static Map<String, Object> flatten(final JsonNode node) {
}
}

/**
* Flattens an ObjectNode, or dumps it into a {null: value} map if it's not an object. New usage of
* this function is best to explicitly declare the intended array mode. This version is provided for
* backward compatibility.
*/
public static Map<String, Object> flatten(final JsonNode node) {
return flatten(node, false);
}

/**
* Prepend all keys in subMap with prefix, then merge that map into originalMap.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;

class JsonsTest {
Expand All @@ -31,6 +34,11 @@ class JsonsTest {
private static final String SERIALIZED_JSON2 = "{\"str\":\"abc\"}";
private static final String ABC = "abc";
private static final String DEF = "def";
private static final String GHI = "ghi";
private static final String JKL = "jkl";
private static final String MNO = "mno";
private static final String PQR = "pqr";
private static final String STU = "stu";
private static final String TEST = "test";
private static final String TEST2 = "test2";
private static final String XYZ = "xyz";
Expand Down Expand Up @@ -60,7 +68,8 @@ void testSerializeJsonNode() {
Jsons.serialize(Jsons.jsonNode(ImmutableMap.of(
TEST, ABC,
TEST2, DEF))));
// issue: 5878 add test for binary node serialization, binary data are serialized into base64
// issue: 5878 add test for binary node serialization, binary data are
// serialized into base64
assertEquals(
"{\"test\":\"dGVzdA==\"}",
Jsons.serialize(Jsons.jsonNode(ImmutableMap.of(
Expand All @@ -83,7 +92,8 @@ void testDeserializeToJsonNode() {
assertEquals(
"[{\"str\":\"abc\"},{\"str\":\"abc\"}]",
Jsons.deserialize("[{\"str\":\"abc\"},{\"str\":\"abc\"}]").toString());
// issue: 5878 add test for binary node deserialization, for now should be base64 string
// issue: 5878 add test for binary node deserialization, for now should be
// base64 string
assertEquals(
"{\"test\":\"dGVzdA==\"}",
Jsons.deserialize("{\"test\":\"dGVzdA==\"}").toString());
Expand Down Expand Up @@ -230,26 +240,27 @@ void testToPrettyString() {

@Test
void testGetOptional() {
final JsonNode json = Jsons.deserialize("{ \"abc\": { \"def\": \"ghi\" }, \"jkl\": {}, \"mno\": \"pqr\", \"stu\": null }");
final JsonNode json = Jsons
.deserialize("{ \"abc\": { \"def\": \"ghi\" }, \"jkl\": {}, \"mno\": \"pqr\", \"stu\": null }");

assertEquals(Optional.of(Jsons.jsonNode("ghi")), Jsons.getOptional(json, "abc", "def"));
assertEquals(Optional.of(Jsons.emptyObject()), Jsons.getOptional(json, "jkl"));
assertEquals(Optional.of(Jsons.jsonNode("pqr")), Jsons.getOptional(json, "mno"));
assertEquals(Optional.of(Jsons.jsonNode(null)), Jsons.getOptional(json, "stu"));
assertEquals(Optional.of(Jsons.jsonNode(GHI)), Jsons.getOptional(json, ABC, DEF));
assertEquals(Optional.of(Jsons.emptyObject()), Jsons.getOptional(json, JKL));
assertEquals(Optional.of(Jsons.jsonNode(PQR)), Jsons.getOptional(json, MNO));
assertEquals(Optional.of(Jsons.jsonNode(null)), Jsons.getOptional(json, STU));
assertEquals(Optional.empty(), Jsons.getOptional(json, XYZ));
assertEquals(Optional.empty(), Jsons.getOptional(json, ABC, XYZ));
assertEquals(Optional.empty(), Jsons.getOptional(json, ABC, DEF, XYZ));
assertEquals(Optional.empty(), Jsons.getOptional(json, ABC, "jkl", XYZ));
assertEquals(Optional.empty(), Jsons.getOptional(json, "stu", XYZ));
assertEquals(Optional.empty(), Jsons.getOptional(json, ABC, JKL, XYZ));
assertEquals(Optional.empty(), Jsons.getOptional(json, STU, XYZ));
}

@Test
void testGetStringOrNull() {
final JsonNode json = Jsons.deserialize("{ \"abc\": { \"def\": \"ghi\" }, \"jkl\": \"mno\", \"pqr\": 1 }");

assertEquals("ghi", Jsons.getStringOrNull(json, ABC, DEF));
assertEquals("mno", Jsons.getStringOrNull(json, "jkl"));
assertEquals("1", Jsons.getStringOrNull(json, "pqr"));
assertEquals(GHI, Jsons.getStringOrNull(json, ABC, DEF));
assertEquals(MNO, Jsons.getStringOrNull(json, JKL));
assertEquals("1", Jsons.getStringOrNull(json, PQR));
assertNull(Jsons.getStringOrNull(json, ABC, DEF, XYZ));
assertNull(Jsons.getStringOrNull(json, XYZ));
}
Expand All @@ -260,6 +271,68 @@ void testGetEstimatedByteSize() {
assertEquals(Jsons.toBytes(json).length, Jsons.getEstimatedByteSize(json));
}

@Test
void testFlatten__noArrays() {
final JsonNode json = Jsons.deserialize("{ \"abc\": { \"def\": \"ghi\" }, \"jkl\": true, \"pqr\": 1 }");
Map<String, Object> expected = Stream.of(new Object[][] {
{"abc.def", GHI},
{JKL, true},
{PQR, 1},
}).collect(Collectors.toMap(data -> (String) data[0], data -> data[1]));
assertEquals(expected, Jsons.flatten(json, false));
}

@Test
void testFlatten__withArraysNoApplyFlatten() {
final JsonNode json = Jsons
.deserialize("{ \"abc\": [{ \"def\": \"ghi\" }, { \"fed\": \"ihg\" }], \"jkl\": true, \"pqr\": 1 }");
Map<String, Object> expected = Stream.of(new Object[][] {
{ABC, "[{\"def\":\"ghi\"},{\"fed\":\"ihg\"}]"},
{JKL, true},
{PQR, 1},
}).collect(Collectors.toMap(data -> (String) data[0], data -> data[1]));
assertEquals(expected, Jsons.flatten(json, false));
}

@Test
void testFlatten__checkBackwardCompatiblity() {
final JsonNode json = Jsons
.deserialize("{ \"abc\": [{ \"def\": \"ghi\" }, { \"fed\": \"ihg\" }], \"jkl\": true, \"pqr\": 1 }");
Map<String, Object> expected = Stream.of(new Object[][] {
{ABC, "[{\"def\":\"ghi\"},{\"fed\":\"ihg\"}]"},
{JKL, true},
{PQR, 1},
}).collect(Collectors.toMap(data -> (String) data[0], data -> data[1]));
assertEquals(expected, Jsons.flatten(json));
}

@Test
void testFlatten__withArraysApplyFlatten() {
final JsonNode json = Jsons
.deserialize("{ \"abc\": [{ \"def\": \"ghi\" }, { \"fed\": \"ihg\" }], \"jkl\": true, \"pqr\": 1 }");
Map<String, Object> expected = Stream.of(new Object[][] {
{"abc.[0].def", "ghi"},
{"abc.[1].fed", "ihg"},
{JKL, true},
{PQR, 1},
}).collect(Collectors.toMap(data -> (String) data[0], data -> data[1]));
assertEquals(expected, Jsons.flatten(json, true));
}

@Test
void testFlatten__withArraysApplyFlattenNested() {
final JsonNode json = Jsons
.deserialize(
"{ \"abc\": [{ \"def\": {\"ghi\": [\"xyz\"] }}, { \"fed\": \"ihg\" }], \"jkl\": true, \"pqr\": 1 }");
Map<String, Object> expected = Stream.of(new Object[][] {
{"abc.[0].def.ghi.[0]", "xyz"},
{"abc.[1].fed", "ihg"},
{JKL, true},
{PQR, 1},
}).collect(Collectors.toMap(data -> (String) data[0], data -> data[1]));
assertEquals(expected, Jsons.flatten(json, true));
}

private static class ToClass {

@JsonProperty("str")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.52
LABEL io.airbyte.version=0.3.53
LABEL io.airbyte.name=airbyte/destination-redshift
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public boolean isValidData(final JsonNode data) {

// check VARCHAR limits for VARCHAR fields within the SUPER object, if overall object is valid
if (isValid) {
final Map<String, Object> dataMap = Jsons.flatten(data);
final Map<String, Object> dataMap = Jsons.flatten(data, true);
for (final Object value : dataMap.values()) {
if (value instanceof String stringValue) {
final int stringDataSize = stringValue.getBytes(StandardCharsets.UTF_8).length;
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.3.53 | 2023-01-03 | [\#17273](https://github.com/airbytehq/airbyte/pull/17273) | Fixed handling of arrays in SUPER maximum size check |
| 0.3.52 | 2022-12-30 | [\#20879](https://github.com/airbytehq/airbyte/pull/20879) | Added configurable parameter for number of file buffers |
| 0.3.51 | 2022-10-26 | [\#18434](https://github.com/airbytehq/airbyte/pull/18434) | Fix empty S3 bucket path handling |
| 0.3.50 | 2022-09-14 | [\#15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage |
Expand Down

0 comments on commit 606e59e

Please sign in to comment.