Skip to content

Commit

Permalink
S3, GCS Destinations : Fix interface duplication (#9577)
Browse files Browse the repository at this point in the history
* Interface reorganization

* add missing impl

* upd related classes

* review remarks
  • Loading branch information
DoNotPanicUA authored Jan 25, 2022
1 parent 7806dbc commit ed1d26a
Show file tree
Hide file tree
Showing 24 changed files with 134 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.bigquery.BigQueryUtils;
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
import io.airbyte.integrations.destination.gcs.writer.CommonWriter;
import io.airbyte.integrations.destination.s3.writer.DestinationWriter;
import io.airbyte.protocol.models.AirbyteMessage;
import java.io.IOException;
import java.util.function.Consumer;
Expand All @@ -28,7 +28,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractBigQueryUploader<T extends CommonWriter> {
public abstract class AbstractBigQueryUploader<T extends DestinationWriter> {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractBigQueryUploader.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter;
import io.airbyte.integrations.destination.gcs.GcsDestinationConfig;
import io.airbyte.integrations.destination.gcs.GcsS3Helper;
import io.airbyte.integrations.destination.gcs.writer.GscWriter;
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
import io.airbyte.protocol.models.AirbyteMessage;
import java.util.List;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractGscBigQueryUploader<T extends GscWriter> extends AbstractBigQueryUploader<GscWriter> {
public abstract class AbstractGscBigQueryUploader<T extends DestinationFileWriter> extends AbstractBigQueryUploader<DestinationFileWriter> {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractGscBigQueryUploader.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.common.base.Charsets;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.destination.gcs.writer.CommonWriter;
import io.airbyte.integrations.destination.s3.writer.DestinationWriter;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryTableWriter implements CommonWriter {
public class BigQueryTableWriter implements DestinationWriter {

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryTableWriter.class);

Expand All @@ -27,13 +29,18 @@ public BigQueryTableWriter(TableDataWriteChannel writeChannel) {
@Override
public void initialize() throws IOException {}

@Override
public void write(UUID id, AirbyteRecordMessage recordMessage) {
throw new RuntimeException("This write method is not used!");
}

@Override
public void write(JsonNode formattedData) throws IOException {
writeChannel.write(ByteBuffer.wrap((Jsons.serialize(formattedData) + "\n").getBytes(Charsets.UTF_8)));
}

@Override
public void close(boolean hasFailed) throws Exception {
public void close(boolean hasFailed) throws IOException {
this.writeChannel.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.destination.gcs.writer.GcsWriterFactory;
import io.airbyte.integrations.destination.s3.writer.S3Writer;
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
Expand All @@ -28,7 +28,7 @@ public class GcsConsumer extends FailureTrackingAirbyteMessageConsumer {
private final ConfiguredAirbyteCatalog configuredCatalog;
private final GcsWriterFactory writerFactory;
private final Consumer<AirbyteMessage> outputRecordCollector;
private final Map<AirbyteStreamNameNamespacePair, S3Writer> streamNameAndNamespaceToWriters;
private final Map<AirbyteStreamNameNamespacePair, DestinationFileWriter> streamNameAndNamespaceToWriters;

private AirbyteMessage lastStateMessage = null;

Expand All @@ -50,7 +50,7 @@ protected void startTracked() throws Exception {
final Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis());

for (final ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) {
final S3Writer writer = writerFactory
final DestinationFileWriter writer = writerFactory
.create(gcsDestinationConfig, s3Client, configuredStream, uploadTimestamp);
writer.initialize();

Expand Down Expand Up @@ -87,7 +87,7 @@ protected void acceptTracked(final AirbyteMessage airbyteMessage) throws Excepti

@Override
protected void close(final boolean hasFailed) throws Exception {
for (final S3Writer handler : streamNameAndNamespaceToWriters.values()) {
for (final DestinationFileWriter handler : streamNameAndNamespaceToWriters.values()) {
handler.close(hasFailed);
}
// Gcs stream uploader is all or nothing if a failure happens in the destination.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@
import io.airbyte.integrations.destination.gcs.GcsDestinationConfig;
import io.airbyte.integrations.destination.gcs.util.GcsUtils;
import io.airbyte.integrations.destination.gcs.writer.BaseGcsWriter;
import io.airbyte.integrations.destination.gcs.writer.CommonWriter;
import io.airbyte.integrations.destination.gcs.writer.GscWriter;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.avro.AvroRecordFactory;
import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter;
import io.airbyte.integrations.destination.s3.avro.S3AvroFormatConfig;
import io.airbyte.integrations.destination.s3.util.S3StreamTransferManagerHelper;
import io.airbyte.integrations.destination.s3.writer.S3Writer;
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.io.IOException;
Expand All @@ -33,7 +31,7 @@
import org.slf4j.LoggerFactory;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;

public class GcsAvroWriter extends BaseGcsWriter implements S3Writer, GscWriter, CommonWriter {
public class GcsAvroWriter extends BaseGcsWriter implements DestinationFileWriter {

protected static final Logger LOGGER = LoggerFactory.getLogger(GcsAvroWriter.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.destination.gcs.GcsDestinationConfig;
import io.airbyte.integrations.destination.gcs.writer.BaseGcsWriter;
import io.airbyte.integrations.destination.gcs.writer.CommonWriter;
import io.airbyte.integrations.destination.gcs.writer.GscWriter;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.csv.CsvSheetGenerator;
import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig;
import io.airbyte.integrations.destination.s3.util.S3StreamTransferManagerHelper;
import io.airbyte.integrations.destination.s3.writer.S3Writer;
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.io.IOException;
Expand All @@ -30,7 +28,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GcsCsvWriter extends BaseGcsWriter implements S3Writer, GscWriter, CommonWriter {
public class GcsCsvWriter extends BaseGcsWriter implements DestinationFileWriter {

private static final Logger LOGGER = LoggerFactory.getLogger(GcsCsvWriter.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.gcs.GcsDestinationConfig;
import io.airbyte.integrations.destination.gcs.writer.BaseGcsWriter;
import io.airbyte.integrations.destination.gcs.writer.CommonWriter;
import io.airbyte.integrations.destination.gcs.writer.GscWriter;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.util.S3StreamTransferManagerHelper;
import io.airbyte.integrations.destination.s3.writer.S3Writer;
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.io.IOException;
Expand All @@ -30,7 +28,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GcsJsonlWriter extends BaseGcsWriter implements S3Writer, GscWriter, CommonWriter {
public class GcsJsonlWriter extends BaseGcsWriter implements DestinationFileWriter {

protected static final Logger LOGGER = LoggerFactory.getLogger(GcsJsonlWriter.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@
import io.airbyte.integrations.destination.gcs.GcsDestinationConfig;
import io.airbyte.integrations.destination.gcs.credential.GcsHmacKeyCredentialConfig;
import io.airbyte.integrations.destination.gcs.writer.BaseGcsWriter;
import io.airbyte.integrations.destination.gcs.writer.CommonWriter;
import io.airbyte.integrations.destination.gcs.writer.GscWriter;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.avro.AvroRecordFactory;
import io.airbyte.integrations.destination.s3.parquet.S3ParquetFormatConfig;
import io.airbyte.integrations.destination.s3.writer.S3Writer;
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.io.IOException;
Expand All @@ -35,7 +33,7 @@
import org.slf4j.LoggerFactory;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;

public class GcsParquetWriter extends BaseGcsWriter implements S3Writer, GscWriter, CommonWriter {
public class GcsParquetWriter extends BaseGcsWriter implements DestinationFileWriter {

private static final Logger LOGGER = LoggerFactory.getLogger(GcsParquetWriter.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import io.airbyte.integrations.destination.s3.S3DestinationConstants;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.util.S3OutputPathHelper;
import io.airbyte.integrations.destination.s3.writer.S3Writer;
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;
Expand All @@ -33,7 +33,7 @@
* <li>Create the bucket and prepare the bucket path.</li>
* </ul>
*/
public abstract class BaseGcsWriter implements S3Writer, CommonWriter {
public abstract class BaseGcsWriter implements DestinationFileWriter {

private static final Logger LOGGER = LoggerFactory.getLogger(BaseGcsWriter.class);

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import com.amazonaws.services.s3.AmazonS3;
import io.airbyte.integrations.destination.gcs.GcsDestinationConfig;
import io.airbyte.integrations.destination.s3.writer.S3Writer;
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.sql.Timestamp;

Expand All @@ -15,7 +15,7 @@
*/
public interface GcsWriterFactory {

S3Writer create(GcsDestinationConfig config,
DestinationFileWriter create(GcsDestinationConfig config,
AmazonS3 s3Client,
ConfiguredAirbyteStream configuredStream,
Timestamp uploadTimestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.avro.AvroConstants;
import io.airbyte.integrations.destination.s3.avro.JsonToAvroSchemaConverter;
import io.airbyte.integrations.destination.s3.writer.S3Writer;
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.sql.Timestamp;
Expand All @@ -26,7 +26,7 @@ public class ProductionWriterFactory implements GcsWriterFactory {
protected static final Logger LOGGER = LoggerFactory.getLogger(ProductionWriterFactory.class);

@Override
public S3Writer create(final GcsDestinationConfig config,
public DestinationFileWriter create(final GcsDestinationConfig config,
final AmazonS3 s3Client,
final ConfiguredAirbyteStream configuredStream,
final Timestamp uploadTimestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig;
import io.airbyte.integrations.destination.s3.csv.S3CsvWriter;
import io.airbyte.integrations.destination.s3.csv.StagingDatabaseCsvSheetGenerator;
import io.airbyte.integrations.destination.s3.writer.S3Writer;
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;
Expand Down Expand Up @@ -47,7 +47,7 @@ public abstract class S3StreamCopier implements StreamCopier {
private final ConfiguredAirbyteStream configuredAirbyteStream;
private final Timestamp uploadTime;
protected final String stagingFolder;
protected final Map<String, S3Writer> stagingWritersByFile = new HashMap<>();
protected final Map<String, DestinationFileWriter> stagingWritersByFile = new HashMap<>();
private final boolean purgeStagingData;

// The number of batches of records that will be inserted into each file.
Expand Down Expand Up @@ -129,7 +129,7 @@ public void write(final UUID id, final AirbyteRecordMessage recordMessage, final

@Override
public void closeStagingUploader(final boolean hasFailed) throws Exception {
for (final S3Writer writer : stagingWritersByFile.values()) {
for (final DestinationFileWriter writer : stagingWritersByFile.values()) {
writer.close(hasFailed);
}
}
Expand All @@ -149,7 +149,7 @@ public void createTemporaryTable() throws Exception {
@Override
public void copyStagingFileToTemporaryTable() throws Exception {
LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, schema: {}, .", tmpTableName, streamName, schemaName);
for (final Map.Entry<String, S3Writer> entry : stagingWritersByFile.entrySet()) {
for (final Map.Entry<String, DestinationFileWriter> entry : stagingWritersByFile.entrySet()) {
final String objectKey = entry.getValue().getOutputPath();
copyS3CsvFileIntoTable(db, getFullS3Path(s3Config.getBucketName(), objectKey), schemaName, tmpTableName, s3Config);
}
Expand Down Expand Up @@ -181,7 +181,7 @@ public String generateMergeStatement(final String destTableName) {
@Override
public void removeFileAndDropTmpTable() throws Exception {
if (purgeStagingData) {
for (final Map.Entry<String, S3Writer> entry : stagingWritersByFile.entrySet()) {
for (final Map.Entry<String, DestinationFileWriter> entry : stagingWritersByFile.entrySet()) {
final String suffix = entry.getKey();
final String objectKey = entry.getValue().getOutputPath();

Expand All @@ -208,7 +208,7 @@ public String getTmpTableName() {
}

@VisibleForTesting
public Map<String, S3Writer> getStagingWritersByFile() {
public Map<String, DestinationFileWriter> getStagingWritersByFile() {
return stagingWritersByFile;
}

Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-s3/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ As a community contributor, you will need access to AWS to run the integration t
- Modify `spec.json` to specify the configuration of this new format.
- Update `S3FormatConfigs` to be able to construct a config for this new format.
- Create a new package under `io.airbyte.integrations.destination.s3`.
- Implement a new `S3Writer`. The implementation can extend `BaseS3Writer`.
- Implement a new `DestinationFileWriter`. The implementation can extend `BaseS3Writer`.
- Write an acceptance test for the new output format. The test can extend `S3DestinationAcceptanceTest`.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.destination.s3.writer.S3Writer;
import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter;
import io.airbyte.integrations.destination.s3.writer.S3WriterFactory;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
Expand All @@ -28,7 +28,7 @@ public class S3Consumer extends FailureTrackingAirbyteMessageConsumer {
private final ConfiguredAirbyteCatalog configuredCatalog;
private final S3WriterFactory writerFactory;
private final Consumer<AirbyteMessage> outputRecordCollector;
private final Map<AirbyteStreamNameNamespacePair, S3Writer> streamNameAndNamespaceToWriters;
private final Map<AirbyteStreamNameNamespacePair, DestinationFileWriter> streamNameAndNamespaceToWriters;

private AirbyteMessage lastStateMessage = null;

Expand All @@ -49,7 +49,7 @@ protected void startTracked() throws Exception {
final Timestamp uploadTimestamp = new Timestamp(System.currentTimeMillis());

for (final ConfiguredAirbyteStream configuredStream : configuredCatalog.getStreams()) {
final S3Writer writer = writerFactory
final DestinationFileWriter writer = writerFactory
.create(s3DestinationConfig, s3Client, configuredStream, uploadTimestamp);
writer.initialize();

Expand Down Expand Up @@ -85,7 +85,7 @@ protected void acceptTracked(final AirbyteMessage airbyteMessage) throws Excepti

@Override
protected void close(final boolean hasFailed) throws Exception {
for (final S3Writer handler : streamNameAndNamespaceToWriters.values()) {
for (final DestinationFileWriter handler : streamNameAndNamespaceToWriters.values()) {
handler.close(hasFailed);
}
// S3 stream uploader is all or nothing if a failure happens in the destination.
Expand Down
Loading

0 comments on commit ed1d26a

Please sign in to comment.