Skip to content

Commit

Permalink
Runless events - consume dataset event (#2641)
Browse files Browse the repository at this point in the history
* Runless events - consume dataset event

Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>

* Runless events - add container to store daos

Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>

* Runless events - extract common methods

Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>

* Runless events - run upsert builder

Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>

* Runless events - review feedback

Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>

* fix daos container - speeds up tests twice

Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>

---------

Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
Co-authored-by: Willy Lulciuc <willy@datakin.com>
  • Loading branch information
pawel-big-lebowski and wslulciuc committed Nov 6, 2023
1 parent 7c19162 commit 42cadbb
Show file tree
Hide file tree
Showing 14 changed files with 646 additions and 268 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
* Web: fix Unix epoch time display for null `endedAt` values [`#2647`](https://github.com/MarquezProject/marquez/pull/2647) [@merobi-hub](https://github.com/merobi-hub)
*Fixes the issue of the GUI displaying Unix epoch time (midnight on January 1, 1970) in the case of running jobs/null `endedAt` values.*

### Added
* API: support `DatasetEvent` [`#2641`](https://github.com/MarquezProject/marquez/pull/2641) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
*Save into Marquez model datasets sent via `DatasetEvent` event type

## [0.41.0](https://github.com/MarquezProject/marquez/compare/0.40.0...0.41.0) - 2023-09-20
### Added
* API: add support for the following parameters in the `SearchDao` [`#2556`](https://github.com/MarquezProject/marquez/pull/2556) [@tati](https://github.com/tati) [@wslulciuc](https://github.com/wslulciuc)
Expand Down
24 changes: 15 additions & 9 deletions api/src/main/java/marquez/api/OpenLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import marquez.db.OpenLineageDao;
import marquez.service.ServiceFactory;
import marquez.service.models.BaseEvent;
import marquez.service.models.DatasetEvent;
import marquez.service.models.LineageEvent;
import marquez.service.models.NodeId;

Expand Down Expand Up @@ -67,15 +68,11 @@ public void create(@Valid @NotNull BaseEvent event, @Suspended final AsyncRespon
if (event instanceof LineageEvent) {
openLineageService
.createAsync((LineageEvent) event)
.whenComplete(
(result, err) -> {
if (err != null) {
log.error("Unexpected error while processing request", err);
asyncResponse.resume(Response.status(determineStatusCode(err)).build());
} else {
asyncResponse.resume(Response.status(201).build());
}
});
.whenComplete((result, err) -> onComplete(result, err, asyncResponse));
} else if (event instanceof DatasetEvent) {
openLineageService
.createAsync((DatasetEvent) event)
.whenComplete((result, err) -> onComplete(result, err, asyncResponse));
} else {
log.warn("Unsupported event type {}. Skipping without error", event.getClass().getName());

Expand All @@ -84,6 +81,15 @@ public void create(@Valid @NotNull BaseEvent event, @Suspended final AsyncRespon
}
}

private void onComplete(Void result, Throwable err, AsyncResponse asyncResponse) {
if (err != null) {
log.error("Unexpected error while processing request", err);
asyncResponse.resume(Response.status(determineStatusCode(err)).build());
} else {
asyncResponse.resume(Response.status(201).build());
}
}

private int determineStatusCode(Throwable e) {
if (e instanceof CompletionException) {
return determineStatusCode(e.getCause());
Expand Down
5 changes: 3 additions & 2 deletions api/src/main/java/marquez/db/DatasetFacetsDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Spliterators;
import java.util.UUID;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import lombok.NonNull;
import marquez.common.Utils;
import marquez.service.models.LineageEvent;
Expand Down Expand Up @@ -126,9 +127,9 @@ void insertDatasetFacet(
default void insertDatasetFacetsFor(
@NonNull UUID datasetUuid,
@NonNull UUID datasetVersionUuid,
@NonNull UUID runUuid,
@Nullable UUID runUuid,
@NonNull Instant lineageEventTime,
@NonNull String lineageEventType,
@Nullable String lineageEventType,
@NonNull LineageEvent.DatasetFacets datasetFacets) {
final Instant now = Instant.now();

Expand Down
Loading

0 comments on commit 42cadbb

Please sign in to comment.