/
Batch.java
159 lines (123 loc) · 3.84 KB
/
Batch.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package org.jumpmind.symmetric.io.data;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.jumpmind.db.util.BinaryEncoding;
public class Batch {
public static final long UNKNOWN_BATCH_ID = -9999;
public enum BatchType { EXTRACT, LOAD };
protected long batchId = UNKNOWN_BATCH_ID;
protected String sourceNodeId;
protected String targetNodeId;
protected boolean initialLoad;
protected String channelId;
protected BinaryEncoding binaryEncoding;
protected Date startTime;
protected long lineCount;
protected long dataReadMillis;
protected long dataWriteMillis;
protected boolean ignored = false;
protected boolean common = false;
protected boolean complete = false;
protected BatchType batchType;
protected Map<String, Long> timers = new HashMap<String, Long>();
public Batch(BatchType batchType, long batchId, String channelId, BinaryEncoding binaryEncoding, String sourceNodeId, String targetNodeId, boolean common) {
this.batchType = batchType;
this.batchId = batchId;
this.channelId = channelId;
this.sourceNodeId = sourceNodeId;
this.targetNodeId = targetNodeId;
this.binaryEncoding = binaryEncoding;
this.common = common;
}
public Batch() {
this.startTime = new Date();
}
public long incrementLineCount() {
return ++lineCount;
}
public void incrementDataReadMillis(long millis) {
dataReadMillis += millis;
}
public void incrementDataWriteMillis(long millis) {
dataWriteMillis += millis;
}
public void startTimer(String name) {
timers.put(name, System.currentTimeMillis());
}
public long endTimer(String name) {
Long startTime = (Long)timers.remove(name);
if (startTime != null) {
return System.currentTimeMillis() - startTime;
} else {
return 0l;
}
}
public long getDataReadMillis() {
return dataReadMillis;
}
public long getDataWriteMillis() {
return dataWriteMillis;
}
public long getLineCount() {
return lineCount;
}
public void setLineCount(long lineCount) {
this.lineCount = lineCount;
}
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public String getSourceNodeId() {
return sourceNodeId;
}
public String getTargetNodeId() {
return targetNodeId;
}
public String getSourceNodeBatchId() {
return String.format("%s-%d", sourceNodeId, batchId);
}
public long getBatchId() {
return batchId;
}
public String getChannelId() {
return channelId;
}
public boolean isInitialLoad() {
return initialLoad;
}
public BinaryEncoding getBinaryEncoding() {
return binaryEncoding;
}
public void setIgnored(boolean ignored) {
this.ignored = ignored;
}
public boolean isIgnored() {
return ignored;
}
public void setCommon(boolean commonFlag) {
this.common = commonFlag;
}
public boolean isCommon() {
return common;
}
public String getStagedLocation() {
if (batchType == BatchType.EXTRACT) {
return getStagedLocation(common, targetNodeId);
} else {
return getStagedLocation(common, sourceNodeId);
}
}
public static String getStagedLocation(boolean common, String nodeId) {
return common ? "common" : nodeId;
}
public void setComplete(boolean complete) {
this.complete = complete;
}
public boolean isComplete() {
return complete;
}
}