Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CLIENT][JAVA] Added a loadOpenLineageJson(InputStream) method, and refactored the existing loadOpenLineageYaml(InputStream) method #2490

Merged
merged 5 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
* **Flink: bump Flink JDBC connector version to 3.1.2-1.18 for Flink 1.18** (https://github.com/OpenLineage/OpenLineage/pull/2472) [@HuangZhenQiu](https://github.com/HuangZhenQiu)
*Bump Flink JDBC connector version to 3.1.2-1.18 for Flink 1.18.*

### Changed
* **Java Client: Added a `OpenLineageClientUtils#loadOpenLineageJson(InputStream)` and changed `OpenLineageClientUtils#loadOpenLineageYaml(InputStream)` methods.** (https://github.com/OpenLineage/OpenLineage/pull/2490) [@d-m-h](https://github.com/d-m-h)
*This improves the explicitness of the methods. Previously, `loadOpenLineageYaml(InputStream)` wanted the `InputStream` to contain bytes that represented JSON.*

### Fixed
* **Spark: fix `HttpTransport` timeout.** [`#2475`](https://github.com/OpenLineage/OpenLineage/pull/2475) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*Existing `timeout` config parameter is ambiguous: implementation treats value as double in seconds, although documentation claims it's milliseconds. New config param `timeoutInMillis` is added. Existing `timeout` gets removed from docs and will be deprecated in 1.13.*
Expand Down
1 change: 1 addition & 0 deletions client/java/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ bin

src/main/java/io/openlineage/client/OpenLineage.java
src/main/java/io/openlineage/server/*
.sdkmanrc
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,26 @@
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.openlineage.client.OpenLineage.RunEvent;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

/** Utilities class for {@link OpenLineageClient}. */
/**
* Utility class for {@link OpenLineageClient} that provides common functionalities for object
* mapping, JSON and YAML parsing, and URI manipulation.
*/
@Slf4j
public final class OpenLineageClientUtils {
private OpenLineageClientUtils() {}

Expand All @@ -42,7 +51,12 @@ private OpenLineageClientUtils() {}
@JsonFilter("disabledFacets")
public class DisabledFacetsMixin {}

/** Returns a new {@link ObjectMapper} instance. */
/**
* Creates a new {@link ObjectMapper} instance configured with modules for JDK8 and JavaTime,
* including settings to ignore unknown properties and to not write dates as timestamps.
*
* @return A configured {@link ObjectMapper} instance.
*/
public static ObjectMapper newObjectMapper() {
final ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new Jdk8Module());
Expand All @@ -54,9 +68,9 @@ public static ObjectMapper newObjectMapper() {
}

/**
* Configure object mapper to exclude specified facets from being serialized
* Configures the object mapper to exclude specified facets from being serialized.
*
* @param disableFacets
* @param disableFacets Array of facet names to be excluded from serialization.
*/
public static void configureObjectMapper(String... disableFacets) {
if (disableFacets == null) {
Expand All @@ -70,17 +84,32 @@ public static void configureObjectMapper(String... disableFacets) {
MAPPER.addMixIn(Object.class, DisabledFacetsMixin.class);
}

/** Converts the provided {@code value} to a Json {@code string}. */
public static String toJson(@NonNull final Object value) {
/**
* Converts the provided value to a JSON string.
*
* @param value The object to be converted to JSON.
* @return A JSON string representation of the object.
* @throws UncheckedIOException If an I/O error occurs during conversion.
*/
public static String toJson(@NonNull final Object value) throws UncheckedIOException {
try {
return MAPPER.writeValueAsString(value);
} catch (JsonProcessingException e) {
throw new UncheckedIOException(e);
}
}

/** Converts the provided Json {@code string} to the specified {@code type}. */
public static <T> T fromJson(@NonNull final String json, @NonNull final TypeReference<T> type) {
/**
* Converts the provided JSON string to an instance of the specified type.
*
* @param json The JSON string to be converted.
* @param type The type to convert the JSON string into.
* @param <T> The generic type of the return value.
* @return An instance of the specified type.
* @throws UncheckedIOException If an I/O error occurs during conversion.
*/
public static <T> T fromJson(@NonNull final String json, @NonNull final TypeReference<T> type)
throws UncheckedIOException {
try {
return MAPPER.readValue(json, type);
} catch (IOException e) {
Expand All @@ -89,20 +118,38 @@ public static <T> T fromJson(@NonNull final String json, @NonNull final TypeRefe
}

/**
* Converts the provided Json {@code string} to {@link RunEvent}. Useful as other `fromJson`
* method gets shaded.
* Convenience method to convert a JSON string directly into a {@link RunEvent} instance.
*
* @param json The JSON string representing a {@link RunEvent}.
* @return An instance of {@link RunEvent}.
* @throws UncheckedIOException If an I/O error occurs during conversion.
*/
public static RunEvent runEventFromJson(@NonNull final String json) {
public static RunEvent runEventFromJson(@NonNull final String json) throws UncheckedIOException {
return fromJson(json, new TypeReference<RunEvent>() {});
}

/**
* Converts the value of an object from one type to another.
*
* @param fromValue The object whose value is to be converted.
* @param toValueType The target type for the conversion.
* @param <T> The generic type of the target type.
* @return An object of the target type with the value converted from the original object.
*/
public static <T> T convertValue(Object fromValue, Class<T> toValueType) {
return MAPPER.convertValue(fromValue, toValueType);
}

/**
* Create a new instance of the facets container with all values merged from the original
* facetsContainer and the given facets Map, with precedence given to the facets Map.
* Merges the given facets map with an existing facets container, giving precedence to the values
* in the facets map.
*
* @param facetsMap A map containing facets to be merged.
* @param facetsContainer The existing container of facets.
* @param klass The class of the facets container.
* @param <T> The type of the facets container.
* @param <F> The type of facets in the map.
* @return A new instance of the facets container with merged values.
*/
public static <T, F> T mergeFacets(Map<String, F> facetsMap, T facetsContainer, Class<T> klass) {
if (facetsContainer == null) {
Expand All @@ -115,38 +162,110 @@ public static <T, F> T mergeFacets(Map<String, F> facetsMap, T facetsContainer,
return MAPPER.convertValue(targetMap, klass);
}

/** Converts the provided {@code urlString} to an {@link URI} object. */
public static URI toUri(@NonNull final String urlString) {
/**
* Converts a string URL into an {@link URI} object.
*
* @param urlString The string URL to be converted.
* @return An {@link URI} object.
* @throws OpenLineageClientException If the given string does not conform to the URI
* specification.
*/
public static URI toUri(@NonNull final String urlString) throws OpenLineageClientException {
try {
final String urlStringWithNoTrailingSlash =
(urlString.endsWith("/") ? urlString.substring(0, urlString.length() - 1) : urlString);
return new URI(urlStringWithNoTrailingSlash);
} catch (URISyntaxException e) {
final OpenLineageClientException error =
new OpenLineageClientException("Malformed URI: " + urlString);
error.initCause(e);
throw error;
throw new OpenLineageClientException("Malformed URI: " + urlString, e);
}
}

public static OpenLineageYaml loadOpenLineageYaml(ConfigPathProvider configPathProvider) {
/**
* Loads and parses OpenLineage configuration from the provided paths. Throws an {@link
* OpenLineageClientException} if one of the following conditions are met:
*
* <ol>
* <li>The provided configPathProvider is null
* <li>No configuration file could be found at any of the provided paths
* <li>Load the default configuration from the classpath if no file is found
* </ol>
*
* @param configPathProvider Provides the paths where the configuration files can be found.
* @return An instance of {@link OpenLineageYaml} containing the parsed configuration.
* @throws OpenLineageClientException According to the rules defined above.
*/
public static OpenLineageYaml loadOpenLineageYaml(ConfigPathProvider configPathProvider)
throws OpenLineageClientException {
try {
for (final Path path : configPathProvider.getPaths()) {
Objects.requireNonNull(configPathProvider);
List<Path> paths = configPathProvider.getPaths();
for (final Path path : paths) {
if (Files.exists(path)) {
return YML.readValue(path.toFile(), OpenLineageYaml.class);
}
}
throw new IllegalArgumentException();
String concatenatedPaths =
paths.stream().map(Path::toString).collect(Collectors.joining(";", "[", "]"));
throw new FileNotFoundException(
"No OpenLineage configuration file found at provided paths, looked in: "
+ concatenatedPaths);
} catch (NullPointerException e) {
throw new OpenLineageClientException("ConfigPathProvider was null");
} catch (FileNotFoundException e) {
throw new OpenLineageClientException("No OpenLineage configuration file found");
} catch (IOException e) {
throw new OpenLineageClientException(e);
}
}

public static OpenLineageYaml loadOpenLineageYaml(InputStream inputStream) {
/**
* Loads and parses OpenLineage YAML configuration from an {@link InputStream}.
*
* @param inputStream The {@link InputStream} from which to load the configuration.
* @return An instance of {@link OpenLineageYaml} containing the parsed YAML configuration.
* @throws OpenLineageClientException If an error occurs while reading or parsing the
* configuration.
*/
public static OpenLineageYaml loadOpenLineageYaml(InputStream inputStream)
throws OpenLineageClientException {
try {
return deserializeInputStream(YML, inputStream);
} catch (IOException e) {
log.warn("Error deserializing OpenLineage YAML, falling back to JSON", e);
// Some clients may actually pass in a JSON, because of how this method used to operate
// So, we'll use OpenLineageClientUtils#loadOpenLineageJson(InputStream) as a fallback
return loadOpenLineageJson(inputStream);
}
}

/**
* Loads and parses OpenLineage JSON configuration from an {@link InputStream}.
*
* @param inputStream The {@link InputStream} from which to load the configuration.
* @return An instance of {@link OpenLineageYaml} containing the parsed JSON configuration.
* @throws OpenLineageClientException If an error occurs while reading or parsing the
* configuration.
*/
public static OpenLineageYaml loadOpenLineageJson(InputStream inputStream)
throws OpenLineageClientException {
try {
return JSON.readValue(inputStream, OpenLineageYaml.class);
return deserializeInputStream(JSON, inputStream);
} catch (IOException e) {
throw new OpenLineageClientException(e);
}
}

/**
* Internal method to deserialize an {@link InputStream} into an {@link OpenLineageYaml} instance,
* using the specified {@link ObjectMapper} for either JSON or YAML.
*
* @param deserializer The {@link ObjectMapper} to use for deserialization.
* @param inputStream The {@link InputStream} containing the configuration data.
* @return An instance of {@link OpenLineageYaml}.
* @throws IOException If an error occurs while reading or parsing the configuration.
*/
private static OpenLineageYaml deserializeInputStream(
ObjectMapper deserializer, InputStream inputStream) throws IOException {
return deserializer.readValue(inputStream, OpenLineageYaml.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import io.openlineage.client.transports.HttpConfig;
import io.openlineage.client.transports.TransportConfig;
import java.io.ByteArrayInputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -106,4 +110,55 @@ static final class ObjectWithDisabledFacets {
this.notExcludedValue = notExcludedValue;
}
}

@Test
void loadOpenLineageYaml_shouldDeserialiseYamlEncodedInputStreams() {
String yamlString =
"transport:\n"
+ " type: http\n"
+ " url: http://localhost:5050\n"
+ " endpoint: api/v1/lineage\n";
System.out.println(yamlString);

byte[] bytes = yamlString.getBytes(StandardCharsets.UTF_8);

OpenLineageYaml yaml =
OpenLineageClientUtils.loadOpenLineageYaml(new ByteArrayInputStream(bytes));
TransportConfig transportConfig = yaml.getTransportConfig();
assertThat(transportConfig).isNotNull();
assertThat(transportConfig).isInstanceOf(HttpConfig.class);
HttpConfig httpConfig = (HttpConfig) transportConfig;
assertThat(httpConfig.getUrl()).isEqualTo(URI.create("http://localhost:5050"));
assertThat(httpConfig.getEndpoint()).isEqualTo("api/v1/lineage");
}

@Test
void loadOpenLineageYaml_shouldFallbackAndDeserialiseJsonEncodedInputStreams() {
byte[] bytes =
"{\"transport\":{\"type\":\"http\",\"url\":\"https://localhost:1234/api/v1/lineage\"}}"
.getBytes(StandardCharsets.UTF_8);

OpenLineageYaml yaml =
OpenLineageClientUtils.loadOpenLineageYaml(new ByteArrayInputStream(bytes));
TransportConfig transportConfig = yaml.getTransportConfig();
assertThat(transportConfig).isNotNull();
assertThat(transportConfig).isInstanceOf(HttpConfig.class);
HttpConfig httpConfig = (HttpConfig) transportConfig;
assertThat(httpConfig.getUrl()).isEqualTo(URI.create("https://localhost:1234/api/v1/lineage"));
}

@Test
void loadOpenLineageJson_ShouldDeserialiseJsonEncodedInputStreams() {
byte[] bytes =
"{\"transport\":{\"type\":\"http\",\"url\":\"https://localhost:1234/api/v1/lineage\"}}"
.getBytes(StandardCharsets.UTF_8);

OpenLineageYaml yaml =
OpenLineageClientUtils.loadOpenLineageJson(new ByteArrayInputStream(bytes));
TransportConfig transportConfig = yaml.getTransportConfig();
assertThat(transportConfig).isNotNull();
assertThat(transportConfig).isInstanceOf(HttpConfig.class);
HttpConfig httpConfig = (HttpConfig) transportConfig;
assertThat(httpConfig.getUrl()).isEqualTo(URI.create("https://localhost:1234/api/v1/lineage"));
}
}