forked from pinterest/secor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
MessageParser.java
89 lines (80 loc) · 3.67 KB
/
MessageParser.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.pinterest.secor.parser;
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.message.Message;
import com.pinterest.secor.message.ParsedMessage;
import net.minidev.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.regex.Pattern;
// TODO(pawel): should we offer a multi-message parser capable of parsing multiple types of
// messages? E.g., it could be implemented as a composite trying out different parsers and using
// the one that works. What is the performance cost of such approach?
/**
* Message parser extracts partitions from messages.
*
* @author Pawel Garbacki (pawel@pinterest.com)
*/
public abstract class MessageParser {
protected SecorConfig mConfig;
protected String[] mNestedFields;
protected final String offsetPrefix;
private static final Logger LOG = LoggerFactory.getLogger(MessageParser.class);
public MessageParser(SecorConfig config) {
mConfig = config;
offsetPrefix = usingOffsetPrefix(mConfig);
if (mConfig.getMessageTimestampName() != null &&
!mConfig.getMessageTimestampName().isEmpty() &&
mConfig.getMessageTimestampNameSeparator() != null &&
!mConfig.getMessageTimestampNameSeparator().isEmpty()) {
String separatorPattern = Pattern.quote(mConfig.getMessageTimestampNameSeparator());
mNestedFields = mConfig.getMessageTimestampName().split(separatorPattern);
}
}
static String usingOffsetPrefix(SecorConfig config) {
return config.getString("secor.offsets.prefix");
}
public ParsedMessage parse(Message message) throws Exception {
String[] partitions = extractPartitions(message);
return new ParsedMessage(message.getTopic(), message.getKafkaPartition(),
message.getOffset(), message.getKafkaKey(),
message.getPayload(), partitions, message.getTimestamp());
}
public abstract String[] extractPartitions(Message payload) throws Exception;
public Object getJsonFieldValue(JSONObject jsonObject) {
Object fieldValue = null;
if (mNestedFields != null) {
Object finalValue = null;
for (int i=0; i < mNestedFields.length; i++) {
if (!jsonObject.containsKey(mNestedFields[i])) {
LOG.warn("Could not find key {} in message", mConfig.getMessageTimestampName());
break;
}
if (i < (mNestedFields.length -1)) {
jsonObject = (JSONObject) jsonObject.get(mNestedFields[i]);
} else {
finalValue = jsonObject.get(mNestedFields[i]);
}
}
fieldValue = finalValue;
} else {
fieldValue = jsonObject.get(mConfig.getMessageTimestampName());
}
return fieldValue;
}
}