Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to paradox theme with new cookie banner #2564

Merged
merged 4 commits into from Feb 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -53,7 +53,8 @@
// #session
import akka.stream.alpakka.couchbase.CouchbaseSessionSettings;
import akka.stream.alpakka.couchbase.javadsl.CouchbaseSession;
// #session #registry
// #session
// #registry
import java.util.stream.Collectors;
// #sessionFromBucket
import com.couchbase.client.java.Bucket;
Expand Down
12 changes: 6 additions & 6 deletions docs/src/main/paradox/jms/consumer.md
Expand Up @@ -142,12 +142,12 @@ Java

## Raw JVM type sources

| Stream element type | Alpakka source factory |
|-------------------------------------------------------|--------------------------|
| String | [`JmsConsumer.textSource`](#text-sources) |
| @scala[Array[Byte]]@java[byte[]] | [`JmsConsumer.bytesSource`](#byte-array-sources) |
| @scala[Map[String, AnyRef]]@java[Map<String, Object>] | [`JmsConsumer.mapSource`](#map-messages-sources) |
| Object (`java.io.Serializable`) | [`JmsConsumer.objectSource`](#object-sources) |
| Stream element type | Alpakka source factory |
|-----------------------------------------------------------|--------------------------|
| `String` | [`JmsConsumer.textSource`](#text-sources) |
| @scala[`Array[Byte]`]@java[`byte[]`] | [`JmsConsumer.bytesSource`](#byte-array-sources) |
| @scala[`Map[String, AnyRef]`]@java[`Map<String, Object>`] | [`JmsConsumer.mapSource`](#map-messages-sources) |
| `Object` (`java.io.Serializable`) | [`JmsConsumer.objectSource`](#object-sources) |

### Text sources

Expand Down
22 changes: 11 additions & 11 deletions docs/src/main/paradox/jms/producer.md
Expand Up @@ -8,17 +8,17 @@ The Alpakka JMS connector offers producing JMS messages to topics or queues in t

The JMS message model supports several types of message bodies in (see @javadoc[javax.jms.Message](javax.jms.Message)), which may be created directly from the Akka Stream elements, or in wrappers to access more advanced features.

| Stream element type | Alpakka producer |
|-------------------------------------------------------|--------------------------|
| String | [`JmsProducer.textSink`](#text-sinks) |
| @scala[Array[Byte]]@java[byte[]] | [`JmsProducer.bytesSink`](#byte-array-sinks) |
| @scala[Map[String, AnyRef]]@java[Map<String, Object>] | [`JmsProducer.mapSink`](#map-messages-sinks) |
| Object (`java.io.Serializable`) | [`JmsProducer.objectSink`](#object-sinks) |
| `JmsTextMessage` | [`JmsProducer.sink`](#a-jmsmessage-sub-type-sink) or [`JmsProducer.flow`](#sending-messages-as-a-flow) |
| `JmsByteMessage` | [`JmsProducer.sink`](#a-jmsmessage-sub-type-sink) or [`JmsProducer.flow`](#sending-messages-as-a-flow) |
| `JmsByteStringMessage` | [`JmsProducer.sink`](#a-jmsmessage-sub-type-sink) or [`JmsProducer.flow`](#sending-messages-as-a-flow) |
| `JmsMapMessage` | [`JmsProducer.sink`](#a-jmsmessage-sub-type-sink) or [`JmsProducer.flow`](#sending-messages-as-a-flow) |
| `JmsObjectMessage` | [`JmsProducer.sink`](#a-jmsmessage-sub-type-sink) or [`JmsProducer.flow`](#sending-messages-as-a-flow) |
| Stream element type | Alpakka producer |
|-----------------------------------------------------------|--------------------------|
| `String` | [`JmsProducer.textSink`](#text-sinks) |
| @scala[`Array[Byte]`]@java[`byte[]`] | [`JmsProducer.bytesSink`](#byte-array-sinks) |
| @scala[`Map[String, AnyRef]`]@java[`Map<String, Object>`] | [`JmsProducer.mapSink`](#map-messages-sinks) |
| `Object` (`java.io.Serializable`) | [`JmsProducer.objectSink`](#object-sinks) |
| `JmsTextMessage` | [`JmsProducer.sink`](#a-jmsmessage-sub-type-sink) or [`JmsProducer.flow`](#sending-messages-as-a-flow) |
| `JmsByteMessage` | [`JmsProducer.sink`](#a-jmsmessage-sub-type-sink) or [`JmsProducer.flow`](#sending-messages-as-a-flow) |
| `JmsByteStringMessage` | [`JmsProducer.sink`](#a-jmsmessage-sub-type-sink) or [`JmsProducer.flow`](#sending-messages-as-a-flow) |
| `JmsMapMessage` | [`JmsProducer.sink`](#a-jmsmessage-sub-type-sink) or [`JmsProducer.flow`](#sending-messages-as-a-flow) |
| `JmsObjectMessage` | [`JmsProducer.sink`](#a-jmsmessage-sub-type-sink) or [`JmsProducer.flow`](#sending-messages-as-a-flow) |
| @scala[`JmsEnvelope[PassThrough]`]@java[`JmsEnvelope<PassThrough>`] with instances `JmsPassThrough`, `JmsTextMessagePassThrough`, `JmsByteMessagePassThrough`, `JmsByteStringMessagePassThrough`, `JmsMapMessagePassThrough`, `JmsObjectMessagePassThrough` | [`JmsProducer.flexiFlow`](#passing-context-through-the-producer) |


Expand Down
96 changes: 96 additions & 0 deletions ftp/src/test/scala/docs/scaladsl/SftpSpec.scala
@@ -0,0 +1,96 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package docs.scaladsl

import java.net.InetAddress

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import akka.stream.alpakka.ftp.scaladsl.{Sftp, SftpApi}
import akka.stream.alpakka.ftp.{FtpCredentials, SftpSettings}
import akka.stream.scaladsl.Sink
import net.schmizz.sshj.transport.verification.PromiscuousVerifier
import net.schmizz.sshj.userauth.method.AuthPassword
import net.schmizz.sshj.userauth.password.{PasswordFinder, Resource}
import net.schmizz.sshj.{DefaultConfig, SSHClient}

import scala.concurrent.Await
import scala.concurrent.duration.DurationInt

class StreamingSftpTransport {
implicit val system: ActorSystem = ActorSystem("my-service")

private val PORT = 22
private val USER = "conor.griffin"
private val CREDENTIALS = FtpCredentials.create(USER, "password!")
private val BASE_PATH = s"/Users/$USER"
private val FILE_NAME = "10mfile"
private val CHUNK_SIZE = 131072

// Set up the source system connection
private val SOURCE_HOSTNAME = "suv1"

private val sourceSettings = SftpSettings(host = InetAddress.getByName(SOURCE_HOSTNAME))
.withCredentials(FtpCredentials.create("testsftp", "t3st123"))
.withPort(PORT)
.withStrictHostKeyChecking(false)

private val sourceClient = new SSHClient(new DefaultConfig) {}
private val configuredSourceClient: SftpApi = Sftp(sourceClient)

// Set up the destination system connection

private val DEST_HOSTNAME = "localhost"
private val destSettings = SftpSettings(host = InetAddress.getByName(DEST_HOSTNAME))
.withCredentials(CREDENTIALS)
.withPort(PORT)
.withStrictHostKeyChecking(false)

private val destClient = new SSHClient(new DefaultConfig)
private val configuredDestClient: SftpApi = Sftp(destClient)

private val decider: Supervision.Decider = {
case a =>
print(a.getMessage)
Supervision.resume
}

implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withSupervisionStrategy(decider))

def doTransfer(): Unit = {
println("Streaming")
val source = configuredSourceClient.fromPath(s"/home/testsftp/$FILE_NAME", sourceSettings, CHUNK_SIZE)
// val sink = configuredDestClient.toPath(s"$BASE_PATH/$FILE_NAME.out", destSettings)
val runnable = source
.runWith(Sink.ignore)

println("Streaming: Starting")
val start = System.currentTimeMillis()
Await.result(runnable, 180.seconds)
val end = System.currentTimeMillis()
println(s"Streaming: ${end - start}")

}

def doSftpTransfer(): Unit = {
println("SFTP")
val ssh = new SSHClient(new DefaultConfig)
ssh.addHostKeyVerifier(new PromiscuousVerifier)
ssh.connect(SOURCE_HOSTNAME, 22)
val passwordAuth: AuthPassword = new AuthPassword(new PasswordFinder() {
def reqPassword(resource: Resource[_]): Array[Char] = "t3st123".toCharArray
def shouldRetry(resource: Resource[_]) = false
})
ssh.auth("testsftp", passwordAuth)

println("SFTP: Starting")
val start = System.currentTimeMillis()
ssh.newSFTPClient().get("/home/testsftp/10mfile", "/Users/conor.griffin/Downloads/10mfile.sftp")
val end = System.currentTimeMillis()
println(s"SFTP: ${end - start}")

}

}
24 changes: 16 additions & 8 deletions jms/src/test/java/docs/javadsl/JmsConnectorsTest.java
Expand Up @@ -121,11 +121,13 @@ private List<JmsTextMessage> createTestMessageList() {
public void publishAndConsumeJmsTextMessage() throws Exception {
withServer(
server -> {
// #connection-factory #text-sink
// #connection-factory
// #text-sink
// #text-source
javax.jms.ConnectionFactory connectionFactory = server.createConnectionFactory();
// #text-source
// #connection-factory #text-sink
// #connection-factory
// #text-sink

// #text-sink

Expand Down Expand Up @@ -155,15 +157,17 @@ public void publishAndConsumeJmsTextMessage() throws Exception {
public void publishAndConsumeJmsObjectMessage() throws Exception {
withServer(
server -> {
// #connection-factory-object #object-sink
// #connection-factory-object
// #object-sink
// #object-source
ActiveMQConnectionFactory connectionFactory =
(ActiveMQConnectionFactory) server.createConnectionFactory();
connectionFactory.setTrustedPackages(
Arrays.asList(DummyJavaTests.class.getPackage().getName()));

// #object-source
// #connection-factory-object #object-sink
// #connection-factory-object
// #object-sink

// #object-sink
Sink<java.io.Serializable, CompletionStage<Done>> jmsSink =
Expand Down Expand Up @@ -193,10 +197,12 @@ public void publishAndConsumeJmsObjectMessage() throws Exception {
public void publishAndConsumeJmsByteMessage() throws Exception {
withServer(
server -> {
// #bytearray-sink #bytearray-source
// #bytearray-sink
// #bytearray-source
ConnectionFactory connectionFactory = server.createConnectionFactory();

// #bytearray-sink #bytearray-source
// #bytearray-sink
// #bytearray-source

// #bytearray-sink
Sink<byte[], CompletionStage<Done>> jmsSink =
Expand Down Expand Up @@ -225,10 +231,12 @@ public void publishAndConsumeJmsByteMessage() throws Exception {
public void publishAndConsumeJmsMapMessage() throws Exception {
withServer(
server -> {
// #map-sink #map-source
// #map-sink
// #map-source
ConnectionFactory connectionFactory = server.createConnectionFactory();

// #map-sink #map-source
// #map-sink
// #map-source
// #map-sink
Sink<Map<String, Object>, CompletionStage<Done>> jmsSink =
JmsProducer.mapSink(
Expand Down
26 changes: 9 additions & 17 deletions jms/src/test/scala/docs/scaladsl/JmsConnectorsSpec.scala
Expand Up @@ -40,10 +40,12 @@ class JmsConnectorsSpec extends JmsSpec {
"The JMS Connectors" should {
"publish and consume strings through a queue" in withServer() { server =>
val url = server.brokerUri
//#connection-factory #text-sink
//#connection-factory
//#text-sink
//#text-source
val connectionFactory: javax.jms.ConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(url)
//#connection-factory #text-sink
//#connection-factory
//#text-sink
//#text-source

//#text-sink
Expand Down Expand Up @@ -71,10 +73,12 @@ class JmsConnectorsSpec extends JmsSpec {
}

"publish and consume serializable objects through a queue" in withConnectionFactory() { connFactory =>
//#object-sink #object-source
//#object-sink
//#object-source
val connectionFactory = connFactory.asInstanceOf[ActiveMQConnectionFactory]
connectionFactory.setTrustedPackages(List(classOf[DummyObject].getPackage.getName).asJava)
//#object-sink #object-source
//#object-sink
//#object-source

//#object-sink

Expand Down Expand Up @@ -104,8 +108,6 @@ class JmsConnectorsSpec extends JmsSpec {
}

"publish and consume bytearray through a queue" in withConnectionFactory() { connectionFactory =>
//#bytearray-sink #bytearray-source
//#bytearray-sink #bytearray-source
//#bytearray-sink
val jmsSink: Sink[Array[Byte], Future[Done]] = JmsProducer.bytesSink(
JmsProducerSettings(system, connectionFactory).withQueue("test")
Expand Down Expand Up @@ -133,9 +135,6 @@ class JmsConnectorsSpec extends JmsSpec {
}

"publish and consume map through a queue" in withConnectionFactory() { connectionFactory =>
//#map-sink #map-source
//#map-sink #map-source

//#map-sink
val jmsSink: Sink[Map[String, Any], Future[Done]] = JmsProducer.mapSink(
JmsProducerSettings(system, connectionFactory).withQueue("test")
Expand Down Expand Up @@ -188,9 +187,6 @@ class JmsConnectorsSpec extends JmsSpec {

"publish and consume JMS text messages with properties through a queue" in withConnectionFactory() {
connectionFactory =>
//#jms-source
//#jms-source

val jmsSink: Sink[JmsTextMessage, Future[Done]] = JmsProducer.sink(
JmsProducerSettings(producerConfig, connectionFactory).withQueue("numbers")
)
Expand Down Expand Up @@ -227,7 +223,7 @@ class JmsConnectorsSpec extends JmsSpec {
//#jms-source

control.shutdown()
//#jms-source
//#jms-source
}

"publish and consume JMS text messages" in withConnectionFactory() { connectionFactory =>
Expand Down Expand Up @@ -760,10 +756,6 @@ class JmsConnectorsSpec extends JmsSpec {
}

"browse" in withConnectionFactory() { connectionFactory =>
// format: off
//#browse-source
//#browse-source
// format: on
val in = List(1 to 100).map(_.toString())

withClue("write some messages") {
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Expand Up @@ -8,7 +8,7 @@ addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.7.0")
addSbtPlugin("com.lightbend" % "sbt-whitesource" % "0.1.18")
addSbtPlugin("com.lightbend.sbt" % "sbt-java-formatter" % "0.5.1")
// docs
addSbtPlugin("com.lightbend.akka" % "sbt-paradox-akka" % "0.35")
addSbtPlugin("com.lightbend.akka" % "sbt-paradox-akka" % "0.37")
addSbtPlugin("com.lightbend.paradox" % "sbt-paradox-dependencies" % "0.2.1")
addSbtPlugin("com.lightbend.sbt" % "sbt-publish-rsync" % "0.2")
addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.4.3")
Expand Down