Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is needed to convert MessageExt body to String in WorkerSinkTask convertToSinkDataEntry method. #799

Open
lizhiboo opened this issue Aug 29, 2021 · 0 comments
Labels

Comments

@lizhiboo
Copy link
Contributor

    private SinkDataEntry convertToSinkDataEntry(MessageExt message) {
        Map<String, String> properties = message.getProperties();
        String queueName;
        EntryType entryType;
        Schema schema;
        Long timestamp;
        Object[] datas = new Object[1];
        if (null == recordConverter || recordConverter instanceof RocketMQConverter) {
            queueName = properties.get(RuntimeConfigDefine.CONNECT_TOPICNAME);
            String connectEntryType = properties.get(RuntimeConfigDefine.CONNECT_ENTRYTYPE);
            entryType = StringUtils.isNotEmpty(connectEntryType) ? EntryType.valueOf(connectEntryType) : null;
            String connectTimestamp = properties.get(RuntimeConfigDefine.CONNECT_TIMESTAMP);
            timestamp = StringUtils.isNotEmpty(connectTimestamp) ? Long.valueOf(connectTimestamp) : null;
            String connectSchema = properties.get(RuntimeConfigDefine.CONNECT_SCHEMA);
            schema = StringUtils.isNotEmpty(connectSchema) ? JSON.parseObject(connectSchema, Schema.class) : null;
            datas = new Object[1];
            datas[0] = new String(message.getBody());
        } else {
            final byte[] messageBody = message.getBody();
            final SourceDataEntry sourceDataEntry = JSON.parseObject(new String(messageBody), SourceDataEntry.class);
            final Object[] payload = sourceDataEntry.getPayload();
            final byte[] decodeBytes = Base64.getDecoder().decode((String) payload[0]);
            Object recodeObject;
            if (recordConverter instanceof JsonConverter) {
                JsonConverter jsonConverter = (JsonConverter) recordConverter;
                jsonConverter.setClazz(Object[].class);
                recodeObject = recordConverter.byteToObject(decodeBytes);
                datas = (Object[]) recodeObject;
            }
            schema = sourceDataEntry.getSchema();
            entryType = sourceDataEntry.getEntryType();
            queueName = sourceDataEntry.getQueueName();
            timestamp = sourceDataEntry.getTimestamp();
        }
        DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
        dataEntryBuilder.entryType(entryType);
        dataEntryBuilder.queue(queueName);
        dataEntryBuilder.timestamp(timestamp);
        List<Field> fields = schema.getFields();
        if (null != fields && !fields.isEmpty()) {
            for (Field field : fields) {
                dataEntryBuilder.putFiled(field.getName(), datas[field.getIndex()]);
            }
        }
        SinkDataEntry sinkDataEntry = dataEntryBuilder.buildSinkDataEntry(message.getQueueOffset());
        return sinkDataEntry;
    }
        datas[0] = new String(message.getBody()); 

IMO, it will occur some problem in encode and decode.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants