This issue was moved to a discussion.
You can continue the conversation there. Go to discussion →
Pulsar function Should support Generic output Message #9892
Labels
You can continue the conversation there. Go to discussion →
Is your feature request related to a problem? Please describe.
In the pulsar function, I don't want to define a particular class as O in
public interface Function<I, O> { O process(I var1, Context var2) throws Exception; }
because I have a POJO class, but I always want to push a wrap class to wrap the POJO class as output. because the only different thing between this wrap class and the POJO class is wrap class always adds a JSON field to any of POJO class.
We can prodcue a Generic message with pulsar.client.api:
RecordSchemaBuilder recordSchemaBuilder = SchemaBuilder.record(USER_CONST.SCHEMA_NAME); recordSchemaBuilder.field(USER_CONST.AGE).type(SchemaType.INT32); recordSchemaBuilder.field(USER_CONST.NAME).type(SchemaType.STRING); recordSchemaBuilder.field(USER_CONST.JSON_FOR_EVENT).type(SchemaType.STRING); SchemaInfo schemaInfo = recordSchemaBuilder.build(SchemaType.AVRO); GenericSchema userSchema = Schema.generic(schemaInfo); Producer<GenericRecord> producer = pulsarClient.newProducer( userSchema) .topic(TOPIC_CONST.TOPIC_FUN_GENERIC) .create(); User user = new User(); user.age = 27; user.name = "Simon Generic"; Gson gson = new Gson(); String jsonForEvent = gson.toJson(user); GenericRecordBuilder genericRecordBuilder = userSchema.newRecordBuilder(); GenericRecord genericRecord = genericRecordBuilder .set(USER_CONST.AGE, user.age) .set(USER_CONST.NAME,user.name) .set(USER_CONST.JSON_FOR_EVENT, jsonForEvent) .build();
but, I am trying many ways to produce dynamic object into a pulsar, just like:
public class FunWrapGeneric implements Function<String, GenericRecord> { @Override public GenericRecord process(String input, Context context) throws Exception {
but I always got failed. Can we support this?
Describe the solution you'd like
Add GenericRecord support for pulsar function
public class FunWrapGeneric implements Function<String, GenericRecord>
or Add a generic Class
public class FunWrapGeneric implements Function<String, RecordWrap<T>> { @Override public RecordWrap<T> process(String input, Context context) throws Exception {
and interface RecordWrap
support add or reduce fields from T.
or Add an override of pulsar function want a Generic class as a parameter of pulsar function
FunWrapGeneric implements Function<String,Schema>
cc @sijie @codelipenghui
The text was updated successfully, but these errors were encountered: