package com.unum.ksql.udaf; import io.confluent.ksql.function.udaf.Udaf; import io.confluent.ksql.function.udaf.UdafDescription; import io.confluent.ksql.function.udaf.UdafFactory; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @UdafDescription(description = "This accepts a struct and returns an array of struct as function union All", name = "UDAFSTLIST") public class UdafStCollectList { private UdafStCollectList() { } @UdafFactory(description = "Input a struct of order and returns list of order schema", paramSchema = "STRUCT", aggregateSchema = "ARRAY>", returnSchema = " ARRAY>") public static Udaf, List> listCollector1() { final Schema mapSchema = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).optional().build(); final Schema arrayMapSchema = SchemaBuilder.array(mapSchema).optional().build(); final Schema arraySchem=SchemaBuilder.array(Schema.STRING_SCHEMA).build(); final Schema finalSchema = SchemaBuilder.struct().optional() .field("ORDERID", Schema.STRING_SCHEMA). field("CUSTOMERID", Schema.STRING_SCHEMA) // .field("CUSTOEMRORDER", arraySchem) //.field("TESTORDER", mapSchema) .build(); final Schema arraySchema = SchemaBuilder.array(finalSchema).optional().build(); final Schema arrayFinalSchema = SchemaBuilder.struct().field("ORDERS", arraySchema).optional().build(); return new Udaf, List>() { @Override public List initialize() { Struct st = new Struct(finalSchema); List aList = new ArrayList<>(); aList.add(st); return aList; } @Override public List aggregate(Struct struct, List structs) { Struct st = new Struct(finalSchema); List fields = struct.schema().fields(); List fieldName = new ArrayList<>(); for (int i = 0; i < fields.size(); i++) { fieldName.add(fields.get(i).name()); } for (int i = 0; i < fieldName.size(); i++) { st.put(fieldName.get(i), struct.get(fieldName.get(i))); } //List stList=new ArrayList<>(); structs.add(st); return structs; } @Override public List merge(List structs, List a1) { for (int k = 0; k < a1.size(); k++) { Struct struct = a1.get(k); Struct st = new Struct(finalSchema); List fields = struct.schema().fields(); List fieldName = new ArrayList<>(); for (int i = 0; i < fields.size(); i++) { fieldName.add(fields.get(i).name()); } for (int i = 0; i < fieldName.size(); i++) { st.put(fieldName.get(i), struct.get(fieldName.get(i))); } structs.add(st); } return structs; } @Override public List map(List structs) { // Struct finalStruct = new Struct(arrayFinalSchema); // finalStruct.put("ORDERS", structs); return structs; } }; } }