Skip to content

Commit

Permalink
Add gzip compression
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Oct 11, 2022
1 parent 33bb4af commit 07bd7bf
Show file tree
Hide file tree
Showing 29 changed files with 1,988 additions and 1,450 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import com.typesafe.scalalogging.LazyLogging
import io.aiven.guardian.kafka.backup.BackupClientInterface
import io.aiven.guardian.kafka.backup.KafkaClientInterface
import io.aiven.guardian.kafka.backup.configs.Backup
import io.aiven.guardian.kafka.models.BackupObjectMetadata
import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}

import scala.collection.immutable
Expand Down Expand Up @@ -167,8 +168,11 @@ class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings
else
Future.successful(None)

} yield UploadStateResult(current.map(_._1),
previous.map { case (state, previousKey) => PreviousState(state, previousKey) }
} yield UploadStateResult(
current.map { case (state, key) => StateDetails(state, BackupObjectMetadata.fromKey(key)) },
previous.map { case (previousState, previousKey) =>
PreviousState(StateDetails(previousState, BackupObjectMetadata.fromKey(previousKey)), previousKey)
}
)

}
Expand Down Expand Up @@ -238,19 +242,19 @@ class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings
// the same key that means that in fact the upload has already been completed so in this case lets not do anything
if (exists) {
logger.debug(
s"Previous upload with uploadId: ${previousState.state.uploadId} and key: ${previousState.previousKey} doesn't actually exist, skipping terminating"
s"Previous upload with uploadId: ${previousState.stateDetails.state.uploadId} and key: ${previousState.previousKey} doesn't actually exist, skipping terminating"
)
Sink.ignore
} else {
logger.info(
s"Terminating and completing previous backup with key: ${previousState.previousKey} and uploadId: ${previousState.state.uploadId}"
s"Terminating and completing previous backup with key: ${previousState.previousKey} and uploadId: ${previousState.stateDetails.state.uploadId}"
)
val sink = S3
.resumeMultipartUploadWithHeaders(
s3Config.dataBucket,
previousState.previousKey,
previousState.state.uploadId,
previousState.state.parts,
previousState.stateDetails.state.uploadId,
previousState.stateDetails.state.parts,
s3Headers = s3Headers,
chunkingParallelism = 1
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class MockedKafkaClientBackupClientSpec

implicit val config: S3Config = s3Config
implicit val backupConfig: Backup =
Backup(MockedBackupClientInterface.KafkaGroupId, PeriodFromFirst(1 second), 10 seconds)
Backup(MockedBackupClientInterface.KafkaGroupId, PeriodFromFirst(1 second), 10 seconds, None)

val backupClient =
new BackupClient(Some(s3Settings))(new MockedKafkaClientInterface(Source(data)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ class MockedS3BackupClientInterface(
s3Config: S3Config,
maybeS3Settings: Option[S3Settings]
)(implicit val s3Headers: S3Headers, system: ActorSystem)
extends BackupClient(maybeS3Settings)(
extends BackupClient(
maybeS3Settings
)(
new MockedKafkaClientInterface(kafkaData),
Backup(MockedBackupClientInterface.KafkaGroupId, timeConfiguration, 10 seconds),
Backup(MockedBackupClientInterface.KafkaGroupId, timeConfiguration, 10 seconds, None),
implicitly,
s3Config,
implicitly
Expand Down

0 comments on commit 07bd7bf

Please sign in to comment.