Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FTP: Add for FTPS the ability to set KeyManager and TrustManager #205

Merged
merged 9 commits into from
Aug 25, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ private[ftp] trait FtpsOperations extends CommonFtpOperations {
Try {
connectionSettings.proxy.foreach(ftpClient.setProxy)

connectionSettings.keyManager.foreach(ftpClient.setKeyManager)
connectionSettings.trustManager.foreach(ftpClient.setTrustManager)

if (ftpClient.getAutodetectUTF8() != connectionSettings.autodetectUTF8) {
ftpClient.setAutodetectUTF8(connectionSettings.autodetectUTF8)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package org.apache.pekko.stream.connectors.ftp

import java.net.InetAddress
import java.net.Proxy
import javax.net.ssl.KeyManager
import javax.net.ssl.TrustManager
import java.nio.file.attribute.PosixFilePermission

import org.apache.pekko.annotation.{ DoNotInherit, InternalApi }
Expand Down Expand Up @@ -185,7 +187,9 @@ final class FtpsSettings private (
val passiveMode: Boolean,
val autodetectUTF8: Boolean,
val configureConnection: FTPSClient => Unit,
val proxy: Option[Proxy]) extends FtpFileSettings {
val proxy: Option[Proxy],
val keyManager: Option[KeyManager],
val trustManager: Option[TrustManager]) extends FtpFileSettings {

def withHost(value: java.net.InetAddress): FtpsSettings = copy(host = value)
def withPort(value: Int): FtpsSettings = copy(port = value)
Expand All @@ -196,6 +200,8 @@ final class FtpsSettings private (
def withAutodetectUTF8(value: Boolean): FtpsSettings =
if (autodetectUTF8 == value) this else copy(autodetectUTF8 = value)
def withProxy(value: Proxy): FtpsSettings = copy(proxy = Some(value))
def withKeyManager(value: KeyManager): FtpsSettings = copy(keyManager = Some(value))
def withTrustManager(value: TrustManager): FtpsSettings = copy(trustManager = Some(value))

/**
* Scala API:
Expand All @@ -220,15 +226,19 @@ final class FtpsSettings private (
passiveMode: Boolean = passiveMode,
autodetectUTF8: Boolean = autodetectUTF8,
configureConnection: FTPSClient => Unit = configureConnection,
proxy: Option[Proxy] = proxy): FtpsSettings = new FtpsSettings(
proxy: Option[Proxy] = proxy,
keyManager: Option[KeyManager] = keyManager,
trustManager: Option[TrustManager] = trustManager): FtpsSettings = new FtpsSettings(
host = host,
port = port,
credentials = credentials,
binary = binary,
passiveMode = passiveMode,
autodetectUTF8 = autodetectUTF8,
configureConnection = configureConnection,
proxy = proxy)
proxy = proxy,
keyManager = keyManager,
trustManager = trustManager)

override def toString =
"FtpsSettings(" +
Expand All @@ -239,7 +249,9 @@ final class FtpsSettings private (
s"passiveMode=$passiveMode," +
s"autodetectUTF8=$autodetectUTF8" +
s"configureConnection=$configureConnection," +
s"proxy=$proxy)"
s"proxy=$proxy" +
s"keyManager=$keyManager" +
s"trustManager=$trustManager)"
}

/**
Expand All @@ -259,7 +271,9 @@ object FtpsSettings {
passiveMode = false,
autodetectUTF8 = false,
configureConnection = _ => (),
proxy = None)
proxy = None,
keyManager = None,
trustManager = None)

/** Java API */
def create(host: java.net.InetAddress): FtpsSettings = apply(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream.connectors.ftp

import org.apache.pekko
import pekko.stream.IOResult
import pekko.stream.scaladsl.{ Sink, Source }
import pekko.util.ByteString
import pekko.{ Done, NotUsed }
import org.mockito.ArgumentMatchers.{ any, anyString }
import org.mockito.Mockito.{ atLeastOnce, doNothing, verify }
import org.scalatestplus.mockito.MockitoSugar

import java.net.{ InetAddress, Socket }
import java.security.cert.X509Certificate
import javax.net.ssl.{ X509ExtendedKeyManager, X509ExtendedTrustManager }
import scala.concurrent.Future

class FtpsWithTrustAndKeyManagersStageSpec extends BaseFtpsSpec with CommonFtpStageSpec with MockitoSugar {

// The implementation of X509ExtendedTrustManager and X509ExtendedKeyManager is final so
// its not possible to put a Mockito spy on it, instead lets just mock the classes and the
// checkServerTrusted method which is executed only when trustManager/keyManager is setup in FtpsSettings

val keyManager: X509ExtendedKeyManager = mock[X509ExtendedKeyManager]
val trustManager: X509ExtendedTrustManager = mock[X509ExtendedTrustManager]

doNothing().when(trustManager).checkServerTrusted(any(classOf[Array[X509Certificate]]), anyString,
any(classOf[Socket]))

override val settings =
FtpsSettings(
InetAddress.getByName(HOSTNAME)).withPort(PORT)
.withCredentials(CREDENTIALS)
.withBinary(true)
.withPassiveMode(true)
.withTrustManager(trustManager)
.withKeyManager(keyManager)

private def verifyServerCheckCertificate(): Unit =
verify(trustManager, atLeastOnce()).checkServerTrusted(any(classOf[Array[X509Certificate]]), anyString,
any(classOf[Socket]))

private def verifyAfterStream[O, Mat](source: Source[O, Mat]): Source[O, Mat] =
source.map { result =>
verifyServerCheckCertificate()
result
}

private def verifyAfterStream[I, Mat](sink: Sink[I, Mat]): Sink[I, Mat] =
sink.mapMaterializedValue { result =>
verifyServerCheckCertificate()
result
}

override protected def listFiles(basePath: String): Source[FtpFile, NotUsed] =
verifyAfterStream(super.listFiles(basePath))

override protected def listFilesWithFilter(basePath: String, branchSelector: FtpFile => Boolean,
emitTraversedDirectories: Boolean): Source[FtpFile, NotUsed] =
verifyAfterStream(super.listFilesWithFilter(basePath, branchSelector, emitTraversedDirectories))

override protected def retrieveFromPath(path: String, fromRoot: Boolean): Source[ByteString, Future[IOResult]] =
verifyAfterStream(super.retrieveFromPath(path, fromRoot))

override protected def retrieveFromPathWithOffset(path: String, offset: Long): Source[ByteString, Future[IOResult]] =
verifyAfterStream(super.retrieveFromPathWithOffset(path, offset))

override protected def storeToPath(path: String, append: Boolean): Sink[ByteString, Future[IOResult]] =
verifyAfterStream(super.storeToPath(path, append))

override protected def remove(): Sink[FtpFile, Future[IOResult]] =
verifyAfterStream(super.remove())

override protected def move(destinationPath: FtpFile => String): Sink[FtpFile, Future[IOResult]] =
verifyAfterStream(super.move(destinationPath))

override protected def mkdir(basePath: String, name: String): Source[Done, NotUsed] =
verifyAfterStream(super.mkdir(basePath, name))

}
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ object Dependencies {
val Ftp = Seq(
libraryDependencies ++= Seq(
"commons-net" % "commons-net" % "3.8.0",
"com.hierynomus" % "sshj" % "0.33.0"))
"com.hierynomus" % "sshj" % "0.33.0") ++ Mockito)

val GeodeVersion = "1.15.0"
val GeodeVersionForDocs = "115"
Expand Down