Skip to content

Commit

Permalink
feat: add pubsublite-kafka-auth module (#363)
Browse files Browse the repository at this point in the history
* feat: Create AuthServer/ClientParameters

* feat: Create AuthServer/ClientParameters

* feat: Create AuthServer/ClientParameters

* feat: Create AuthServer/ClientParameters

* feat: Create AuthServer/ClientParameters

* feat: Create AuthServer/ClientParameters

* feat: Create AuthServer/ClientParameters

* feat: Create AuthServer/ClientParameters

* feat: Create AuthServer/ClientParameters

* feat: Create AuthServer/ClientParameters

* feat: Create AuthServer/ClientParameters

* feat: Create AuthServer/ClientParameters

* feat: Create AuthServer/ClientParameters

* feat: Create AuthServer/ClientParameters

* feat: Create AuthServer/ClientParameters

* feat: Create AuthServer/ClientParameters

* feat: Create AuthServer/ClientParameters

* feat: Create AuthServer/ClientParameters

* feat: Create AuthServer/ClientParameters

* feat: Create AuthServer/ClientParameters

* feat: Create AuthServer/ClientParameters

* feat: Create AuthServer/ClientParameters

* feat: Create AuthServer/ClientParameters

* feat: Create AuthServer/ClientParameters

* feat: Create AuthServer/ClientParameters

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
dpcollins-google and gcf-owl-bot[bot] committed Jan 6, 2023
1 parent c152761 commit 6c99767
Show file tree
Hide file tree
Showing 10 changed files with 450 additions and 27 deletions.
7 changes: 4 additions & 3 deletions README.md
Expand Up @@ -19,20 +19,20 @@ If you are using Maven, add this to your pom.xml file:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>pubsublite-kafka</artifactId>
<version>1.0.3</version>
<version>1.0.4</version>
</dependency>
```

If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:pubsublite-kafka:1.0.3'
implementation 'com.google.cloud:pubsublite-kafka:1.0.4'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "pubsublite-kafka" % "1.0.3"
libraryDependencies += "com.google.cloud" % "pubsublite-kafka" % "1.0.4"
```

## Authentication
Expand Down Expand Up @@ -160,6 +160,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/java-pubsublite-ka
| Sample | Source Code | Try it |
| --------------------------- | --------------------------------- | ------ |
| Consumer Example | [source code](https://github.com/googleapis/java-pubsublite-kafka/blob/main/samples/snippets/src/main/java/pubsublite/ConsumerExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsublite-kafka&page=editor&open_in_editor=samples/snippets/src/main/java/pubsublite/ConsumerExample.java) |
| Kafka Producer Example | [source code](https://github.com/googleapis/java-pubsublite-kafka/blob/main/samples/snippets/src/main/java/pubsublite/KafkaProducerExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsublite-kafka&page=editor&open_in_editor=samples/snippets/src/main/java/pubsublite/KafkaProducerExample.java) |
| Producer Example | [source code](https://github.com/googleapis/java-pubsublite-kafka/blob/main/samples/snippets/src/main/java/pubsublite/ProducerExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsublite-kafka&page=editor&open_in_editor=samples/snippets/src/main/java/pubsublite/ProducerExample.java) |


Expand Down
70 changes: 70 additions & 0 deletions kafka_gcp_credentials.py
@@ -0,0 +1,70 @@
import base64
import datetime
import google.auth
import google.auth.transport.urllib3
import http.server
import json
import urllib3

_credentials, _project = google.auth.default()
_http_client = urllib3.PoolManager()


def valid_credentials():
if not _credentials.valid:
_credentials.refresh(
google.auth.transport.urllib3.Request(_http_client))
return _credentials


_HEADER = json.dumps(dict(typ='JWT', alg='GOOG_TOKEN'))


def get_jwt(creds):
return json.dumps(dict(exp=creds.expiry.timestamp(),
iat=datetime.datetime.utcnow().timestamp(),
scope='pubsub',
sub='unused'))


def b64_encode(source):
return base64.urlsafe_b64encode(source.encode('utf-8')).decode('utf-8')


def get_kafka_access_token(creds):
return '.'.join([b64_encode(_HEADER), b64_encode(get_jwt(creds)),
b64_encode(creds.token)])


def build_message():
creds = valid_credentials()
expiry_seconds = (creds.expiry - datetime.datetime.utcnow()).total_seconds()
return json.dumps(
dict(access_token=get_kafka_access_token(creds), token_type='bearer',
expires_in=expiry_seconds))


class AuthHandler(http.server.BaseHTTPRequestHandler):
def _handle(self):
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(build_message().encode('utf-8'))

def do_GET(self):
self._handle()

def do_POST(self):
self._handle()


def run_server():
server_address = ('localhost', 14293)
server = http.server.ThreadingHTTPServer(server_address, AuthHandler)
print("Serving on localhost:14293. This is not accessible outside of the "
"current machine.")
server.serve_forever()


if __name__ == '__main__':
run_server()
1 change: 1 addition & 0 deletions pom.xml
Expand Up @@ -15,6 +15,7 @@
<description>Parent POM for Pub/Sub Lite Kafka Integrations</description>
<modules>
<module>pubsublite-kafka</module>
<module>pubsublite-kafka-auth</module>
</modules>
<properties>
<psl.version>1.9.2</psl.version>
Expand Down
34 changes: 34 additions & 0 deletions pubsublite-kafka-auth/pom.xml
@@ -0,0 +1,34 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.google.cloud</groupId>
<artifactId>pubsublite-kafka-parent</artifactId>
<version>1.0.5-SNAPSHOT</version><!-- {x-version-update:pubsublite-kafka:current} -->
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.google.cloud</groupId>
<artifactId>pubsublite-kafka-auth</artifactId>
<version>1.0.5-SNAPSHOT</version><!-- {x-version-update:pubsublite-kafka:current} -->
<packaging>jar</packaging>
<name>Pub/Sub Lite Kafka Auth</name>
<url>https://github.com/googleapis/java-pubsublite-kafka</url>
<description>Kafka Auth Provider for Google Cloud Pub/Sub Lite</description>
<dependencies>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
</dependencies>
</project>
@@ -0,0 +1,63 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.kafka;

import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.ProjectId;
import com.google.cloud.pubsublite.ProjectIdOrNumber;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.kafka.internal.AuthServer;
import java.util.HashMap;
import java.util.Map;

/** A class providing the correct parameters for connecting a Kafka client to Pub/Sub Lite. */
public final class ClientParameters {
public static Map<String, Object> getProducerParams(ProjectId project, CloudRegion region) {
return getProducerParams(ProjectIdOrNumber.of(project), region);
}

public static Map<String, Object> getProducerParams(ProjectNumber project, CloudRegion region) {
return getProducerParams(ProjectIdOrNumber.of(project), region);
}

public static Map<String, Object> getProducerParams(
ProjectIdOrNumber project, CloudRegion region) {
HashMap<String, Object> params = new HashMap<>();
params.put("enable.idempotence", false);
params.put("bootstrap.servers", getEndpoint(region));
params.put("security.protocol", "SASL_SSL");
params.put("sasl.mechanism", "OAUTHBEARER");
params.put("sasl.oauthbearer.token.endpoint.url", "http://localhost:" + AuthServer.PORT);
params.put("sasl.jaas.config", getJaasConfig(project));
params.put(
"sasl.login.callback.handler.class",
"org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler");
return params;
}

private static String getEndpoint(CloudRegion region) {
return region.value() + "-kafka-pubsub.googleapis.com:443";
}

private static String getJaasConfig(ProjectIdOrNumber project) {
return String.format(
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId=\"unused\" clientSecret=\"unused\" extension_pubsubProject=\"%s\";",
project);
}

private ClientParameters() {}
}
@@ -0,0 +1,118 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.kafka.internal;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.singletonList;

import com.google.auth.oauth2.AccessToken;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import com.sun.net.httpserver.HttpServer;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.time.Instant;
import java.util.Base64;

public class AuthServer {
public static int PORT = 14293;
public static InetSocketAddress ADDRESS =
new InetSocketAddress(InetAddress.getLoopbackAddress(), PORT);

private static final String HEADER =
new Gson().toJson(ImmutableMap.of("typ", "JWT", "alg", "GOOG_TOKEN"));

static {
spawnDaemon();
}

private static String b64Encode(String data) {
return Base64.getUrlEncoder().encodeToString(data.getBytes(UTF_8));
}

private static String getJwt(AccessToken token) {
return new Gson()
.toJson(
ImmutableMap.of(
"exp",
token.getExpirationTime().toInstant().getEpochSecond(),
"iat",
Instant.now().getEpochSecond(),
"scope",
"pubsub",
"sub",
"unused"));
}

private static String getKafkaAccessToken(AccessToken token) {
return String.join(
".", b64Encode(HEADER), b64Encode(getJwt(token)), b64Encode(token.getTokenValue()));
}

private static String getResponse(GoogleCredentials creds) throws IOException {
creds.refreshIfExpired();
AccessToken token = creds.getAccessToken();
long exipiresInSeconds =
Duration.between(Instant.now(), token.getExpirationTime().toInstant()).getSeconds();
return new Gson()
.toJson(
ImmutableMap.of(
"access_token",
getKafkaAccessToken(token),
"token_type",
"bearer",
"expires_in",
Long.toString(exipiresInSeconds)));
}

private static void spawnDaemon() {
// Run spawn() in a daemon thread so the created threads are themselves daemons.
Thread thread = new Thread(AuthServer::spawn);
thread.setDaemon(true);
thread.start();
}

private static void spawn() {
try {
GoogleCredentials creds =
GoogleCredentials.getApplicationDefault()
.createScoped("https://www.googleapis.com/auth/cloud-platform");
HttpServer server = HttpServer.create(ADDRESS, 0);
server.createContext(
"/",
handler -> {
try {
byte[] response = getResponse(creds).getBytes(UTF_8);
handler.getResponseHeaders().put("Content-type", singletonList("text/plain"));
handler.sendResponseHeaders(200, response.length);
handler.getResponseBody().write(response);
handler.close();
} catch (Exception e) {
e.printStackTrace(System.err);
throw new RuntimeException(e);
}
});
server.start();
} catch (Exception e) {
e.printStackTrace(System.err);
throw new RuntimeException(e);
}
}
}
16 changes: 16 additions & 0 deletions samples/snapshot/pom.xml
Expand Up @@ -45,12 +45,28 @@
<artifactId>pubsublite-kafka</artifactId>
<version>1.0.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>pubsublite-kafka-auth</artifactId>
<version>1.0.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.1</version>
</dependency>
<!-- {x-version-update-end} -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.36</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions samples/snippets/pom.xml
Expand Up @@ -47,6 +47,11 @@
<artifactId>pubsublite-kafka</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>pubsublite-kafka-auth</artifactId>
<version>1.0.5-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Expand Down

0 comments on commit 6c99767

Please sign in to comment.