Skip to content

Commit

Permalink
feat: Add non google default creds provider to subscribers in Spark C…
Browse files Browse the repository at this point in the history
…onnector (#440)
  • Loading branch information
jiangmichaellll committed Jan 9, 2021
1 parent 5190989 commit 2099751
Show file tree
Hide file tree
Showing 17 changed files with 150 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.google.cloud.pubsublite;

import static com.google.cloud.pubsublite.internal.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;

import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.google.cloud.pubsublite.internal;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;

import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.google.cloud.pubsublite.internal;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;

import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;

import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;

import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;

import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
Expand All @@ -32,8 +33,6 @@
import com.google.cloud.pubsublite.v1.PublisherServiceClient;
import com.google.cloud.pubsublite.v1.PublisherServiceSettings;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import java.util.Optional;
import org.threeten.bp.Duration;

Expand Down Expand Up @@ -110,19 +109,14 @@ public Publisher<Offset> build() throws ApiException {
serviceClient = autoBuilt.serviceClient().get();
} else {
try {
Map<String, String> metadata = autoBuilt.context().getMetadata();
Map<String, String> routingMetadata =
RoutingMetadata.of(autoBuilt.topic(), autoBuilt.partition());
Map<String, String> allMetadata =
ImmutableMap.<String, String>builder()
.putAll(metadata)
.putAll(routingMetadata)
.build();
PublisherServiceSettings.Builder settingsBuilder = PublisherServiceSettings.newBuilder();
addDefaultMetadata(
autoBuilt.context(),
RoutingMetadata.of(autoBuilt.topic(), autoBuilt.partition()),
settingsBuilder);
serviceClient =
PublisherServiceClient.create(
addDefaultSettings(
autoBuilt.topic().location().region(),
PublisherServiceSettings.newBuilder().setHeaderProvider(() -> allMetadata)));
addDefaultSettings(autoBuilt.topic().location().region(), settingsBuilder));
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,43 @@
import java.nio.charset.StandardCharsets;
import java.util.Map;

final class RoutingMetadata {
private RoutingMetadata() {}
public final class RoutingMetadata {

static final String PARAMS_HEADER = "x-goog-request-params";
private static final String PARAMS_HEADER = "x-goog-request-params";
private final Map<String, String> metadata;

static Map<String, String> of(TopicPath topic, Partition partition) throws ApiException {
public static RoutingMetadata of(TopicPath topic, Partition partition) throws ApiException {
return new RoutingMetadata(topic, partition);
}

public static RoutingMetadata of(SubscriptionPath subscription, Partition partition)
throws ApiException {
return new RoutingMetadata(subscription, partition);
}

private RoutingMetadata(TopicPath topic, Partition partition) {
try {
String topic_value = URLEncoder.encode(topic.toString(), StandardCharsets.UTF_8.toString());
String params = String.format("partition=%s&topic=%s", partition.value(), topic_value);
return ImmutableMap.of(PARAMS_HEADER, params);
this.metadata = ImmutableMap.of(PARAMS_HEADER, params);
} catch (UnsupportedEncodingException e) {
throw toCanonical(e).underlying;
}
}

static Map<String, String> of(SubscriptionPath subscription, Partition partition)
throws ApiException {
private RoutingMetadata(SubscriptionPath subscription, Partition partition) {
try {
String subscription_value =
URLEncoder.encode(subscription.toString(), StandardCharsets.UTF_8.toString());
String params =
String.format("partition=%s&subscription=%s", partition.value(), subscription_value);
return ImmutableMap.of(PARAMS_HEADER, params);
this.metadata = ImmutableMap.of(PARAMS_HEADER, params);
} catch (UnsupportedEncodingException e) {
throw toCanonical(e).underlying;
}
}

public Map<String, String> getMetadata() {
return metadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal;
package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;

Expand All @@ -25,6 +25,8 @@
import com.google.api.gax.rpc.ClientSettings;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.Endpoints;
import com.google.cloud.pubsublite.internal.Lazy;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.threeten.bp.Duration;
Expand Down Expand Up @@ -60,4 +62,18 @@ Settings addDefaultSettings(CloudRegion target, Builder builder) throws ApiExcep
throw toCanonical(t).underlying;
}
}

// Adds context routing metadata for publisher or subscriber.
public static <
Settings extends ClientSettings<Settings>,
Builder extends ClientSettings.Builder<Settings, Builder>>
Builder addDefaultMetadata(
PubsubContext context, RoutingMetadata routingMetadata, Builder builder) {
return builder.setHeaderProvider(
() ->
ImmutableMap.<String, String>builder()
.putAll(context.getMetadata())
.putAll(routingMetadata.getMetadata())
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;

import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
Expand All @@ -28,8 +29,6 @@
import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
import com.google.cloud.pubsublite.v1.SubscriberServiceSettings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;

Expand Down Expand Up @@ -77,19 +76,16 @@ public Subscriber build() throws ApiException {
serviceClient = autoBuilt.serviceClient().get();
} else {
try {
Map<String, String> metadata = autoBuilt.context().getMetadata();
Map<String, String> routingMetadata =
RoutingMetadata.of(autoBuilt.subscriptionPath(), autoBuilt.partition());
Map<String, String> allMetadata =
ImmutableMap.<String, String>builder()
.putAll(metadata)
.putAll(routingMetadata)
.build();
SubscriberServiceSettings.Builder settingsBuilder =
SubscriberServiceSettings.newBuilder();
addDefaultMetadata(
autoBuilt.context(),
RoutingMetadata.of(autoBuilt.subscriptionPath(), autoBuilt.partition()),
settingsBuilder);
serviceClient =
SubscriberServiceClient.create(
addDefaultSettings(
autoBuilt.subscriptionPath().location().region(),
SubscriberServiceSettings.newBuilder().setHeaderProvider(() -> allMetadata)));
autoBuilt.subscriptionPath().location().region(), settingsBuilder));
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.common.collect.ImmutableList;
import java.io.Serializable;
import java.util.function.Consumer;

public interface PartitionSubscriberFactory extends Serializable {
Subscriber newSubscriber(
Partition partition, Consumer<ImmutableList<SequencedMessage>> message_consumer)
throws ApiException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SeekRequest;
import java.io.Serializable;
Expand All @@ -33,14 +32,17 @@
public class PslContinuousInputPartition
implements ContinuousInputPartition<InternalRow>, Serializable {

private final SubscriberFactory subscriberFactory;
private final SparkPartitionOffset startOffset;
private final SubscriptionPath subscriptionPath;
private final FlowControlSettings flowControlSettings;

public PslContinuousInputPartition(
SubscriberFactory subscriberFactory,
SparkPartitionOffset startOffset,
SubscriptionPath subscriptionPath,
FlowControlSettings flowControlSettings) {
this.subscriberFactory = subscriberFactory;
this.startOffset = startOffset;
this.subscriptionPath = subscriptionPath;
this.flowControlSettings = flowControlSettings;
Expand All @@ -59,14 +61,7 @@ public InputPartitionReader<InternalRow> createContinuousReader(PartitionOffset
try {
subscriber =
new BlockingPullSubscriberImpl(
// TODO(jiangmichael): Pass credentials settings here.
(consumer) ->
SubscriberBuilder.newBuilder()
.setSubscriptionPath(subscriptionPath)
.setPartition(pslPartitionOffset.partition())
.setContext(PubsubContext.of(Constants.FRAMEWORK))
.setMessageConsumer(consumer)
.build(),
subscriberFactory,
flowControlSettings,
SeekRequest.newBuilder()
.setCursor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class PslContinuousReader implements ContinuousReader {

private final CursorClient cursorClient;
private final MultiPartitionCommitter committer;
private final PartitionSubscriberFactory partitionSubscriberFactory;
private final SubscriptionPath subscriptionPath;
private final FlowControlSettings flowControlSettings;
private final long topicPartitionCount;
Expand All @@ -44,11 +45,13 @@ public class PslContinuousReader implements ContinuousReader {
public PslContinuousReader(
CursorClient cursorClient,
MultiPartitionCommitter committer,
PartitionSubscriberFactory partitionSubscriberFactory,
SubscriptionPath subscriptionPath,
FlowControlSettings flowControlSettings,
long topicPartitionCount) {
this.cursorClient = cursorClient;
this.committer = committer;
this.partitionSubscriberFactory = partitionSubscriberFactory;
this.subscriptionPath = subscriptionPath;
this.flowControlSettings = flowControlSettings;
this.topicPartitionCount = topicPartitionCount;
Expand Down Expand Up @@ -104,10 +107,12 @@ public StructType readSchema() {

@Override
public List<InputPartition<InternalRow>> planInputPartitions() {

return startOffset.getPartitionOffsetMap().values().stream()
.map(
v ->
new PslContinuousInputPartition(
(consumer) -> partitionSubscriberFactory.newSubscriber(v.partition(), consumer),
SparkPartitionOffset.builder()
.partition(v.partition())
.offset(v.offset())
Expand Down
Loading

0 comments on commit 2099751

Please sign in to comment.