Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kinesis and solr Akka 2.6 updates and other things #2575

Merged
merged 10 commits into from
Mar 2, 2021
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ lazy val jms = alpakkaProject("jms", "jms", Dependencies.Jms, fatalWarnings := t

lazy val jsonStreaming = alpakkaProject("json-streaming", "json.streaming", Dependencies.JsonStreaming)

lazy val kinesis = alpakkaProject("kinesis", "aws.kinesis", Dependencies.Kinesis)
lazy val kinesis = alpakkaProject("kinesis", "aws.kinesis", Dependencies.Kinesis, fatalWarnings := true)

lazy val kudu = alpakkaProject("kudu", "kudu", Dependencies.Kudu, fork in Test := false)

Expand Down Expand Up @@ -267,7 +267,7 @@ lazy val eventbridge =

lazy val sns = alpakkaProject("sns", "aws.sns", Dependencies.Sns)

lazy val solr = alpakkaProject("solr", "solr", Dependencies.Solr)
lazy val solr = alpakkaProject("solr", "solr", Dependencies.Solr, fatalWarnings := true)

lazy val sqs = alpakkaProject("sqs", "aws.sqs", Dependencies.Sqs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private[kinesis] class KinesisSchedulerSourceStage(
callback.invoke(NewRecord(record))
}
override def onPull(): Unit = awaitingRecords(Pump)
override def onDownstreamFinish(): Unit = awaitingRecords(Complete)
override def onDownstreamFinish(cause: Throwable): Unit = awaitingRecords(Complete)
@tailrec
private def awaitingRecords(in: Command): Unit = in match {
case NewRecord(record) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object KinesisSchedulerSource {
settings: KinesisSchedulerSourceSettings
): Source[CommittableRecord, Future[Scheduler]] =
Source
.setup { (mat, _) =>
.fromMaterializer { (mat, _) =>
import mat.executionContext
Source
.fromGraph(new KinesisSchedulerSourceStage(settings, schedulerBuilder))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.ActorMaterializer;
import akka.stream.alpakka.kinesis.ShardSettings;
import akka.stream.alpakka.testkit.javadsl.LogCapturingJunit4;
import akka.stream.javadsl.Sink;
Expand All @@ -32,7 +30,6 @@ public class KinesisTest {
@Rule public final LogCapturingJunit4 logCapturing = new LogCapturingJunit4();

private static ActorSystem system;
private static ActorMaterializer materializer;
private static ShardSettings settings;
private static KinesisAsyncClient amazonKinesisAsync;

Expand All @@ -42,7 +39,6 @@ public static void setup() throws Exception {
System.setProperty("aws.secretKey", "someSecretKey");

system = ActorSystem.create();
materializer = ActorMaterializer.create(system);

settings = ShardSettings.create("my-stream", "shard-id");
amazonKinesisAsync = mock(KinesisAsyncClient.class);
Expand Down Expand Up @@ -71,24 +67,21 @@ public void PullRecord() throws Exception {
when(amazonKinesisAsync.getShardIterator((GetShardIteratorRequest) any()))
.thenAnswer(
(Answer)
invocation -> {
return CompletableFuture.completedFuture(
GetShardIteratorResponse.builder().build());
});
invocation ->
CompletableFuture.completedFuture(GetShardIteratorResponse.builder().build()));

when(amazonKinesisAsync.getRecords((GetRecordsRequest) any()))
.thenAnswer(
(Answer)
invocation -> {
return CompletableFuture.completedFuture(
GetRecordsResponse.builder()
.records(Record.builder().sequenceNumber("1").build())
.nextShardIterator("iter")
.build());
});
invocation ->
CompletableFuture.completedFuture(
GetRecordsResponse.builder()
.records(Record.builder().sequenceNumber("1").build())
.nextShardIterator("iter")
.build()));

final Source<Record, NotUsed> source = KinesisSource.basic(settings, amazonKinesisAsync);
final CompletionStage<Record> record = source.runWith(Sink.head(), materializer);
final CompletionStage<Record> record = source.runWith(Sink.head(), system);

assertEquals("1", record.toCompletableFuture().get(10, TimeUnit.SECONDS).sequenceNumber());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.alpakka.kinesisfirehose.KinesisFirehoseFlowSettings;
import akka.stream.alpakka.kinesisfirehose.javadsl.KinesisFirehoseFlow;
import akka.stream.alpakka.kinesisfirehose.javadsl.KinesisFirehoseSink;
Expand All @@ -25,7 +24,6 @@ public void snippets() {
// #init-client

final ActorSystem system = ActorSystem.create();
final ActorMaterializer materializer = ActorMaterializer.create(system);

final software.amazon.awssdk.services.firehose.FirehoseAsyncClient amazonFirehoseAsync =
FirehoseAsyncClient.builder()
Expand Down
2 changes: 0 additions & 2 deletions kinesis/src/test/java/docs/javadsl/KinesisSnippets.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.alpakka.kinesis.KinesisFlowSettings;
import akka.stream.alpakka.kinesis.ShardIterators;
import akka.stream.alpakka.kinesis.ShardSettings;
Expand Down Expand Up @@ -37,7 +36,6 @@ public void snippets() {
// #init-client

final ActorSystem system = ActorSystem.create();
final ActorMaterializer materializer = ActorMaterializer.create(system);

final software.amazon.awssdk.services.kinesis.KinesisAsyncClient amazonKinesisAsync =
KinesisAsyncClient.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package akka.stream.alpakka.kinesis
import java.util.concurrent.{Executors, TimeoutException}

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Materializer}
import com.typesafe.config.ConfigFactory
import org.scalatestplus.mockito.MockitoSugar
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Suite}
Expand All @@ -24,7 +23,6 @@ trait DefaultTestContext extends BeforeAndAfterAll with BeforeAndAfterEach with
akka.stream.materializer.max-input-buffer-size = 1
""")
)
implicit protected val materializer: Materializer = ActorMaterializer()
private val threadPool = Executors.newFixedThreadPool(10)
implicit protected val executionContext: ExecutionContextExecutor =
ExecutionContext.fromExecutor(threadPool)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package akka.stream.alpakka.kinesis

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Materializer}
import org.mockito.Mockito.reset
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Suite}
import org.scalatestplus.mockito.MockitoSugar
Expand All @@ -17,7 +16,6 @@ import scala.concurrent.duration._
trait KinesisMock extends BeforeAndAfterAll with BeforeAndAfterEach with MockitoSugar { this: Suite =>

implicit protected val system: ActorSystem = ActorSystem()
implicit protected val materializer: Materializer = ActorMaterializer()
implicit protected val amazonKinesisAsync: KinesisAsyncClient = mock[KinesisAsyncClient]

override protected def beforeEach(): Unit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package akka.stream.alpakka.kinesisfirehose

import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Materializer}
import org.mockito.Mockito.reset
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Suite}
import org.scalatestplus.mockito.MockitoSugar
Expand All @@ -17,7 +16,6 @@ import scala.concurrent.duration._
trait KinesisFirehoseMock extends BeforeAndAfterAll with BeforeAndAfterEach with MockitoSugar { this: Suite =>

implicit protected val system: ActorSystem = ActorSystem()
implicit protected val materializer: Materializer = ActorMaterializer()
implicit protected val amazonKinesisFirehoseAsync: FirehoseAsyncClient = mock[FirehoseAsyncClient]

override protected def beforeEach(): Unit =
Expand Down
6 changes: 2 additions & 4 deletions kinesis/src/test/scala/docs/scaladsl/KclSnippets.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ package docs.scaladsl
import java.util.UUID

import akka.actor.ActorSystem
import akka.stream.scaladsl.Sink
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.alpakka.kinesis.{KinesisSchedulerCheckpointSettings, KinesisSchedulerSourceSettings}
import akka.stream.alpakka.kinesis.scaladsl.KinesisSchedulerSource
import akka.stream.alpakka.kinesis.{KinesisSchedulerCheckpointSettings, KinesisSchedulerSourceSettings}
import akka.stream.scaladsl.Sink
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
Expand All @@ -25,7 +24,6 @@ class KclSnippets {

//#init-system
implicit val system: ActorSystem = ActorSystem()
implicit val materializer: Materializer = ActorMaterializer()
//#init-system

//#init-clients
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import akka.actor.ActorSystem
import akka.stream.alpakka.kinesisfirehose.KinesisFirehoseFlowSettings
import akka.stream.alpakka.kinesisfirehose.scaladsl.{KinesisFirehoseFlow, KinesisFirehoseSink}
import akka.stream.scaladsl.{Flow, Sink}
import akka.stream.{ActorMaterializer, Materializer}
import software.amazon.awssdk.services.firehose.model.{PutRecordBatchResponseEntry, Record}

object KinesisFirehoseSnippets {
Expand All @@ -19,7 +18,6 @@ object KinesisFirehoseSnippets {
import software.amazon.awssdk.services.firehose.FirehoseAsyncClient

implicit val system: ActorSystem = ActorSystem()
implicit val materializer: Materializer = ActorMaterializer()

implicit val amazonKinesisFirehoseAsync: software.amazon.awssdk.services.firehose.FirehoseAsyncClient =
FirehoseAsyncClient
Expand Down
2 changes: 0 additions & 2 deletions kinesis/src/test/scala/docs/scaladsl/KinesisSnippets.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import akka.actor.ActorSystem
import akka.stream.alpakka.kinesis.scaladsl.{KinesisFlow, KinesisSink, KinesisSource}
import akka.stream.alpakka.kinesis.{KinesisFlowSettings, ShardIterator, ShardSettings}
import akka.stream.scaladsl.{Flow, FlowWithContext, Sink, Source}
import akka.stream.{ActorMaterializer, Materializer}
import akka.util.ByteString
import software.amazon.awssdk.services.kinesis.model.{PutRecordsRequestEntry, PutRecordsResultEntry, Record}

Expand All @@ -24,7 +23,6 @@ object KinesisSnippets {
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient

implicit val system: ActorSystem = ActorSystem()
implicit val materializer: Materializer = ActorMaterializer()

implicit val amazonKinesisAsync: software.amazon.awssdk.services.kinesis.KinesisAsyncClient =
KinesisAsyncClient
Expand Down
4 changes: 2 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ object Dependencies {
val InfluxDBJavaVersion = "2.15"

val AwsSdk2Version = "2.11.14"
val AwsSpiAkkaHttpVersion = "0.0.10"
val AwsSpiAkkaHttpVersion = "0.0.11"
// Sync with plugins.sbt
val AkkaGrpcBinaryVersion = "1.0"
val AkkaHttp101 = "10.1.11"
Expand Down Expand Up @@ -321,7 +321,7 @@ object Dependencies {
) ++ Seq(
"software.amazon.awssdk" % "kinesis" % AwsSdk2Version, // ApacheV2
"software.amazon.awssdk" % "firehose" % AwsSdk2Version, // ApacheV2
"software.amazon.kinesis" % "amazon-kinesis-client" % "2.2.7" // ApacheV2
"software.amazon.kinesis" % "amazon-kinesis-client" % "2.3.3" // ApacheV2
).map(
_.excludeAll(
ExclusionRule("software.amazon.awssdk", "apache-client"),
Expand Down
Loading