Skip to content

Commit

Permalink
Setting expiry time for http download requests
Browse files Browse the repository at this point in the history
  • Loading branch information
DImuthuUpe committed Jan 19, 2021
1 parent fe56f1c commit f7495b7
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 9 deletions.
11 changes: 10 additions & 1 deletion agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
Expand Up @@ -63,10 +63,19 @@ public class MFTAgent implements CommandLineRunner {

@org.springframework.beans.factory.annotation.Value("${agent.id}")
private String agentId;

@org.springframework.beans.factory.annotation.Value("${agent.host}")
private String agentHost;

@org.springframework.beans.factory.annotation.Value("${agent.user}")
private String agentUser;

@org.springframework.beans.factory.annotation.Value("${agent.http.port}")
private Integer agentHttpPort;

@org.springframework.beans.factory.annotation.Value("${agent.https.enabled}")
private boolean agentHttpsEnabled;

@org.springframework.beans.factory.annotation.Value("${agent.supported.protocols}")
private String supportedProtocols;

Expand Down Expand Up @@ -230,7 +239,7 @@ private void acceptHTTPRequests() {
logger.info("Starting the HTTP front end");

new Thread(() -> {
HttpServer httpServer = new HttpServer(3333, false, transferRequestsStore);
HttpServer httpServer = new HttpServer(agentHost, agentHttpPort, agentHttpsEnabled, transferRequestsStore);
try {
httpServer.run();
} catch (Exception e) {
Expand Down
Expand Up @@ -27,14 +27,20 @@
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpServer {

private static final Logger logger = LoggerFactory.getLogger(HttpServer.class);

private final String host;
private final int port;
private final boolean enableSSL;
private HttpTransferRequestsStore transferRequestsStore;

public HttpServer(int port, boolean enableSSL, HttpTransferRequestsStore transferRequestsStore) {
public HttpServer(String host, int port, boolean enableSSL, HttpTransferRequestsStore transferRequestsStore) {
this.host = host;
this.port = port;
this.enableSSL = enableSSL;
this.transferRequestsStore = transferRequestsStore;
Expand All @@ -59,10 +65,9 @@ public void run() throws Exception {
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new HttpServerInitializer(sslCtx, transferRequestsStore));

Channel ch = b.bind(port).sync().channel();
Channel ch = b.bind(host, port).sync().channel();

System.err.println("Open your web browser and navigate to " +
(enableSSL? "https" : "http") + "://127.0.0.1:" + port + '/');
logger.info("Http endpoint is open on {}://{}:{}", (enableSSL? "https" : "http"), host, port);

ch.closeFuture().sync();
} finally {
Expand Down
Expand Up @@ -25,6 +25,7 @@ public class HttpTransferRequest {
private MetadataCollector otherMetadataCollector;
private ConnectorParams connectorParams;
private String targetResourcePath;
private long createdTime = System.currentTimeMillis();

public Connector getOtherConnector() {
return otherConnector;
Expand Down Expand Up @@ -61,4 +62,13 @@ public HttpTransferRequest setConnectorParams(ConnectorParams connectorParams) {
this.connectorParams = connectorParams;
return this;
}

public long getCreatedTime() {
return createdTime;
}

public HttpTransferRequest setCreatedTime(long createdTime) {
this.createdTime = createdTime;
return this;
}
}
Expand Up @@ -17,14 +17,44 @@

package org.apache.airavata.mft.agent.http;

import java.util.HashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class HttpTransferRequestsStore {

final private Map<String, HttpTransferRequest> downloadRequestStore = new HashMap<>();
final private Map<String, HttpTransferRequest> uploadRequestStore = new HashMap<>();
private static final Logger logger = LoggerFactory.getLogger(HttpTransferRequestsStore.class);

final private Map<String, HttpTransferRequest> downloadRequestStore = new ConcurrentHashMap<>();
final private Map<String, HttpTransferRequest> uploadRequestStore = new ConcurrentHashMap<>();

final private ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
private long entryExpiryTimeMS = 300 * 1000;

public HttpTransferRequestsStore() {
monitor.scheduleWithFixedDelay(()-> {
logger.debug("Cleaning up the request store..");
downloadRequestStore.keySet().forEach(key -> {
if ((System.currentTimeMillis() - downloadRequestStore.get(key).getCreatedTime()) > entryExpiryTimeMS) {
downloadRequestStore.remove(key);
logger.info("Removed url {} from download cache", key);
}
});

uploadRequestStore.keySet().forEach(key -> {
if ((System.currentTimeMillis() - uploadRequestStore.get(key).getCreatedTime()) > entryExpiryTimeMS) {
uploadRequestStore.remove(key);
logger.info("Removed url {} from upload cache", key);
}
});
}, 2, 10, TimeUnit.SECONDS);
}

public String addDownloadRequest(HttpTransferRequest request) {
String randomUrl = UUID.randomUUID().toString();
Expand Down
Expand Up @@ -51,6 +51,15 @@ public class RPCParser {
@org.springframework.beans.factory.annotation.Value("${secret.service.port}")
private int secretServicePort;

@org.springframework.beans.factory.annotation.Value("${agent.host}")
private String agentHost;

@org.springframework.beans.factory.annotation.Value("${agent.http.port}")
private Integer agentHttpPort;

@org.springframework.beans.factory.annotation.Value("${agent.https.enabled}")
private boolean agentHttpsEnabled;

@Autowired
private HttpTransferRequestsStore httpTransferRequestsStore;

Expand Down Expand Up @@ -143,7 +152,7 @@ public String resolveRPCRequest(SyncRPCRequest request) throws Exception {
transferRequest.setOtherMetadataCollector(metadataCollectorOp.get());
transferRequest.setOtherConnector(connectorOp.get());
String url = httpTransferRequestsStore.addDownloadRequest(transferRequest);
return url;
return (agentHttpsEnabled? "https": "http") + "://" + agentHost + ":" + agentHttpPort + "/" + url;
}
break;
}
Expand Down
3 changes: 3 additions & 0 deletions agent/src/main/resources/application.properties
Expand Up @@ -19,7 +19,10 @@ spring.main.web-application-type=NONE
agent.id=agent0
agent.host=localhost
agent.user=dimuthu
agent.http.port=3333
agent.https.enabled=false
agent.supported.protocols=SCP,LOCAL,FTP

consul.host=localhost
consul.port=8500

Expand Down

0 comments on commit f7495b7

Please sign in to comment.