Skip to content

Commit

Permalink
[FLINK-34239][core,table] Add copy() in SerializerConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
kumar-mallikarjuna committed Mar 12, 2024
1 parent 94b55d1 commit 195c8ab
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 38 deletions.
Expand Up @@ -209,4 +209,6 @@ <T extends Serializer<?> & Serializable> void registerTypeWithKryoSerializer(
* @param classLoader a class loader to use when loading classes
*/
void configure(ReadableConfig configuration, ClassLoader classLoader);

SerializerConfigImpl copy();
}
Expand Up @@ -557,4 +557,29 @@ private void registerTypeWithTypeInfoFactory(
public ExecutionConfig getExecutionConfig() {
return executionConfig;
}

@Override
public SerializerConfigImpl copy() {
final SerializerConfigImpl newSerializerConfig = new SerializerConfigImpl();
newSerializerConfig.configure(configuration, this.getClass().getClassLoader());

getRegisteredTypesWithKryoSerializers()
.forEach(
(c, s) ->
newSerializerConfig.registerTypeWithKryoSerializer(
c, s.getSerializer()));
getRegisteredTypesWithKryoSerializerClasses()
.forEach(newSerializerConfig::registerTypeWithKryoSerializer);
getDefaultKryoSerializers()
.forEach(
(c, s) ->
newSerializerConfig.addDefaultKryoSerializer(c, s.getSerializer()));
getDefaultKryoSerializerClasses().forEach(newSerializerConfig::addDefaultKryoSerializer);
getRegisteredKryoTypes().forEach(newSerializerConfig::registerKryoType);
getRegisteredPojoTypes().forEach(newSerializerConfig::registerPojoType);
getRegisteredTypeInfoFactories()
.forEach(newSerializerConfig::registerTypeWithTypeInfoFactory);

return newSerializerConfig;
}
}
Expand Up @@ -131,46 +131,12 @@ public LogicalType createLogicalType(UnresolvedIdentifier identifier) {
private static Supplier<SerializerConfig> createSerializerConfig(
ClassLoader classLoader, ReadableConfig config, SerializerConfig serializerConfig) {
return () -> {
final SerializerConfig newSerializerConfig = new SerializerConfigImpl();

SerializerConfig newSerializerConfig = new SerializerConfigImpl();
if (serializerConfig != null) {
if (serializerConfig.isForceKryoEnabled()) {
newSerializerConfig.setForceKryo(true);
}

if (serializerConfig.isForceAvroEnabled()) {
newSerializerConfig.setForceAvro(true);
}

serializerConfig
.getDefaultKryoSerializers()
.forEach(
(c, s) ->
newSerializerConfig.addDefaultKryoSerializer(
c, s.getSerializer()));

serializerConfig
.getDefaultKryoSerializerClasses()
.forEach(newSerializerConfig::addDefaultKryoSerializer);

serializerConfig
.getRegisteredKryoTypes()
.forEach(newSerializerConfig::registerKryoType);

serializerConfig
.getRegisteredTypesWithKryoSerializerClasses()
.forEach(newSerializerConfig::registerTypeWithKryoSerializer);

serializerConfig
.getRegisteredTypesWithKryoSerializers()
.forEach(
(c, s) ->
newSerializerConfig.registerTypeWithKryoSerializer(
c, s.getSerializer()));
newSerializerConfig = serializerConfig.copy();
} else {
newSerializerConfig.configure(config, classLoader);
}

newSerializerConfig.configure(config, classLoader);

return newSerializerConfig;
};
}
Expand Down

0 comments on commit 195c8ab

Please sign in to comment.