1818
1919import static com .google .common .base .Preconditions .checkArgument ;
2020
21- import com .fasterxml .jackson .core .JsonProcessingException ;
22- import com .fasterxml .jackson .databind .ObjectMapper ;
23- import com .fasterxml .jackson .databind .SerializationFeature ;
2421import com .google .cloud .pubsublite .Partition ;
2522import com .google .common .collect .ImmutableList ;
2623import com .google .common .collect .ImmutableMap ;
27- import java .io .IOException ;
24+ import com .google .gson .Gson ;
25+ import com .google .gson .reflect .TypeToken ;
2826import java .util .Collections ;
2927import java .util .HashMap ;
3028import java .util .Map ;
3129import java .util .Objects ;
30+ import java .util .TreeMap ;
3231import java .util .stream .Collectors ;
3332
3433public final class SparkSourceOffset
3534 extends org .apache .spark .sql .sources .v2 .reader .streaming .Offset {
36- private static final ObjectMapper objectMapper =
37- new ObjectMapper ().configure (SerializationFeature .ORDER_MAP_ENTRIES_BY_KEYS , true );
35+ private static final Gson gson = new Gson ();
3836
3937 // Using a map to ensure unique partitions.
4038 private final ImmutableMap <Partition , SparkPartitionOffset > partitionOffsetMap ;
@@ -79,26 +77,17 @@ public static SparkSourceOffset merge(SparkPartitionOffset[] offsets) {
7977 return new SparkSourceOffset (map );
8078 }
8179
82- @ SuppressWarnings ("unchecked" )
8380 public static SparkSourceOffset fromJson (String json ) {
84- Map <String , Number > map ;
85- try {
86- // TODO: Use TypeReference instead of Map.class, currently TypeReference breaks spark with
87- // java.lang.LinkageError: loader constraint violation: loader previously initiated loading
88- // for a different type.
89- map = objectMapper .readValue (json , Map .class );
90- } catch (IOException e ) {
91- throw new IllegalStateException ("Unable to deserialize PslSourceOffset." , e );
92- }
81+ Map <Long , Long > map = gson .fromJson (json , new TypeToken <Map <Long , Long >>() {}.getType ());
9382 Map <Partition , SparkPartitionOffset > partitionOffsetMap =
9483 map .entrySet ().stream ()
9584 .collect (
9685 Collectors .toMap (
97- e -> Partition .of (Long . parseLong ( e .getKey () )),
86+ e -> Partition .of (e .getKey ()),
9887 e ->
9988 SparkPartitionOffset .builder ()
100- .partition (Partition .of (Long . parseLong ( e .getKey () )))
101- .offset (e .getValue (). longValue () )
89+ .partition (Partition .of (e .getKey ()))
90+ .offset (e .getValue ())
10291 .build ()));
10392 return new SparkSourceOffset (partitionOffsetMap );
10493 }
@@ -109,13 +98,9 @@ public Map<Partition, SparkPartitionOffset> getPartitionOffsetMap() {
10998
11099 @ Override
111100 public String json () {
112- try {
113- Map <Long , Long > map =
114- partitionOffsetMap .entrySet ().stream ()
115- .collect (Collectors .toMap (e -> e .getKey ().value (), e -> e .getValue ().offset ()));
116- return objectMapper .writeValueAsString (map );
117- } catch (JsonProcessingException e ) {
118- throw new IllegalStateException ("Unable to serialize PslSourceOffset." , e );
119- }
101+ Map <Long , Long > map =
102+ partitionOffsetMap .entrySet ().stream ()
103+ .collect (Collectors .toMap (e -> e .getKey ().value (), e -> e .getValue ().offset ()));
104+ return gson .toJson (new TreeMap <>(map ));
120105 }
121106}
0 commit comments