Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rename: tap => source; target => destination #3120

Merged
merged 1 commit into from
Apr 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -436,15 +436,15 @@ private List<AirbyteMessage> runRead(ConfiguredAirbyteCatalog configuredCatalog)

// todo (cgardens) - assume no state since we are all full refresh right now.
private List<AirbyteMessage> runRead(ConfiguredAirbyteCatalog catalog, JsonNode state) throws Exception {
final StandardTapConfig tapConfig = new StandardTapConfig()
final StandardTapConfig sourceConfig = new StandardTapConfig()
.withSourceConnectionConfiguration(getConfig())
.withState(state == null ? null : new State().withState(state))
.withCatalog(catalog);

final AirbyteSource source = new DefaultAirbyteSource(new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), pbf));
final List<AirbyteMessage> messages = new ArrayList<>();

source.start(tapConfig, jobRoot);
source.start(sourceConfig, jobRoot);
while (!source.isFinished()) {
source.attemptRead().ifPresent(messages::add);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ public StandardSyncOutput run(StandardSyncInput syncInput, Path jobRoot) throws
.stream()
.collect(Collectors.toMap(s -> s.getStream().getNamespace() + "." + s.getStream().getName(),
s -> String.format("%s - %s", s.getSyncMode(), s.getDestinationSyncMode()))));
final StandardTapConfig tapConfig = WorkerUtils.syncToTapConfig(syncInput);
final StandardTargetConfig targetConfig = WorkerUtils.syncToTargetConfig(syncInput);
targetConfig.setCatalog(mapper.mapCatalog(targetConfig.getCatalog()));
final StandardTapConfig sourceConfig = WorkerUtils.syncToTapConfig(syncInput);
final StandardTargetConfig destinationConfig = WorkerUtils.syncToTargetConfig(syncInput);
destinationConfig.setCatalog(mapper.mapCatalog(destinationConfig.getCatalog()));

// note: resources are closed in the opposite order in which they are declared. thus source will be
// closed first (which is what we want).
try (destination; source) {
destination.start(targetConfig, jobRoot);
source.start(tapConfig, jobRoot);
destination.start(destinationConfig, jobRoot);
source.start(sourceConfig, jobRoot);

while (!cancelled.get() && !source.isFinished()) {
final Optional<AirbyteMessage> maybeMessage = source.attemptRead();
Expand All @@ -114,7 +114,7 @@ public StandardSyncOutput run(StandardSyncInput syncInput, Path jobRoot) throws
LOGGER.info("Running normalization.");
normalizationRunner.start();
final Path normalizationRoot = Files.createDirectories(jobRoot.resolve("normalize"));
if (!normalizationRunner.normalize(jobId, attempt, normalizationRoot, syncInput.getDestinationConfiguration(), targetConfig.getCatalog())) {
if (!normalizationRunner.normalize(jobId, attempt, normalizationRoot, syncInput.getDestinationConfiguration(), destinationConfig.getCatalog())) {
throw new WorkerException("Normalization Failed.");
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ public void close() throws Exception {
return;
}

LOGGER.debug("Closing tap process");
LOGGER.debug("Closing source process");
WorkerUtils.gentleClose(process, 1, TimeUnit.MINUTES);
if (process.isAlive() || process.exitValue() != 0) {
throw new WorkerException("Tap process wasn't successful");
throw new WorkerException("Source process wasn't successful");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class DefaultAirbyteDestination implements AirbyteDestination {

private final IntegrationLauncher integrationLauncher;

private Process targetProcess = null;
private Process destinationProcess = null;
private BufferedWriter writer = null;
private boolean endOfStream = false;

Expand All @@ -58,33 +58,33 @@ public DefaultAirbyteDestination(final IntegrationLauncher integrationLauncher)
}

@Override
public void start(StandardTargetConfig targetConfig, Path jobRoot) throws IOException, WorkerException {
Preconditions.checkState(targetProcess == null);
IOs.writeFile(jobRoot, WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, Jsons.serialize(targetConfig.getDestinationConnectionConfiguration()));
IOs.writeFile(jobRoot, WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(targetConfig.getCatalog()));
public void start(StandardTargetConfig destinationConfig, Path jobRoot) throws IOException, WorkerException {
Preconditions.checkState(destinationProcess == null);
IOs.writeFile(jobRoot, WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, Jsons.serialize(destinationConfig.getDestinationConnectionConfiguration()));
IOs.writeFile(jobRoot, WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(destinationConfig.getCatalog()));

LOGGER.info("Running target...");
targetProcess = integrationLauncher.write(
LOGGER.info("Running destination...");
destinationProcess = integrationLauncher.write(
jobRoot,
WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME).start();
LineGobbler.gobble(targetProcess.getInputStream(), LOGGER::info);
LineGobbler.gobble(targetProcess.getErrorStream(), LOGGER::error);
LineGobbler.gobble(destinationProcess.getInputStream(), LOGGER::info);
LineGobbler.gobble(destinationProcess.getErrorStream(), LOGGER::error);

writer = new BufferedWriter(new OutputStreamWriter(targetProcess.getOutputStream(), Charsets.UTF_8));
writer = new BufferedWriter(new OutputStreamWriter(destinationProcess.getOutputStream(), Charsets.UTF_8));
}

@Override
public void accept(AirbyteMessage message) throws IOException {
Preconditions.checkState(targetProcess != null && !endOfStream);
Preconditions.checkState(destinationProcess != null && !endOfStream);

writer.write(Jsons.serialize(message));
writer.newLine();
}

@Override
public void notifyEndOfStream() throws IOException {
Preconditions.checkState(targetProcess != null && !endOfStream);
Preconditions.checkState(destinationProcess != null && !endOfStream);

writer.flush();
writer.close();
Expand All @@ -93,30 +93,30 @@ public void notifyEndOfStream() throws IOException {

@Override
public void close() throws WorkerException, IOException {
if (targetProcess == null) {
if (destinationProcess == null) {
return;
}

if (!endOfStream) {
notifyEndOfStream();
}

LOGGER.debug("Closing target process");
WorkerUtils.gentleClose(targetProcess, 10, TimeUnit.HOURS);
if (targetProcess.isAlive() || targetProcess.exitValue() != 0) {
throw new WorkerException("target process wasn't successful");
LOGGER.debug("Closing destination process");
WorkerUtils.gentleClose(destinationProcess, 10, TimeUnit.HOURS);
if (destinationProcess.isAlive() || destinationProcess.exitValue() != 0) {
throw new WorkerException("destination process wasn't successful");
}
}

@Override
public void cancel() throws Exception {
LOGGER.info("Attempting to cancel destination process...");

if (targetProcess == null) {
LOGGER.info("Target process no longer exists, cancellation is a no-op.");
if (destinationProcess == null) {
LOGGER.info("Destination process no longer exists, cancellation is a no-op.");
} else {
LOGGER.info("Target process exists, cancelling...");
WorkerUtils.cancelProcess(targetProcess);
LOGGER.info("Destination process exists, cancelling...");
WorkerUtils.cancelProcess(destinationProcess);
LOGGER.info("Cancelled destination process!");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class DefaultAirbyteSource implements AirbyteSource {
private final AirbyteStreamFactory streamFactory;
private final HeartbeatMonitor heartbeatMonitor;

private Process tapProcess = null;
private Process sourceProcess = null;
private Iterator<AirbyteMessage> messageIterator = null;

public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher) {
Expand All @@ -77,69 +77,69 @@ public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher) {

@Override
public void start(StandardTapConfig input, Path jobRoot) throws Exception {
Preconditions.checkState(tapProcess == null);
Preconditions.checkState(sourceProcess == null);

IOs.writeFile(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(input.getSourceConnectionConfiguration()));
IOs.writeFile(jobRoot, WorkerConstants.SOURCE_CATALOG_JSON_FILENAME, Jsons.serialize(input.getCatalog()));
if (input.getState() != null) {
IOs.writeFile(jobRoot, WorkerConstants.INPUT_STATE_JSON_FILENAME, Jsons.serialize(input.getState().getState()));
}

tapProcess = integrationLauncher.read(jobRoot,
sourceProcess = integrationLauncher.read(jobRoot,
WorkerConstants.SOURCE_CONFIG_JSON_FILENAME,
WorkerConstants.SOURCE_CATALOG_JSON_FILENAME,
input.getState() == null ? null : WorkerConstants.INPUT_STATE_JSON_FILENAME).start();
// stdout logs are logged elsewhere since stdout also contains data
LineGobbler.gobble(tapProcess.getErrorStream(), LOGGER::error);
LineGobbler.gobble(sourceProcess.getErrorStream(), LOGGER::error);

messageIterator = streamFactory.create(IOs.newBufferedReader(tapProcess.getInputStream()))
messageIterator = streamFactory.create(IOs.newBufferedReader(sourceProcess.getInputStream()))
.peek(message -> heartbeatMonitor.beat())
.filter(message -> message.getType() == Type.RECORD || message.getType() == Type.STATE)
.iterator();
}

@Override
public boolean isFinished() {
Preconditions.checkState(tapProcess != null);
Preconditions.checkState(sourceProcess != null);

return !tapProcess.isAlive() && !messageIterator.hasNext();
return !sourceProcess.isAlive() && !messageIterator.hasNext();
}

@Override
public Optional<AirbyteMessage> attemptRead() {
Preconditions.checkState(tapProcess != null);
Preconditions.checkState(sourceProcess != null);

return Optional.ofNullable(messageIterator.hasNext() ? messageIterator.next() : null);
}

@Override
public void close() throws Exception {
if (tapProcess == null) {
if (sourceProcess == null) {
return;
}

LOGGER.debug("Closing tap process");
LOGGER.debug("Closing source process");
WorkerUtils.gentleCloseWithHeartbeat(
tapProcess,
sourceProcess,
heartbeatMonitor,
GRACEFUL_SHUTDOWN_DURATION,
CHECK_HEARTBEAT_DURATION,
FORCED_SHUTDOWN_DURATION);

if (tapProcess.isAlive() || tapProcess.exitValue() != 0) {
throw new WorkerException("Tap process wasn't successful");
if (sourceProcess.isAlive() || sourceProcess.exitValue() != 0) {
throw new WorkerException("Source process wasn't successful");
}
}

@Override
public void cancel() throws Exception {
LOGGER.info("Attempting to cancel source process...");

if (tapProcess == null) {
if (sourceProcess == null) {
LOGGER.info("Source process no longer exists, cancellation is a no-op.");
} else {
LOGGER.info("Source process exists, cancelling...");
WorkerUtils.cancelProcess(tapProcess);
WorkerUtils.cancelProcess(sourceProcess);
LOGGER.info("Cancelled source process!");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public Stream<AirbyteMessage> create(BufferedReader bufferedReader) {
Optional<JsonNode> j = Jsons.tryDeserialize(s);
if (j.isEmpty()) {
// we log as info all the lines that are not valid json
// some taps actually logs their process on stdout, we
// some sources actually log their process on stdout, we
// want to make sure this info is available in the logs.
logger.info(s);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ class DefaultSyncWorkerTest {

private Path jobRoot;
private Path normalizationRoot;
private AirbyteSource tap;
private AirbyteSource source;
private NamespacingMapper mapper;
private AirbyteDestination target;
private StandardSyncInput syncInput;
private StandardTapConfig tapConfig;
private StandardTapConfig sourceConfig;
private StandardTargetConfig targetConfig;
private NormalizationRunner normalizationRunner;

Expand All @@ -85,16 +85,16 @@ void setup() throws Exception {
final ImmutablePair<StandardSync, StandardSyncInput> syncPair = TestConfigHelpers.createSyncConfig();
syncInput = syncPair.getValue();

tapConfig = WorkerUtils.syncToTapConfig(syncInput);
sourceConfig = WorkerUtils.syncToTapConfig(syncInput);
targetConfig = WorkerUtils.syncToTargetConfig(syncInput);

tap = mock(AirbyteSource.class);
source = mock(AirbyteSource.class);
mapper = mock(NamespacingMapper.class);
target = mock(AirbyteDestination.class);
normalizationRunner = mock(NormalizationRunner.class);

when(tap.isFinished()).thenReturn(false, false, false, true);
when(tap.attemptRead()).thenReturn(Optional.of(RECORD_MESSAGE1), Optional.empty(), Optional.of(RECORD_MESSAGE2));
when(source.isFinished()).thenReturn(false, false, false, true);
when(source.attemptRead()).thenReturn(Optional.of(RECORD_MESSAGE1), Optional.empty(), Optional.of(RECORD_MESSAGE2));
when(mapper.mapCatalog(targetConfig.getCatalog())).thenReturn(targetConfig.getCatalog());
when(mapper.mapMessage(RECORD_MESSAGE1)).thenReturn(RECORD_MESSAGE1);
when(mapper.mapMessage(RECORD_MESSAGE2)).thenReturn(RECORD_MESSAGE2);
Expand All @@ -106,19 +106,19 @@ void setup() throws Exception {
@Test
void test() throws Exception {
final DefaultSyncWorker defaultSyncWorker =
new DefaultSyncWorker(JOB_ID, JOB_ATTEMPT, tap, mapper, target, new AirbyteMessageTracker(), normalizationRunner);
new DefaultSyncWorker(JOB_ID, JOB_ATTEMPT, source, mapper, target, new AirbyteMessageTracker(), normalizationRunner);

defaultSyncWorker.run(syncInput, jobRoot);

verify(tap).start(tapConfig, jobRoot);
verify(source).start(sourceConfig, jobRoot);
verify(target).start(targetConfig, jobRoot);
verify(target).accept(RECORD_MESSAGE1);
verify(target).accept(RECORD_MESSAGE2);
verify(normalizationRunner).start();
verify(normalizationRunner).normalize(JOB_ID, JOB_ATTEMPT, normalizationRoot, targetConfig.getDestinationConnectionConfiguration(),
targetConfig.getCatalog());
verify(normalizationRunner).close();
verify(tap).close();
verify(source).close();
verify(target).close();
}

Expand All @@ -131,7 +131,7 @@ void testPopulatesSyncSummary() throws WorkerException {
when(messageTracker.getBytesCount()).thenReturn(100L);
when(messageTracker.getOutputState()).thenReturn(Optional.of(expectedState));

final DefaultSyncWorker defaultSyncWorker = new DefaultSyncWorker(JOB_ID, JOB_ATTEMPT, tap, mapper, target, messageTracker, normalizationRunner);
final DefaultSyncWorker defaultSyncWorker = new DefaultSyncWorker(JOB_ID, JOB_ATTEMPT, source, mapper, target, messageTracker, normalizationRunner);
final StandardSyncOutput actual = defaultSyncWorker.run(syncInput, jobRoot);
final StandardSyncOutput expectedSyncOutput = new StandardSyncOutput()
.withStandardSyncSummary(new StandardSyncSummary()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class DefaultAirbyteDestinationTest {
private static final String STREAM_NAME = "user_preferences";
private static final String FIELD_NAME = "favorite_color";

private static final StandardTargetConfig TARGET_CONFIG = WorkerUtils.syncToTargetConfig(TestConfigHelpers.createSyncConfig().getValue());
private static final StandardTargetConfig DESTINATION_CONFIG = WorkerUtils.syncToTargetConfig(TestConfigHelpers.createSyncConfig().getValue());

private Path jobRoot;
private IntegrationLauncher integrationLauncher;
Expand All @@ -86,19 +86,19 @@ public void setup() throws IOException, WorkerException {
@SuppressWarnings("BusyWait")
@Test
public void testSuccessfulLifecycle() throws Exception {
final AirbyteDestination target = new DefaultAirbyteDestination(integrationLauncher);
target.start(TARGET_CONFIG, jobRoot);
final AirbyteDestination destination = new DefaultAirbyteDestination(integrationLauncher);
destination.start(DESTINATION_CONFIG, jobRoot);

final AirbyteMessage recordMessage = AirbyteMessageUtils.createRecordMessage(STREAM_NAME, FIELD_NAME, "blue");
target.accept(recordMessage);
destination.accept(recordMessage);

verify(outputStream, never()).close();

target.notifyEndOfStream();
destination.notifyEndOfStream();

verify(outputStream).close();

target.close();
destination.close();

final String actualOutput = new String(outputStream.toByteArray());
assertEquals(Jsons.serialize(recordMessage) + "\n", actualOutput);
Expand All @@ -119,22 +119,22 @@ public void testSuccessfulLifecycle() throws Exception {

@Test
public void testCloseNotifiesLifecycle() throws Exception {
final AirbyteDestination target = new DefaultAirbyteDestination(integrationLauncher);
target.start(TARGET_CONFIG, jobRoot);
final AirbyteDestination destination = new DefaultAirbyteDestination(integrationLauncher);
destination.start(DESTINATION_CONFIG, jobRoot);

verify(outputStream, never()).close();

target.close();
destination.close();
verify(outputStream).close();
}

@Test
public void testProcessFailLifecycle() throws Exception {
final AirbyteDestination target = new DefaultAirbyteDestination(integrationLauncher);
target.start(TARGET_CONFIG, jobRoot);
final AirbyteDestination destination = new DefaultAirbyteDestination(integrationLauncher);
destination.start(DESTINATION_CONFIG, jobRoot);

when(process.exitValue()).thenReturn(1);
Assertions.assertThrows(WorkerException.class, target::close);
Assertions.assertThrows(WorkerException.class, destination::close);
}

}
Loading