Skip to content

Commit

Permalink
馃帀 New Destination: Google PubSub (#4339)
Browse files Browse the repository at this point in the history
Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
  • Loading branch information
prasrvenkat and sherifnada committed Jun 25, 2021
1 parent 0e0d0a1 commit 0c25e1c
Show file tree
Hide file tree
Showing 13 changed files with 686 additions and 0 deletions.
@@ -0,0 +1,7 @@
{
"destinationDefinitionId": "356668e2-7e34-47f3-a3b0-67a8a481b692",
"name": "Google PubSub",
"dockerRepository": "airbyte/destination-pubsub",
"dockerImageTag": "0.1.0",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/pubsub"
}
Expand Up @@ -24,6 +24,11 @@
dockerRepository: airbyte/destination-bigquery-denormalized
dockerImageTag: 0.1.0
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
- destinationDefinitionId: 356668e2-7e34-47f3-a3b0-67a8a481b692
name: Google PubSub
dockerRepository: airbyte/destination-pubsub
dockerImageTag: 0.1.0
documentationUrl: https://docs.airbyte.io/integrations/destinations/pubsub
- destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
name: Snowflake
dockerRepository: airbyte/destination-snowflake
Expand Down
2 changes: 2 additions & 0 deletions airbyte-integrations/builds.md
Expand Up @@ -125,6 +125,8 @@

Postgres [![destination-postgres](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-postgres%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-postgres)

Google PubSub [![destination-pubsub](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-pubsub%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-pubsub)

Redshift [![destination-redshift](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-redshift%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-redshift)

S3 [![destination-s3](https://img.shields.io/endpoint?url=https%3A%2F%2Fstatus-api.airbyte.io%2Ftests%2Fsummary%2Fdestination-s3%2Fbadge.json)](https://status-api.airbyte.io/tests/summary/destination-s3)
Expand Down
@@ -0,0 +1,3 @@
*
!Dockerfile
!build
11 changes: 11 additions & 0 deletions airbyte-integrations/connectors/destination-pubsub/Dockerfile
@@ -0,0 +1,11 @@
FROM airbyte/integration-base-java:dev

WORKDIR /airbyte
ENV APPLICATION destination-pubsub

COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/destination-pubsub
21 changes: 21 additions & 0 deletions airbyte-integrations/connectors/destination-pubsub/README.md
@@ -0,0 +1,21 @@
# Google PubSub Test Configuration

In order to test the PubSub destination, you need a service account key file.

## Community Contributor

As a community contributor, you will need access to a GCP project and PubSub to run tests.

1. Go to the `Service Accounts` page on the GCP console
1. Click on `+ Create Service Account" button
1. Fill out a descriptive name/id/description
1. Click the edit icon next to the service account you created on the `IAM` page
1. Add the `Pub/Sub Editor` role
1. Go back to the `Service Accounts` page and use the actions modal to `Create Key`
1. Download this key as a JSON file
1. Move and rename this file to `secrets/credentials.json`

## Airbyte Employee

1. Access the `google pubsub test credentials.json` secret on Lastpass under the `shared-integration-test` folder
1. Create a file with the contents at `secrets/credentials.json`
21 changes: 21 additions & 0 deletions airbyte-integrations/connectors/destination-pubsub/build.gradle
@@ -0,0 +1,21 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
}

application {
mainClass = 'io.airbyte.integrations.destination.pubsub.PubsubDestination'
}

dependencies {
implementation group: 'com.google.cloud', name: 'google-cloud-pubsub', version: '1.113.3'

implementation project(':airbyte-config:models')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-integrations:bases:base-java')
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-pubsub')
}
@@ -0,0 +1,139 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.destination.pubsub;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.base.Charsets;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.io.ByteArrayInputStream;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubsubConsumer extends FailureTrackingAirbyteMessageConsumer {

private static final Logger LOGGER = LoggerFactory.getLogger(PubsubConsumer.class);
private final JsonNode config;
private final ConfiguredAirbyteCatalog catalog;
private final Consumer<AirbyteMessage> outputRecordCollector;
private final Map<AirbyteStreamNameNamespacePair, Map<String, String>> attributes;
private Publisher publisher;
private AirbyteMessage lastStateMessage;

public PubsubConsumer(JsonNode config,
ConfiguredAirbyteCatalog catalog,
Consumer<AirbyteMessage> outputRecordCollector) {
this.outputRecordCollector = outputRecordCollector;
this.config = config;
this.catalog = catalog;
this.lastStateMessage = null;
this.attributes = Maps.newHashMap();
this.publisher = null;
LOGGER.info("initializing consumer.");
}

@Override
protected void startTracked() throws Exception {
// get publisher
final String projectId = config.get(PubsubDestination.CONFIG_PROJECT_ID).asText();
final String topicName = config.get(PubsubDestination.CONFIG_TOPIC_ID).asText();
TopicName topic = TopicName.of(projectId, topicName);
final String credentialsString =
config.get(PubsubDestination.CONFIG_CREDS).isObject() ? Jsons.serialize(config.get(
PubsubDestination.CONFIG_CREDS))
: config.get(PubsubDestination.CONFIG_CREDS).asText();
final ServiceAccountCredentials credentials = ServiceAccountCredentials
.fromStream(new ByteArrayInputStream(credentialsString.getBytes(Charsets.UTF_8)));
publisher = Publisher.newBuilder(topic)
.setEnableMessageOrdering(true)
.setCredentialsProvider(FixedCredentialsProvider.create(credentials)).build();
for (final ConfiguredAirbyteStream configStream : catalog.getStreams()) {
final Map<String, String> attrs = Maps.newHashMap();
var key = AirbyteStreamNameNamespacePair.fromAirbyteSteam(configStream.getStream());
attrs.put(PubsubDestination.STREAM, key.getName());
if (!Strings.isNullOrEmpty(key.getNamespace())) {
attrs.put(PubsubDestination.NAMESPACE, key.getNamespace());
}
attributes.put(key, attrs);
}
}

@Override
protected void acceptTracked(AirbyteMessage msg) throws Exception {
if (msg.getType() == Type.STATE) {
lastStateMessage = msg;
outputRecordCollector.accept(lastStateMessage);
return;
} else if (msg.getType() != Type.RECORD) {
return;
}
final AirbyteRecordMessage recordMessage = msg.getRecord();
final AirbyteStreamNameNamespacePair streamKey = AirbyteStreamNameNamespacePair
.fromRecordMessage(recordMessage);

if (!attributes.containsKey(streamKey)) {
throw new IllegalArgumentException(
String.format(
"Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s",
Jsons.serialize(catalog), Jsons.serialize(recordMessage)));
}
final JsonNode data = Jsons.jsonNode(ImmutableMap.of(
JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString(),
JavaBaseConstants.COLUMN_NAME_DATA, recordMessage.getData(),
JavaBaseConstants.COLUMN_NAME_EMITTED_AT, recordMessage.getEmittedAt()));

publisher.publish(
PubsubMessage.newBuilder().putAllAttributes(attributes.get(streamKey))
.setOrderingKey(streamKey.toString())
.setData(ByteString.copyFromUtf8(Jsons.serialize(data))).build());
}

@Override
protected void close(boolean hasFailed) throws Exception {
if (!hasFailed) {
publisher.shutdown();
LOGGER.info("shutting down consumer.");
outputRecordCollector.accept(lastStateMessage);
}
}
}
@@ -0,0 +1,103 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.destination.pubsub;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.iam.v1.TestIamPermissionsRequest;
import com.google.iam.v1.TestIamPermissionsResponse;
import com.google.pubsub.v1.TopicName;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.io.ByteArrayInputStream;
import java.util.List;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubsubDestination extends BaseConnector implements Destination {

static final String CONFIG_TOPIC_ID = "topic_id";
static final String CONFIG_PROJECT_ID = "project_id";
static final String CONFIG_CREDS = "credentials_json";
static final String STREAM = "_stream";
static final String NAMESPACE = "_namespace";
private static final Logger LOGGER = LoggerFactory.getLogger(PubsubDestination.class);

public static void main(String[] args) throws Exception {
new IntegrationRunner(new PubsubDestination()).run(args);
}

@Override
public AirbyteConnectionStatus check(JsonNode config) {
try {
final String projectId = config.get(CONFIG_PROJECT_ID).asText();
final String topicId = config.get(CONFIG_TOPIC_ID).asText();
final String credentialsString =
config.get(CONFIG_CREDS).isObject() ? Jsons.serialize(config.get(CONFIG_CREDS))
: config.get(CONFIG_CREDS).asText();
final ServiceAccountCredentials credentials = ServiceAccountCredentials
.fromStream(new ByteArrayInputStream(credentialsString.getBytes(Charsets.UTF_8)));

TopicAdminClient adminClient = TopicAdminClient
.create(TopicAdminSettings.newBuilder().setCredentialsProvider(
FixedCredentialsProvider.create(credentials)).build());

// check if topic is present and the service account has necessary permissions on it
TopicName topicName = TopicName.of(projectId, topicId);
final List<String> requiredPermissions = List.of("pubsub.topics.publish");
final TestIamPermissionsResponse response = adminClient.testIamPermissions(
TestIamPermissionsRequest.newBuilder().setResource(topicName.toString())
.addAllPermissions(requiredPermissions).build());
Preconditions.checkArgument(response.getPermissionsList().containsAll(requiredPermissions),
"missing required permissions " + requiredPermissions);

return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
} catch (Exception e) {
LOGGER.info("Check failed.", e);
return new AirbyteConnectionStatus().withStatus(Status.FAILED)
.withMessage(e.getMessage() != null ? e.getMessage() : e.toString());
}
}

@Override
public AirbyteMessageConsumer getConsumer(JsonNode config,
ConfiguredAirbyteCatalog configuredCatalog,
Consumer<AirbyteMessage> outputRecordCollector) {
return new PubsubConsumer(config, configuredCatalog, outputRecordCollector);
}
}
@@ -0,0 +1,32 @@
{
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/pubsub",
"supportsIncremental": true,
"supportsNormalization": false,
"supportsDBT": false,
"supported_destination_sync_modes": ["append"],
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Google PubSub Destination Spec",
"type": "object",
"required": ["project_id", "topic_id", "credentials_json"],
"additionalProperties": true,
"properties": {
"project_id": {
"type": "string",
"description": "The GCP project ID for the project containing the target PubSub",
"title": "Project ID"
},
"topic_id": {
"type": "string",
"description": "PubSub topic ID in the given GCP project ID",
"title": "PubSub Topic ID"
},
"credentials_json": {
"type": "string",
"description": "The contents of the JSON service account key. Check out the <a href=\"https://docs.airbyte.io/integrations/destinations/pubsub\">docs</a> if you need help generating this key.",
"title": "Credentials JSON",
"airbyte_secret": true
}
}
}
}

0 comments on commit 0c25e1c

Please sign in to comment.