Skip to content

Commit

Permalink
Publish Pubsub (#4354)
Browse files Browse the repository at this point in the history
  • Loading branch information
sherifnada committed Jun 25, 2021
1 parent 8bce174 commit 767f40b
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 6 deletions.
1 change: 1 addition & 0 deletions .github/workflows/publish-command.yml
Expand Up @@ -58,6 +58,7 @@ jobs:
AZURE_STORAGE_INTEGRATION_TEST_CREDS: ${{ secrets.AZURE_STORAGE_INTEGRATION_TEST_CREDS }}
BIGQUERY_INTEGRATION_TEST_CREDS: ${{ secrets.BIGQUERY_INTEGRATION_TEST_CREDS }}
BRAINTREE_TEST_CREDS: ${{ secrets.BRAINTREE_TEST_CREDS }}
DESTINATION_PUBSUB_TEST_CREDS: ${{ secrets.DESTINATION_PUBSUB_TEST_CREDS }}
DRIFT_INTEGRATION_TEST_CREDS: ${{ secrets.DRIFT_INTEGRATION_TEST_CREDS }}
EXCHANGE_RATES_TEST_CREDS: ${{ secrets.EXCHANGE_RATES_TEST_CREDS }}
FACEBOOK_MARKETING_TEST_INTEGRATION_CREDS: ${{ secrets.FACEBOOK_MARKETING_TEST_INTEGRATION_CREDS }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test-command.yml
Expand Up @@ -58,6 +58,7 @@ jobs:
AZURE_STORAGE_INTEGRATION_TEST_CREDS: ${{ secrets.AZURE_STORAGE_INTEGRATION_TEST_CREDS }}
BIGQUERY_INTEGRATION_TEST_CREDS: ${{ secrets.BIGQUERY_INTEGRATION_TEST_CREDS }}
BRAINTREE_TEST_CREDS: ${{ secrets.BRAINTREE_TEST_CREDS }}
DESTINATION_PUBSUB_TEST_CREDS: ${{ secrets.DESTINATION_PUBSUB_TEST_CREDS }}
DRIFT_INTEGRATION_TEST_CREDS: ${{ secrets.DRIFT_INTEGRATION_TEST_CREDS }}
EXCHANGE_RATES_TEST_CREDS: ${{ secrets.EXCHANGE_RATES_TEST_CREDS }}
FACEBOOK_MARKETING_TEST_INTEGRATION_CREDS: ${{ secrets.FACEBOOK_MARKETING_TEST_INTEGRATION_CREDS }}
Expand Down
Expand Up @@ -21,6 +21,7 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.destination.pubsub;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -61,8 +62,8 @@ public class PubsubConsumer extends FailureTrackingAirbyteMessageConsumer {
private AirbyteMessage lastStateMessage;

public PubsubConsumer(JsonNode config,
ConfiguredAirbyteCatalog catalog,
Consumer<AirbyteMessage> outputRecordCollector) {
ConfiguredAirbyteCatalog catalog,
Consumer<AirbyteMessage> outputRecordCollector) {
this.outputRecordCollector = outputRecordCollector;
this.config = config;
this.catalog = catalog;
Expand Down Expand Up @@ -136,4 +137,5 @@ protected void close(boolean hasFailed) throws Exception {
outputRecordCollector.accept(lastStateMessage);
}
}

}
Expand Up @@ -96,8 +96,9 @@ public AirbyteConnectionStatus check(JsonNode config) {

@Override
public AirbyteMessageConsumer getConsumer(JsonNode config,
ConfiguredAirbyteCatalog configuredCatalog,
Consumer<AirbyteMessage> outputRecordCollector) {
ConfiguredAirbyteCatalog configuredCatalog,
Consumer<AirbyteMessage> outputRecordCollector) {
return new PubsubConsumer(config, configuredCatalog, outputRecordCollector);
}

}
Expand Up @@ -79,7 +79,8 @@ public class PubsubDestinationAcceptanceTest extends DestinationAcceptanceTest {
private ProjectSubscriptionName subscriptionName;
private Credentials credentials;
private JsonNode configJson;
// Store retrieved data during the test run since we can't re-read it multiple times (ACKing messages causes them to be removed from pubsub)
// Store retrieved data during the test run since we can't re-read it multiple times (ACKing
// messages causes them to be removed from pubsub)
private List<JsonNode> records;

@Override
Expand Down Expand Up @@ -120,7 +121,7 @@ protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv,
// verification
SubscriberStubSettings subscriberStubSettings =
SubscriberStubSettings.newBuilder()
.setCredentialsProvider(FixedCredentialsProvider.create(credentials))
.setCredentialsProvider(FixedCredentialsProvider.create(credentials))
.setTransportChannelProvider(
SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
.setCredentials(credentials)
Expand Down
1 change: 1 addition & 0 deletions tools/bin/ci_credentials.sh
Expand Up @@ -23,6 +23,7 @@ function write_standard_creds() {

write_standard_creds destination-bigquery "$BIGQUERY_INTEGRATION_TEST_CREDS" "credentials.json"
write_standard_creds destination-bigquery-denormalized "$BIGQUERY_INTEGRATION_TEST_CREDS" "credentials.json"
write_standard_creds destination-pubsub "$DESTINATION_PUBSUB_TEST_CREDS" "credentials.json"
write_standard_creds destination-snowflake "$SNOWFLAKE_INTEGRATION_TEST_CREDS" "insert_config.json"
write_standard_creds destination-snowflake "$SNOWFLAKE_S3_COPY_INTEGRATION_TEST_CREDS" "copy_s3_config.json"
write_standard_creds destination-snowflake "$SNOWFLAKE_GCS_COPY_INTEGRATION_TEST_CREDS" "copy_gcs_config.json"
Expand Down

0 comments on commit 767f40b

Please sign in to comment.