Skip to content
Permalink
Browse files
[hotfix][python][connector/pulsar] Improve PulsarDeserializationSchem…
…a.flink_type_info set execution_config default to None
  • Loading branch information
dianfu committed May 21, 2022
1 parent 26deac8 commit e7d004d81c01a19e7d0d9736b6e9638404324e08
Showing 2 changed files with 6 additions and 5 deletions.
@@ -52,16 +52,17 @@ def flink_schema(deserialization_schema: DeserializationSchema) \
return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)

@staticmethod
def flink_type_info(type_information: TypeInformation, execution_config: ExecutionConfig) \
-> 'PulsarDeserializationSchema':
def flink_type_info(type_information: TypeInformation,
execution_config: ExecutionConfig = None) -> 'PulsarDeserializationSchema':
"""
Create a PulsarDeserializationSchema by using the given TypeInformation. This method is
only used for treating message that was written into pulsar by TypeInformation.
"""
JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \
.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema
JExecutionConfig = get_gateway().jvm.org.apache.flink.api.common.ExecutionConfig
_j_execution_config = execution_config._j_execution_config \
if execution_config is not None else None
if execution_config is not None else JExecutionConfig()
_j_pulsar_deserialization_schema = JPulsarDeserializationSchema.flinkTypeInfo(
type_information.get_java_type_info(), _j_execution_config)
return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)
@@ -183,7 +183,7 @@ def test_pulsar_source(self):
.set_subscription_name('ff') \
.set_subscription_type(SubscriptionType.Exclusive) \
.set_deserialization_schema(
PulsarDeserializationSchema.flink_type_info(Types.STRING(), None)) \
PulsarDeserializationSchema.flink_type_info(Types.STRING())) \
.set_deserialization_schema(
PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \
.set_config(TEST_OPTION_NAME, True) \
@@ -256,7 +256,7 @@ def test_source_deprecated_method(self):
.set_admin_url('http://localhost:8080') \
.set_topics_pattern('ada.*') \
.set_deserialization_schema(
PulsarDeserializationSchema.flink_type_info(Types.STRING(), None)) \
PulsarDeserializationSchema.flink_type_info(Types.STRING())) \
.set_subscription_name('ff') \
.set_config(test_option, True) \
.set_config_with_dict({'pulsar.source.autoCommitCursorInterval': '1000'}) \

0 comments on commit e7d004d

Please sign in to comment.