Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File: TAR archive generation #2241

Merged
merged 4 commits into from
Mar 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions docs/src/main/paradox/file.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,31 @@ is not limited to memory size.
This example usage shows compressing files from disk.

Scala
: @@snip [snip](/file/src/test/scala/docs/scaladsl/ArchiveSpec.scala) { #sample }
: @@snip [snip](/file/src/test/scala/docs/scaladsl/ArchiveSpec.scala) { #sample-zip }

Java
: @@snip [snip](/file/src/test/java/docs/javadsl/ArchiveTest.java) { #sample }
: @@snip [snip](/file/src/test/java/docs/javadsl/ArchiveTest.java) { #sample-zip }

## TAR Archive

The @apidoc[Archive$]
contains flow for packaging multiple files into one TAR file.

Result of flow can be send to sink even before whole TAR file is created, so size of resulting TAR archive
is not limited to memory size.

This example usage shows packaging files from disk.

Scala
: @@snip [snip](/file/src/test/scala/docs/scaladsl/ArchiveSpec.scala) { #sample-tar }

Java
: @@snip [snip](/file/src/test/java/docs/javadsl/ArchiveTest.java) { #sample-tar }

To produce a gzipped TAR file see the following example.

Scala
: @@snip [snip](/file/src/test/scala/docs/scaladsl/ArchiveSpec.scala) { #sample-tar-gz }

Java
: @@snip [snip](/file/src/test/java/docs/javadsl/ArchiveTest.java) { #sample-tar-gz }
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.file.impl.archive

import akka.annotation.InternalApi
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.util.ByteString

/**
* INTERNAL API
*/
@InternalApi private[file] class EnsureByteStreamSize(expectedSize: Long)
extends GraphStage[FlowShape[ByteString, ByteString]] {

val in = Inlet[ByteString]("EnsureByteStreamSize.in")
val out = Outlet[ByteString]("EnsureByteStreamSize.out")

override val shape = FlowShape.of(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private var currentSize = 0L

setHandler(
in,
new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
currentSize = currentSize + elem.size
push(out, elem)
}

override def onUpstreamFinish(): Unit = {
if (currentSize == expectedSize) super.onUpstreamFinish()
else failStage(new IllegalStateException(s"Expected ${expectedSize} bytes but got ${currentSize} bytes"))
}
}
)
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.file.impl.archive

import java.lang.Long.toOctalString

import akka.annotation.InternalApi
import akka.stream.alpakka.file.TarArchiveMetadata
import akka.util.ByteString

/**
* INTERNAL API
*/
@InternalApi private[file] final class TarArchiveEntry(metadata: TarArchiveMetadata) {

def headerBytes: ByteString = {
val withoutChecksum = headerBytesWithoutChecksum
val checksumLong = withoutChecksum.foldLeft(0L)((sum, byte) => sum + byte)
val checksumBytes = ByteString(toOctalString(checksumLong).reverse.padTo(6, '0').take(6).reverse) ++ ByteString(
new Array[Byte](1) ++ ByteString(" ")
)
val withChecksum = withoutChecksum.take(148) ++ checksumBytes ++ withoutChecksum.drop(148 + 8)
withChecksum.compact
}

def trailingBytes: ByteString = {
val paddingSize = if (metadata.size % 512 > 0) (512 - metadata.size % 512).toInt else 0
padded(ByteString.empty, paddingSize)
}

private def headerBytesWithoutChecksum: ByteString = {
// [0, 100)
val fileNameBytes = padded(ByteString(metadata.filePathName), 100)
// [100, 108)
val fileModeBytes = padded(ByteString("0755"), 8)
// [108, 116)
val ownerIdBytes = padded(ByteString.empty, 8)
// [116, 124)
val groupIdBytes = padded(ByteString.empty, 8)
// [124, 136)
val fileSizeBytes = padded(ByteString("0" + toOctalString(metadata.size)), 12)
// [136, 148)
val lastModificationBytes = padded(ByteString(toOctalString(metadata.lastModification.getEpochSecond)), 12)
// [148, 156)
val checksumPlaceholderBytes = ByteString(" ")
// [156, 157)
val linkIndicatorBytes = padded(ByteString.empty, 1)
// [157, 257)
val linkFileNameBytes = padded(ByteString.empty, 100)
// [257, 263)
val ustarIndicatorBytes = ByteString("ustar") ++ ByteString(new Array[Byte](1))
// [263, 265)
val ustarVersionBytes = ByteString(new Array[Byte](2))
// [265, 297)
val ownerNameBytes = padded(ByteString.empty, 32)
// [297, 329)
val groupNameBytes = padded(ByteString.empty, 32)
// [329, 337)
val deviceMajorNumberBytes = padded(ByteString.empty, 8)
// [337, 345)
val deviceMinorNumberBytes = padded(ByteString.empty, 8)
// [345, 500)
val fileNamePrefixBytes = padded(metadata.filePathPrefix.map(ByteString.apply).getOrElse(ByteString.empty), 155)

padded(
fileNameBytes ++ fileModeBytes ++ ownerIdBytes ++ groupIdBytes ++ fileSizeBytes ++ lastModificationBytes ++ checksumPlaceholderBytes ++ linkIndicatorBytes ++ linkFileNameBytes ++ ustarIndicatorBytes ++ ustarVersionBytes ++ ownerNameBytes ++ groupNameBytes ++ deviceMajorNumberBytes ++ deviceMinorNumberBytes ++ fileNamePrefixBytes,
512
)
}

private def padded(bytes: ByteString, targetSize: Int): ByteString = {
require(bytes.size <= targetSize)
if (bytes.size < targetSize) bytes ++ ByteString(new Array[Byte](targetSize - bytes.size))
else bytes
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.file.impl.archive

import akka.NotUsed
import akka.annotation.InternalApi
import akka.stream.alpakka.file.TarArchiveMetadata
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString

/**
* INTERNAL API
*/
@InternalApi private[file] object TarArchiveManager {

def tarFlow(): Flow[(TarArchiveMetadata, Source[ByteString, _]), ByteString, NotUsed] = {
Flow[(TarArchiveMetadata, Source[ByteString, Any])]
.flatMapConcat {
case (metadata, stream) =>
val entry = new TarArchiveEntry(metadata)
Source
.single(entry.headerBytes)
.concat(stream.via(new EnsureByteStreamSize(metadata.size)))
.concat(Source.single(entry.trailingBytes))
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package akka.stream.alpakka.file.javadsl

import akka.NotUsed
import akka.stream.alpakka.file.{scaladsl, ArchiveMetadata}
import akka.stream.alpakka.file.{scaladsl, ArchiveMetadata, TarArchiveMetadata}
import akka.stream.javadsl.Flow
import akka.util.ByteString
import akka.japi.Pair
Expand All @@ -25,6 +25,15 @@ object Archive {
.map(func(pair => (pair.first, pair.second.asScala)))
.via(scaladsl.Archive.zip().asJava)

/**
* Flow for packaging multiple files into one TAR file.
*/
def tar(): Flow[Pair[TarArchiveMetadata, Source[ByteString, NotUsed]], ByteString, NotUsed] =
Flow
.create[Pair[TarArchiveMetadata, Source[ByteString, NotUsed]]]()
.map(func(pair => (pair.first, pair.second.asScala)))
.via(scaladsl.Archive.tar().asJava)

private def func[T, R](f: T => R) = new akka.japi.function.Function[T, R] {
override def apply(param: T): R = f(param)
}
Expand Down
28 changes: 28 additions & 0 deletions file/src/main/scala/akka/stream/alpakka/file/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package akka.stream.alpakka.file

import java.time.Instant

final class ArchiveMetadata private (
val filePath: String
)
Expand All @@ -12,3 +14,29 @@ object ArchiveMetadata {
def apply(filePath: String): ArchiveMetadata = new ArchiveMetadata(filePath)
def create(filePath: String): ArchiveMetadata = new ArchiveMetadata(filePath)
}

final class TarArchiveMetadata private (
val filePath: String,
val size: Long,
val lastModification: Instant
) {
private val filePathSegments: Array[String] = filePath.split("/")
val filePathPrefix = Option(filePathSegments.init).filter(_.nonEmpty).map(_.mkString("/"))
val filePathName = filePathSegments.last

require(
filePathPrefix.forall(fnp => fnp.length > 0 && fnp.length <= 154),
"File path prefix must be between 1 and 154 characters long"
)
require(filePathName.length > 0 && filePathName.length <= 99,
"File path name must be between 1 and 99 characters long")
}

object TarArchiveMetadata {
def apply(filePath: String, size: Long): TarArchiveMetadata = apply(filePath, size, Instant.now)
def apply(filePath: String, size: Long, lastModification: Instant): TarArchiveMetadata =
new TarArchiveMetadata(filePath, size, lastModification)
def create(filePath: String, size: Long): TarArchiveMetadata = create(filePath, size, Instant.now)
def create(filePath: String, size: Long, lastModification: Instant): TarArchiveMetadata =
new TarArchiveMetadata(filePath, size, lastModification)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
package akka.stream.alpakka.file.scaladsl

import akka.NotUsed
import akka.stream.alpakka.file.ArchiveMetadata
import akka.stream.alpakka.file.impl.archive.ZipArchiveManager
import akka.stream.alpakka.file.{ArchiveMetadata, TarArchiveMetadata}
import akka.stream.alpakka.file.impl.archive.{TarArchiveManager, ZipArchiveManager}
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString

Expand All @@ -21,4 +21,10 @@ object Archive {
def zip(): Flow[(ArchiveMetadata, Source[ByteString, Any]), ByteString, NotUsed] =
ZipArchiveManager.zipFlow()

/**
* Flow for packaging multiple files into one TAR file.
*/
def tar(): Flow[(TarArchiveMetadata, Source[ByteString, _]), ByteString, NotUsed] =
TarArchiveManager.tarFlow()

}
65 changes: 60 additions & 5 deletions file/src/test/java/docs/javadsl/ArchiveTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import akka.stream.IOResult;
import akka.stream.Materializer;
import akka.stream.alpakka.file.ArchiveMetadata;
import akka.stream.alpakka.file.TarArchiveMetadata;
import akka.stream.alpakka.file.javadsl.Archive;
import akka.stream.alpakka.testkit.javadsl.LogCapturingJunit4;
import akka.stream.javadsl.FileIO;
Expand All @@ -20,9 +21,9 @@
import akka.testkit.javadsl.TestKit;
import akka.util.ByteString;
import org.junit.*;
import scala.concurrent.duration.FiniteDuration;
import static akka.util.ByteString.emptyByteString;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
Expand Down Expand Up @@ -66,13 +67,14 @@ public void flowShouldCreateZIPArchive() throws Exception {
Source<ByteString, NotUsed> source2 = toSource(fileContent2);

/*
// #sample
// #sample-zip
Source<ByteString, NotUsed> source1 = ...
Source<ByteString, NotUsed> source2 = ...
// #sample

// #sample-zip
*/

// #sample
// #sample-zip
Pair<ArchiveMetadata, Source<ByteString, NotUsed>> pair1 =
Pair.create(ArchiveMetadata.create("akka_full_color.svg"), source1);
Pair<ArchiveMetadata, Source<ByteString, NotUsed>> pair2 =
Expand All @@ -84,7 +86,7 @@ public void flowShouldCreateZIPArchive() throws Exception {
Sink<ByteString, CompletionStage<IOResult>> fileSink = FileIO.toPath(Paths.get("logo.zip"));
CompletionStage<IOResult> ioResult = source.via(Archive.zip()).runWith(fileSink, mat);

// #sample
// #sample-zip

ioResult.toCompletableFuture().get(3, TimeUnit.SECONDS);

Expand All @@ -105,6 +107,59 @@ public void flowShouldCreateZIPArchive() throws Exception {
new File("logo.zip").delete();
}

@Test
public void flowShouldCreateTARArchive() throws Exception {
Path filePath1 = getFileFromResource("akka_full_color.svg");
Path filePath2 = getFileFromResource("akka_icon_reverse.svg");

ByteString fileContent1 = readFileAsByteString(filePath1);
ByteString fileContent2 = readFileAsByteString(filePath2);

Source<ByteString, NotUsed> source1 = toSource(fileContent1);
Long size1 = Files.size(filePath1);
Source<ByteString, NotUsed> source2 = toSource(fileContent2);
Long size2 = Files.size(filePath2);

/*
// #sample-tar
Source<ByteString, NotUsed> source1 = ...
Source<ByteString, NotUsed> source2 = ...
Long size1 = ...
Long size2 = ...

// #sample-tar
*/

// #sample-tar
Pair<TarArchiveMetadata, Source<ByteString, NotUsed>> pair1 =
Pair.create(TarArchiveMetadata.create("akka_full_color.svg", size1), source1);
Pair<TarArchiveMetadata, Source<ByteString, NotUsed>> pair2 =
Pair.create(TarArchiveMetadata.create("akka_icon_reverse.svg", size2), source2);

Source<Pair<TarArchiveMetadata, Source<ByteString, NotUsed>>, NotUsed> source =
Source.from(Arrays.asList(pair1, pair2));

Sink<ByteString, CompletionStage<IOResult>> fileSink = FileIO.toPath(Paths.get("logo.tar"));
CompletionStage<IOResult> ioResult = source.via(Archive.tar()).runWith(fileSink, mat);
// #sample-tar

// #sample-tar-gz
Sink<ByteString, CompletionStage<IOResult>> fileSinkGz =
FileIO.toPath(Paths.get("logo.tar.gz"));
CompletionStage<IOResult> ioResultGz =
source
.via(Archive.tar().via(akka.stream.javadsl.Compression.gzip()))
.runWith(fileSinkGz, mat);
// #sample-tar-gz

ioResult.toCompletableFuture().get(3, TimeUnit.SECONDS);
ioResultGz.toCompletableFuture().get(3, TimeUnit.SECONDS);

// cleanup
new File("logo.tar").delete();
new File("logo.tar.gz").delete();
}

@After
public void tearDown() throws Exception {
StreamTestKit.assertAllStagesStopped(mat);
Expand Down
Loading