/
Backup.java
642 lines (542 loc) · 22.7 KB
/
Backup.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
/* (c) 2016 Open Source Geospatial Foundation - all rights reserved
* This code is licensed under the GPL 2.0 license, available at the root
* application directory.
*/
package org.geoserver.backuprestore;
import java.io.IOException;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.io.FileUtils;
import org.geoserver.backuprestore.utils.BackupUtils;
import org.geoserver.catalog.Catalog;
import org.geoserver.config.GeoServer;
import org.geoserver.config.GeoServerDataDirectory;
import org.geoserver.config.util.XStreamPersister;
import org.geoserver.config.util.XStreamPersisterFactory;
import org.geoserver.platform.ContextLoadedEvent;
import org.geoserver.platform.GeoServerExtensions;
import org.geoserver.platform.GeoServerResourceLoader;
import org.geoserver.platform.resource.Resource;
import org.geotools.factory.Hints;
import org.geotools.filter.text.ecql.ECQL;
import org.geotools.util.logging.Logging;
import org.opengis.filter.Filter;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.job.AbstractJob;
import org.springframework.batch.core.launch.JobExecutionNotRunningException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.NoSuchJobException;
import org.springframework.batch.core.launch.NoSuchJobExecutionException;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import com.thoughtworks.xstream.XStream;
/**
* Primary controller/facade of the backup and restore subsystem.
*
* @author Alessio Fabiani, GeoSolutions
*
*/
@SuppressWarnings("rawtypes")
public class Backup extends JobExecutionListenerSupport
implements DisposableBean, ApplicationContextAware, ApplicationListener {
static Logger LOGGER = Logging.getLogger(Backup.class);
/* Job Parameters Keys **/
public static final String PARAM_TIME = "time";
public static final String PARAM_JOB_NAME = "job.execution.name";
public static final String PARAM_OUTPUT_FILE_PATH = "output.file.path";
public static final String PARAM_INPUT_FILE_PATH = "input.file.path";
public static final String PARAM_CLEANUP_TEMP = "BK_CLEANUP_TEMP";
public static final String PARAM_DRY_RUN_MODE = "BK_DRY_RUN";
public static final String PARAM_BEST_EFFORT_MODE = "BK_BEST_EFFORT";
/* Jobs Context Keys **/
public static final String BACKUP_JOB_NAME = "backupJob";
public static final String RESTORE_JOB_NAME = "restoreJob";
public static final String RESTORE_CATALOG_KEY = "restore.catalog";
/** catalog */
Catalog catalog;
GeoServer geoServer;
GeoServerResourceLoader resourceLoader;
GeoServerDataDirectory geoServerDataDirectory;
XStreamPersisterFactory xpf;
JobOperator jobOperator;
JobLauncher jobLauncher;
JobRepository jobRepository;
Job backupJob;
Job restoreJob;
ConcurrentHashMap<Long, BackupExecutionAdapter> backupExecutions = new ConcurrentHashMap<Long, BackupExecutionAdapter>();
ConcurrentHashMap<Long, RestoreExecutionAdapter> restoreExecutions = new ConcurrentHashMap<Long, RestoreExecutionAdapter>();
Integer totalNumberOfBackupSteps;
Integer totalNumberOfRestoreSteps;
/**
* A static application context
*/
private static ApplicationContext context;
public Backup(Catalog catalog, GeoServerResourceLoader rl) {
this.catalog = catalog;
this.geoServer = GeoServerExtensions.bean(GeoServer.class);
this.resourceLoader = rl;
this.geoServerDataDirectory = new GeoServerDataDirectory(rl);
this.xpf = GeoServerExtensions.bean(XStreamPersisterFactory.class);
}
/**
* @return the context
*/
public static ApplicationContext getContext() {
return context;
}
/**
* @return the jobOperator
*/
public JobOperator getJobOperator() {
return jobOperator;
}
/**
* @return the jobLauncher
*/
public JobLauncher getJobLauncher() {
return jobLauncher;
}
/**
* @return the Backup job
*/
public Job getBackupJob() {
return backupJob;
}
/**
* @return the Restore job
*/
public Job getRestoreJob() {
return restoreJob;
}
/**
* @return the backupExecutions
*/
public ConcurrentHashMap<Long, BackupExecutionAdapter> getBackupExecutions() {
return backupExecutions;
}
/**
* @return the restoreExecutions
*/
public ConcurrentHashMap<Long, RestoreExecutionAdapter> getRestoreExecutions() {
return restoreExecutions;
}
@Override
public void onApplicationEvent(ApplicationEvent event) {
// load the context store here to avoid circular dependency on creation
if (event instanceof ContextLoadedEvent) {
this.jobOperator = (JobOperator) context.getBean("jobOperator");
this.jobLauncher = (JobLauncher) context.getBean("jobLauncherAsync");
this.jobRepository = (JobRepository) context.getBean("jobRepository");
this.backupJob = (Job) context.getBean(BACKUP_JOB_NAME);
this.restoreJob = (Job) context.getBean(RESTORE_JOB_NAME);
}
}
/**
* @return
*/
public Set<Long> getBackupRunningExecutions() {
synchronized (jobOperator) {
Set<Long> runningExecutions;
try {
runningExecutions = jobOperator.getRunningExecutions(BACKUP_JOB_NAME);
} catch (NoSuchJobException e) {
runningExecutions = new HashSet<>();
}
return runningExecutions;
}
}
/**
* @return
*/
public Set<Long> getRestoreRunningExecutions() {
synchronized (jobOperator) {
Set<Long> runningExecutions;
try {
runningExecutions = jobOperator.getRunningExecutions(RESTORE_JOB_NAME);
} catch (NoSuchJobException e) {
runningExecutions = new HashSet<>();
}
return runningExecutions;
}
}
public Catalog getCatalog() {
return catalog;
}
public GeoServer getGeoServer() {
return geoServer;
}
/**
* @return the resourceLoader
*/
public GeoServerResourceLoader getResourceLoader() {
return resourceLoader;
}
/**
* @param resourceLoader the resourceLoader to set
*/
public void setResourceLoader(GeoServerResourceLoader resourceLoader) {
this.resourceLoader = resourceLoader;
}
/**
* @return the geoServerDataDirectory
*/
public GeoServerDataDirectory getGeoServerDataDirectory() {
return geoServerDataDirectory;
}
/**
* @param geoServerDataDirectory the geoServerDataDirectory to set
*/
public void setGeoServerDataDirectory(GeoServerDataDirectory geoServerDataDirectory) {
this.geoServerDataDirectory = geoServerDataDirectory;
}
/**
* @return the totalNumberOfBackupSteps
*/
public Integer getTotalNumberOfBackupSteps() {
return totalNumberOfBackupSteps;
}
/**
* @param totalNumberOfBackupSteps the totalNumberOfBackupSteps to set
*/
public void setTotalNumberOfBackupSteps(Integer totalNumberOfBackupSteps) {
this.totalNumberOfBackupSteps = totalNumberOfBackupSteps;
}
/**
* @return the totalNumberOfRestoreSteps
*/
public Integer getTotalNumberOfRestoreSteps() {
return totalNumberOfRestoreSteps;
}
/**
* @param totalNumberOfRestoreSteps the totalNumberOfRestoreSteps to set
*/
public void setTotalNumberOfRestoreSteps(Integer totalNumberOfRestoreSteps) {
this.totalNumberOfRestoreSteps = totalNumberOfRestoreSteps;
}
@Override
public void destroy() throws Exception {
// Nothing to do.
}
@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
Backup.context = context;
try {
AbstractJob backupJob = (AbstractJob) context.getBean(BACKUP_JOB_NAME);
if (backupJob != null) {
this.setTotalNumberOfBackupSteps(backupJob.getStepNames().size());
}
AbstractJob restoreJob = (AbstractJob) context.getBean(BACKUP_JOB_NAME);
if (restoreJob != null) {
this.setTotalNumberOfRestoreSteps(restoreJob.getStepNames().size());
}
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Could not fully configure the Backup Facade!", e);
}
}
protected String getItemName(XStreamPersister xp, Class clazz) {
return xp.getClassAliasingMapper().serializedClass(clazz);
}
/**
* @return
* @throws IOException
*
*/
public BackupExecutionAdapter runBackupAsync(final Resource archiveFile,
final boolean overwrite, final Filter filter, final Hints params) throws IOException {
// Check if archiveFile exists
if (archiveFile.file().exists()) {
if (!overwrite && FileUtils.sizeOf(archiveFile.file()) > 0) {
// Unless the user explicitly wants to overwrite the archiveFile, throw an exception whenever it already exists
throw new IOException(
"The target archive file already exists. Use 'overwrite=TRUE' if you want to overwrite it.");
} else {
FileUtils.forceDelete(archiveFile.file());
}
} else {
// Make sure the parent path exists
if (!archiveFile.file().getParentFile().exists()) {
try {
archiveFile.file().getParentFile().mkdirs();
} finally {
if (!archiveFile.file().getParentFile().exists()) {
throw new IOException("The path to target archive file is unreachable.");
}
}
}
}
// Initialize ZIP
FileUtils.touch(archiveFile.file());
// Write flat files into a temporary folder
Resource tmpDir = BackupUtils.geoServerTmpDir(getGeoServerDataDirectory());
// Fill Job Parameters
JobParametersBuilder paramsBuilder = new JobParametersBuilder();
if (filter != null) {
paramsBuilder.addString("filter", ECQL.toCQL(filter));
}
paramsBuilder
.addString(PARAM_JOB_NAME, BACKUP_JOB_NAME)
.addString(PARAM_OUTPUT_FILE_PATH,
BackupUtils.getArchiveURLProtocol(tmpDir) + tmpDir.path())
.addLong(PARAM_TIME, System.currentTimeMillis());
parseParams(params, paramsBuilder);
JobParameters jobParameters = paramsBuilder.toJobParameters();
// Send Execution Signal
BackupExecutionAdapter backupExecution;
try {
if (getRestoreRunningExecutions().isEmpty() && getBackupRunningExecutions().isEmpty()) {
synchronized (jobOperator) {
// Start a new Job
JobExecution jobExecution = jobLauncher.run(backupJob, jobParameters);
backupExecution = new BackupExecutionAdapter(jobExecution,
totalNumberOfBackupSteps);
backupExecutions.put(backupExecution.getId(), backupExecution);
backupExecution.setArchiveFile(archiveFile);
backupExecution.setOverwrite(overwrite);
backupExecution.setFilter(filter);
backupExecution.getOptions().add("OVERWRITE=" + overwrite);
for (Entry jobParam : jobParameters.toProperties().entrySet()) {
if (!PARAM_OUTPUT_FILE_PATH.equals(jobParam.getKey())
&& !PARAM_INPUT_FILE_PATH.equals(jobParam.getKey())
&& !PARAM_TIME.equals(jobParam.getKey())) {
backupExecution.getOptions()
.add(jobParam.getKey() + "=" + jobParam.getValue());
}
}
return backupExecution;
}
} else {
throw new IOException(
"Could not start a new Backup Job Execution since there are currently Running jobs.");
}
} catch (JobExecutionAlreadyRunningException | JobRestartException
| JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
throw new IOException("Could not start a new Backup Job Execution: ", e);
} finally {
}
}
/**
* @return
* @return
* @throws IOException
*
*/
public RestoreExecutionAdapter runRestoreAsync(final Resource archiveFile, final Filter filter,
final Hints params) throws IOException {
// Extract archive into a temporary folder
Resource tmpDir = BackupUtils.geoServerTmpDir(getGeoServerDataDirectory());
BackupUtils.extractTo(archiveFile, tmpDir);
// Fill Job Parameters
JobParametersBuilder paramsBuilder = new JobParametersBuilder();
if (filter != null) {
paramsBuilder.addString("filter", ECQL.toCQL(filter));
}
paramsBuilder
.addString(PARAM_JOB_NAME, RESTORE_JOB_NAME)
.addString(PARAM_INPUT_FILE_PATH,
BackupUtils.getArchiveURLProtocol(tmpDir) + tmpDir.path())
.addLong(PARAM_TIME, System.currentTimeMillis());
parseParams(params, paramsBuilder);
JobParameters jobParameters = paramsBuilder.toJobParameters();
RestoreExecutionAdapter restoreExecution;
try {
if (getRestoreRunningExecutions().isEmpty() && getBackupRunningExecutions().isEmpty()) {
synchronized (jobOperator) {
// Start a new Job
JobExecution jobExecution = jobLauncher.run(restoreJob, jobParameters);
restoreExecution = new RestoreExecutionAdapter(jobExecution,
totalNumberOfRestoreSteps);
restoreExecutions.put(restoreExecution.getId(), restoreExecution);
restoreExecution.setArchiveFile(archiveFile);
restoreExecution.setFilter(filter);
for (Entry jobParam : jobParameters.toProperties().entrySet()) {
if (!PARAM_OUTPUT_FILE_PATH.equals(jobParam.getKey())
&& !PARAM_INPUT_FILE_PATH.equals(jobParam.getKey())
&& !PARAM_TIME.equals(jobParam.getKey())) {
restoreExecution.getOptions()
.add(jobParam.getKey() + "=" + jobParam.getValue());
}
}
return restoreExecution;
}
} else {
throw new IOException(
"Could not start a new Restore Job Execution since there are currently Running jobs.");
}
} catch (JobExecutionAlreadyRunningException | JobRestartException
| JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
throw new IOException("Could not start a new Restore Job Execution: ", e);
} finally {
}
}
@Override
public void afterJob(JobExecution jobExecution) {
// Release locks on GeoServer Configuration:
try {
List<BackupRestoreCallback> callbacks = GeoServerExtensions.extensions(BackupRestoreCallback.class);
for (BackupRestoreCallback callback : callbacks) {
callback.onEndRequest();
}
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Could not unlock GeoServer Catalog Configuration!", e);
}
}
@Override
public void beforeJob(JobExecution jobExecution) {
// Acquire GeoServer Configuration Lock in READ mode
List<BackupRestoreCallback> callbacks = GeoServerExtensions.extensions(BackupRestoreCallback.class);
for (BackupRestoreCallback callback : callbacks) {
callback.onBeginRequest(jobExecution.getJobParameters().getString(PARAM_JOB_NAME));
}
}
/**
* Stop a running Backup/Restore Execution
*
* @param executionId
* @return
* @throws NoSuchJobExecutionException
* @throws JobExecutionNotRunningException
*/
public void stopExecution(Long executionId)
throws NoSuchJobExecutionException, JobExecutionNotRunningException {
LOGGER.info("Stopping execution id [" + executionId + "]");
JobExecution jobExecution = null;
try {
if (this.backupExecutions.get(executionId) != null) {
jobExecution = this.backupExecutions.get(executionId).getDelegate();
} else if (this.restoreExecutions.get(executionId) != null) {
jobExecution = this.restoreExecutions.get(executionId).getDelegate();
}
jobOperator.stop(executionId);
} finally {
if (jobExecution != null) {
final BatchStatus status = jobExecution.getStatus();
if (!status.isGreaterThan(BatchStatus.STARTED)) {
jobExecution.setStatus(BatchStatus.STOPPING);
jobExecution.setEndTime(new Date());
jobRepository.update(jobExecution);
}
}
// Release locks on GeoServer Configuration:
try {
List<BackupRestoreCallback> callbacks = GeoServerExtensions.extensions(BackupRestoreCallback.class);
for (BackupRestoreCallback callback : callbacks) {
callback.onEndRequest();
}
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Could not unlock GeoServer Catalog Configuration!", e);
}
}
}
/**
* Restarts a running Backup/Restore Execution
*
* @param executionId
* @return
* @throws JobInstanceAlreadyCompleteException
* @throws NoSuchJobExecutionException
* @throws NoSuchJobException
* @throws JobRestartException
* @throws JobParametersInvalidException
*/
public Long restartExecution(Long executionId)
throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
NoSuchJobException, JobRestartException, JobParametersInvalidException {
return jobOperator.restart(executionId);
}
/**
* Abort a running Backup/Restore Execution
*
* @param executionId
* @throws NoSuchJobExecutionException
* @throws JobExecutionAlreadyRunningException
*/
public void abandonExecution(Long executionId)
throws NoSuchJobExecutionException, JobExecutionAlreadyRunningException {
LOGGER.info("Aborting execution id [" + executionId + "]");
JobExecution jobExecution = null;
try {
if (this.backupExecutions.get(executionId) != null) {
jobExecution = this.backupExecutions.get(executionId).getDelegate();
} else if (this.restoreExecutions.get(executionId) != null) {
jobExecution = this.restoreExecutions.get(executionId).getDelegate();
}
jobOperator.abandon(executionId);
} finally {
if (jobExecution != null) {
jobExecution.setStatus(BatchStatus.ABANDONED);
jobExecution.setEndTime(new Date());
jobRepository.update(jobExecution);
}
// Release locks on GeoServer Configuration:
try {
List<BackupRestoreCallback> callbacks = GeoServerExtensions.extensions(BackupRestoreCallback.class);
for (BackupRestoreCallback callback : callbacks) {
callback.onEndRequest();
}
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Could not unlock GeoServer Catalog Configuration!", e);
}
}
}
/**
* @param params
* @param paramsBuilder
*/
private void parseParams(final Hints params, JobParametersBuilder paramsBuilder) {
if (params != null) {
for (Entry<Object, Object> param : params.entrySet()) {
if (param.getKey() instanceof Hints.OptionKey) {
final Set<String> key = ((Hints.OptionKey) param.getKey()).getOptions();
for (String k : key) {
switch (k) {
case PARAM_CLEANUP_TEMP:
case PARAM_DRY_RUN_MODE:
case PARAM_BEST_EFFORT_MODE:
if (paramsBuilder.toJobParameters().getString(k) == null) {
paramsBuilder.addString(k, "true");
}
}
}
}
}
}
}
public XStreamPersister createXStreamPersisterXML() {
return initXStreamPersister(new XStreamPersisterFactory().createXMLPersister());
}
public XStreamPersister createXStreamPersisterJSON() {
return initXStreamPersister(new XStreamPersisterFactory().createJSONPersister());
}
public XStreamPersister initXStreamPersister(XStreamPersister xp) {
xp.setCatalog(catalog);
// xp.setReferenceByName(true);
XStream xs = xp.getXStream();
// ImportContext
xs.alias("backup", BackupExecutionAdapter.class);
// security
xs.allowTypes(new Class[] { BackupExecutionAdapter.class });
xs.allowTypeHierarchy(Resource.class);
return xp;
}
}