Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FTP: Add for FTPS the ability to set KeyManager and TrustManager #205

Merged
merged 9 commits into from
Aug 25, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ private[ftp] trait FtpsOperations extends CommonFtpOperations {
Try {
connectionSettings.proxy.foreach(ftpClient.setProxy)

connectionSettings.keyManager.foreach(ftpClient.setKeyManager)
connectionSettings.trustManager.foreach(ftpClient.setTrustManager)

ftpClient.connect(connectionSettings.host, connectionSettings.port)

connectionSettings.configureConnection(ftpClient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package org.apache.pekko.stream.connectors.ftp

import java.net.InetAddress
import java.net.Proxy
import javax.net.ssl.KeyManager
import javax.net.ssl.TrustManager
import java.nio.file.attribute.PosixFilePermission

import org.apache.pekko.annotation.{ DoNotInherit, InternalApi }
Expand Down Expand Up @@ -173,7 +175,9 @@ final class FtpsSettings private (
val binary: Boolean,
val passiveMode: Boolean,
val configureConnection: FTPSClient => Unit,
val proxy: Option[Proxy]) extends FtpFileSettings {
val proxy: Option[Proxy],
val keyManager: Option[KeyManager],
val trustManager: Option[TrustManager]) extends FtpFileSettings {

def withHost(value: java.net.InetAddress): FtpsSettings = copy(host = value)
def withPort(value: Int): FtpsSettings = copy(port = value)
Expand All @@ -182,6 +186,8 @@ final class FtpsSettings private (
def withPassiveMode(value: Boolean): FtpsSettings =
if (passiveMode == value) this else copy(passiveMode = value)
def withProxy(value: Proxy): FtpsSettings = copy(proxy = Some(value))
def withKeyManager(value: KeyManager): FtpsSettings = copy(keyManager = Some(value))
def withTrustManager(value: TrustManager): FtpsSettings = copy(trustManager = Some(value))

/**
* Scala API:
Expand All @@ -205,14 +211,18 @@ final class FtpsSettings private (
binary: Boolean = binary,
passiveMode: Boolean = passiveMode,
configureConnection: FTPSClient => Unit = configureConnection,
proxy: Option[Proxy] = proxy): FtpsSettings = new FtpsSettings(
proxy: Option[Proxy] = proxy,
keyManager: Option[KeyManager] = keyManager,
trustManager: Option[TrustManager] = trustManager): FtpsSettings = new FtpsSettings(
host = host,
port = port,
credentials = credentials,
binary = binary,
passiveMode = passiveMode,
configureConnection = configureConnection,
proxy = proxy)
proxy = proxy,
keyManager = keyManager,
trustManager = trustManager)

override def toString =
"FtpsSettings(" +
Expand All @@ -222,7 +232,9 @@ final class FtpsSettings private (
s"binary=$binary," +
s"passiveMode=$passiveMode," +
s"configureConnection=$configureConnection," +
s"proxy=$proxy)"
s"proxy=$proxy" +
s"keyManager=$keyManager" +
s"trustManager=$trustManager)"
}

/**
Expand All @@ -241,7 +253,9 @@ object FtpsSettings {
binary = false,
passiveMode = false,
configureConnection = _ => (),
proxy = None)
proxy = None,
keyManager = None,
trustManager = None)

/** Java API */
def create(host: java.net.InetAddress): FtpsSettings = apply(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
TheDeadOne marked this conversation as resolved.
Show resolved Hide resolved
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package org.apache.pekko.stream.connectors.ftp;

import nl.altindag.ssl.util.PemUtils;
import org.apache.pekko.NotUsed;
import org.apache.pekko.stream.IOResult;
import org.apache.pekko.stream.connectors.ftp.javadsl.Ftps;
import org.apache.pekko.stream.connectors.testkit.javadsl.LogCapturingJunit4;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.util.ByteString;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;

import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import javax.net.ssl.*;
import java.net.Socket;
import java.security.cert.X509Certificate;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.*;

public class FtpsWithTrustAndKeyManagersStageTest extends BaseFtpSupport implements CommonFtpStageTest {
private static final String PEM_PATH = "ftpd/pure-ftpd.pem";

@Rule public final LogCapturingJunit4 logCapturing = new LogCapturingJunit4();

private X509ExtendedKeyManager keyManager;
private X509ExtendedTrustManager trustManager;

@Test
public void listFiles() throws Exception {
CommonFtpStageTest.super.listFiles();

verify(trustManager).checkServerTrusted(any(X509Certificate[].class), anyString(), any(Socket.class));
}

public Source<FtpFile, NotUsed> getBrowserSource(String basePath) throws Exception {
return Ftps.ls(basePath, settings());
}

public Source<ByteString, CompletionStage<IOResult>> getIOSource(String path) throws Exception {
return Ftps.fromPath(path, settings());
}

public Sink<ByteString, CompletionStage<IOResult>> getIOSink(String path) throws Exception {
return Ftps.toPath(path, settings());
}

public Sink<FtpFile, CompletionStage<IOResult>> getRemoveSink() throws Exception {
return Ftps.remove(settings());
}

public Sink<FtpFile, CompletionStage<IOResult>> getMoveSink(
Function<FtpFile, String> destinationPath) throws Exception {
return Ftps.move(destinationPath, settings());
}

private FtpsSettings settings() throws Exception {
keyManager = keyManager();
trustManager = trustManager();

return FtpsSettings.create(InetAddress.getByName(HOSTNAME))
.withPort(PORT)
.withCredentials(CREDENTIALS)
.withBinary(false)
.withPassiveMode(true)
.withTrustManager(trustManager)
.withKeyManager(keyManager);
}

private X509ExtendedKeyManager keyManager() throws IOException {
try (InputStream stream = classLoader().getResourceAsStream(PEM_PATH)) {
X509ExtendedKeyManager manager = PemUtils.loadIdentityMaterial(stream);
return Mockito.spy(manager);
}
}

private X509ExtendedTrustManager trustManager() throws IOException {
try (InputStream stream = classLoader().getResourceAsStream(PEM_PATH)) {
X509ExtendedTrustManager manager = PemUtils.loadTrustMaterial(stream);
return Mockito.spy(manager);
}
}

private ClassLoader classLoader() {
return FtpsWithTrustAndKeyManagersStageTest.class.getClassLoader();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mock-maker-inline
6 changes: 4 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,10 @@ object Dependencies {
val Ftp = Seq(
libraryDependencies ++= Seq(
"commons-net" % "commons-net" % "3.8.0", // ApacheV2
"com.hierynomus" % "sshj" % "0.33.0" // ApacheV2
))
"com.hierynomus" % "sshj" % "0.33.0", // ApacheV2
"io.github.hakky54" % "sslcontext-kickstart-for-pem" % "6.8.0" % Test,
"org.mockito" % "mockito-core" % "4.11.0" % Test,
"org.mockito" % "mockito-inline" % "4.11.0" % Test))

val GeodeVersion = "1.15.0"
val GeodeVersionForDocs = "115"
Expand Down