Skip to content

Commit

Permalink
rename: tap => source; target => destination (#3120)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens committed Apr 28, 2021
1 parent 439ec00 commit 94d5b27
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -437,15 +437,15 @@ private List<AirbyteMessage> runRead(ConfiguredAirbyteCatalog configuredCatalog)

// todo (cgardens) - assume no state since we are all full refresh right now.
private List<AirbyteMessage> runRead(ConfiguredAirbyteCatalog catalog, JsonNode state) throws Exception {
final StandardTapConfig tapConfig = new StandardTapConfig()
final StandardTapConfig sourceConfig = new StandardTapConfig()
.withSourceConnectionConfiguration(getConfig())
.withState(state == null ? null : new State().withState(state))
.withCatalog(catalog);

final AirbyteSource source = new DefaultAirbyteSource(new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), pbf));
final List<AirbyteMessage> messages = new ArrayList<>();

source.start(tapConfig, jobRoot);
source.start(sourceConfig, jobRoot);
while (!source.isFinished()) {
source.attemptRead().ifPresent(messages::add);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ public StandardSyncOutput run(StandardSyncInput syncInput, Path jobRoot) throws
.stream()
.collect(Collectors.toMap(s -> s.getStream().getNamespace() + "." + s.getStream().getName(),
s -> String.format("%s - %s", s.getSyncMode(), s.getDestinationSyncMode()))));
final StandardTapConfig tapConfig = WorkerUtils.syncToTapConfig(syncInput);
final StandardTargetConfig targetConfig = WorkerUtils.syncToTargetConfig(syncInput);
targetConfig.setCatalog(mapper.mapCatalog(targetConfig.getCatalog()));
final StandardTapConfig sourceConfig = WorkerUtils.syncToTapConfig(syncInput);
final StandardTargetConfig destinationConfig = WorkerUtils.syncToTargetConfig(syncInput);
destinationConfig.setCatalog(mapper.mapCatalog(destinationConfig.getCatalog()));

// note: resources are closed in the opposite order in which they are declared. thus source will be
// closed first (which is what we want).
try (destination; source) {
destination.start(targetConfig, jobRoot);
source.start(tapConfig, jobRoot);
destination.start(destinationConfig, jobRoot);
source.start(sourceConfig, jobRoot);

while (!cancelled.get() && !source.isFinished()) {
final Optional<AirbyteMessage> maybeMessage = source.attemptRead();
Expand All @@ -114,7 +114,7 @@ public StandardSyncOutput run(StandardSyncInput syncInput, Path jobRoot) throws
LOGGER.info("Running normalization.");
normalizationRunner.start();
final Path normalizationRoot = Files.createDirectories(jobRoot.resolve("normalize"));
if (!normalizationRunner.normalize(jobId, attempt, normalizationRoot, syncInput.getDestinationConfiguration(), targetConfig.getCatalog())) {
if (!normalizationRunner.normalize(jobId, attempt, normalizationRoot, syncInput.getDestinationConfiguration(), destinationConfig.getCatalog())) {
throw new WorkerException("Normalization Failed.");
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ public void close() throws Exception {
return;
}

LOGGER.debug("Closing tap process");
LOGGER.debug("Closing source process");
WorkerUtils.gentleClose(process, 1, TimeUnit.MINUTES);
if (process.isAlive() || process.exitValue() != 0) {
throw new WorkerException("Tap process wasn't successful");
throw new WorkerException("Source process wasn't successful");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class DefaultAirbyteDestination implements AirbyteDestination {

private final IntegrationLauncher integrationLauncher;

private Process targetProcess = null;
private Process destinationProcess = null;
private BufferedWriter writer = null;
private boolean endOfStream = false;

Expand All @@ -58,33 +58,33 @@ public DefaultAirbyteDestination(final IntegrationLauncher integrationLauncher)
}

@Override
public void start(StandardTargetConfig targetConfig, Path jobRoot) throws IOException, WorkerException {
Preconditions.checkState(targetProcess == null);
IOs.writeFile(jobRoot, WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, Jsons.serialize(targetConfig.getDestinationConnectionConfiguration()));
IOs.writeFile(jobRoot, WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(targetConfig.getCatalog()));
public void start(StandardTargetConfig destinationConfig, Path jobRoot) throws IOException, WorkerException {
Preconditions.checkState(destinationProcess == null);
IOs.writeFile(jobRoot, WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, Jsons.serialize(destinationConfig.getDestinationConnectionConfiguration()));
IOs.writeFile(jobRoot, WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(destinationConfig.getCatalog()));

LOGGER.info("Running target...");
targetProcess = integrationLauncher.write(
LOGGER.info("Running destination...");
destinationProcess = integrationLauncher.write(
jobRoot,
WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME).start();
LineGobbler.gobble(targetProcess.getInputStream(), LOGGER::info);
LineGobbler.gobble(targetProcess.getErrorStream(), LOGGER::error);
LineGobbler.gobble(destinationProcess.getInputStream(), LOGGER::info);
LineGobbler.gobble(destinationProcess.getErrorStream(), LOGGER::error);

writer = new BufferedWriter(new OutputStreamWriter(targetProcess.getOutputStream(), Charsets.UTF_8));
writer = new BufferedWriter(new OutputStreamWriter(destinationProcess.getOutputStream(), Charsets.UTF_8));
}

@Override
public void accept(AirbyteMessage message) throws IOException {
Preconditions.checkState(targetProcess != null && !endOfStream);
Preconditions.checkState(destinationProcess != null && !endOfStream);

writer.write(Jsons.serialize(message));
writer.newLine();
}

@Override
public void notifyEndOfStream() throws IOException {
Preconditions.checkState(targetProcess != null && !endOfStream);
Preconditions.checkState(destinationProcess != null && !endOfStream);

writer.flush();
writer.close();
Expand All @@ -93,30 +93,30 @@ public void notifyEndOfStream() throws IOException {

@Override
public void close() throws WorkerException, IOException {
if (targetProcess == null) {
if (destinationProcess == null) {
return;
}

if (!endOfStream) {
notifyEndOfStream();
}

LOGGER.debug("Closing target process");
WorkerUtils.gentleClose(targetProcess, 10, TimeUnit.HOURS);
if (targetProcess.isAlive() || targetProcess.exitValue() != 0) {
throw new WorkerException("target process wasn't successful");
LOGGER.debug("Closing destination process");
WorkerUtils.gentleClose(destinationProcess, 10, TimeUnit.HOURS);
if (destinationProcess.isAlive() || destinationProcess.exitValue() != 0) {
throw new WorkerException("destination process wasn't successful");
}
}

@Override
public void cancel() throws Exception {
LOGGER.info("Attempting to cancel destination process...");

if (targetProcess == null) {
LOGGER.info("Target process no longer exists, cancellation is a no-op.");
if (destinationProcess == null) {
LOGGER.info("Destination process no longer exists, cancellation is a no-op.");
} else {
LOGGER.info("Target process exists, cancelling...");
WorkerUtils.cancelProcess(targetProcess);
LOGGER.info("Destination process exists, cancelling...");
WorkerUtils.cancelProcess(destinationProcess);
LOGGER.info("Cancelled destination process!");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class DefaultAirbyteSource implements AirbyteSource {
private final AirbyteStreamFactory streamFactory;
private final HeartbeatMonitor heartbeatMonitor;

private Process tapProcess = null;
private Process sourceProcess = null;
private Iterator<AirbyteMessage> messageIterator = null;

public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher) {
Expand All @@ -77,69 +77,69 @@ public DefaultAirbyteSource(final IntegrationLauncher integrationLauncher) {

@Override
public void start(StandardTapConfig input, Path jobRoot) throws Exception {
Preconditions.checkState(tapProcess == null);
Preconditions.checkState(sourceProcess == null);

IOs.writeFile(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(input.getSourceConnectionConfiguration()));
IOs.writeFile(jobRoot, WorkerConstants.SOURCE_CATALOG_JSON_FILENAME, Jsons.serialize(input.getCatalog()));
if (input.getState() != null) {
IOs.writeFile(jobRoot, WorkerConstants.INPUT_STATE_JSON_FILENAME, Jsons.serialize(input.getState().getState()));
}

tapProcess = integrationLauncher.read(jobRoot,
sourceProcess = integrationLauncher.read(jobRoot,
WorkerConstants.SOURCE_CONFIG_JSON_FILENAME,
WorkerConstants.SOURCE_CATALOG_JSON_FILENAME,
input.getState() == null ? null : WorkerConstants.INPUT_STATE_JSON_FILENAME).start();
// stdout logs are logged elsewhere since stdout also contains data
LineGobbler.gobble(tapProcess.getErrorStream(), LOGGER::error);
LineGobbler.gobble(sourceProcess.getErrorStream(), LOGGER::error);

messageIterator = streamFactory.create(IOs.newBufferedReader(tapProcess.getInputStream()))
messageIterator = streamFactory.create(IOs.newBufferedReader(sourceProcess.getInputStream()))
.peek(message -> heartbeatMonitor.beat())
.filter(message -> message.getType() == Type.RECORD || message.getType() == Type.STATE)
.iterator();
}

@Override
public boolean isFinished() {
Preconditions.checkState(tapProcess != null);
Preconditions.checkState(sourceProcess != null);

return !tapProcess.isAlive() && !messageIterator.hasNext();
return !sourceProcess.isAlive() && !messageIterator.hasNext();
}

@Override
public Optional<AirbyteMessage> attemptRead() {
Preconditions.checkState(tapProcess != null);
Preconditions.checkState(sourceProcess != null);

return Optional.ofNullable(messageIterator.hasNext() ? messageIterator.next() : null);
}

@Override
public void close() throws Exception {
if (tapProcess == null) {
if (sourceProcess == null) {
return;
}

LOGGER.debug("Closing tap process");
LOGGER.debug("Closing source process");
WorkerUtils.gentleCloseWithHeartbeat(
tapProcess,
sourceProcess,
heartbeatMonitor,
GRACEFUL_SHUTDOWN_DURATION,
CHECK_HEARTBEAT_DURATION,
FORCED_SHUTDOWN_DURATION);

if (tapProcess.isAlive() || tapProcess.exitValue() != 0) {
throw new WorkerException("Tap process wasn't successful");
if (sourceProcess.isAlive() || sourceProcess.exitValue() != 0) {
throw new WorkerException("Source process wasn't successful");
}
}

@Override
public void cancel() throws Exception {
LOGGER.info("Attempting to cancel source process...");

if (tapProcess == null) {
if (sourceProcess == null) {
LOGGER.info("Source process no longer exists, cancellation is a no-op.");
} else {
LOGGER.info("Source process exists, cancelling...");
WorkerUtils.cancelProcess(tapProcess);
WorkerUtils.cancelProcess(sourceProcess);
LOGGER.info("Cancelled source process!");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public Stream<AirbyteMessage> create(BufferedReader bufferedReader) {
Optional<JsonNode> j = Jsons.tryDeserialize(s);
if (j.isEmpty()) {
// we log as info all the lines that are not valid json
// some taps actually logs their process on stdout, we
// some sources actually log their process on stdout, we
// want to make sure this info is available in the logs.
logger.info(s);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ class DefaultSyncWorkerTest {

private Path jobRoot;
private Path normalizationRoot;
private AirbyteSource tap;
private AirbyteSource source;
private NamespacingMapper mapper;
private AirbyteDestination target;
private StandardSyncInput syncInput;
private StandardTapConfig tapConfig;
private StandardTapConfig sourceConfig;
private StandardTargetConfig targetConfig;
private NormalizationRunner normalizationRunner;

Expand All @@ -85,16 +85,16 @@ void setup() throws Exception {
final ImmutablePair<StandardSync, StandardSyncInput> syncPair = TestConfigHelpers.createSyncConfig();
syncInput = syncPair.getValue();

tapConfig = WorkerUtils.syncToTapConfig(syncInput);
sourceConfig = WorkerUtils.syncToTapConfig(syncInput);
targetConfig = WorkerUtils.syncToTargetConfig(syncInput);

tap = mock(AirbyteSource.class);
source = mock(AirbyteSource.class);
mapper = mock(NamespacingMapper.class);
target = mock(AirbyteDestination.class);
normalizationRunner = mock(NormalizationRunner.class);

when(tap.isFinished()).thenReturn(false, false, false, true);
when(tap.attemptRead()).thenReturn(Optional.of(RECORD_MESSAGE1), Optional.empty(), Optional.of(RECORD_MESSAGE2));
when(source.isFinished()).thenReturn(false, false, false, true);
when(source.attemptRead()).thenReturn(Optional.of(RECORD_MESSAGE1), Optional.empty(), Optional.of(RECORD_MESSAGE2));
when(mapper.mapCatalog(targetConfig.getCatalog())).thenReturn(targetConfig.getCatalog());
when(mapper.mapMessage(RECORD_MESSAGE1)).thenReturn(RECORD_MESSAGE1);
when(mapper.mapMessage(RECORD_MESSAGE2)).thenReturn(RECORD_MESSAGE2);
Expand All @@ -106,19 +106,19 @@ void setup() throws Exception {
@Test
void test() throws Exception {
final DefaultSyncWorker defaultSyncWorker =
new DefaultSyncWorker(JOB_ID, JOB_ATTEMPT, tap, mapper, target, new AirbyteMessageTracker(), normalizationRunner);
new DefaultSyncWorker(JOB_ID, JOB_ATTEMPT, source, mapper, target, new AirbyteMessageTracker(), normalizationRunner);

defaultSyncWorker.run(syncInput, jobRoot);

verify(tap).start(tapConfig, jobRoot);
verify(source).start(sourceConfig, jobRoot);
verify(target).start(targetConfig, jobRoot);
verify(target).accept(RECORD_MESSAGE1);
verify(target).accept(RECORD_MESSAGE2);
verify(normalizationRunner).start();
verify(normalizationRunner).normalize(JOB_ID, JOB_ATTEMPT, normalizationRoot, targetConfig.getDestinationConnectionConfiguration(),
targetConfig.getCatalog());
verify(normalizationRunner).close();
verify(tap).close();
verify(source).close();
verify(target).close();
}

Expand All @@ -131,7 +131,7 @@ void testPopulatesSyncSummary() throws WorkerException {
when(messageTracker.getBytesCount()).thenReturn(100L);
when(messageTracker.getOutputState()).thenReturn(Optional.of(expectedState));

final DefaultSyncWorker defaultSyncWorker = new DefaultSyncWorker(JOB_ID, JOB_ATTEMPT, tap, mapper, target, messageTracker, normalizationRunner);
final DefaultSyncWorker defaultSyncWorker = new DefaultSyncWorker(JOB_ID, JOB_ATTEMPT, source, mapper, target, messageTracker, normalizationRunner);
final StandardSyncOutput actual = defaultSyncWorker.run(syncInput, jobRoot);
final StandardSyncOutput expectedSyncOutput = new StandardSyncOutput()
.withStandardSyncSummary(new StandardSyncSummary()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class DefaultAirbyteDestinationTest {
private static final String STREAM_NAME = "user_preferences";
private static final String FIELD_NAME = "favorite_color";

private static final StandardTargetConfig TARGET_CONFIG = WorkerUtils.syncToTargetConfig(TestConfigHelpers.createSyncConfig().getValue());
private static final StandardTargetConfig DESTINATION_CONFIG = WorkerUtils.syncToTargetConfig(TestConfigHelpers.createSyncConfig().getValue());

private Path jobRoot;
private IntegrationLauncher integrationLauncher;
Expand All @@ -86,19 +86,19 @@ public void setup() throws IOException, WorkerException {
@SuppressWarnings("BusyWait")
@Test
public void testSuccessfulLifecycle() throws Exception {
final AirbyteDestination target = new DefaultAirbyteDestination(integrationLauncher);
target.start(TARGET_CONFIG, jobRoot);
final AirbyteDestination destination = new DefaultAirbyteDestination(integrationLauncher);
destination.start(DESTINATION_CONFIG, jobRoot);

final AirbyteMessage recordMessage = AirbyteMessageUtils.createRecordMessage(STREAM_NAME, FIELD_NAME, "blue");
target.accept(recordMessage);
destination.accept(recordMessage);

verify(outputStream, never()).close();

target.notifyEndOfStream();
destination.notifyEndOfStream();

verify(outputStream).close();

target.close();
destination.close();

final String actualOutput = new String(outputStream.toByteArray());
assertEquals(Jsons.serialize(recordMessage) + "\n", actualOutput);
Expand All @@ -119,22 +119,22 @@ public void testSuccessfulLifecycle() throws Exception {

@Test
public void testCloseNotifiesLifecycle() throws Exception {
final AirbyteDestination target = new DefaultAirbyteDestination(integrationLauncher);
target.start(TARGET_CONFIG, jobRoot);
final AirbyteDestination destination = new DefaultAirbyteDestination(integrationLauncher);
destination.start(DESTINATION_CONFIG, jobRoot);

verify(outputStream, never()).close();

target.close();
destination.close();
verify(outputStream).close();
}

@Test
public void testProcessFailLifecycle() throws Exception {
final AirbyteDestination target = new DefaultAirbyteDestination(integrationLauncher);
target.start(TARGET_CONFIG, jobRoot);
final AirbyteDestination destination = new DefaultAirbyteDestination(integrationLauncher);
destination.start(DESTINATION_CONFIG, jobRoot);

when(process.exitValue()).thenReturn(1);
Assertions.assertThrows(WorkerException.class, target::close);
Assertions.assertThrows(WorkerException.class, destination::close);
}

}

0 comments on commit 94d5b27

Please sign in to comment.