Skip to content

Commit

Permalink
feat: fix offsets type check
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangmichaellll committed Jan 19, 2021
1 parent 54dc6ae commit 0571f75
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ public PslContinuousInputPartition(
@Override
public InputPartitionReader<InternalRow> createContinuousReader(PartitionOffset offset) {
checkArgument(
SparkPartitionOffset.class.isAssignableFrom(offset.getClass()),
"offset is not assignable to SparkPartitionOffset");
offset instanceof SparkPartitionOffset, "offset is not instance of SparkPartitionOffset");

SparkPartitionOffset sparkPartitionOffset = (SparkPartitionOffset) offset;
PslPartitionOffset pslPartitionOffset =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ public PslContinuousReader(

@Override
public Offset mergeOffsets(PartitionOffset[] offsets) {
checkArgument(
SparkPartitionOffset.class.isAssignableFrom(offsets.getClass().getComponentType()),
"PartitionOffset object is not assignable to SparkPartitionOffset.");
return SparkSourceOffset.merge(
Arrays.copyOf(offsets, offsets.length, SparkPartitionOffset[].class));
}
Expand All @@ -83,8 +80,8 @@ public Offset getStartOffset() {
public void setStartOffset(Optional<Offset> start) {
if (start.isPresent()) {
checkArgument(
SparkSourceOffset.class.isAssignableFrom(start.get().getClass()),
"start offset is not assignable to PslSourceOffset.");
start.get() instanceof SparkSourceOffset,
"start offset is not instance of SparkSourceOffset.");
startOffset = (SparkSourceOffset) start.get();
return;
}
Expand All @@ -95,8 +92,7 @@ public void setStartOffset(Optional<Offset> start) {
@Override
public void commit(Offset end) {
checkArgument(
SparkSourceOffset.class.isAssignableFrom(end.getClass()),
"end offset is not assignable to SparkSourceOffset.");
end instanceof SparkSourceOffset, "end offset is not assignable to SparkSourceOffset.");
committer.commit(PslSparkUtils.toPslSourceOffset((SparkSourceOffset) end));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,17 @@ public PslMicroBatchReader(
public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
if (start.isPresent()) {
checkArgument(
SparkSourceOffset.class.isAssignableFrom(start.get().getClass()),
"start offset is not assignable to PslSourceOffset.");
start.get() instanceof SparkSourceOffset,
"start offset is not instance of SparkSourceOffset.");
startOffset = (SparkSourceOffset) start.get();
} else {
startOffset =
PslSparkUtils.getSparkStartOffset(cursorClient, subscriptionPath, topicPartitionCount);
}
if (end.isPresent()) {
checkArgument(
SparkSourceOffset.class.isAssignableFrom(end.get().getClass()),
"start offset is not assignable to PslSourceOffset.");
end.get() instanceof SparkSourceOffset,
"end offset is not instance of SparkSourceOffset.");
endOffset = (SparkSourceOffset) end.get();
} else {
endOffset = PslSparkUtils.toSparkSourceOffset(headOffsetReader.getHeadOffset());
Expand All @@ -101,8 +101,7 @@ public Offset deserializeOffset(String json) {
@Override
public void commit(Offset end) {
checkArgument(
SparkSourceOffset.class.isAssignableFrom(end.getClass()),
"end offset is not assignable to SparkSourceOffset.");
end instanceof SparkSourceOffset, "end offset is not instance of SparkSourceOffset.");
committer.commit(PslSparkUtils.toPslSourceOffset((SparkSourceOffset) end));
}

Expand Down

0 comments on commit 0571f75

Please sign in to comment.