Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

separate tap and target in sync worker 2 #113

Merged
merged 1 commit into from
Aug 27, 2020
Merged

Conversation

cgardens
Copy link
Contributor

@cgardens cgardens commented Aug 26, 2020

Note: I will NOT merge this until the final two items in the checklist below are complete. I think this is in a good state, however, to get feedback now. Really looking for a yay or nay on whether we are happy with this approach and it's worth bringing across the finish line.

What

  • Per our discussion on RFC PR, this PR attempts to implement the sync worker in such a way that separate tap and target from each other. This opens up in the future the ability to 1. use non-singer taps / targets 2) introduce our own protocol 3) provide a well-defined interface to build new integrations against.

How

  • Separate tap and target behind two new interface. For now they are just implemented specifically for singer.

Checklist

  • try tap and target as try-as-resource
  • redo all exception handling within the workers (NOT GOING TO DO THIS IN THIS PR. Need to look at all workers holistically next to get this right.)
  • reimplement the test coverage of SingerSyncWorkerTest for DefaultSyncWorker, SingerTap, and SingerTarget.
  • write tests in general

Recommended reading order

  1. WorkerRunner.java
  2. DefaultSyncWorker
  3. SyncTap
  4. SyncTarget
  5. SingerTap
  6. SingerTarget
  7. the rest

closes #90

@@ -1,239 +0,0 @@
/*
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaced by DefaultSyncWorker.

@@ -1,208 +0,0 @@
/*
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still need to reimplement these tests for DefaultSyncWorker, SingerTap, and SingerTarget.

import java.util.Iterator;
import java.util.function.Consumer;

/**
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i tried to find a guava iterator that did this, but could not find one. feels like something that should exist. i guess the counterargument here is we just used a stream we could use peek instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A stream with peek sounds better for this.

import java.util.Iterator;

public interface SyncTap<T> extends AutoCloseable {
Iterator<T> run(StandardTapConfig tapConfig, Path workspacePath)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have an opinion here on Iterator versus Stream?

final State state;
try (SyncTap<SingerProtocol> tap = new SingerTap(tapDockerImage)) {
final Iterator<SingerProtocol> iterator = tap.run(tapConfig, workspacePath);
tapCloser = tap::close;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a little weird, but it was the only way i could figure out how to do this while still instantiating the tap in the try-with-resource block.

@cgardens cgardens marked this pull request as ready for review August 26, 2020 19:09
Copy link
Contributor

@jrhizor jrhizor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally think this is in the right direction, but the implementation DefaultSyncWorker and the other interfaces would be cleaner with Stream-ing.

import java.nio.file.Path;
import java.util.Iterator;

public interface SyncTap<T> extends AutoCloseable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't actually seem tied to the Sync concept. Maybe TapStream? If it extended Stream, it would already be autocloseable.

It seems like you separately want a factory from (StandardTapConfig tapConfig, Path workspacePath) -> TapStream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment for SyncTarget. I'm also disliking how we're using Tap/Target in some places that isn't Singer-specific instead of Source/Destination.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment for SyncTarget. I'm also disliking how we're using Tap/Target in some places that isn't Singer-specific instead of Source/Destination.

good call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haven't done this name change yet. but will do it.


@Override
public void cancel() {
WorkerUtils.cancelHelper(tapProcess);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this manifest in the Iterator? Does it throw an IOException or InterruptedException or something else?

import java.util.Iterator;
import java.util.function.Consumer;

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A stream with peek sounds better for this.


StandardSyncSummary summary = new StandardSyncSummary();
summary.setRecordsSynced(recordCount.getValue());
summary.setStartTime(startTime);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we show this time, the Job timestamps, or both in the UI?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess this one? don't feel strongly. this timestamp feels boarding on redundant.

}

@Override
public State run(
Copy link
Contributor

@jrhizor jrhizor Aug 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this would basically be a stateful consumer of the tap stream if this is reimplemented using streams?

SingerTarget st = new SingerTarget(...);
TapFactory.create(...)
   .peek(statsPeeker)
   .forEach(st);
State state = st.getState();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. check it out.


@Override
public void cancel() {
WorkerUtils.cancelHelper(targetProcess);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also a little confused about the cancellation lifecycle for targets.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think i was confused too. lmk if new version clarifies it.

@Override
public void cancel() {
try {
if (tapCloser != null) {
Copy link
Contributor

@jrhizor jrhizor Aug 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe

Optional.ofNullable(tapCloser).ifPresent(closer -> closer.close());
Optional.ofNullable(targetCloser).ifPresent(closer -> closer.close());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this ends up uglier because you need to handle the Exception inside the lambda which explodes the lines of the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay. ended up having to handle the exceptions in lambda for other reasons so went with this now :D

@cgardens cgardens marked this pull request as draft August 26, 2020 23:25
@cgardens
Copy link
Contributor Author

I got some really useful feedback from Jared. Switching this back to draft while I incorporate it. Always happy to get more feedback if you read it anyway, but feel free to hold off until I un-draft it again.

}

stdout = tapProcess.getInputStream();
return Streams.stream(new SingerJsonIterator(stdout)).onClose(getCloseFunction());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jrhizor - would be curious to hear what you think about this. i'm a little wary because 1. people don't usually call close on streams in java 2. it doesn't codify close in the interface so it's a little harder to enforce the close step in the lifecycle. don't hate it. don't love it. what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems fine to me. The fact that the close can be called separately in the cancellation project is the part that feels clunky, so I'm not sure changing this makes the overall usage "nicer".

@cgardens cgardens marked this pull request as ready for review August 27, 2020 01:11
@cgardens
Copy link
Contributor Author

Okay, I think I have now integrated @jrhizor 's feedback and we are now all set for another round of review.

@cgardens cgardens requested a review from jrhizor August 27, 2020 01:11
try (Target<SingerMessage> target = new SingerTarget(targetDockerImage)) {
targetCloser = getTargetCloser(target);

target.consume(peakedTapStream, targetConfig, workspacePath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels a bit strange that the targetConfig and workspacePath are inputs to the consumer instead of the consumer actually being a Consumer and being called on the stream's forEach. They are always going to be the same for the target's lifespan.

import java.util.stream.Stream;

public interface Target<T> extends AutoCloseable {
void consume(Stream<T> data, StandardTargetConfig targetConfig, Path workspacePath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above comment, feels like this should implement void accept(T t) and be an AutoCloseable Consumer, with a TargetFactory that takes in the config and path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the interface is written from the point of view of what interface could we given an OSS developer to write an integration against. i think this interface does that part well, but i think internal to worker, your critique makes sense.

I think I have an idea on how to do do both the interface thing that i set out to do and fix the part that you think is weird too.

}

stdout = tapProcess.getInputStream();
return Streams.stream(new SingerJsonIterator(stdout)).onClose(getCloseFunction());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems fine to me. The fact that the close can be called separately in the cancellation project is the part that feels clunky, so I'm not sure changing this makes the overall usage "nicer".

final SingerTapFactory singerTapFactory = new SingerTapFactory(tapDockerImage);
final SingerTargetFactory singerTargetFactory = new SingerTargetFactory(targetDockerImage);

try (Stream<SingerMessage> tap = singerTapFactory.create(tapConfig, workspacePath)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

going to dependency inject the tapfactory and targetfactory.

Copy link
Contributor

@michel-tricot michel-tricot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really cool PR


target.consume(peakedTapStream, targetConfig, workspacePath);
try (CloseableConsumer<SingerMessage> consumer =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can create both resources in the same try:
try (tap = xxx; consumer = yyy)


import java.util.function.Consumer;

public interface CloseableConsumer<T> extends Consumer<T>, AutoCloseable {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this belongs to commons maybe add a functional package.
add the @ FunctionalInterface annotation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will move. i think not technically functional interface since it has consume and close, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hum right... :)

@@ -89,7 +89,11 @@ public void run() {
new WorkerRun<>(
jobId,
syncInput,
new SingerSyncWorker(
// todo (cgardens) - still locked into only using SingerTaps and Targets. Next step
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see nothing related to singer in that code. I think this comment is mis-leading

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'll fix this. going to dependency inject the factories and then this comment will make sense again.

private final String tapDockerImage;
private final String targetDockerImage;

Runnable tapCloser = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this not private?

final SingerTargetFactory singerTargetFactory = new SingerTargetFactory(targetDockerImage);

try (Stream<SingerMessage> tap = singerTapFactory.create(tapConfig, workspacePath)) {
tapCloser = tap::close;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a nasty state. My suggestion: don't use forEach instead loop with an iterator and have a boolean that gets set when it is time to stop.


@Override
public void cancel() {
Optional.ofNullable(tapCloser).ifPresent(Runnable::run);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set a boolean instead. Make sure you set the proper JobStatus in the run.

@@ -56,6 +81,9 @@ public static String readFileFromWorkspace(Path workspaceRoot, String fileName)
}

public static void cancelHelper(Process workerProcess) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not a great name for a function what about cancelProcess?

import java.io.InputStream;
import java.util.Iterator;

public class SingerJsonIterator implements Iterator<SingerMessage> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest you extend Guava's AbstractIterator to build iterators. They make writing iterators less error prone and more readable (you don't have to put tons of code in your ctor.

Copy link
Contributor Author

@cgardens cgardens Aug 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

happy to use the AbstractIterator. I don't think it effects the constructor code though. There's still something we want to check when we first start the iterator that seems like it only fits in the constructor. just pushed what the version with AbstractIterator looks like. Actually not sure that it's better.

@cgardens cgardens force-pushed the cgardens/abstract_this branch 2 times, most recently from 8c772c6 to e759cfe Compare August 27, 2020 20:44
wip

remove old version of singer protocol

 clean up

add comment

clean up

remove SingerSyncWorker

attempt to clean up the resources

improve comment

re-used schema from catalog

clean up

 gradle style

new way of handling closing tap and target; still some jenk to it

share docker command

style

Update dataline-workers/src/main/java/io/dataline/workers/DefaultSyncWorker.java

Co-authored-by: Jared Rhizor <jared@dataline.io>

Update dataline-workers/src/main/java/io/dataline/workers/singer/SingerTap.java

Co-authored-by: Jared Rhizor <jared@dataline.io>

do it

remove Docker*Workers for sake of not overcrowding the PR

target factory (#118)

fix object mapper properties

unused import

code review feedback

dependency inject

take while

job status

iteratore
@cgardens cgardens merged commit e53c0e1 into master Aug 27, 2020
@cgardens cgardens deleted the cgardens/abstract_this branch August 27, 2020 21:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants