You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Describe the bug
For stateful spark structured Streaming query processing, Used RocksDB state store, and existing state data is serialized by Spark's Encoders.bean serializer, still to have backward compatibility I changed serializer to Encoders.kryo with other registration changes and used CompatibleFieldSerializer but it failed with below error.
`- Provided value schema: StructType(StructField(groupState,StructType(StructField(value,BinaryType,true)),true))
Existing value schema: StructType(StructField(groupState,StructType(StructField(count,LongType,true),StructField(word,StringType,true),StructField(wordLen,LongType,true),StructField(wordsTs,ArrayType(LongType,true),true)),true))
If you want to force running query without schema validation, please set spark.sql.streaming.stateStore.stateSchemaCheck to false.
Please note running query with incompatible schema could cause indeterministic behavior.`
To Reproduce
register CompatibleFieldSerializer serializer as mentioned below.
`public class CustomKryoRegistrator implements KryoRegistrator{
@Override
public void registerClasses(Kryo kryo) {
//kryo.register(StateInfo.class, new StateInfoSerdes()); // custom serdes
kryo.register(StateInfo.class, new CompatibleFieldSerializer<>(kryo, StateInfo.class)); // inbuild impl provided
kryo.register(java.util.Collections.emptyList().getClass());
kryo.register(java.util.HashMap.class);
kryo.register(java.util.LinkedHashMap.class);
}
}`
Create StateInfo class for state data with below fields. private String word; private Long count; private List<Long> wordsTs; private Long wordLen; private Map<String, LinkedHashMap<String, String>> hashSrcEvts;
write a sample world count spark streaming query
`public class SparkStreamingWordCount {
public static void start() throws StreamingQueryException {
SparkSession spark = createSparkSession();
List<String> values = Arrays.asList("29--2141741193=1713852879594","48--2141741193=1713853467453");
LinkedHashMap<String,String> linkedHashMap = new LinkedHashMap<String, String>();
values.forEach(v -> linkedHashMap.put(v.split("=")[0], v.split("=")[1]));
// Define the function to update state and compute word count
MapGroupsWithStateFunction<String, String, StateInfo, StateOutput> stateAggregationFunc =
new MapGroupsWithStateFunction<String, String, StateInfo, StateOutput>() {
private static final long serialVersionUID = 1L;
private StateInfo state;
@Override
public StateOutput call(String key, Iterator<String> values, GroupState<StateInfo> oldState) throws Exception {
state = oldState.exists() ? oldState.get() : new StateInfo(key, 0L, Collections.emptyList(), 0L, Collections.emptyMap());
//state = oldState.exists() ? oldState.get() : new StateInfo(key, 0L,Collections.emptyList(), 0L);
while (values.hasNext()) {
String wordKey = values.next().toString();
state = oldState.exists() ? oldState.get() : new StateInfo(key, 0L, Collections.emptyList(), 0L, Collections.emptyMap());
//state = oldState.exists() ? oldState.get() : new StateInfo(key, 0L, Collections.emptyList(), 0L);
StateInfo updatedState= new StateInfo(key, state.getCount()+1, Collections.emptyList(), Long.valueOf(key.length()), populateHashSrcEvts(key, state.getHashSrcEvts()));
//StateInfo updatedState= new StateInfo(key, state.getCount()+1, Collections.emptyList(), Long.valueOf(key.length()));
oldState.update(updatedState);
}
return new StateOutput(key, state.getCount());
}
private Map<String, LinkedHashMap<String, String>> populateHashSrcEvts(String key, Map<String, LinkedHashMap<String, String>> map) {
Map<String, LinkedHashMap<String, String>> map2 = new HashMap<String, LinkedHashMap<String, String>>();
map2.put(key, linkedHashMap);
return map2;
}
};
Dataset<Row> kafkaStream = spark
.readStream()
.format("kafka")
.options(getKafkaParams())
.option("subscribe", "topic1")
.option("maxOffsetsPerTrigger", 1000)
.option("checkpointLocation", "/Users/amol/rocksdb/")
.load();
Dataset<String> stringStream = kafkaStream.selectExpr("CAST(value AS STRING)").as(Encoders.STRING());
Dataset<StateOutput> wordCounts = null;
try {
// Generate running word count
wordCounts = stringStream
.groupByKey((MapFunction<String, String>) value -> value, Encoders.STRING())
.mapGroupsWithState(
stateAggregationFunc,
Encoders.kryo(StateInfo.class),
Encoders.bean(StateOutput.class));
}catch (Exception e) {
System.out.println(e.getStackTrace());
System.exit(1);
}
// Start running the query that prints the running counts to the console
StreamingQuery query = null;
try {
query = wordCounts.writeStream()
.outputMode("update")
.format("console")
.option("checkpointLocation", "/Users/amol/rocksdb/")
.start();
System.out.println("spark streaming query started...");
} catch (TimeoutException e) {
// TODO Auto-generated catch block
}
query.awaitTermination();
System.out.println("spark streaming query stoped...");
}
private static SparkSession createSparkSession() {
SparkSession spark = SparkSession
.builder()
.appName("SparkStreamingWordCount")
.master("local[*]")
.getOrCreate();
spark.sparkContext().conf().set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider");
spark.sparkContext().conf().set("spark.sql.streaming.stateStore.rocksdb.trackTotalNumberOfRows", "false");
spark.sparkContext().conf().set("spark.sql.streaming.stateStore.stateSchemaCheck", "false");
spark.sparkContext().conf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
spark.sparkContext().conf().set("spark.kryo.registrationRequired", "true");
spark.sparkContext().conf().set("spark.kryo.classesToRegister", "com.test.rockdb.StateInfo");
spark.sparkContext().conf().set("spark.kryo.referenceTracking", "false");
spark.sparkContext().conf().set("spark.kryo.registrator", CustomKryoRegistrator.class.getName());
System.out.println("spark session created");
return spark;
}
private static Map<String, String> getKafkaParams() {
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("kafka.bootstrap.servers", "localhost:9092");
kafkaParams.put("group.id", "consumer-group");
return kafkaParams;
}
}`
Now to reproduce the issue
create a state with bean serializer,
Change the StateInfo class to add new field like:private Map<String, LinkedHashMap<String, String>> hashSrcEvts;
change the serializer to kryo
generate event and try to update the state using kryo serializer.
Environment:
OS: [e.g. Mac OS eclipse]
JDK Version: [e.g. 1.8]
Kryo Version: [e.g. 4.0.2]
Additional context
This issue is while evolution of rock db state structure, in the spark structure streaming, where existing rocks DB state data is serialized by bean serializer but to change schema(add/remove field in StateInfo) using kryo's CompatibleFieldSerializer.
The text was updated successfully, but these errors were encountered:
Thanks for the report @ams45. Can you try to reproduce this using Kryo alone? At the moment the setup is much too complicated for me to look into.
The best approach would be to write a simple unit test without dependencies that reproduces your exact Kryo setup. You can serialize the original StateInfo class and then deserialize it into a StateInfoNew class with the new field.
Something like this but with the exact Kryo config:
Describe the bug
For stateful spark structured Streaming query processing, Used RocksDB state store, and existing state data is serialized by Spark's Encoders.bean serializer, still to have backward compatibility I changed serializer to Encoders.kryo with other registration changes and used CompatibleFieldSerializer but it failed with below error.
`- Provided value schema: StructType(StructField(groupState,StructType(StructField(value,BinaryType,true)),true))
If you want to force running query without schema validation, please set spark.sql.streaming.stateStore.stateSchemaCheck to false.
Please note running query with incompatible schema could cause indeterministic behavior.`
To Reproduce
register CompatibleFieldSerializer serializer as mentioned below.
`public class CustomKryoRegistrator implements KryoRegistrator{
}`
Create StateInfo class for state data with below fields.
private String word; private Long count; private List<Long> wordsTs; private Long wordLen; private Map<String, LinkedHashMap<String, String>> hashSrcEvts;
write a sample world count spark streaming query
`public class SparkStreamingWordCount {
}`
Now to reproduce the issue
private Map<String, LinkedHashMap<String, String>> hashSrcEvts;
Environment:
Additional context
This issue is while evolution of rock db state structure, in the spark structure streaming, where existing rocks DB state data is serialized by bean serializer but to change schema(add/remove field in StateInfo) using kryo's CompatibleFieldSerializer.
The text was updated successfully, but these errors were encountered: