Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gzip compression #196

Merged
merged 1 commit into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import com.typesafe.scalalogging.LazyLogging
import io.aiven.guardian.kafka.backup.BackupClientInterface
import io.aiven.guardian.kafka.backup.KafkaClientInterface
import io.aiven.guardian.kafka.backup.configs.Backup
import io.aiven.guardian.kafka.models.BackupObjectMetadata
import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}

import scala.collection.immutable
Expand Down Expand Up @@ -167,8 +168,11 @@ class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings
else
Future.successful(None)

} yield UploadStateResult(current.map(_._1),
previous.map { case (state, previousKey) => PreviousState(state, previousKey) }
} yield UploadStateResult(
current.map { case (state, key) => StateDetails(state, BackupObjectMetadata.fromKey(key)) },
previous.map { case (previousState, previousKey) =>
PreviousState(StateDetails(previousState, BackupObjectMetadata.fromKey(previousKey)), previousKey)
}
)

}
Expand Down Expand Up @@ -238,19 +242,19 @@ class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings
// the same key that means that in fact the upload has already been completed so in this case lets not do anything
if (exists) {
logger.debug(
s"Previous upload with uploadId: ${previousState.state.uploadId} and key: ${previousState.previousKey} doesn't actually exist, skipping terminating"
s"Previous upload with uploadId: ${previousState.stateDetails.state.uploadId} and key: ${previousState.previousKey} doesn't actually exist, skipping terminating"
)
Sink.ignore
} else {
logger.info(
s"Terminating and completing previous backup with key: ${previousState.previousKey} and uploadId: ${previousState.state.uploadId}"
s"Terminating and completing previous backup with key: ${previousState.previousKey} and uploadId: ${previousState.stateDetails.state.uploadId}"
)
val sink = S3
.resumeMultipartUploadWithHeaders(
s3Config.dataBucket,
previousState.previousKey,
previousState.state.uploadId,
previousState.state.parts,
previousState.stateDetails.state.uploadId,
previousState.stateDetails.state.parts,
s3Headers = s3Headers,
chunkingParallelism = 1
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class MockedKafkaClientBackupClientSpec

implicit val config: S3Config = s3Config
implicit val backupConfig: Backup =
Backup(MockedBackupClientInterface.KafkaGroupId, PeriodFromFirst(1 second), 10 seconds)
Backup(MockedBackupClientInterface.KafkaGroupId, PeriodFromFirst(1 second), 10 seconds, None)

val backupClient =
new BackupClient(Some(s3Settings))(new MockedKafkaClientInterface(Source(data)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ class MockedS3BackupClientInterface(
s3Config: S3Config,
maybeS3Settings: Option[S3Settings]
)(implicit val s3Headers: S3Headers, system: ActorSystem)
extends BackupClient(maybeS3Settings)(
extends BackupClient(
maybeS3Settings
)(
new MockedKafkaClientInterface(kafkaData),
Backup(MockedBackupClientInterface.KafkaGroupId, timeConfiguration, 10 seconds),
Backup(MockedBackupClientInterface.KafkaGroupId, timeConfiguration, 10 seconds, None),
implicitly,
s3Config,
implicitly
Expand Down
Loading