Skip to content

Commit

Permalink
Reorganise Worker Module. (#12950)
Browse files Browse the repository at this point in the history
I was reading this code when I realised this module is gradually developing into a sprawl. Reorganise to make this more readable and clarify certain operations.

A less-urgent follow up step here is to pull all process related code out of this module and combine it with the container-orchestrator module to achieve a clear split between 'job execution code' and 'process abstraction code'.

- Move all the worker implementations from top-level into the io.airbyte.workers.general package. Leave normalization related code in the normalization package as this is actually a special case of the dbt worker and should be deprecated once we implement general DBT support.
- Move all exceptions into an exceptions package.
- With this change, top level code in the airbyte workers module is now all interfaces + worker app set up code.
- Rename io.airbyte.workers.protocols.airbyte to io.airbyte.workers.internal to better represent this subpackage contains interfaces/implementations internal to the workers. Protocol is confusing since it implies some relation to the general airbyte protocol.
- Rename io.airbyte.workers.worker_run to io.airbyte.workers.run for conciseness.
- Merge io.airbyte.workers.test_helper to io.airbyte.workers.helper. Only a single class was present here.
  • Loading branch information
davinchia committed May 19, 2022
1 parent bdbd68f commit 6b7640c
Show file tree
Hide file tree
Showing 95 changed files with 193 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
import io.airbyte.config.OperatorDbtInput;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.DbtTransformationRunner;
import io.airbyte.workers.DbtTransformationWorker;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.general.DbtTransformationRunner;
import io.airbyte.workers.general.DbtTransformationWorker;
import io.airbyte.workers.normalization.NormalizationRunnerFactory;
import io.airbyte.workers.process.KubePodProcess;
import io.airbyte.workers.process.ProcessFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

package io.airbyte.container_orchestrator;

import io.airbyte.workers.general.DocumentStoreClient;
import io.airbyte.workers.process.AsyncKubePodStatus;
import io.airbyte.workers.process.KubePodInfo;
import io.airbyte.workers.storage.DocumentStoreClient;
import java.util.List;
import lombok.extern.slf4j.Slf4j;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
import io.airbyte.config.NormalizationSummary;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.DefaultNormalizationWorker;
import io.airbyte.workers.NormalizationWorker;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.general.DefaultNormalizationWorker;
import io.airbyte.workers.normalization.NormalizationRunnerFactory;
import io.airbyte.workers.normalization.NormalizationWorker;
import io.airbyte.workers.process.KubePodProcess;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.temporal.sync.ReplicationLauncherWorker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,22 @@
import io.airbyte.config.StandardSyncInput;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.DefaultReplicationWorker;
import io.airbyte.workers.RecordSchemaValidator;
import io.airbyte.workers.ReplicationWorker;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.general.DefaultReplicationWorker;
import io.airbyte.workers.general.ReplicationWorker;
import io.airbyte.workers.internal.AirbyteMessageTracker;
import io.airbyte.workers.internal.AirbyteSource;
import io.airbyte.workers.internal.DefaultAirbyteDestination;
import io.airbyte.workers.internal.DefaultAirbyteSource;
import io.airbyte.workers.internal.EmptyAirbyteSource;
import io.airbyte.workers.internal.NamespacingMapper;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.IntegrationLauncher;
import io.airbyte.workers.process.KubePodProcess;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.protocols.airbyte.AirbyteMessageTracker;
import io.airbyte.workers.protocols.airbyte.AirbyteSource;
import io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination;
import io.airbyte.workers.protocols.airbyte.DefaultAirbyteSource;
import io.airbyte.workers.protocols.airbyte.EmptyAirbyteSource;
import io.airbyte.workers.protocols.airbyte.NamespacingMapper;
import io.airbyte.workers.temporal.sync.ReplicationLauncherWorker;
import java.nio.file.Path;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.when;

import io.airbyte.workers.general.DocumentStoreClient;
import io.airbyte.workers.process.AsyncKubePodStatus;
import io.airbyte.workers.process.KubePodInfo;
import io.airbyte.workers.storage.DocumentStoreClient;
import java.util.Optional;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,19 @@
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.workers.DbtTransformationRunner;
import io.airbyte.workers.DefaultCheckConnectionWorker;
import io.airbyte.workers.DefaultGetSpecWorker;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerException;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.general.DbtTransformationRunner;
import io.airbyte.workers.general.DefaultCheckConnectionWorker;
import io.airbyte.workers.general.DefaultGetSpecWorker;
import io.airbyte.workers.helper.EntrypointEnvChecker;
import io.airbyte.workers.internal.AirbyteDestination;
import io.airbyte.workers.internal.DefaultAirbyteDestination;
import io.airbyte.workers.normalization.NormalizationRunner;
import io.airbyte.workers.normalization.NormalizationRunnerFactory;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.DockerProcessFactory;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.protocols.airbyte.AirbyteDestination;
import io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination;
import io.airbyte.workers.test_helpers.EntrypointEnvChecker;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.workers.protocols.airbyte.AirbyteDestination;
import io.airbyte.workers.internal.AirbyteDestination;
import java.nio.file.Path;
import java.util.Optional;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.workers.DefaultCheckConnectionWorker;
import io.airbyte.workers.DefaultDiscoverCatalogWorker;
import io.airbyte.workers.DefaultGetSpecWorker;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerException;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.general.DefaultCheckConnectionWorker;
import io.airbyte.workers.general.DefaultDiscoverCatalogWorker;
import io.airbyte.workers.general.DefaultGetSpecWorker;
import io.airbyte.workers.helper.EntrypointEnvChecker;
import io.airbyte.workers.internal.AirbyteSource;
import io.airbyte.workers.internal.DefaultAirbyteSource;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.DockerProcessFactory;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.protocols.airbyte.AirbyteSource;
import io.airbyte.workers.protocols.airbyte.DefaultAirbyteSource;
import io.airbyte.workers.test_helpers.EntrypointEnvChecker;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState;
import io.airbyte.workers.worker_run.TemporalWorkerRunFactory;
import io.airbyte.workers.worker_run.WorkerRun;
import io.airbyte.workers.run.TemporalWorkerRunFactory;
import io.airbyte.workers.run.WorkerRun;
import java.nio.file.Path;
import java.util.Optional;
import java.util.Set;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
import io.airbyte.scheduler.persistence.WorkspaceHelper;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.run.TemporalWorkerRunFactory;
import io.airbyte.workers.temporal.TemporalClient;
import io.airbyte.workers.worker_run.TemporalWorkerRunFactory;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState;
import io.airbyte.workers.JobStatus;
import io.airbyte.workers.OutputAndStatus;
import io.airbyte.workers.worker_run.TemporalWorkerRunFactory;
import io.airbyte.workers.worker_run.WorkerRun;
import io.airbyte.workers.run.TemporalWorkerRunFactory;
import io.airbyte.workers.run.WorkerRun;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.exception.RecordSchemaValidationException;
import java.util.Map;
import java.util.Objects;

Expand Down
2 changes: 2 additions & 0 deletions airbyte-workers/src/main/java/io/airbyte/workers/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.workers;

import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.general.DefaultReplicationWorker;
import java.nio.file.Path;

public interface Worker<InputType, OutputType> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,14 @@
import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier;
import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
import io.airbyte.workers.general.DocumentStoreClient;
import io.airbyte.workers.helper.ConnectionHelper;
import io.airbyte.workers.process.DockerProcessFactory;
import io.airbyte.workers.process.KubePortManagerSingleton;
import io.airbyte.workers.process.KubeProcessFactory;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.process.WorkerHeartbeatServer;
import io.airbyte.workers.storage.DocumentStoreClient;
import io.airbyte.workers.run.TemporalWorkerRunFactory;
import io.airbyte.workers.storage.StateClients;
import io.airbyte.workers.temporal.TemporalClient;
import io.airbyte.workers.temporal.TemporalJobType;
Expand All @@ -69,7 +70,6 @@
import io.airbyte.workers.temporal.sync.PersistStateActivityImpl;
import io.airbyte.workers.temporal.sync.ReplicationActivityImpl;
import io.airbyte.workers.temporal.sync.SyncWorkflowImpl;
import io.airbyte.workers.worker_run.TemporalWorkerRunFactory;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.temporal.client.WorkflowClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers;
package io.airbyte.workers.exception;

/**
* Exception thrown by the RecordSchemaValidator during a sync when AirbyteRecordMessage data does
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers;
package io.airbyte.workers.exception;

public class WorkerException extends Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers;
package io.airbyte.workers.general;

import io.airbyte.config.StandardCheckConnectionInput;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.workers.Worker;

public interface CheckConnectionWorker extends Worker<StandardCheckConnectionInput, StandardCheckConnectionOutput> {}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers;
package io.airbyte.workers.general;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Strings;
Expand All @@ -14,6 +14,9 @@
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.OperatorDbt;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.normalization.NormalizationRunner;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.ProcessFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers;
package io.airbyte.workers.general;

import io.airbyte.config.OperatorDbtInput;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.workers.Worker;
import io.airbyte.workers.exception.WorkerException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers;
package io.airbyte.workers.general;

import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.io.IOs;
Expand All @@ -14,9 +14,11 @@
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.workers.*;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.internal.AirbyteStreamFactory;
import io.airbyte.workers.internal.DefaultAirbyteStreamFactory;
import io.airbyte.workers.process.IntegrationLauncher;
import io.airbyte.workers.protocols.airbyte.AirbyteStreamFactory;
import io.airbyte.workers.protocols.airbyte.DefaultAirbyteStreamFactory;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers;
package io.airbyte.workers.general;

import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
Expand All @@ -11,9 +11,11 @@
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.workers.*;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.internal.AirbyteStreamFactory;
import io.airbyte.workers.internal.DefaultAirbyteStreamFactory;
import io.airbyte.workers.process.IntegrationLauncher;
import io.airbyte.workers.protocols.airbyte.AirbyteStreamFactory;
import io.airbyte.workers.protocols.airbyte.DefaultAirbyteStreamFactory;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers;
package io.airbyte.workers.general;

import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.config.JobGetSpecConfig;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.internal.AirbyteStreamFactory;
import io.airbyte.workers.internal.DefaultAirbyteStreamFactory;
import io.airbyte.workers.process.IntegrationLauncher;
import io.airbyte.workers.protocols.airbyte.AirbyteStreamFactory;
import io.airbyte.workers.protocols.airbyte.DefaultAirbyteStreamFactory;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers;
package io.airbyte.workers.general;

import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.NormalizationInput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.normalization.NormalizationRunner;
import io.airbyte.workers.normalization.NormalizationWorker;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers;
package io.airbyte.workers.general;

import io.airbyte.config.FailureReason;
import io.airbyte.config.ReplicationAttemptSummary;
Expand All @@ -15,11 +15,14 @@
import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.workers.*;
import io.airbyte.workers.exception.RecordSchemaValidationException;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.FailureHelper;
import io.airbyte.workers.protocols.airbyte.AirbyteDestination;
import io.airbyte.workers.protocols.airbyte.AirbyteMapper;
import io.airbyte.workers.protocols.airbyte.AirbyteSource;
import io.airbyte.workers.protocols.airbyte.MessageTracker;
import io.airbyte.workers.internal.AirbyteDestination;
import io.airbyte.workers.internal.AirbyteMapper;
import io.airbyte.workers.internal.AirbyteSource;
import io.airbyte.workers.internal.MessageTracker;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down

0 comments on commit 6b7640c

Please sign in to comment.