Skip to content

Commit

Permalink
feat: Changed assert to Preconditions check (#457)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangmichaellll committed Jan 14, 2021
1 parent d1c2a24 commit f6f1bc4
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.pubsublite.spark;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl;
Expand Down Expand Up @@ -50,8 +52,9 @@ public PslContinuousInputPartition(

@Override
public InputPartitionReader<InternalRow> createContinuousReader(PartitionOffset offset) {
assert SparkPartitionOffset.class.isAssignableFrom(offset.getClass())
: "offset is not assignable to SparkPartitionOffset";
checkArgument(
SparkPartitionOffset.class.isAssignableFrom(offset.getClass()),
"offset is not assignable to SparkPartitionOffset");

SparkPartitionOffset sparkPartitionOffset = (SparkPartitionOffset) offset;
PslPartitionOffset pslPartitionOffset =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.pubsublite.spark;

import static com.google.common.base.Preconditions.checkState;

import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl;
Expand Down Expand Up @@ -56,7 +58,7 @@ public boolean next() {
// since next() will not be called concurrently, we are sure that the message
// is available to this thread.
Optional<SequencedMessage> msg = subscriber.messageIfAvailable();
assert msg.isPresent();
checkState(msg.isPresent());
currentMsg = msg.get();
currentOffset =
SparkPartitionOffset.builder()
Expand All @@ -71,7 +73,7 @@ public boolean next() {

@Override
public InternalRow get() {
assert currentMsg != null;
checkState(currentMsg != null);
return PslSparkUtils.toInternalRow(currentMsg, subscriptionPath, currentOffset.partition());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.pubsublite.spark;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.CursorClient;
Expand Down Expand Up @@ -60,8 +62,9 @@ public PslContinuousReader(

@Override
public Offset mergeOffsets(PartitionOffset[] offsets) {
assert SparkPartitionOffset.class.isAssignableFrom(offsets.getClass().getComponentType())
: "PartitionOffset object is not assignable to SparkPartitionOffset.";
checkArgument(
SparkPartitionOffset.class.isAssignableFrom(offsets.getClass().getComponentType()),
"PartitionOffset object is not assignable to SparkPartitionOffset.");
return SparkSourceOffset.merge(
Arrays.copyOf(offsets, offsets.length, SparkPartitionOffset[].class));
}
Expand All @@ -79,8 +82,9 @@ public Offset getStartOffset() {
@Override
public void setStartOffset(Optional<Offset> start) {
if (start.isPresent()) {
assert SparkSourceOffset.class.isAssignableFrom(start.get().getClass())
: "start offset is not assignable to PslSourceOffset.";
checkArgument(
SparkSourceOffset.class.isAssignableFrom(start.get().getClass()),
"start offset is not assignable to PslSourceOffset.");
startOffset = (SparkSourceOffset) start.get();
return;
}
Expand All @@ -90,8 +94,9 @@ public void setStartOffset(Optional<Offset> start) {

@Override
public void commit(Offset end) {
assert SparkSourceOffset.class.isAssignableFrom(end.getClass())
: "end offset is not assignable to SparkSourceOffset.";
checkArgument(
SparkSourceOffset.class.isAssignableFrom(end.getClass()),
"end offset is not assignable to SparkSourceOffset.");
committer.commit(PslSparkUtils.toPslSourceOffset((SparkSourceOffset) end));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.pubsublite.spark;

import static com.google.common.base.Preconditions.checkState;

import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriber;
Expand Down Expand Up @@ -69,7 +71,7 @@ public boolean next() {
}
// since next() is only called on one thread at a time, we are sure that the message is
// available to this thread.
assert msg.isPresent();
checkState(msg.isPresent());
currentMsg = msg.get();
if (currentMsg.offset().value() == endOffset.offset()) {
// this is the last msg for the batch.
Expand All @@ -83,7 +85,7 @@ public boolean next() {

@Override
public InternalRow get() {
assert currentMsg != null;
checkState(currentMsg != null);
return PslSparkUtils.toInternalRow(currentMsg, subscriptionPath, endOffset.partition());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

package com.google.cloud.pubsublite.spark;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;

import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.spark.sql.catalyst.InternalRow;
Expand Down Expand Up @@ -63,16 +65,18 @@ public PslMicroBatchReader(
@Override
public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
if (start.isPresent()) {
assert SparkSourceOffset.class.isAssignableFrom(start.get().getClass())
: "start offset is not assignable to PslSourceOffset.";
checkArgument(
SparkSourceOffset.class.isAssignableFrom(start.get().getClass()),
"start offset is not assignable to PslSourceOffset.");
startOffset = (SparkSourceOffset) start.get();
} else {
startOffset =
PslSparkUtils.getSparkStartOffset(cursorClient, subscriptionPath, topicPartitionCount);
}
if (end.isPresent()) {
assert SparkSourceOffset.class.isAssignableFrom(end.get().getClass())
: "start offset is not assignable to PslSourceOffset.";
checkArgument(
SparkSourceOffset.class.isAssignableFrom(end.get().getClass()),
"start offset is not assignable to PslSourceOffset.");
endOffset = (SparkSourceOffset) end.get();
} else {
endOffset = PslSparkUtils.toSparkSourceOffset(headOffsetReader.getHeadOffset());
Expand All @@ -96,8 +100,9 @@ public Offset deserializeOffset(String json) {

@Override
public void commit(Offset end) {
assert SparkSourceOffset.class.isAssignableFrom(end.getClass())
: "end offset is not assignable to SparkSourceOffset.";
checkArgument(
SparkSourceOffset.class.isAssignableFrom(end.getClass()),
"end offset is not assignable to SparkSourceOffset.");
committer.commit(PslSparkUtils.toPslSourceOffset((SparkSourceOffset) end));
}

Expand All @@ -113,10 +118,9 @@ public StructType readSchema() {

@Override
public List<InputPartition<InternalRow>> planInputPartitions() {
checkState(startOffset != null);
List<InputPartition<InternalRow>> list = new ArrayList<>();

for (SparkPartitionOffset offset :
Objects.requireNonNull(startOffset).getPartitionOffsetMap().values()) {
for (SparkPartitionOffset offset : startOffset.getPartitionOffsetMap().values()) {
SparkPartitionOffset endPartitionOffset =
endOffset.getPartitionOffsetMap().get(offset.partition());
if (offset.equals(endPartitionOffset)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.pubsublite.spark;

import static com.google.common.base.Preconditions.checkArgument;
import static scala.collection.JavaConverters.asScalaBufferConverter;

import com.google.cloud.pubsublite.Offset;
Expand Down Expand Up @@ -99,7 +100,7 @@ public static PslSourceOffset toPslSourceOffset(SparkSourceOffset sparkSourceOff
Map<Partition, Offset> pslSourceOffsetMap = new HashMap<>();
for (long i = 0; i < partitionCount; i++) {
Partition p = Partition.of(i);
assert sparkSourceOffset.getPartitionOffsetMap().containsKey(p);
checkArgument(sparkSourceOffset.getPartitionOffsetMap().containsKey(p));
pslSourceOffsetMap.put(
p, Offset.of(sparkSourceOffset.getPartitionOffsetMap().get(p).offset() + 1));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.pubsublite.spark;

import static com.google.common.base.Preconditions.checkArgument;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
Expand Down Expand Up @@ -44,10 +46,10 @@ public SparkSourceOffset(Map<Partition, SparkPartitionOffset> map) {

private static void validateMap(Map<Partition, SparkPartitionOffset> map) {
map.forEach(
(k, v) -> {
assert Objects.equals(k, v.partition())
: "Key(Partition) and value(SparkPartitionOffset)'s partition don't match.";
});
(k, v) ->
checkArgument(
Objects.equals(k, v.partition()),
"Key(Partition) and value(SparkPartitionOffset)'s partition don't match."));
}

public static SparkSourceOffset merge(SparkSourceOffset o1, SparkSourceOffset o2) {
Expand All @@ -68,7 +70,8 @@ public static SparkSourceOffset merge(SparkSourceOffset o1, SparkSourceOffset o2
public static SparkSourceOffset merge(SparkPartitionOffset[] offsets) {
Map<Partition, SparkPartitionOffset> map = new HashMap<>();
for (SparkPartitionOffset po : offsets) {
assert !map.containsKey(po.partition()) : "Multiple PslPartitionOffset has same partition.";
checkArgument(
!map.containsKey(po.partition()), "Multiple PslPartitionOffset has same partition.");
map.put(
po.partition(),
SparkPartitionOffset.builder().partition(po.partition()).offset(po.offset()).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void mergeSparkPartitionOffsetsDuplicatePartition() {
try {
SparkSourceOffset.merge(offsets);
fail();
} catch (AssertionError e) {
} catch (IllegalArgumentException e) {
assertThat(e).hasMessageThat().contains("same partition");
}
}
Expand Down Expand Up @@ -123,7 +123,7 @@ public void invalidMap() {
Partition.of(3L),
SparkPartitionOffset.builder().partition(Partition.of(2L)).offset(10L).build()));
fail();
} catch (AssertionError e) {
} catch (IllegalArgumentException e) {
assertThat(e).hasMessageThat().contains("don't match");
}
}
Expand Down

0 comments on commit f6f1bc4

Please sign in to comment.