Skip to content

Commit

Permalink
feat: add pubsublite sink support for credentials settings (#251)
Browse files Browse the repository at this point in the history
psl source support requires more work

also unify credentialsProvider creation
  • Loading branch information
dpcollins-google committed Mar 31, 2023
1 parent c9c3beb commit 7290786
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,58 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class ConnectorCredentialsProvider implements CredentialsProvider {
private static final List<String> GCP_SCOPE =
Arrays.asList("https://www.googleapis.com/auth/cloud-platform");

private static final List<String> CPS_SCOPE =
Arrays.asList("https://www.googleapis.com/auth/pubsub");
CredentialsProvider impl;

GoogleCredentials credentials;
private ConnectorCredentialsProvider(CredentialsProvider impl) {
this.impl = impl;
}

public static ConnectorCredentialsProvider fromConfig(Map<String, Object> config) {
String credentialsPath = config.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG).toString();
String credentialsJson = config.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG).toString();
if (!credentialsPath.isEmpty()) {
if (!credentialsJson.isEmpty()) {
throw new IllegalArgumentException(
"May not set both "
+ ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG
+ " and "
+ ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG);
}
return ConnectorCredentialsProvider.fromFile(credentialsPath);
} else if (!credentialsJson.isEmpty()) {
return ConnectorCredentialsProvider.fromJson(credentialsJson);
} else {
return ConnectorCredentialsProvider.fromDefault();
}
}

public void loadFromFile(String credentialPath) throws IOException {
this.credentials = GoogleCredentials.fromStream(new FileInputStream(credentialPath));
public static ConnectorCredentialsProvider fromFile(String credentialPath) {
return new ConnectorCredentialsProvider(
() ->
GoogleCredentials.fromStream(new FileInputStream(credentialPath))
.createScoped(GCP_SCOPE));
}

public void loadJson(String credentialsJson) throws IOException {
ByteArrayInputStream bs = new ByteArrayInputStream(credentialsJson.getBytes());
this.credentials = credentials = GoogleCredentials.fromStream(bs);
public static ConnectorCredentialsProvider fromJson(String credentialsJson) {
return new ConnectorCredentialsProvider(
() ->
GoogleCredentials.fromStream(new ByteArrayInputStream(credentialsJson.getBytes()))
.createScoped(GCP_SCOPE));
}

public static ConnectorCredentialsProvider fromDefault() {
return new ConnectorCredentialsProvider(
() -> GoogleCredentials.getApplicationDefault().createScoped(GCP_SCOPE));
}

@Override
public Credentials getCredentials() throws IOException {
if (this.credentials == null) {
return GoogleCredentials.getApplicationDefault().createScoped(this.CPS_SCOPE);
} else {
return this.credentials.createScoped(this.CPS_SCOPE);
}
return impl.getCredentials();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,13 @@ public ConfigDef config() {
.define(
ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG,
Type.STRING,
null,
"",
Importance.HIGH,
"The path to the GCP credentials file")
.define(
ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG,
Type.STRING,
null,
"",
Importance.HIGH,
"GCP JSON credentials")
.define(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.google.pubsub.kafka.sink.CloudPubSubSinkConnector.OrderingKeySource;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -136,24 +135,7 @@ public void start(Map<String, String> props) {
orderingKeySource =
OrderingKeySource.getEnum(
(String) validatedProps.get(CloudPubSubSinkConnector.ORDERING_KEY_SOURCE));
gcpCredentialsProvider = new ConnectorCredentialsProvider();
String credentialsPath =
(String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG);
String credentialsJson =
(String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG);
if (credentialsPath != null) {
try {
gcpCredentialsProvider.loadFromFile(credentialsPath);
} catch (IOException e) {
throw new RuntimeException(e);
}
} else if (credentialsJson != null) {
try {
gcpCredentialsProvider.loadJson(credentialsJson);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
gcpCredentialsProvider = ConnectorCredentialsProvider.fromConfig(validatedProps);
if (publisher == null) {
// Only do this if we did not use the constructor.
createPublisher();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.pubsub.kafka.common.ConnectorCredentialsProvider;
import com.google.pubsub.kafka.common.ConnectorUtils;
import com.google.pubsub.v1.GetSubscriptionRequest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -138,25 +137,11 @@ public String version() {
public void start(Map<String, String> props) {
// Do a validation of configs here too so that we do not pass null objects to
// verifySubscription().
config().parse(props);
String cpsProject = props.get(ConnectorUtils.CPS_PROJECT_CONFIG);
String cpsSubscription = props.get(CPS_SUBSCRIPTION_CONFIG);
String credentialsPath = props.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG);
String credentialsJson = props.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG);
ConnectorCredentialsProvider credentialsProvider = new ConnectorCredentialsProvider();
if (credentialsPath != null) {
try {
credentialsProvider.loadFromFile(credentialsPath);
} catch (IOException e) {
throw new RuntimeException(e);
}
} else if (credentialsJson != null) {
try {
credentialsProvider.loadJson(credentialsJson);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
Map<String, Object> validated = config().parse(props);
String cpsProject = validated.get(ConnectorUtils.CPS_PROJECT_CONFIG).toString();
String cpsSubscription = validated.get(CPS_SUBSCRIPTION_CONFIG).toString();
ConnectorCredentialsProvider credentialsProvider =
ConnectorCredentialsProvider.fromConfig(validated);

verifySubscription(cpsProject, cpsSubscription, credentialsProvider);
this.props = props;
Expand Down Expand Up @@ -271,13 +256,13 @@ public ConfigDef config() {
.define(
ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG,
Type.STRING,
null,
"",
Importance.HIGH,
"The path to the GCP credentials file")
.define(
ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG,
Type.STRING,
null,
"",
Importance.HIGH,
"GCP JSON credentials")
.define(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -114,11 +113,6 @@ public void start(Map<String, String> props) {
useKafkaHeaders = (Boolean) validatedProps.get(CloudPubSubSourceConnector.USE_KAFKA_HEADERS);
makeOrderingKeyAttribute =
(Boolean) validatedProps.get(CloudPubSubSourceConnector.CPS_MAKE_ORDERING_KEY_ATTRIBUTE);
ConnectorCredentialsProvider gcpCredentialsProvider = new ConnectorCredentialsProvider();
String gcpCredentialsFilePath =
(String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG);
String credentialsJson =
(String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG);
boolean useStreamingPull =
(Boolean) validatedProps.get(CloudPubSubSourceConnector.CPS_STREAMING_PULL_ENABLED);
long streamingPullBytes =
Expand All @@ -136,19 +130,9 @@ public void start(Map<String, String> props) {
(Long)
validatedProps.get(
CloudPubSubSourceConnector.CPS_STREAMING_PULL_MAX_MS_PER_ACK_EXTENSION);
if (gcpCredentialsFilePath != null) {
try {
gcpCredentialsProvider.loadFromFile(gcpCredentialsFilePath);
} catch (IOException e) {
throw new RuntimeException(e);
}
} else if (credentialsJson != null) {
try {
gcpCredentialsProvider.loadJson(credentialsJson);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
ConnectorCredentialsProvider gcpCredentialsProvider =
ConnectorCredentialsProvider.fromConfig(validatedProps);

// Only do this if we did not set it through the constructor.
if (subscriber == null) {
if (useStreamingPull) {
Expand Down
15 changes: 14 additions & 1 deletion src/main/java/com/google/pubsublite/kafka/sink/ConfigDefs.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.pubsublite.kafka.sink;

import com.google.pubsub.kafka.common.ConnectorUtils;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;

Expand Down Expand Up @@ -49,6 +50,18 @@ static ConfigDef config() {
ConfigDef.Type.STRING,
OrderingMode.DEFAULT.name(),
Importance.HIGH,
"The ordering mode to use for publishing to Pub/Sub Lite. If set to `KAFKA`, messages will be republished to the same partition index they were read from on the source topic. Note that this means the Pub/Sub Lite topic *must* have the same number of partitions as the source Kafka topic.");
"The ordering mode to use for publishing to Pub/Sub Lite. If set to `KAFKA`, messages will be republished to the same partition index they were read from on the source topic. Note that this means the Pub/Sub Lite topic *must* have the same number of partitions as the source Kafka topic.")
.define(
ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG,
ConfigDef.Type.STRING,
"",
Importance.HIGH,
"The path to the GCP credentials file")
.define(
ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG,
ConfigDef.Type.STRING,
"",
Importance.HIGH,
"GCP JSON credentials");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,21 @@
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder;
import com.google.cloud.pubsublite.v1.AdminServiceClient;
import com.google.cloud.pubsublite.v1.AdminServiceSettings;
import com.google.cloud.pubsublite.v1.PublisherServiceClient;
import com.google.cloud.pubsublite.v1.PublisherServiceSettings;
import com.google.pubsub.kafka.common.ConnectorCredentialsProvider;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.common.config.ConfigValue;

class PublisherFactoryImpl implements PublisherFactory {

private static final Framework FRAMEWORK = Framework.of("KAFKA_CONNECT");

private PartitionPublisherFactory getPartitionPublisherFactory(TopicPath topic) {
private PartitionPublisherFactory getPartitionPublisherFactory(
TopicPath topic, ConnectorCredentialsProvider credentialsProvider) {

return new PartitionPublisherFactory() {
private Optional<PublisherServiceClient> publisherServiceClient = Optional.empty();
Expand All @@ -61,9 +65,7 @@ private synchronized PublisherServiceClient getServiceClient() throws ApiExcepti
addDefaultSettings(
topic.location().extractRegion(),
PublisherServiceSettings.newBuilder()
.setCredentialsProvider(
PublisherServiceSettings.defaultCredentialsProviderBuilder()
.build()))));
.setCredentialsProvider(credentialsProvider))));
return publisherServiceClient.get();
} catch (Throwable t) {
throw toCanonical(t).underlying;
Expand Down Expand Up @@ -95,25 +97,38 @@ public void close() {}

@Override
public Publisher<MessageMetadata> newPublisher(Map<String, String> params) {
Map<String, ConfigValue> config = ConfigDefs.config().validateAll(params);
Map<String, Object> config = ConfigDefs.config().parse(params);
ConnectorCredentialsProvider credentialsProvider =
ConnectorCredentialsProvider.fromConfig(config);
CloudRegionOrZone location =
CloudRegionOrZone.parse(config.get(ConfigDefs.LOCATION_FLAG).value().toString());
CloudRegionOrZone.parse(config.get(ConfigDefs.LOCATION_FLAG).toString());
PartitionCountWatchingPublisherSettings.Builder builder =
PartitionCountWatchingPublisherSettings.newBuilder();
TopicPath topic =
TopicPath.newBuilder()
.setProject(
ProjectPath.parse("projects/" + config.get(ConfigDefs.PROJECT_FLAG).value())
.project())
ProjectPath.parse("projects/" + config.get(ConfigDefs.PROJECT_FLAG)).project())
.setLocation(location)
.setName(TopicName.of(config.get(ConfigDefs.TOPIC_NAME_FLAG).value().toString()))
.setName(TopicName.of(config.get(ConfigDefs.TOPIC_NAME_FLAG).toString()))
.build();
builder.setTopic(topic);
builder.setPublisherFactory(getPartitionPublisherFactory(topic));
builder.setAdminClient(
AdminClient.create(
AdminClientSettings.newBuilder().setRegion(location.extractRegion()).build()));
if (OrderingMode.valueOf(config.get(ConfigDefs.ORDERING_MODE_FLAG).value().toString())
builder.setPublisherFactory(getPartitionPublisherFactory(topic, credentialsProvider));
try {
builder.setAdminClient(
AdminClient.create(
AdminClientSettings.newBuilder()
.setRegion(location.extractRegion())
.setServiceClient(
AdminServiceClient.create(
addDefaultSettings(
location.extractRegion(),
AdminServiceSettings.newBuilder()
.setCredentialsProvider(credentialsProvider))))
.build()));
} catch (IOException e) {
throw new IllegalStateException(e);
}
if (OrderingMode.valueOf(config.get(ConfigDefs.ORDERING_MODE_FLAG).toString())
== OrderingMode.KAFKA) {
builder.setRoutingPolicyFactory(KafkaPartitionRoutingPolicy::new);
}
Expand Down
15 changes: 14 additions & 1 deletion src/main/java/com/google/pubsublite/kafka/source/ConfigDefs.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.pubsublite.kafka.source;

import com.google.pubsub.kafka.common.ConnectorUtils;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;

Expand Down Expand Up @@ -63,6 +64,18 @@ static ConfigDef config() {
ConfigDef.Type.LONG,
20_000_000,
Importance.MEDIUM,
"The number of outstanding bytes per-partition allowed. Set to 20MB by default.");
"The number of outstanding bytes per-partition allowed. Set to 20MB by default.")
.define(
ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG,
ConfigDef.Type.STRING,
"",
Importance.HIGH,
"The path to the GCP credentials file")
.define(
ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG,
ConfigDef.Type.STRING,
"",
Importance.HIGH,
"GCP JSON credentials");
}
}

0 comments on commit 7290786

Please sign in to comment.