-
Notifications
You must be signed in to change notification settings - Fork 24.3k
/
GetCcrRestoreFileChunkAction.java
175 lines (149 loc) · 6.93 KB
/
GetCcrRestoreFileChunkAction.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.ccr.action.repositories;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;
import java.io.IOException;
public class GetCcrRestoreFileChunkAction extends ActionType<GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse> {
public static final GetCcrRestoreFileChunkAction INTERNAL_INSTANCE = new GetCcrRestoreFileChunkAction();
public static final String INTERNAL_NAME = "internal:admin/ccr/restore/file_chunk/get";
public static final String NAME = "indices:internal/admin/ccr/restore/file_chunk/get";
public static final GetCcrRestoreFileChunkAction INSTANCE = new GetCcrRestoreFileChunkAction(NAME);
private GetCcrRestoreFileChunkAction() {
this(INTERNAL_NAME);
}
private GetCcrRestoreFileChunkAction(String name) {
super(name, GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse::new);
}
abstract static class TransportGetCcrRestoreFileChunkAction extends HandledTransportAction<
GetCcrRestoreFileChunkRequest,
GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse> {
protected final CcrRestoreSourceService restoreSourceService;
private final BigArrays bigArrays;
private TransportGetCcrRestoreFileChunkAction(
String actionName,
BigArrays bigArrays,
TransportService transportService,
ActionFilters actionFilters,
CcrRestoreSourceService restoreSourceService
) {
super(actionName, transportService, actionFilters, GetCcrRestoreFileChunkRequest::new, ThreadPool.Names.GENERIC);
TransportActionProxy.registerProxyAction(transportService, actionName, false, GetCcrRestoreFileChunkResponse::new);
this.restoreSourceService = restoreSourceService;
this.bigArrays = bigArrays;
}
@Override
protected void doExecute(
Task task,
GetCcrRestoreFileChunkRequest request,
ActionListener<GetCcrRestoreFileChunkResponse> listener
) {
validate(request);
int bytesRequested = request.getSize();
ByteArray array = bigArrays.newByteArray(bytesRequested, false);
String fileName = request.getFileName();
String sessionUUID = request.getSessionUUID();
BytesReference pagedBytesReference = BytesReference.fromByteArray(array, bytesRequested);
try (ReleasableBytesReference reference = new ReleasableBytesReference(pagedBytesReference, array)) {
try (CcrRestoreSourceService.SessionReader sessionReader = restoreSourceService.getSessionReader(sessionUUID)) {
long offsetAfterRead = sessionReader.readFileBytes(fileName, reference);
long offsetBeforeRead = offsetAfterRead - reference.length();
listener.onResponse(new GetCcrRestoreFileChunkResponse(offsetBeforeRead, reference));
}
} catch (IOException e) {
listener.onFailure(e);
}
}
// We don't enforce any validation by default so that the internal action stays the same for BWC reasons
protected void validate(GetCcrRestoreFileChunkRequest request) {}
}
public static class InternalTransportAction extends TransportGetCcrRestoreFileChunkAction {
@Inject
public InternalTransportAction(
BigArrays bigArrays,
TransportService transportService,
ActionFilters actionFilters,
CcrRestoreSourceService restoreSourceService
) {
super(INTERNAL_NAME, bigArrays, transportService, actionFilters, restoreSourceService);
}
}
public static class TransportAction extends TransportGetCcrRestoreFileChunkAction {
@Inject
public TransportAction(
BigArrays bigArrays,
TransportService transportService,
ActionFilters actionFilters,
CcrRestoreSourceService restoreSourceService
) {
super(NAME, bigArrays, transportService, actionFilters, restoreSourceService);
}
@Override
protected void validate(GetCcrRestoreFileChunkRequest request) {
final ShardId shardId = request.getShardId();
assert shardId != null : "shardId must be specified for the request";
restoreSourceService.ensureSessionShardIdConsistency(request.getSessionUUID(), shardId);
restoreSourceService.ensureFileNameIsKnownToSession(request.getSessionUUID(), request.getFileName());
}
}
public static class GetCcrRestoreFileChunkResponse extends ActionResponse {
private final long offset;
private final ReleasableBytesReference chunk;
GetCcrRestoreFileChunkResponse(StreamInput streamInput) throws IOException {
super(streamInput);
offset = streamInput.readVLong();
chunk = streamInput.readReleasableBytesReference();
}
GetCcrRestoreFileChunkResponse(long offset, ReleasableBytesReference chunk) {
this.offset = offset;
this.chunk = chunk.retain();
}
public long getOffset() {
return offset;
}
public ReleasableBytesReference getChunk() {
return chunk;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(offset);
out.writeBytesReference(chunk);
}
@Override
public void incRef() {
chunk.incRef();
}
@Override
public boolean tryIncRef() {
return chunk.tryIncRef();
}
@Override
public boolean decRef() {
return chunk.decRef();
}
@Override
public boolean hasReferences() {
return chunk.hasReferences();
}
}
}