Skip to content

Commit

Permalink
File: TAR archive generation (#2241)
Browse files Browse the repository at this point in the history
  • Loading branch information
choffmeister authored Mar 31, 2020
1 parent 6264cd2 commit 6020e28
Show file tree
Hide file tree
Showing 10 changed files with 478 additions and 21 deletions.
28 changes: 26 additions & 2 deletions docs/src/main/paradox/file.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,31 @@ is not limited to memory size.
This example usage shows compressing files from disk.

Scala
: @@snip [snip](/file/src/test/scala/docs/scaladsl/ArchiveSpec.scala) { #sample }
: @@snip [snip](/file/src/test/scala/docs/scaladsl/ArchiveSpec.scala) { #sample-zip }

Java
: @@snip [snip](/file/src/test/java/docs/javadsl/ArchiveTest.java) { #sample }
: @@snip [snip](/file/src/test/java/docs/javadsl/ArchiveTest.java) { #sample-zip }

## TAR Archive

The @apidoc[Archive$]
contains flow for packaging multiple files into one TAR file.

Result of flow can be send to sink even before whole TAR file is created, so size of resulting TAR archive
is not limited to memory size.

This example usage shows packaging files from disk.

Scala
: @@snip [snip](/file/src/test/scala/docs/scaladsl/ArchiveSpec.scala) { #sample-tar }

Java
: @@snip [snip](/file/src/test/java/docs/javadsl/ArchiveTest.java) { #sample-tar }

To produce a gzipped TAR file see the following example.

Scala
: @@snip [snip](/file/src/test/scala/docs/scaladsl/ArchiveSpec.scala) { #sample-tar-gz }

Java
: @@snip [snip](/file/src/test/java/docs/javadsl/ArchiveTest.java) { #sample-tar-gz }
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.file.impl.archive

import akka.annotation.InternalApi
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.util.ByteString

/**
* INTERNAL API
*/
@InternalApi private[file] class EnsureByteStreamSize(expectedSize: Long)
extends GraphStage[FlowShape[ByteString, ByteString]] {

val in = Inlet[ByteString]("EnsureByteStreamSize.in")
val out = Outlet[ByteString]("EnsureByteStreamSize.out")

override val shape = FlowShape.of(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private var currentSize = 0L

setHandler(
in,
new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
currentSize = currentSize + elem.size
push(out, elem)
}

override def onUpstreamFinish(): Unit = {
if (currentSize == expectedSize) super.onUpstreamFinish()
else failStage(new IllegalStateException(s"Expected ${expectedSize} bytes but got ${currentSize} bytes"))
}
}
)
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.file.impl.archive

import java.lang.Long.toOctalString

import akka.annotation.InternalApi
import akka.stream.alpakka.file.TarArchiveMetadata
import akka.util.ByteString

/**
* INTERNAL API
*/
@InternalApi private[file] final class TarArchiveEntry(metadata: TarArchiveMetadata) {

def headerBytes: ByteString = {
val withoutChecksum = headerBytesWithoutChecksum
val checksumLong = withoutChecksum.foldLeft(0L)((sum, byte) => sum + byte)
val checksumBytes = ByteString(toOctalString(checksumLong).reverse.padTo(6, '0').take(6).reverse) ++ ByteString(
new Array[Byte](1) ++ ByteString(" ")
)
val withChecksum = withoutChecksum.take(148) ++ checksumBytes ++ withoutChecksum.drop(148 + 8)
withChecksum.compact
}

def trailingBytes: ByteString = {
val paddingSize = if (metadata.size % 512 > 0) (512 - metadata.size % 512).toInt else 0
padded(ByteString.empty, paddingSize)
}

private def headerBytesWithoutChecksum: ByteString = {
// [0, 100)
val fileNameBytes = padded(ByteString(metadata.filePathName), 100)
// [100, 108)
val fileModeBytes = padded(ByteString("0755"), 8)
// [108, 116)
val ownerIdBytes = padded(ByteString.empty, 8)
// [116, 124)
val groupIdBytes = padded(ByteString.empty, 8)
// [124, 136)
val fileSizeBytes = padded(ByteString("0" + toOctalString(metadata.size)), 12)
// [136, 148)
val lastModificationBytes = padded(ByteString(toOctalString(metadata.lastModification.getEpochSecond)), 12)
// [148, 156)
val checksumPlaceholderBytes = ByteString(" ")
// [156, 157)
val linkIndicatorBytes = padded(ByteString.empty, 1)
// [157, 257)
val linkFileNameBytes = padded(ByteString.empty, 100)
// [257, 263)
val ustarIndicatorBytes = ByteString("ustar") ++ ByteString(new Array[Byte](1))
// [263, 265)
val ustarVersionBytes = ByteString(new Array[Byte](2))
// [265, 297)
val ownerNameBytes = padded(ByteString.empty, 32)
// [297, 329)
val groupNameBytes = padded(ByteString.empty, 32)
// [329, 337)
val deviceMajorNumberBytes = padded(ByteString.empty, 8)
// [337, 345)
val deviceMinorNumberBytes = padded(ByteString.empty, 8)
// [345, 500)
val fileNamePrefixBytes = padded(metadata.filePathPrefix.map(ByteString.apply).getOrElse(ByteString.empty), 155)

padded(
fileNameBytes ++ fileModeBytes ++ ownerIdBytes ++ groupIdBytes ++ fileSizeBytes ++ lastModificationBytes ++ checksumPlaceholderBytes ++ linkIndicatorBytes ++ linkFileNameBytes ++ ustarIndicatorBytes ++ ustarVersionBytes ++ ownerNameBytes ++ groupNameBytes ++ deviceMajorNumberBytes ++ deviceMinorNumberBytes ++ fileNamePrefixBytes,
512
)
}

private def padded(bytes: ByteString, targetSize: Int): ByteString = {
require(bytes.size <= targetSize)
if (bytes.size < targetSize) bytes ++ ByteString(new Array[Byte](targetSize - bytes.size))
else bytes
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.file.impl.archive

import akka.NotUsed
import akka.annotation.InternalApi
import akka.stream.alpakka.file.TarArchiveMetadata
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString

/**
* INTERNAL API
*/
@InternalApi private[file] object TarArchiveManager {

def tarFlow(): Flow[(TarArchiveMetadata, Source[ByteString, _]), ByteString, NotUsed] = {
Flow[(TarArchiveMetadata, Source[ByteString, Any])]
.flatMapConcat {
case (metadata, stream) =>
val entry = new TarArchiveEntry(metadata)
Source
.single(entry.headerBytes)
.concat(stream.via(new EnsureByteStreamSize(metadata.size)))
.concat(Source.single(entry.trailingBytes))
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package akka.stream.alpakka.file.javadsl

import akka.NotUsed
import akka.stream.alpakka.file.{scaladsl, ArchiveMetadata}
import akka.stream.alpakka.file.{scaladsl, ArchiveMetadata, TarArchiveMetadata}
import akka.stream.javadsl.Flow
import akka.util.ByteString
import akka.japi.Pair
Expand All @@ -25,6 +25,15 @@ object Archive {
.map(func(pair => (pair.first, pair.second.asScala)))
.via(scaladsl.Archive.zip().asJava)

/**
* Flow for packaging multiple files into one TAR file.
*/
def tar(): Flow[Pair[TarArchiveMetadata, Source[ByteString, NotUsed]], ByteString, NotUsed] =
Flow
.create[Pair[TarArchiveMetadata, Source[ByteString, NotUsed]]]()
.map(func(pair => (pair.first, pair.second.asScala)))
.via(scaladsl.Archive.tar().asJava)

private def func[T, R](f: T => R) = new akka.japi.function.Function[T, R] {
override def apply(param: T): R = f(param)
}
Expand Down
28 changes: 28 additions & 0 deletions file/src/main/scala/akka/stream/alpakka/file/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package akka.stream.alpakka.file

import java.time.Instant

final class ArchiveMetadata private (
val filePath: String
)
Expand All @@ -12,3 +14,29 @@ object ArchiveMetadata {
def apply(filePath: String): ArchiveMetadata = new ArchiveMetadata(filePath)
def create(filePath: String): ArchiveMetadata = new ArchiveMetadata(filePath)
}

final class TarArchiveMetadata private (
val filePath: String,
val size: Long,
val lastModification: Instant
) {
private val filePathSegments: Array[String] = filePath.split("/")
val filePathPrefix = Option(filePathSegments.init).filter(_.nonEmpty).map(_.mkString("/"))
val filePathName = filePathSegments.last

require(
filePathPrefix.forall(fnp => fnp.length > 0 && fnp.length <= 154),
"File path prefix must be between 1 and 154 characters long"
)
require(filePathName.length > 0 && filePathName.length <= 99,
"File path name must be between 1 and 99 characters long")
}

object TarArchiveMetadata {
def apply(filePath: String, size: Long): TarArchiveMetadata = apply(filePath, size, Instant.now)
def apply(filePath: String, size: Long, lastModification: Instant): TarArchiveMetadata =
new TarArchiveMetadata(filePath, size, lastModification)
def create(filePath: String, size: Long): TarArchiveMetadata = create(filePath, size, Instant.now)
def create(filePath: String, size: Long, lastModification: Instant): TarArchiveMetadata =
new TarArchiveMetadata(filePath, size, lastModification)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
package akka.stream.alpakka.file.scaladsl

import akka.NotUsed
import akka.stream.alpakka.file.ArchiveMetadata
import akka.stream.alpakka.file.impl.archive.ZipArchiveManager
import akka.stream.alpakka.file.{ArchiveMetadata, TarArchiveMetadata}
import akka.stream.alpakka.file.impl.archive.{TarArchiveManager, ZipArchiveManager}
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString

Expand All @@ -21,4 +21,10 @@ object Archive {
def zip(): Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed] =
ZipArchiveManager.zipFlow()

/**
* Flow for packaging multiple files into one TAR file.
*/
def tar(): Flow[(TarArchiveMetadata, Source[ByteString, _]), ByteString, NotUsed] =
TarArchiveManager.tarFlow()

}
65 changes: 60 additions & 5 deletions file/src/test/java/docs/javadsl/ArchiveTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import akka.stream.IOResult;
import akka.stream.Materializer;
import akka.stream.alpakka.file.ArchiveMetadata;
import akka.stream.alpakka.file.TarArchiveMetadata;
import akka.stream.alpakka.file.javadsl.Archive;
import akka.stream.alpakka.testkit.javadsl.LogCapturingJunit4;
import akka.stream.javadsl.FileIO;
Expand All @@ -20,9 +21,9 @@
import akka.testkit.javadsl.TestKit;
import akka.util.ByteString;
import org.junit.*;
import scala.concurrent.duration.FiniteDuration;
import static akka.util.ByteString.emptyByteString;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
Expand Down Expand Up @@ -66,13 +67,14 @@ public void flowShouldCreateZIPArchive() throws Exception {
Source<ByteString, NotUsed> source2 = toSource(fileContent2);

/*
// #sample
// #sample-zip
Source<ByteString, NotUsed> source1 = ...
Source<ByteString, NotUsed> source2 = ...
// #sample
// #sample-zip
*/

// #sample
// #sample-zip
Pair<ArchiveMetadata, Source<ByteString, NotUsed>> pair1 =
Pair.create(ArchiveMetadata.create("akka_full_color.svg"), source1);
Pair<ArchiveMetadata, Source<ByteString, NotUsed>> pair2 =
Expand All @@ -84,7 +86,7 @@ public void flowShouldCreateZIPArchive() throws Exception {
Sink<ByteString, CompletionStage<IOResult>> fileSink = FileIO.toPath(Paths.get("logo.zip"));
CompletionStage<IOResult> ioResult = source.via(Archive.zip()).runWith(fileSink, mat);

// #sample
// #sample-zip

ioResult.toCompletableFuture().get(3, TimeUnit.SECONDS);

Expand All @@ -105,6 +107,59 @@ public void flowShouldCreateZIPArchive() throws Exception {
new File("logo.zip").delete();
}

@Test
public void flowShouldCreateTARArchive() throws Exception {
Path filePath1 = getFileFromResource("akka_full_color.svg");
Path filePath2 = getFileFromResource("akka_icon_reverse.svg");

ByteString fileContent1 = readFileAsByteString(filePath1);
ByteString fileContent2 = readFileAsByteString(filePath2);

Source<ByteString, NotUsed> source1 = toSource(fileContent1);
Long size1 = Files.size(filePath1);
Source<ByteString, NotUsed> source2 = toSource(fileContent2);
Long size2 = Files.size(filePath2);

/*
// #sample-tar
Source<ByteString, NotUsed> source1 = ...
Source<ByteString, NotUsed> source2 = ...
Long size1 = ...
Long size2 = ...
// #sample-tar
*/

// #sample-tar
Pair<TarArchiveMetadata, Source<ByteString, NotUsed>> pair1 =
Pair.create(TarArchiveMetadata.create("akka_full_color.svg", size1), source1);
Pair<TarArchiveMetadata, Source<ByteString, NotUsed>> pair2 =
Pair.create(TarArchiveMetadata.create("akka_icon_reverse.svg", size2), source2);

Source<Pair<TarArchiveMetadata, Source<ByteString, NotUsed>>, NotUsed> source =
Source.from(Arrays.asList(pair1, pair2));

Sink<ByteString, CompletionStage<IOResult>> fileSink = FileIO.toPath(Paths.get("logo.tar"));
CompletionStage<IOResult> ioResult = source.via(Archive.tar()).runWith(fileSink, mat);
// #sample-tar

// #sample-tar-gz
Sink<ByteString, CompletionStage<IOResult>> fileSinkGz =
FileIO.toPath(Paths.get("logo.tar.gz"));
CompletionStage<IOResult> ioResultGz =
source
.via(Archive.tar().via(akka.stream.javadsl.Compression.gzip()))
.runWith(fileSinkGz, mat);
// #sample-tar-gz

ioResult.toCompletableFuture().get(3, TimeUnit.SECONDS);
ioResultGz.toCompletableFuture().get(3, TimeUnit.SECONDS);

// cleanup
new File("logo.tar").delete();
new File("logo.tar.gz").delete();
}

@After
public void tearDown() throws Exception {
StreamTestKit.assertAllStagesStopped(mat);
Expand Down
Loading

0 comments on commit 6020e28

Please sign in to comment.