forked from infinispan/infinispan
-
Notifications
You must be signed in to change notification settings - Fork 1
/
BackupSenderImpl.java
325 lines (284 loc) · 13.3 KB
/
BackupSenderImpl.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
/*
* JBoss, Home of Professional Open Source
* Copyright 2012 Red Hat Inc. and/or its affiliates and other contributors
* as indicated by the @author tags. All rights reserved.
* See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This copyrighted material is made available to anyone wishing to use,
* modify, copy, or redistribute it subject to the terms and conditions
* of the GNU Lesser General Public License, v. 2.1.
* This program is distributed in the hope that it will be useful, but WITHOUT A
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
* You should have received a copy of the GNU Lesser General Public License,
* v.2.1 along with this distribution; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
* MA 02110-1301, USA.
*/
package org.infinispan.xsite;
import org.infinispan.Cache;
import org.infinispan.commands.AbstractVisitor;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.remote.SingleRpcCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.configuration.cache.BackupConfiguration;
import org.infinispan.configuration.cache.BackupFailurePolicy;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.SitesConfiguration;
import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.remoting.transport.BackupResponse;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.transaction.LocalTransaction;
import org.infinispan.transaction.TransactionTable;
import org.infinispan.util.Util;
import org.infinispan.util.concurrent.ConcurrentMapFactory;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import javax.transaction.Transaction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
/**
* @author Mircea Markus
* @since 5.2
*/
public class BackupSenderImpl implements BackupSender {
private static Log log = LogFactory.getLog(BackupSenderImpl.class);
private Cache cache;
private Transport transport;
private Configuration config;
private TransactionTable txTable;
private final Map<String, CustomFailurePolicy> siteFailurePolicy = new HashMap<String, CustomFailurePolicy>();
private final ConcurrentMap<String, OfflineStatus> offlineStatus = ConcurrentMapFactory.makeConcurrentMap();
private final String localSiteName;
private String cacheName;
private GlobalConfiguration globalConfig;
public BackupSenderImpl(String localSiteName) {
this.localSiteName = localSiteName;
}
@Inject
public void init(Cache cache, Transport transport, TransactionTable txTable, GlobalConfiguration gc) {
this.cache = cache;
this.transport = transport;
this.txTable = txTable;
this.globalConfig = gc;
}
@Start
public void start() {
this.config = cache.getCacheConfiguration();
this.cacheName = cache.getName();
for (BackupConfiguration bc : config.sites().inUseBackups()) {
if (bc.backupFailurePolicy() == BackupFailurePolicy.CUSTOM) {
String backupPolicy = bc.failurePolicyClass();
if (backupPolicy == null) {
throw new IllegalStateException("Backup policy class missing for custom failure policy!");
}
CustomFailurePolicy instance = Util.getInstance(backupPolicy, globalConfig.classLoader());
siteFailurePolicy.put(bc.site(), instance);
}
if (bc.takeOffline().enabled()) {
OfflineStatus offline = new OfflineStatus(bc.takeOffline());
offlineStatus.put(bc.site(), offline);
}
}
}
enum BackupFilter {KEEP_SYNC_ONLY, KEEP_ASYNC_ONLY, KEEP_ALL}
@Override
public BackupResponse backupPrepare(PrepareCommand command) throws Exception {
//if we run a 2PC then filter out ASYNC prepare backup calls as they will happen during the local commit phase
// as we know the tx is doomed to succeed.
BackupFilter filter = !command.isOnePhaseCommit() ? BackupFilter.KEEP_SYNC_ONLY : BackupFilter.KEEP_ALL;
List<XSiteBackup> backups = calculateBackupInfo(filter);
return backupCommand(command, backups);
}
@Override
public void processResponses(BackupResponse backupResponse, VisitableCommand command) throws Throwable {
processResponses(backupResponse, command, null);
}
@Override
public void processResponses(BackupResponse backupResponse, VisitableCommand command, Transaction transaction) throws Throwable {
backupResponse.waitForBackupToFinish();
updateOfflineSites(backupResponse);
processFailedResponses(backupResponse, command, transaction);
}
private void updateOfflineSites(BackupResponse backupResponse) {
if (offlineStatus.isEmpty()) return;
Set<String> communicationErrors = backupResponse.getCommunicationErrors();
for (Map.Entry<String, OfflineStatus> statusEntry : offlineStatus.entrySet()) {
OfflineStatus status = statusEntry.getValue();
if (communicationErrors.contains(statusEntry.getKey())) {
status.updateOnCommunicationFailure(backupResponse.getSendTimeMillis());
log.tracef("OfflineStatus updated %s", status);
} else if(!status.isOffline()) {
status.updateOnCommunicationSuccess();
}
}
}
@Override
public BackupResponse backupWrite(WriteCommand command) throws Exception {
List<XSiteBackup> xSiteBackups = calculateBackupInfo(BackupFilter.KEEP_ALL);
return backupCommand(command, xSiteBackups);
}
@Override
public BackupResponse backupCommit(CommitCommand command) throws Exception {
//we have a 2PC: we didn't backup the async stuff during prepare, we need to do it now.
send1PcToAsyncBackups(command);
List<XSiteBackup> xSiteBackups = calculateBackupInfo(BackupFilter.KEEP_SYNC_ONLY);
return backupCommand(command, xSiteBackups);
}
@Override
public BackupResponse backupRollback(RollbackCommand command) throws Exception {
List<XSiteBackup> xSiteBackups = calculateBackupInfo(BackupFilter.KEEP_SYNC_ONLY);
return backupCommand(command, xSiteBackups);
}
@Override
public BringSiteOnlineResponse bringSiteOnline(String siteName) {
if (!config.sites().hasInUseBackup(siteName)) {
log.tryingToBringOnlineUnexistentSite(siteName);
return BringSiteOnlineResponse.NO_SUCH_SITE;
} else {
OfflineStatus offline = offlineStatus.get(siteName);
if (offline == null) {
log.tracef("The site %s doesn't have enabled the 'takeOffline' functionality", siteName);
return BringSiteOnlineResponse.OFFLINE_NOT_ENABLED;
} else {
boolean broughtOnline = offline.bringOnline();
return broughtOnline ? BringSiteOnlineResponse.BROUGHT_ONLINE : BringSiteOnlineResponse.ALREADY_ONLINE;
}
}
}
private BackupResponse backupCommand(ReplicableCommand command, List<XSiteBackup> xSiteBackups) throws Exception {
return transport.backupRemotely(xSiteBackups, new SingleRpcCommand(cacheName, command));
}
private void send1PcToAsyncBackups(CommitCommand command) throws Exception {
List<XSiteBackup> backups = calculateBackupInfo(BackupFilter.KEEP_ASYNC_ONLY);
LocalTransaction localTx = txTable.getLocalTransaction(command.getGlobalTransaction());
PrepareCommand prepare = new PrepareCommand(cacheName, localTx.getGlobalTransaction(),
localTx.getModifications(), true);
backupCommand(prepare, backups);
}
private void processFailedResponses(BackupResponse backupResponse, VisitableCommand command, Transaction transaction) throws Throwable {
SitesConfiguration sitesConfiguration = config.sites();
Map<String, Throwable> failures = backupResponse.getFailedBackups();
BackupFailureException backupException = null;
for (Map.Entry<String, Throwable> failure : failures.entrySet()) {
BackupFailurePolicy policy = sitesConfiguration.getFailurePolicy(failure.getKey());
if (policy == BackupFailurePolicy.CUSTOM) {
CustomFailurePolicy customFailurePolicy = siteFailurePolicy.get(failure.getKey());
command.acceptVisitor(null, new CustomBackupPolicyInvoker(failure.getKey(), customFailurePolicy, transaction));
}
if (policy == BackupFailurePolicy.WARN) {
log.warnXsiteBackupFailed(cacheName, failure.getKey(), failure.getValue());
} else if (policy == BackupFailurePolicy.FAIL) {
if(backupException == null)
backupException = new BackupFailureException(cacheName);
backupException.addFailure(failure.getKey(), failure.getValue());
}
}
if(backupException != null)
throw backupException;
}
private List<XSiteBackup> calculateBackupInfo(BackupFilter backupFilter) {
List<XSiteBackup> backupInfo = new ArrayList<XSiteBackup>(2);
SitesConfiguration sites = config.sites();
for (BackupConfiguration bc : sites.inUseBackups()) {
if (bc.site().equals(localSiteName)) {
log.cacheBackupsDataToSameSite(localSiteName);
continue;
}
boolean isSync = bc.strategy() == BackupConfiguration.BackupStrategy.SYNC;
if (backupFilter == BackupFilter.KEEP_ASYNC_ONLY) {
if (isSync) continue;
}
if (backupFilter == BackupFilter.KEEP_SYNC_ONLY) {
if (!isSync) continue;
}
if (isOffline(bc.site())) {
log.tracef("The site '%s' is offline, not backing up information to it", bc.site());
continue;
}
XSiteBackup bi = new XSiteBackup(bc.site(), isSync, bc.replicationTimeout());
backupInfo.add(bi);
}
return backupInfo;
}
private boolean isOffline(String site) {
OfflineStatus offline = offlineStatus.get(site);
return offline != null && offline.isOffline();
}
public static final class CustomBackupPolicyInvoker extends AbstractVisitor {
private final String site;
private final CustomFailurePolicy failurePolicy;
private final Transaction tx;
public CustomBackupPolicyInvoker(String site, CustomFailurePolicy failurePolicy, Transaction tx) {
this.site = site;
this.failurePolicy = failurePolicy;
this.tx = tx;
}
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
failurePolicy.handlePutFailure(site, command.getKey(), command.getValue(), command.isPutIfAbsent());
return null;
}
@Override
public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
failurePolicy.handleRemoveFailure(site, command.getKey(), command.getValue());
return null;
}
@Override
public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
failurePolicy.handleReplaceFailure(site, command.getKey(), command.getOldValue(), command.getNewValue());
return null;
}
@Override
public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable {
failurePolicy.handleClearFailure(site);
return null;
}
@Override
public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
failurePolicy.handlePutAllFailure(site, command.getMap());
return null;
}
@Override
public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
failurePolicy.handlePrepareFailure(site, tx);
return null;
}
@Override
public Object visitRollbackCommand(TxInvocationContext ctx, RollbackCommand command) throws Throwable {
failurePolicy.handleRollbackFailure(site, tx);
return null;
}
@Override
public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
failurePolicy.handleCommitFailure(site, tx);
return null;
}
@Override
protected Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable {
super.handleDefault(ctx, command);
throw new IllegalStateException("Unknown command: " + command);
}
}
public OfflineStatus getOfflineStatus(String site) {
return offlineStatus.get(site);
}
}