Skip to content

Commit

Permalink
feat: Separates Offsets from different context (Spark vs PSL) (#404)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangmichaellll committed Dec 9, 2020
1 parent 8f26f54 commit b745f58
Show file tree
Hide file tree
Showing 8 changed files with 394 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import java.io.Serializable;
import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;

@AutoValue
public abstract class PslPartitionOffset implements PartitionOffset, Serializable {
private static final long serialVersionUID = -3398208694782540866L;
public abstract class PslPartitionOffset implements Serializable {
private static final long serialVersionUID = -5446439851334065339L;

public abstract Partition partition();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,72 +16,25 @@

package com.google.cloud.pubsublite.spark;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

public final class PslSourceOffset extends org.apache.spark.sql.sources.v2.reader.streaming.Offset {
private static final ObjectMapper objectMapper =
new ObjectMapper().configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
@AutoValue
public abstract class PslSourceOffset {

private final ImmutableMap<Partition, Offset> partitionOffsetMap;
public abstract Map<Partition, Offset> partitionOffsetMap();

public PslSourceOffset(Map<Partition, Offset> map) {
this.partitionOffsetMap = ImmutableMap.copyOf(map);
public static Builder builder() {
return new AutoValue_PslSourceOffset.Builder();
}

public static PslSourceOffset merge(PslSourceOffset o1, PslSourceOffset o2) {
Map<Partition, Offset> result = new HashMap<>(o1.partitionOffsetMap);
o2.partitionOffsetMap.forEach(
(k, v) -> result.merge(k, v, (v1, v2) -> Collections.max(ImmutableList.of(v1, v2))));
return new PslSourceOffset(result);
}

public static PslSourceOffset merge(PslPartitionOffset[] offsets) {
Map<Partition, Offset> map = new HashMap<>();
for (PslPartitionOffset po : offsets) {
assert !map.containsKey(po.partition()) : "Multiple PslPartitionOffset has same partition.";
map.put(po.partition(), po.offset());
}
return new PslSourceOffset(map);
}
@AutoValue.Builder
public abstract static class Builder {

public static PslSourceOffset fromJson(String json) {
Map<Long, Long> map;
try {
map = objectMapper.readValue(json, new TypeReference<Map<Long, Long>>() {});
} catch (IOException e) {
throw new IllegalStateException("Unable to deserialize PslSourceOffset.", e);
}
Map<Partition, Offset> partitionOffsetMap =
map.entrySet().stream()
.collect(Collectors.toMap(e -> Partition.of(e.getKey()), e -> Offset.of(e.getValue())));
return new PslSourceOffset(partitionOffsetMap);
}

public Map<Partition, Offset> getPartitionOffsetMap() {
return this.partitionOffsetMap;
}
public abstract Builder partitionOffsetMap(Map<Partition, Offset> partitionOffsetMap);

@Override
public String json() {
try {
Map<Long, Long> map =
partitionOffsetMap.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().value(), e -> e.getValue().value()));
return objectMapper.writeValueAsString(map);
} catch (JsonProcessingException e) {
throw new IllegalStateException("Unable to serialize PslSourceOffset.", e);
}
public abstract PslSourceOffset build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@

package com.google.cloud.pubsublite.spark;

import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.common.collect.ImmutableList;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.spark.sql.catalyst.InternalRow;

public class PslSparkUtils {
Expand All @@ -37,4 +41,36 @@ public static InternalRow toInternalRow(
msg.message().eventTime(),
msg.message().attributes())));
}

public static SparkSourceOffset toSparkSourceOffset(PslSourceOffset pslSourceOffset) {
return new SparkSourceOffset(
pslSourceOffset.partitionOffsetMap().entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
e ->
SparkPartitionOffset.builder()
.partition(Partition.of(e.getKey().value()))
.offset(e.getValue().value() - 1)
.build())));
}

public static PslSourceOffset toPslSourceOffset(SparkSourceOffset sparkSourceOffset) {
long partitionCount = sparkSourceOffset.getPartitionOffsetMap().size();
Map<Partition, Offset> pslSourceOffsetMap = new HashMap<>();
for (long i = 0; i < partitionCount; i++) {
Partition p = Partition.of(i);
assert sparkSourceOffset.getPartitionOffsetMap().containsKey(p);
pslSourceOffsetMap.put(
p, Offset.of(sparkSourceOffset.getPartitionOffsetMap().get(p).offset() + 1));
}
return PslSourceOffset.builder().partitionOffsetMap(pslSourceOffsetMap).build();
}

public static PslPartitionOffset toPslPartitionOffset(SparkPartitionOffset sparkPartitionOffset) {
return PslPartitionOffset.builder()
.partition(sparkPartitionOffset.partition())
.offset(Offset.of(sparkPartitionOffset.offset() + 1))
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.Partition;
import java.io.Serializable;
import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;

@AutoValue
abstract class SparkPartitionOffset implements PartitionOffset, Serializable {
private static final long serialVersionUID = -3398208694782540866L;

abstract Partition partition();

abstract long offset();

public static Builder builder() {
return new AutoValue_SparkPartitionOffset.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder partition(Partition partition);

public abstract Builder offset(long offset);

public abstract SparkPartitionOffset build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.spark;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.cloud.pubsublite.Partition;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

public final class SparkSourceOffset
extends org.apache.spark.sql.sources.v2.reader.streaming.Offset {
private static final ObjectMapper objectMapper =
new ObjectMapper().configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);

// Using a map to ensure unique partitions.
private final ImmutableMap<Partition, SparkPartitionOffset> partitionOffsetMap;

public SparkSourceOffset(Map<Partition, SparkPartitionOffset> map) {
validateMap(map);
this.partitionOffsetMap = ImmutableMap.copyOf(map);
}

private static void validateMap(Map<Partition, SparkPartitionOffset> map) {
map.forEach(
(k, v) -> {
assert Objects.equals(k, v.partition())
: "Key(Partition) and value(SparkPartitionOffset)'s partition don't match.";
});
}

public static SparkSourceOffset merge(SparkSourceOffset o1, SparkSourceOffset o2) {
Map<Partition, SparkPartitionOffset> result = new HashMap<>(o1.partitionOffsetMap);
o2.partitionOffsetMap.forEach(
(k, v) ->
result.merge(
k,
v,
(v1, v2) ->
SparkPartitionOffset.builder()
.partition(Partition.of(k.value()))
.offset(Collections.max(ImmutableList.of(v1.offset(), v2.offset())))
.build()));
return new SparkSourceOffset(result);
}

public static SparkSourceOffset merge(SparkPartitionOffset[] offsets) {
Map<Partition, SparkPartitionOffset> map = new HashMap<>();
for (SparkPartitionOffset po : offsets) {
assert !map.containsKey(po.partition()) : "Multiple PslPartitionOffset has same partition.";
map.put(
po.partition(),
SparkPartitionOffset.builder().partition(po.partition()).offset(po.offset()).build());
}
return new SparkSourceOffset(map);
}

@SuppressWarnings("unchecked")
public static SparkSourceOffset fromJson(String json) {
Map<String, Number> map;
try {
// TODO: Use TypeReference instead of Map.class, currently TypeReference breaks spark with
// java.lang.LinkageError: loader constraint violation: loader previously initiated loading
// for a different type.
map = objectMapper.readValue(json, Map.class);
} catch (IOException e) {
throw new IllegalStateException("Unable to deserialize PslSourceOffset.", e);
}
Map<Partition, SparkPartitionOffset> partitionOffsetMap =
map.entrySet().stream()
.collect(
Collectors.toMap(
e -> Partition.of(Long.parseLong(e.getKey())),
e ->
SparkPartitionOffset.builder()
.partition(Partition.of(Long.parseLong(e.getKey())))
.offset(e.getValue().longValue())
.build()));
return new SparkSourceOffset(partitionOffsetMap);
}

public Map<Partition, SparkPartitionOffset> getPartitionOffsetMap() {
return this.partitionOffsetMap;
}

@Override
public String json() {
try {
Map<Long, Long> map =
partitionOffsetMap.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().value(), e -> e.getValue().offset()));
return objectMapper.writeValueAsString(map);
} catch (JsonProcessingException e) {
throw new IllegalStateException("Unable to serialize PslSourceOffset.", e);
}
}
}
Loading

0 comments on commit b745f58

Please sign in to comment.