-
Notifications
You must be signed in to change notification settings - Fork 119
/
RemoteTemplateStorage.java
210 lines (197 loc) · 7.34 KB
/
RemoteTemplateStorage.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
/*
* Copyright 2019-2023 CloudNetService team & contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package eu.cloudnetservice.driver.template.defaults;
import eu.cloudnetservice.common.io.FileUtil;
import eu.cloudnetservice.common.io.ListenableOutputStream;
import eu.cloudnetservice.common.io.ZipUtil;
import eu.cloudnetservice.driver.ComponentInfo;
import eu.cloudnetservice.driver.channel.ChannelMessage;
import eu.cloudnetservice.driver.network.NetworkClient;
import eu.cloudnetservice.driver.network.buffer.DataBuf;
import eu.cloudnetservice.driver.network.chunk.ChunkedPacketSender;
import eu.cloudnetservice.driver.network.chunk.TransferStatus;
import eu.cloudnetservice.driver.network.def.NetworkConstants;
import eu.cloudnetservice.driver.service.ServiceTemplate;
import eu.cloudnetservice.driver.template.TemplateStorage;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import lombok.NonNull;
import org.jetbrains.annotations.Nullable;
/**
* The default implementation of a template storage which pulls its information from a remote provider.
*
* @since 4.0
*/
public abstract class RemoteTemplateStorage implements TemplateStorage {
private final String name;
private final ComponentInfo componentInfo;
private final NetworkClient networkClient;
/**
* Constructs a new remote template storage instance.
*
* @param name the name of the storage which was created.
* @param componentInfo the information about the current component.
* @param networkClient the network client of the current component.
* @throws NullPointerException if the given name, component info or network client is null.
*/
public RemoteTemplateStorage(
@NonNull String name,
@NonNull ComponentInfo componentInfo,
@NonNull NetworkClient networkClient
) {
this.name = name;
this.componentInfo = componentInfo;
this.networkClient = networkClient;
}
/**
* {@inheritDoc}
*/
@Override
public @NonNull String name() {
return this.name;
}
/**
* {@inheritDoc}
*/
@Override
public boolean deployDirectory(
@NonNull ServiceTemplate target,
@NonNull Path directory,
@Nullable Predicate<Path> filter
) {
try (var inputStream = ZipUtil.zipToStream(directory, filter)) {
return this.deploy(target, inputStream);
} catch (IOException exception) {
return false;
}
}
/**
* {@inheritDoc}
*/
@Override
public boolean deploy(@NonNull ServiceTemplate target, @NonNull InputStream inputStream) {
return ChunkedPacketSender.forFileTransfer()
.source(inputStream)
.transferChannel("deploy_service_template")
.withExtraData(DataBuf.empty().writeString(this.name).writeObject(target).writeBoolean(true))
.toChannels(this.networkClient.firstChannel())
.build()
.transferChunkedData()
.get(5, TimeUnit.MINUTES, TransferStatus.FAILURE) == TransferStatus.SUCCESS;
}
/**
* {@inheritDoc}
*/
@Override
public @Nullable InputStream zipTemplate(@NonNull ServiceTemplate template) throws IOException {
// send a request for the template to the node
var responseId = UUID.randomUUID();
var response = ChannelMessage.builder()
.message("remote_templates_zip_template")
.channel(NetworkConstants.INTERNAL_MSG_CHANNEL)
.targetNode(this.componentInfo.nodeUniqueId())
.buffer(DataBuf.empty().writeString(this.name).writeObject(template).writeUniqueId(responseId))
.build()
.sendSingleQuery();
// check if we got a response
if (response == null || !response.content().readBoolean()) {
return null;
}
// the file is transferred and should be readable
return Files.newInputStream(FileUtil.TEMP_DIR.resolve(responseId.toString()), StandardOpenOption.DELETE_ON_CLOSE);
}
/**
* {@inheritDoc}
*/
@Override
public @NonNull OutputStream appendOutputStream(
@NonNull ServiceTemplate template,
@NonNull String path
) throws IOException {
return this.openLocalOutputStream(template, path, FileUtil.createTempFile(), true);
}
/**
* {@inheritDoc}
*/
@Override
public @NonNull OutputStream newOutputStream(
@NonNull ServiceTemplate template,
@NonNull String path
) throws IOException {
return this.openLocalOutputStream(template, path, FileUtil.createTempFile(), false);
}
/**
* Opens a new output stream to a temporary local file which will be deployed to the remote component once the stream
* was closed.
*
* @param template the template in which the file to open the stream for is located.
* @param path the path to the file in the given template for which the stream was opened.
* @param localPath the local path to the file which should be used as a temporary location to write to.
* @param append if the stream should append to the content in the remote file.
* @return a new output stream which writes to a local temp files and deploys its data once closed.
* @throws IOException if an I/O error occurs while opening the stream.
* @throws NullPointerException if one of the given parameters is null.
*/
private @NonNull OutputStream openLocalOutputStream(
@NonNull ServiceTemplate template,
@NonNull String path,
@NonNull Path localPath,
boolean append
) throws IOException {
return new ListenableOutputStream<>(
Files.newOutputStream(localPath),
$ -> ChunkedPacketSender.forFileTransfer()
.forFile(localPath)
.transferChannel("deploy_single_file")
.toChannels(this.networkClient.firstChannel())
.withExtraData(
DataBuf.empty().writeString(this.name).writeObject(template).writeString(path).writeBoolean(append))
.build()
.transferChunkedData()
.get(5, TimeUnit.MINUTES, null));
}
/**
* {@inheritDoc}
*/
@Override
public @Nullable InputStream newInputStream(
@NonNull ServiceTemplate template,
@NonNull String path
) throws IOException {
// send a request for the file to the node
var responseId = UUID.randomUUID();
var response = ChannelMessage.builder()
.message("remote_templates_template_file")
.channel(NetworkConstants.INTERNAL_MSG_CHANNEL)
.targetNode(this.componentInfo.nodeUniqueId())
.buffer(DataBuf.empty().writeString(path).writeString(this.name).writeObject(template).writeUniqueId(responseId))
.build()
.sendSingleQuery();
// check if we got a response
if (response == null || !response.content().readBoolean()) {
return null;
}
// the file is transferred and should be readable
return Files.newInputStream(FileUtil.TEMP_DIR.resolve(responseId.toString()), StandardOpenOption.DELETE_ON_CLOSE);
}
}