-
-
Notifications
You must be signed in to change notification settings - Fork 399
/
ConsumerLocalConfigs.java
189 lines (162 loc) · 6.52 KB
/
ConsumerLocalConfigs.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package org.jsmart.zerocode.core.kafka.consume;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
import static org.jsmart.zerocode.core.kafka.KafkaConstants.RAW;
//@JsonIgnoreProperties(ignoreUnknown = true) //<--- Do not enable this. All properties need to be aware of and processed
@JsonInclude(JsonInclude.Include.NON_NULL)
public class ConsumerLocalConfigs {
private final String recordType;
private final String fileDumpTo;
private final Boolean commitAsync;
private final Boolean commitSync;
private final Boolean showRecordsConsumed;
private final Integer maxNoOfRetryPollsOrTimeouts;
private final Long pollingTime;
private final String seek;
private final String protoClassType;
private final Boolean cacheByTopic;
private final String filterByJsonPath;
private final String seekEpoch;
private final SeekTimestamp seekTimestamp;
@JsonCreator
public ConsumerLocalConfigs(
@JsonProperty("recordType") String recordType,
@JsonProperty("protoClassType") String protobufMessageClassType,
@JsonProperty("fileDumpTo") String fileDumpTo,
@JsonProperty("commitAsync") Boolean commitAsync,
@JsonProperty("commitSync") Boolean commitSync,
@JsonProperty("showRecordsConsumed") Boolean showRecordsConsumed,
@JsonProperty("maxNoOfRetryPollsOrTimeouts") Integer maxNoOfRetryPollsOrTimeouts,
@JsonProperty("pollingTime") Long pollingTime,
@JsonProperty("cacheByTopic") Boolean cacheByTopic,
@JsonProperty("filterByJsonPath") String filterByJsonPath,
@JsonProperty("seek") String seek,
@JsonProperty("seekEpoch") String seekEpoch,
@JsonProperty("seekTimestamp") SeekTimestamp seekTimestamp) {
this.recordType = recordType;
this.protoClassType = protobufMessageClassType;
this.fileDumpTo = fileDumpTo;
this.commitAsync = commitAsync;
this.commitSync = commitSync;
this.showRecordsConsumed = showRecordsConsumed;
this.maxNoOfRetryPollsOrTimeouts = maxNoOfRetryPollsOrTimeouts;
this.pollingTime = pollingTime;
this.cacheByTopic = cacheByTopic;
this.filterByJsonPath = filterByJsonPath;
this.seek = seek;
this.seekEpoch = seekEpoch;
this.seekTimestamp = seekTimestamp;
}
public ConsumerLocalConfigs(
String recordType,
String fileDumpTo,
Boolean commitAsync,
Boolean commitSync,
Boolean showRecordsConsumed,
Integer maxNoOfRetryPollsOrTimeouts,
Long pollingTime,
Boolean cacheByTopic,
String filterByJsonPath,
String seek,
String seekEpoch,
SeekTimestamp seekTimestamp) {
this(recordType, null,
fileDumpTo,
commitAsync,
commitSync,
showRecordsConsumed,
maxNoOfRetryPollsOrTimeouts,
pollingTime,
cacheByTopic,
filterByJsonPath,
seek,
seekEpoch,
seekTimestamp);
}
public String getRecordType() {
return recordType != null ? recordType : RAW;
}
public String getProtoClassType() {
return protoClassType;
}
public String getFileDumpTo() {
return fileDumpTo;
}
public Boolean getCommitAsync() {
return commitAsync;
}
public Boolean getCommitSync() {
return commitSync;
}
public Boolean getShowRecordsConsumed() {
return showRecordsConsumed != null ? showRecordsConsumed : true;
}
public Integer getMaxNoOfRetryPollsOrTimeouts() {
return maxNoOfRetryPollsOrTimeouts;
}
public Long getPollingTime() {
return pollingTime;
}
public Boolean getCacheByTopic() {
return cacheByTopic;
}
public String getFilterByJsonPath() {
return filterByJsonPath;
}
public String getSeek() {
return seek;
}
public String getSeekEpoch() {
return seekEpoch;
}
public SeekTimestamp getSeekTimestamp() {
return seekTimestamp;
}
@JsonIgnore
public String[] getSeekTopicPartitionOffset() {
return seek.split(",");
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ConsumerLocalConfigs that = (ConsumerLocalConfigs) o;
return Objects.equals(recordType, that.recordType) &&
Objects.equals(protoClassType, that.protoClassType) &&
Objects.equals(fileDumpTo, that.fileDumpTo) &&
Objects.equals(commitAsync, that.commitAsync) &&
Objects.equals(commitSync, that.commitSync) &&
Objects.equals(showRecordsConsumed, that.showRecordsConsumed) &&
Objects.equals(maxNoOfRetryPollsOrTimeouts, that.maxNoOfRetryPollsOrTimeouts) &&
Objects.equals(pollingTime, that.pollingTime) &&
Objects.equals(filterByJsonPath, that.filterByJsonPath) &&
Objects.equals(cacheByTopic, that.cacheByTopic) &&
Objects.equals(seek, that.seek) &&
Objects.equals(seekEpoch, that.seekEpoch);
}
@Override
public int hashCode() {
return Objects.hash(recordType, fileDumpTo, commitAsync, commitSync, showRecordsConsumed, maxNoOfRetryPollsOrTimeouts, pollingTime, cacheByTopic, filterByJsonPath, seek);
}
@Override
public String toString() {
return "ConsumerLocalConfigs{" +
"recordType='" + recordType + '\'' +
"protobufMessageClassType='" + protoClassType + '\'' +
", fileDumpTo='" + fileDumpTo + '\'' +
", commitAsync=" + commitAsync +
", commitSync=" + commitSync +
", showRecordsConsumed=" + showRecordsConsumed +
", maxNoOfRetryPollsOrTimeouts=" + maxNoOfRetryPollsOrTimeouts +
", pollingTime=" + pollingTime +
", cacheByTopic=" + cacheByTopic +
", filterByJsonPath=" + filterByJsonPath +
", seek=" + seek +
", seekEpoch=" + seekEpoch +
", seekTimestamp=" + seekTimestamp +
'}';
}
}