Skip to content

Commit

Permalink
Addressing code review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
  • Loading branch information
reta committed Apr 6, 2022
1 parent 43dbd3c commit 753c587
Show file tree
Hide file tree
Showing 24 changed files with 178 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package akka.stream.alpakka.elasticsearch;

public enum ApiVersion implements akka.stream.alpakka.common.ApiVersion {
public enum ApiVersion implements akka.stream.alpakka.elasticsearch.ApiVersionBase {
V5,
V7
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.common;
package akka.stream.alpakka.elasticsearch;

/**
* Common interface to represent Opensearch / Elasticsearch versions.
*/
public interface ApiVersion {
public interface ApiVersionBase {

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

package akka.stream.alpakka.opensearch;

public enum ApiVersion implements akka.stream.alpakka.common.ApiVersion {
public enum ApiVersion implements akka.stream.alpakka.elasticsearch.ApiVersionBase {
V1
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ package akka.stream.alpakka.elasticsearch

import java.util.concurrent.TimeUnit

import akka.stream.alpakka.common.SourceSettings

import scala.concurrent.duration.FiniteDuration

/**
Expand All @@ -18,14 +16,18 @@ final class ElasticsearchSourceSettings private (connection: ElasticsearchConnec
bufferSize: Int,
includeDocumentVersion: Boolean,
scrollDuration: FiniteDuration,
apiVersion: ApiVersion)
extends SourceSettings[ApiVersion, ElasticsearchSourceSettings](connection, bufferSize, includeDocumentVersion, scrollDuration, apiVersion) {
apiVersion: ApiVersion)
extends SourceSettingsBase[ApiVersion, ElasticsearchSourceSettings](connection,
bufferSize,
includeDocumentVersion,
scrollDuration,
apiVersion) {

protected override def copy(connection: ElasticsearchConnectionSettings,
bufferSize: Int,
includeDocumentVersion: Boolean,
scrollDuration: FiniteDuration,
apiVersion: ApiVersion): ElasticsearchSourceSettings =
bufferSize: Int,
includeDocumentVersion: Boolean,
scrollDuration: FiniteDuration,
apiVersion: ApiVersion): ElasticsearchSourceSettings =
new ElasticsearchSourceSettings(connection = connection,
bufferSize = bufferSize,
includeDocumentVersion = includeDocumentVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

package akka.stream.alpakka.elasticsearch

import akka.stream.alpakka.common.WriteSettings

import akka.util.JavaDurationConverters._

import scala.concurrent.duration._
Expand Down Expand Up @@ -66,15 +64,20 @@ final class ElasticsearchWriteSettings private (connection: ElasticsearchConnect
retryLogic: RetryLogic,
versionType: Option[String],
apiVersion: ApiVersion,
allowExplicitIndex: Boolean)
extends WriteSettings[ApiVersion, ElasticsearchWriteSettings](connection, bufferSize, retryLogic, versionType, apiVersion, allowExplicitIndex) {
allowExplicitIndex: Boolean)
extends WriteSettingsBase[ApiVersion, ElasticsearchWriteSettings](connection,
bufferSize,
retryLogic,
versionType,
apiVersion,
allowExplicitIndex) {

protected override def copy(connection: ElasticsearchConnectionSettings,
bufferSize: Int,
retryLogic: RetryLogic,
versionType: Option[String],
apiVersion: ApiVersion,
allowExplicitIndex: Boolean): ElasticsearchWriteSettings =
bufferSize: Int,
retryLogic: RetryLogic,
versionType: Option[String],
apiVersion: ApiVersion,
allowExplicitIndex: Boolean): ElasticsearchWriteSettings =
new ElasticsearchWriteSettings(connection, bufferSize, retryLogic, versionType, apiVersion, allowExplicitIndex)

override def toString: String =
Expand All @@ -89,6 +92,7 @@ final class ElasticsearchWriteSettings private (connection: ElasticsearchConnect
}

object ElasticsearchWriteSettings {

/** Scala API */
def apply(connection: ElasticsearchConnectionSettings): ElasticsearchWriteSettings =
new ElasticsearchWriteSettings(connection, 10, RetryNever, None, ApiVersion.V7, allowExplicitIndex = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.common
package akka.stream.alpakka.elasticsearch

import akka.util.JavaDurationConverters._
import java.util.concurrent.TimeUnit
Expand All @@ -15,12 +15,13 @@ import scala.concurrent.duration.FiniteDuration
* Configure Elastiscsearch/Opensearch sources.
*
*/
abstract class SourceSettings[Version <: ApiVersion, S <: SourceSettings[Version, S]] private[alpakka] (
val connection: ElasticsearchConnectionSettings,
val bufferSize: Int,
val includeDocumentVersion: Boolean,
val scrollDuration: FiniteDuration,
val apiVersion: Version) { this: S =>
abstract class SourceSettingsBase[Version <: ApiVersionBase, S <: SourceSettingsBase[Version, S]] private[alpakka] (
val connection: ElasticsearchConnectionSettings,
val bufferSize: Int,
val includeDocumentVersion: Boolean,
val scrollDuration: FiniteDuration,
val apiVersion: Version
) { this: S =>
def withConnection(value: ElasticsearchConnectionSettings): S = copy(connection = value)

def withBufferSize(value: Int): S = copy(bufferSize = value)
Expand Down Expand Up @@ -55,9 +56,9 @@ abstract class SourceSettings[Version <: ApiVersion, S <: SourceSettings[Version
}

protected def copy(connection: ElasticsearchConnectionSettings = connection,
bufferSize: Int = bufferSize,
includeDocumentVersion: Boolean = includeDocumentVersion,
scrollDuration: FiniteDuration = scrollDuration,
apiVersion: Version = apiVersion): S;
bufferSize: Int = bufferSize,
includeDocumentVersion: Boolean = includeDocumentVersion,
scrollDuration: FiniteDuration = scrollDuration,
apiVersion: Version = apiVersion): S;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.elasticsearch

import akka.stream.alpakka.elasticsearch.ElasticsearchConnectionSettings
import akka.stream.alpakka.elasticsearch.RetryLogic

/**
* Configure Elasticsearch/Opensearch sinks and flows.
*/
abstract class WriteSettingsBase[Version <: ApiVersionBase, W <: WriteSettingsBase[Version, W]] private[alpakka] (
val connection: ElasticsearchConnectionSettings,
val bufferSize: Int,
val retryLogic: RetryLogic,
val versionType: Option[String],
val apiVersion: Version,
val allowExplicitIndex: Boolean
) { this: W =>

def withConnection(value: ElasticsearchConnectionSettings): W = copy(connection = value)

def withBufferSize(value: Int): W = copy(bufferSize = value)

def withRetryLogic(value: RetryLogic): W =
copy(retryLogic = value)

def withVersionType(value: String): W = copy(versionType = Option(value))

def withApiVersion(value: Version): W =
if (apiVersion == value) this else copy(apiVersion = value)

def withAllowExplicitIndex(value: Boolean): W = copy(allowExplicitIndex = value)

protected def copy(connection: ElasticsearchConnectionSettings = connection,
bufferSize: Int = bufferSize,
retryLogic: RetryLogic = retryLogic,
versionType: Option[String] = versionType,
apiVersion: Version = apiVersion,
allowExplicitIndex: Boolean = allowExplicitIndex): W;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import akka.http.scaladsl.HttpExt
import akka.http.scaladsl.model.Uri.Path
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.alpakka.common.WriteSettings
import akka.stream.alpakka.elasticsearch._
import akka.stream.stage._
import akka.stream._
Expand All @@ -25,7 +24,7 @@ import scala.concurrent.{ExecutionContext, Future}
@InternalApi
private[elasticsearch] final class ElasticsearchSimpleFlowStage[T, C](
elasticsearchParams: ElasticsearchParams,
settings: WriteSettings[_, _],
settings: WriteSettingsBase[_, _],
writer: MessageWriter[T]
)(implicit http: HttpExt, mat: Materializer, ec: ExecutionContext)
extends GraphStage[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import akka.http.scaladsl.HttpExt
import akka.http.scaladsl.model.Uri.Path
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.alpakka.common.SourceSettings
import akka.stream.alpakka.elasticsearch.{ApiVersion, ElasticsearchParams, ReadResult}
import akka.stream.alpakka.elasticsearch.{ApiVersion, ElasticsearchParams, ReadResult, SourceSettingsBase}
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler, StageLogging}
import akka.stream.{Attributes, Materializer, Outlet, SourceShape}
import spray.json.DefaultJsonProtocol._
Expand Down Expand Up @@ -46,7 +45,7 @@ private[elasticsearch] trait MessageReader[T] {
private[elasticsearch] final class ElasticsearchSourceStage[T](
elasticsearchParams: ElasticsearchParams,
searchParams: Map[String, String],
settings: SourceSettings[_, _],
settings: SourceSettingsBase[_, _],
reader: MessageReader[T]
)(implicit http: HttpExt, mat: Materializer, ec: ExecutionContext)
extends GraphStage[SourceShape[ReadResult[T]]] {
Expand All @@ -72,7 +71,7 @@ object ElasticsearchSourceStage {
private[elasticsearch] final class ElasticsearchSourceLogic[T](
elasticsearchParams: ElasticsearchParams,
searchParams: Map[String, String],
settings: SourceSettings[_, _],
settings: SourceSettingsBase[_, _],
out: Outlet[ReadResult[T]],
shape: SourceShape[ReadResult[T]],
reader: MessageReader[T]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package akka.stream.alpakka.elasticsearch.javadsl

import akka.NotUsed
import akka.annotation.ApiMayChange
import akka.stream.alpakka.common.WriteSettings
import akka.stream.alpakka.elasticsearch.{scaladsl, _}
import com.fasterxml.jackson.databind.ObjectMapper

Expand All @@ -28,7 +27,7 @@ object ElasticsearchFlow {
*/
def create[T](
elasticsearchParams: ElasticsearchParams,
settings: WriteSettings[_, _],
settings: WriteSettingsBase[_, _],
objectMapper: ObjectMapper
): akka.stream.javadsl.Flow[WriteMessage[T, NotUsed], WriteResult[T, NotUsed], NotUsed] =
create(elasticsearchParams, settings, new JacksonWriter[T](objectMapper))
Expand All @@ -44,7 +43,7 @@ object ElasticsearchFlow {
*/
def create[T](
elasticsearchParams: ElasticsearchParams,
settings: WriteSettings[_, _],
settings: WriteSettingsBase[_, _],
messageWriter: MessageWriter[T]
): akka.stream.javadsl.Flow[WriteMessage[T, NotUsed], WriteResult[T, NotUsed], NotUsed] =
scaladsl.ElasticsearchFlow
Expand All @@ -63,7 +62,7 @@ object ElasticsearchFlow {
*/
def createWithPassThrough[T, C](
elasticsearchParams: ElasticsearchParams,
settings: WriteSettings[_, _],
settings: WriteSettingsBase[_, _],
objectMapper: ObjectMapper
): akka.stream.javadsl.Flow[WriteMessage[T, C], WriteResult[T, C], NotUsed] =
createWithPassThrough(elasticsearchParams, settings, new JacksonWriter[T](objectMapper))
Expand All @@ -80,7 +79,7 @@ object ElasticsearchFlow {
*/
def createWithPassThrough[T, C](
elasticsearchParams: ElasticsearchParams,
settings: WriteSettings[_, _],
settings: WriteSettingsBase[_, _],
messageWriter: MessageWriter[T]
): akka.stream.javadsl.Flow[WriteMessage[T, C], WriteResult[T, C], NotUsed] =
scaladsl.ElasticsearchFlow
Expand All @@ -100,7 +99,7 @@ object ElasticsearchFlow {
*/
def createBulk[T, C](
elasticsearchParams: ElasticsearchParams,
settings: WriteSettings[_, _],
settings: WriteSettingsBase[_, _],
objectMapper: ObjectMapper
): akka.stream.javadsl.Flow[java.util.List[WriteMessage[T, C]], java.util.List[WriteResult[T, C]], NotUsed] =
createBulk(elasticsearchParams, settings, new JacksonWriter[T](objectMapper))
Expand All @@ -118,7 +117,7 @@ object ElasticsearchFlow {
*/
def createBulk[T, C](
elasticsearchParams: ElasticsearchParams,
settings: WriteSettings[_, _],
settings: WriteSettingsBase[_, _],
messageWriter: MessageWriter[T]
): akka.stream.javadsl.Flow[java.util.List[WriteMessage[T, C]], java.util.List[WriteResult[T, C]], NotUsed] =
akka.stream.scaladsl
Expand All @@ -143,7 +142,7 @@ object ElasticsearchFlow {
@ApiMayChange
def createWithContext[T, C](
elasticsearchParams: ElasticsearchParams,
settings: WriteSettings[_, _],
settings: WriteSettingsBase[_, _],
objectMapper: ObjectMapper
): akka.stream.javadsl.FlowWithContext[WriteMessage[T, NotUsed], C, WriteResult[T, C], C, NotUsed] =
createWithContext(elasticsearchParams, settings, new JacksonWriter[T](objectMapper))
Expand All @@ -160,7 +159,7 @@ object ElasticsearchFlow {
@ApiMayChange
def createWithContext[T, C](
elasticsearchParams: ElasticsearchParams,
settings: WriteSettings[_, _],
settings: WriteSettingsBase[_, _],
messageWriter: MessageWriter[T]
): akka.stream.javadsl.FlowWithContext[WriteMessage[T, NotUsed], C, WriteResult[T, C], C, NotUsed] =
scaladsl.ElasticsearchFlow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package akka.stream.alpakka.elasticsearch.javadsl

import java.util.concurrent.CompletionStage

import akka.stream.alpakka.common.WriteSettings
import akka.stream.alpakka.elasticsearch._
import akka.stream.javadsl._
import akka.{Done, NotUsed}
Expand All @@ -22,7 +21,7 @@ object ElasticsearchSink {
*/
def create[T](
elasticsearchParams: ElasticsearchParams,
settings: WriteSettings[_, _],
settings: WriteSettingsBase[_, _],
objectMapper: ObjectMapper
): akka.stream.javadsl.Sink[WriteMessage[T, NotUsed], CompletionStage[Done]] =
ElasticsearchFlow
Expand Down
Loading

0 comments on commit 753c587

Please sign in to comment.