Skip to content

Commit 3c43ef3

Browse files
feat: Implement interfaces and utilities needed for Pub/Sub Lite Kafka shim (#276)
* fix: Implement rethrowAsRuntime and clean up ExtractStatus * feat: Implement interfaces and utilities needed for Pub/Sub Lite Kafka shim * fix: Formatting changes * deps: Fix dependencies * chore: Update exception type for already exists * chore: Run linter
1 parent 5c0e7cc commit 3c43ef3

File tree

12 files changed

+656
-6
lines changed

12 files changed

+656
-6
lines changed

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/ExtractStatus.java

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,21 @@
2323
import io.grpc.StatusRuntimeException;
2424
import java.util.Optional;
2525
import java.util.concurrent.ExecutionException;
26+
import java.util.function.BiConsumer;
2627
import java.util.function.Consumer;
28+
import java.util.function.Function;
2729

2830
public final class ExtractStatus {
2931
public static Optional<Status> extract(Throwable t) {
30-
if (t instanceof StatusException) {
31-
return Optional.of(((StatusException) t).getStatus());
32+
try {
33+
throw t;
34+
} catch (StatusException e) {
35+
return Optional.of(e.getStatus());
36+
} catch (StatusRuntimeException e) {
37+
return Optional.of(e.getStatus());
38+
} catch (Throwable e) {
39+
return Optional.empty();
3240
}
33-
if (t instanceof StatusRuntimeException) {
34-
return Optional.of(((StatusRuntimeException) t).getStatus());
35-
}
36-
return Optional.empty();
3741
}
3842

3943
public static StatusException toCanonical(Throwable t) {
@@ -56,5 +60,47 @@ public static void addFailureHandler(ApiFuture<?> future, Consumer<StatusExcepti
5660
MoreExecutors.directExecutor());
5761
}
5862

63+
public interface StatusFunction<I, O> {
64+
O apply(I input) throws StatusException;
65+
}
66+
67+
public interface StatusConsumer<I> {
68+
void apply(I input) throws StatusException;
69+
}
70+
71+
public interface StatusBiconsumer<K, V> {
72+
void apply(K key, V value) throws StatusException;
73+
}
74+
75+
public static <I, O> Function<I, O> rethrowAsRuntime(StatusFunction<I, O> function) {
76+
return i -> {
77+
try {
78+
return function.apply(i);
79+
} catch (StatusException e) {
80+
throw e.getStatus().asRuntimeException();
81+
}
82+
};
83+
}
84+
85+
public static <I> Consumer<I> rethrowAsRuntime(StatusConsumer<I> consumer) {
86+
return i -> {
87+
try {
88+
consumer.apply(i);
89+
} catch (StatusException e) {
90+
throw e.getStatus().asRuntimeException();
91+
}
92+
};
93+
}
94+
95+
public static <K, V> BiConsumer<K, V> rethrowAsRuntime(StatusBiconsumer<K, V> consumer) {
96+
return (k, v) -> {
97+
try {
98+
consumer.apply(k, v);
99+
} catch (StatusException e) {
100+
throw e.getStatus().asRuntimeException();
101+
}
102+
};
103+
}
104+
59105
private ExtractStatus() {}
60106
}

pubsublite-kafka-shim/pom.xml

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,59 @@
1414
<name>Pub/Sub Lite Kafka Shim</name>
1515
<url>https://github.com/googleapis/java-pubsublite</url>
1616
<description>Kafka Producer and Consumer for Google Cloud Pub/Sub Lite</description>
17+
<dependencies>
18+
<dependency>
19+
<groupId>com.google.api.grpc</groupId>
20+
<artifactId>proto-google-cloud-pubsublite-v1</artifactId>
21+
<version>0.4.2-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsublite:current} -->
22+
</dependency>
23+
<dependency>
24+
<groupId>com.google.cloud</groupId>
25+
<artifactId>google-cloud-pubsublite</artifactId>
26+
<version>0.4.2-SNAPSHOT</version><!-- {x-version-update:google-cloud-pubsublite:current} -->
27+
</dependency>
28+
<dependency>
29+
<groupId>org.apache.kafka</groupId>
30+
<artifactId>kafka-clients</artifactId>
31+
<version>2.5.0</version>
32+
</dependency>
33+
<dependency>
34+
<groupId>io.grpc</groupId>
35+
<artifactId>grpc-api</artifactId>
36+
</dependency>
37+
<dependency>
38+
<groupId>com.google.protobuf</groupId>
39+
<artifactId>protobuf-java</artifactId>
40+
</dependency>
41+
<dependency>
42+
<groupId>com.google.protobuf</groupId>
43+
<artifactId>protobuf-java-util</artifactId>
44+
</dependency>
45+
<dependency>
46+
<groupId>com.google.api</groupId>
47+
<artifactId>api-common</artifactId>
48+
</dependency>
49+
<dependency>
50+
<groupId>com.google.guava</groupId>
51+
<artifactId>guava</artifactId>
52+
</dependency>
53+
<!--test dependencies-->
54+
<dependency>
55+
<groupId>junit</groupId>
56+
<artifactId>junit</artifactId>
57+
<scope>test</scope>
58+
</dependency>
59+
<dependency>
60+
<groupId>com.google.truth</groupId>
61+
<artifactId>truth</artifactId>
62+
<scope>test</scope>
63+
</dependency>
64+
<dependency>
65+
<groupId>com.google.truth.extensions</groupId>
66+
<artifactId>truth-java8-extension</artifactId>
67+
<scope>test</scope>
68+
</dependency>
69+
</dependencies>
1770
<build>
1871
<plugins>
1972
<plugin>
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.kafka;
18+
19+
import com.google.cloud.pubsublite.Partition;
20+
import com.google.cloud.pubsublite.internal.wire.Committer;
21+
import io.grpc.StatusException;
22+
23+
/** A factory for making new PullSubscribers for a given partition of a subscription. */
24+
interface CommitterFactory {
25+
Committer newCommitter(Partition partition) throws StatusException;
26+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.kafka;
18+
19+
interface ConsumerFactory {
20+
SingleSubscriptionConsumer newConsumer();
21+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.kafka;
18+
19+
import com.google.cloud.pubsublite.internal.ExtractStatus;
20+
import io.grpc.StatusException;
21+
import java.util.concurrent.TimeoutException;
22+
import org.apache.kafka.common.KafkaException;
23+
import org.apache.kafka.common.errors.AuthenticationException;
24+
import org.apache.kafka.common.errors.AuthorizationException;
25+
import org.apache.kafka.common.errors.BrokerNotAvailableException;
26+
import org.apache.kafka.common.errors.InterruptException;
27+
import org.apache.kafka.common.errors.InvalidRequestException;
28+
29+
class KafkaExceptionUtils {
30+
private KafkaExceptionUtils() {}
31+
32+
static KafkaException toKafkaException(StatusException source) {
33+
switch (source.getStatus().getCode()) {
34+
case OK:
35+
throw source.getStatus().asRuntimeException();
36+
case ABORTED:
37+
return new BrokerNotAvailableException("Aborted.", source);
38+
case ALREADY_EXISTS:
39+
return new KafkaException("Already exists.", source);
40+
case CANCELLED:
41+
return new BrokerNotAvailableException("Cancelled.", source);
42+
case DATA_LOSS:
43+
return new KafkaException("Data loss.", source);
44+
case DEADLINE_EXCEEDED:
45+
return new BrokerNotAvailableException("Deadline exceeded.", source);
46+
case FAILED_PRECONDITION:
47+
return new InvalidRequestException("Failed precondition.", source);
48+
case INTERNAL:
49+
return new BrokerNotAvailableException("Internal.", source);
50+
case INVALID_ARGUMENT:
51+
return new InvalidRequestException("Invalid argument.", source);
52+
case NOT_FOUND:
53+
return new KafkaException("Not found.", source);
54+
case OUT_OF_RANGE:
55+
return new KafkaException("Out of range.", source);
56+
case PERMISSION_DENIED:
57+
return new AuthorizationException("Permission denied.", source);
58+
case RESOURCE_EXHAUSTED:
59+
return new KafkaException("Resource exhausted.", source);
60+
case UNAUTHENTICATED:
61+
return new AuthenticationException("Unauthenticated.", source);
62+
case UNAVAILABLE:
63+
return new BrokerNotAvailableException("Unavailable.", source);
64+
case UNIMPLEMENTED:
65+
return new KafkaException("Unimplemented.", source);
66+
case UNKNOWN:
67+
return new KafkaException("Unknown.", source);
68+
}
69+
return new KafkaException("No case.", source);
70+
}
71+
72+
/**
73+
* Transform an exception into a kind that is likely to be thrown through kafka interfaces.
74+
*
75+
* @param t A throwable to transform.
76+
* @return The transformed exception suitable for re-throwing.
77+
*/
78+
static RuntimeException toKafka(Throwable t) {
79+
try {
80+
throw t;
81+
} catch (KafkaException | UnsupportedOperationException | IllegalStateException e) {
82+
return e;
83+
} catch (InterruptedException e) {
84+
return new InterruptException(e);
85+
} catch (TimeoutException e) {
86+
return new org.apache.kafka.common.errors.TimeoutException(e);
87+
} catch (Throwable e) {
88+
return KafkaExceptionUtils.toKafkaException(ExtractStatus.toCanonical(t));
89+
}
90+
}
91+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.kafka;
18+
19+
import com.google.common.collect.ImmutableList;
20+
import com.google.common.collect.ImmutableListMultimap;
21+
import com.google.common.collect.Iterables;
22+
import com.google.common.collect.Iterators;
23+
import com.google.protobuf.ByteString;
24+
import java.util.Collection;
25+
import java.util.Iterator;
26+
import java.util.List;
27+
import org.apache.kafka.common.header.Header;
28+
import org.apache.kafka.common.header.Headers;
29+
30+
class LiteHeaders implements Headers {
31+
private ImmutableListMultimap<String, ByteString> attributes;
32+
33+
LiteHeaders(ImmutableListMultimap<String, ByteString> attributes) {
34+
this.attributes = attributes;
35+
}
36+
37+
static Header toHeader(String key, ByteString value) {
38+
return new Header() {
39+
@Override
40+
public String key() {
41+
return key;
42+
}
43+
44+
@Override
45+
public byte[] value() {
46+
return value.toByteArray();
47+
}
48+
};
49+
}
50+
51+
private static List<Header> toHeaders(String key, Collection<ByteString> values) {
52+
ImmutableList.Builder<Header> headersBuilder = ImmutableList.builder();
53+
values.forEach(value -> headersBuilder.add(toHeader(key, value)));
54+
return headersBuilder.build();
55+
}
56+
57+
@Override
58+
public Headers add(Header header) throws IllegalStateException {
59+
throw new IllegalStateException();
60+
}
61+
62+
@Override
63+
public Headers add(String s, byte[] bytes) throws IllegalStateException {
64+
throw new IllegalStateException();
65+
}
66+
67+
@Override
68+
public Headers remove(String s) throws IllegalStateException {
69+
throw new IllegalStateException();
70+
}
71+
72+
@Override
73+
public Header lastHeader(String s) {
74+
return Iterables.getLast(this);
75+
}
76+
77+
@Override
78+
public Iterable<Header> headers(String s) {
79+
if (attributes.containsKey(s))
80+
return Iterables.transform(attributes.get(s), value -> toHeader(s, value));
81+
return ImmutableList.of();
82+
}
83+
84+
@Override
85+
public Header[] toArray() {
86+
ImmutableList.Builder<Header> arrayBuilder = ImmutableList.builder();
87+
attributes
88+
.entries()
89+
.forEach(entry -> arrayBuilder.add(toHeader(entry.getKey(), entry.getValue())));
90+
return (Header[]) arrayBuilder.build().toArray();
91+
}
92+
93+
@Override
94+
public Iterator<Header> iterator() {
95+
return Iterators.transform(
96+
attributes.entries().iterator(), entry -> toHeader(entry.getKey(), entry.getValue()));
97+
}
98+
}

0 commit comments

Comments
 (0)