Skip to content

Commit

Permalink
[CLIENT][JAVA] Added a loadOpenLineageJson(InputStream) method, and r…
Browse files Browse the repository at this point in the history
…efactored the existing loadOpenLineageYaml(InputStream) method (#2490)

* Added a loadOpenLineageJson(InputStream) method, and refactored the loadOpenLineageYaml(InputStream) method
* Improved the error handling for loadOpenLineageYaml(ConfigPathProvider)
* Explicitly state the exceptions, despite being unchecked exceptions

Signed-off-by: Damien Hawes <d-m-h@users.noreply.github.com>
  • Loading branch information
d-m-h committed Mar 6, 2024
1 parent bfbcdc1 commit 3f52111
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 24 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
* **Flink: bump Flink JDBC connector version to 3.1.2-1.18 for Flink 1.18** (https://github.com/OpenLineage/OpenLineage/pull/2472) [@HuangZhenQiu](https://github.com/HuangZhenQiu)
*Bump Flink JDBC connector version to 3.1.2-1.18 for Flink 1.18.*

### Changed
* **Java Client: Added a `OpenLineageClientUtils#loadOpenLineageJson(InputStream)` and changed `OpenLineageClientUtils#loadOpenLineageYaml(InputStream)` methods.** (https://github.com/OpenLineage/OpenLineage/pull/2490) [@d-m-h](https://github.com/d-m-h)
*This improves the explicitness of the methods. Previously, `loadOpenLineageYaml(InputStream)` wanted the `InputStream` to contain bytes that represented JSON.*

### Fixed
* **Spark: fix `HttpTransport` timeout.** [`#2475`](https://github.com/OpenLineage/OpenLineage/pull/2475) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*Existing `timeout` config parameter is ambiguous: implementation treats value as double in seconds, although documentation claims it's milliseconds. New config param `timeoutInMillis` is added. Existing `timeout` gets removed from docs and will be deprecated in 1.13.*
Expand Down
1 change: 1 addition & 0 deletions client/java/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ bin

src/main/java/io/openlineage/client/OpenLineage.java
src/main/java/io/openlineage/server/*
.sdkmanrc
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,26 @@
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.openlineage.client.OpenLineage.RunEvent;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

/** Utilities class for {@link OpenLineageClient}. */
/**
* Utility class for {@link OpenLineageClient} that provides common functionalities for object
* mapping, JSON and YAML parsing, and URI manipulation.
*/
@Slf4j
public final class OpenLineageClientUtils {
private OpenLineageClientUtils() {}

Expand All @@ -42,7 +51,12 @@ private OpenLineageClientUtils() {}
@JsonFilter("disabledFacets")
public class DisabledFacetsMixin {}

/** Returns a new {@link ObjectMapper} instance. */
/**
* Creates a new {@link ObjectMapper} instance configured with modules for JDK8 and JavaTime,
* including settings to ignore unknown properties and to not write dates as timestamps.
*
* @return A configured {@link ObjectMapper} instance.
*/
public static ObjectMapper newObjectMapper() {
final ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new Jdk8Module());
Expand All @@ -54,9 +68,9 @@ public static ObjectMapper newObjectMapper() {
}

/**
* Configure object mapper to exclude specified facets from being serialized
* Configures the object mapper to exclude specified facets from being serialized.
*
* @param disableFacets
* @param disableFacets Array of facet names to be excluded from serialization.
*/
public static void configureObjectMapper(String... disableFacets) {
if (disableFacets == null) {
Expand All @@ -70,17 +84,32 @@ public static void configureObjectMapper(String... disableFacets) {
MAPPER.addMixIn(Object.class, DisabledFacetsMixin.class);
}

/** Converts the provided {@code value} to a Json {@code string}. */
public static String toJson(@NonNull final Object value) {
/**
* Converts the provided value to a JSON string.
*
* @param value The object to be converted to JSON.
* @return A JSON string representation of the object.
* @throws UncheckedIOException If an I/O error occurs during conversion.
*/
public static String toJson(@NonNull final Object value) throws UncheckedIOException {
try {
return MAPPER.writeValueAsString(value);
} catch (JsonProcessingException e) {
throw new UncheckedIOException(e);
}
}

/** Converts the provided Json {@code string} to the specified {@code type}. */
public static <T> T fromJson(@NonNull final String json, @NonNull final TypeReference<T> type) {
/**
* Converts the provided JSON string to an instance of the specified type.
*
* @param json The JSON string to be converted.
* @param type The type to convert the JSON string into.
* @param <T> The generic type of the return value.
* @return An instance of the specified type.
* @throws UncheckedIOException If an I/O error occurs during conversion.
*/
public static <T> T fromJson(@NonNull final String json, @NonNull final TypeReference<T> type)
throws UncheckedIOException {
try {
return MAPPER.readValue(json, type);
} catch (IOException e) {
Expand All @@ -89,20 +118,38 @@ public static <T> T fromJson(@NonNull final String json, @NonNull final TypeRefe
}

/**
* Converts the provided Json {@code string} to {@link RunEvent}. Useful as other `fromJson`
* method gets shaded.
* Convenience method to convert a JSON string directly into a {@link RunEvent} instance.
*
* @param json The JSON string representing a {@link RunEvent}.
* @return An instance of {@link RunEvent}.
* @throws UncheckedIOException If an I/O error occurs during conversion.
*/
public static RunEvent runEventFromJson(@NonNull final String json) {
public static RunEvent runEventFromJson(@NonNull final String json) throws UncheckedIOException {
return fromJson(json, new TypeReference<RunEvent>() {});
}

/**
* Converts the value of an object from one type to another.
*
* @param fromValue The object whose value is to be converted.
* @param toValueType The target type for the conversion.
* @param <T> The generic type of the target type.
* @return An object of the target type with the value converted from the original object.
*/
public static <T> T convertValue(Object fromValue, Class<T> toValueType) {
return MAPPER.convertValue(fromValue, toValueType);
}

/**
* Create a new instance of the facets container with all values merged from the original
* facetsContainer and the given facets Map, with precedence given to the facets Map.
* Merges the given facets map with an existing facets container, giving precedence to the values
* in the facets map.
*
* @param facetsMap A map containing facets to be merged.
* @param facetsContainer The existing container of facets.
* @param klass The class of the facets container.
* @param <T> The type of the facets container.
* @param <F> The type of facets in the map.
* @return A new instance of the facets container with merged values.
*/
public static <T, F> T mergeFacets(Map<String, F> facetsMap, T facetsContainer, Class<T> klass) {
if (facetsContainer == null) {
Expand All @@ -115,38 +162,110 @@ public static <T, F> T mergeFacets(Map<String, F> facetsMap, T facetsContainer,
return MAPPER.convertValue(targetMap, klass);
}

/** Converts the provided {@code urlString} to an {@link URI} object. */
public static URI toUri(@NonNull final String urlString) {
/**
* Converts a string URL into an {@link URI} object.
*
* @param urlString The string URL to be converted.
* @return An {@link URI} object.
* @throws OpenLineageClientException If the given string does not conform to the URI
* specification.
*/
public static URI toUri(@NonNull final String urlString) throws OpenLineageClientException {
try {
final String urlStringWithNoTrailingSlash =
(urlString.endsWith("/") ? urlString.substring(0, urlString.length() - 1) : urlString);
return new URI(urlStringWithNoTrailingSlash);
} catch (URISyntaxException e) {
final OpenLineageClientException error =
new OpenLineageClientException("Malformed URI: " + urlString);
error.initCause(e);
throw error;
throw new OpenLineageClientException("Malformed URI: " + urlString, e);
}
}

public static OpenLineageYaml loadOpenLineageYaml(ConfigPathProvider configPathProvider) {
/**
* Loads and parses OpenLineage configuration from the provided paths. Throws an {@link
* OpenLineageClientException} if one of the following conditions are met:
*
* <ol>
* <li>The provided configPathProvider is null
* <li>No configuration file could be found at any of the provided paths
* <li>Load the default configuration from the classpath if no file is found
* </ol>
*
* @param configPathProvider Provides the paths where the configuration files can be found.
* @return An instance of {@link OpenLineageYaml} containing the parsed configuration.
* @throws OpenLineageClientException According to the rules defined above.
*/
public static OpenLineageYaml loadOpenLineageYaml(ConfigPathProvider configPathProvider)
throws OpenLineageClientException {
try {
for (final Path path : configPathProvider.getPaths()) {
Objects.requireNonNull(configPathProvider);
List<Path> paths = configPathProvider.getPaths();
for (final Path path : paths) {
if (Files.exists(path)) {
return YML.readValue(path.toFile(), OpenLineageYaml.class);
}
}
throw new IllegalArgumentException();
String concatenatedPaths =
paths.stream().map(Path::toString).collect(Collectors.joining(";", "[", "]"));
throw new FileNotFoundException(
"No OpenLineage configuration file found at provided paths, looked in: "
+ concatenatedPaths);
} catch (NullPointerException e) {
throw new OpenLineageClientException("ConfigPathProvider was null");
} catch (FileNotFoundException e) {
throw new OpenLineageClientException("No OpenLineage configuration file found");
} catch (IOException e) {
throw new OpenLineageClientException(e);
}
}

public static OpenLineageYaml loadOpenLineageYaml(InputStream inputStream) {
/**
* Loads and parses OpenLineage YAML configuration from an {@link InputStream}.
*
* @param inputStream The {@link InputStream} from which to load the configuration.
* @return An instance of {@link OpenLineageYaml} containing the parsed YAML configuration.
* @throws OpenLineageClientException If an error occurs while reading or parsing the
* configuration.
*/
public static OpenLineageYaml loadOpenLineageYaml(InputStream inputStream)
throws OpenLineageClientException {
try {
return deserializeInputStream(YML, inputStream);
} catch (IOException e) {
log.warn("Error deserializing OpenLineage YAML, falling back to JSON", e);
// Some clients may actually pass in a JSON, because of how this method used to operate
// So, we'll use OpenLineageClientUtils#loadOpenLineageJson(InputStream) as a fallback
return loadOpenLineageJson(inputStream);
}
}

/**
* Loads and parses OpenLineage JSON configuration from an {@link InputStream}.
*
* @param inputStream The {@link InputStream} from which to load the configuration.
* @return An instance of {@link OpenLineageYaml} containing the parsed JSON configuration.
* @throws OpenLineageClientException If an error occurs while reading or parsing the
* configuration.
*/
public static OpenLineageYaml loadOpenLineageJson(InputStream inputStream)
throws OpenLineageClientException {
try {
return JSON.readValue(inputStream, OpenLineageYaml.class);
return deserializeInputStream(JSON, inputStream);
} catch (IOException e) {
throw new OpenLineageClientException(e);
}
}

/**
* Internal method to deserialize an {@link InputStream} into an {@link OpenLineageYaml} instance,
* using the specified {@link ObjectMapper} for either JSON or YAML.
*
* @param deserializer The {@link ObjectMapper} to use for deserialization.
* @param inputStream The {@link InputStream} containing the configuration data.
* @return An instance of {@link OpenLineageYaml}.
* @throws IOException If an error occurs while reading or parsing the configuration.
*/
private static OpenLineageYaml deserializeInputStream(
ObjectMapper deserializer, InputStream inputStream) throws IOException {
return deserializer.readValue(inputStream, OpenLineageYaml.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import io.openlineage.client.transports.HttpConfig;
import io.openlineage.client.transports.TransportConfig;
import java.io.ByteArrayInputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -106,4 +110,55 @@ static final class ObjectWithDisabledFacets {
this.notExcludedValue = notExcludedValue;
}
}

@Test
void loadOpenLineageYaml_shouldDeserialiseYamlEncodedInputStreams() {
String yamlString =
"transport:\n"
+ " type: http\n"
+ " url: http://localhost:5050\n"
+ " endpoint: api/v1/lineage\n";
System.out.println(yamlString);

byte[] bytes = yamlString.getBytes(StandardCharsets.UTF_8);

OpenLineageYaml yaml =
OpenLineageClientUtils.loadOpenLineageYaml(new ByteArrayInputStream(bytes));
TransportConfig transportConfig = yaml.getTransportConfig();
assertThat(transportConfig).isNotNull();
assertThat(transportConfig).isInstanceOf(HttpConfig.class);
HttpConfig httpConfig = (HttpConfig) transportConfig;
assertThat(httpConfig.getUrl()).isEqualTo(URI.create("http://localhost:5050"));
assertThat(httpConfig.getEndpoint()).isEqualTo("api/v1/lineage");
}

@Test
void loadOpenLineageYaml_shouldFallbackAndDeserialiseJsonEncodedInputStreams() {
byte[] bytes =
"{\"transport\":{\"type\":\"http\",\"url\":\"https://localhost:1234/api/v1/lineage\"}}"
.getBytes(StandardCharsets.UTF_8);

OpenLineageYaml yaml =
OpenLineageClientUtils.loadOpenLineageYaml(new ByteArrayInputStream(bytes));
TransportConfig transportConfig = yaml.getTransportConfig();
assertThat(transportConfig).isNotNull();
assertThat(transportConfig).isInstanceOf(HttpConfig.class);
HttpConfig httpConfig = (HttpConfig) transportConfig;
assertThat(httpConfig.getUrl()).isEqualTo(URI.create("https://localhost:1234/api/v1/lineage"));
}

@Test
void loadOpenLineageJson_ShouldDeserialiseJsonEncodedInputStreams() {
byte[] bytes =
"{\"transport\":{\"type\":\"http\",\"url\":\"https://localhost:1234/api/v1/lineage\"}}"
.getBytes(StandardCharsets.UTF_8);

OpenLineageYaml yaml =
OpenLineageClientUtils.loadOpenLineageJson(new ByteArrayInputStream(bytes));
TransportConfig transportConfig = yaml.getTransportConfig();
assertThat(transportConfig).isNotNull();
assertThat(transportConfig).isInstanceOf(HttpConfig.class);
HttpConfig httpConfig = (HttpConfig) transportConfig;
assertThat(httpConfig.getUrl()).isEqualTo(URI.create("https://localhost:1234/api/v1/lineage"));
}
}

0 comments on commit 3f52111

Please sign in to comment.