Skip to content

Commit

Permalink
[Bug][format][json] Fix jackson package conflict with spark (#2934)
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 committed Sep 29, 2022
1 parent f3ff39b commit 1a92b83
Show file tree
Hide file tree
Showing 6 changed files with 372 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import lombok.AllArgsConstructor;

import java.io.IOException;
Expand All @@ -35,26 +33,10 @@ public class DeserializationCollector {

public void collect(byte[] message, Collector<SeaTunnelRow> out) throws IOException {
if (deserializationSchema instanceof JsonDeserializationSchema) {
collectJson(message, (JsonDeserializationSchema) deserializationSchema, out);
((JsonDeserializationSchema) deserializationSchema).collect(message, out);
} else {
SeaTunnelRow deserialize = deserializationSchema.deserialize(message);
out.collect(deserialize);
}
}

private void collectJson(byte[] message,
JsonDeserializationSchema jsonDeserializationSchema,
Collector<SeaTunnelRow> out) throws IOException {
JsonNode jsonNode = jsonDeserializationSchema.convertBytes(message);
if (jsonNode.isArray()) {
ArrayNode arrayNode = (ArrayNode) jsonNode;
for (int i = 0; i < arrayNode.size(); i++) {
SeaTunnelRow deserialize = jsonDeserializationSchema.convertJsonNode(arrayNode.get(i));
out.collect(deserialize);
}
} else {
SeaTunnelRow deserialize = jsonDeserializationSchema.convertJsonNode(jsonNode);
out.collect(deserialize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.format.json.JsonSerializationSchema;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -57,7 +56,6 @@

import scala.Tuple2;

@DisabledOnContainer(value = "spark:2.4.3", disabledReason = "json-format conflicts with the Jackson version of Spark-2.4.3, see:https://github.com/apache/incubator-seatunnel/issues/2929")
@Slf4j
public class RedisIT extends TestSuiteBase implements TestResource {
private static final String IMAGE = "redis:latest";
Expand Down Expand Up @@ -162,6 +160,7 @@ private static Tuple2<SeaTunnelRowType, List<SeaTunnelRow>> generateTestDataSet(
private void initJedis() {
Jedis jedis = new Jedis(redisContainer.getHost(), redisContainer.getFirstMappedPort());
jedis.auth(PASSWORD);
jedis.ping();
this.jedis = jedis;
}

Expand Down
43 changes: 43 additions & 0 deletions seatunnel-formats/seatunnel-format-json/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,50 @@
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<relocations>
<relocation>
<pattern>com.fasterxml.jackson</pattern>
<shadedPattern>${seatunnel.shade.package}.com.fasterxml.jackson</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
<!-- make sure that flatten runs after maven-shade-plugin -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>flatten-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static java.lang.String.format;

import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.CompositeType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
Expand All @@ -32,6 +33,7 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;

import java.io.IOException;

Expand Down Expand Up @@ -107,7 +109,18 @@ public SeaTunnelRow deserialize(byte[] message) throws IOException {
return convertJsonNode(convertBytes(message));
}

public SeaTunnelRow convertJsonNode(JsonNode jsonNode) throws IOException {
public void collect(byte[] message, Collector<SeaTunnelRow> out) throws IOException {
JsonNode jsonNode = convertBytes(message);
if (jsonNode.isArray()) {
ArrayNode arrayNode = (ArrayNode) jsonNode;
for (int i = 0; i < arrayNode.size(); i++) {
SeaTunnelRow deserialize = convertJsonNode(arrayNode.get(i));
out.collect(deserialize);
}
}
}

private SeaTunnelRow convertJsonNode(JsonNode jsonNode) throws IOException {
if (jsonNode == null) {
return null;
}
Expand All @@ -122,7 +135,7 @@ public SeaTunnelRow convertJsonNode(JsonNode jsonNode) throws IOException {
}
}

public JsonNode convertBytes(byte[] message) throws IOException {
private JsonNode convertBytes(byte[] message) throws IOException {
try {
return objectMapper.readTree(message);
} catch (Throwable t) {
Expand Down
Loading

0 comments on commit 1a92b83

Please sign in to comment.