-
Notifications
You must be signed in to change notification settings - Fork 49
/
zip.scala
113 lines (93 loc) · 4.06 KB
/
zip.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package sharry.common
import java.io.OutputStream
import java.nio.file.{Files, Path}
import java.util.zip.ZipEntry
import java.util.zip.ZipOutputStream
import scala.concurrent.ExecutionContext
import fs2.{async, io, Chunk, Pipe, Sink, Stream, Segment}
import scala.concurrent.SyncVar
import cats.effect.{Effect, IO}
object zip {
/** This implemenation is from @wedens and @pchlupacek
* https://gitter.im/functional-streams-for-scala/fs2?at=592affb6c4d73f445af10e45
* http://lpaste.net/9043000581702025216
*/
def zip[F[_]](chunkSize: Int)(implicit F: Effect[F], EC: ExecutionContext): Pipe[F, (String, Stream[F,Byte]), Byte] = entries =>
Stream.eval(fs2.async.synchronousQueue[F, Option[Chunk[Byte]]]).flatMap { q =>
def writeEntry(zos: ZipOutputStream): Sink[F, (String, Stream[F, Byte])] =
_.flatMap {
case (name, data) =>
val mkEntry = Stream.eval(F.delay {
val ze = new ZipEntry(name)
zos.putNextEntry(ze)
})
val writeData = data.to(
io.writeOutputStream(
F.delay(zos),
closeAfterUse = false))
val closeEntry = Stream.eval(F.delay(zos.closeEntry()))
mkEntry ++ writeData ++ closeEntry
}
Stream.suspend {
val os = new OutputStream {
private def enqueueChunkSync(a: Option[Chunk[Byte]]) = {
val done = new SyncVar[Either[Throwable, Unit]]
//F.unsafeRunAsync(q.enqueue1(a))(done.put)
async.unsafeRunAsync(q.enqueue1(a))(e => IO(done.put(e)))
done.get.fold(throw _, identity)
}
@annotation.tailrec
private def addChunk(c: Chunk[Byte]): Unit = {
val free = chunkSize - chunk.size
if (c.size > free) {
enqueueChunkSync(Some((Segment.chunk(chunk) ++ Segment.chunk(c.take(free))).force.toChunk))
chunk = Chunk.empty
addChunk(c.drop(free))
} else {
chunk = (Segment.chunk(chunk) ++ Segment.chunk(c)).force.toChunk
}
}
private var chunk: Chunk[Byte] = Chunk.empty
override def close(): Unit = {
enqueueChunkSync(Some(chunk))
chunk = Chunk.empty
enqueueChunkSync(None)
}
override def write(bytes: Array[Byte]): Unit =
addChunk(Chunk.bytes(bytes))
override def write(bytes: Array[Byte], off: Int, len: Int): Unit =
addChunk(Chunk.bytes(bytes, off, len))
override def write(b: Int): Unit =
addChunk(Chunk.singleton(b.toByte))
}
val zos = new ZipOutputStream(os)
val write = entries.to(writeEntry(zos))
.onFinalize(F.delay(zos.close()))
q.dequeue
.unNoneTerminate
.flatMap(Stream.chunk(_))
.concurrently(write)
}
}
def zip[F[_]](entries: Stream[F, (String, Stream[F, Byte])], chunkSize: Int)(implicit F: Effect[F], EC: ExecutionContext): Stream[F, Byte] = {
entries.through(zip(chunkSize))
}
def dirEntries[F[_]](dir: Path, include: Path => Boolean = _ => true)(implicit F: Effect[F]): Stream[F, Path] =
Stream.bracket(F.delay(Files.newDirectoryStream(dir)))(
dirs => Stream.unfold(dirs.iterator) {
iter => if (iter.hasNext) Some((iter.next, iter)) else None
},
dirs => F.delay(dirs.close)).
filter(include)
def dirEntriesRecursive[F[_]](dir: Path, include: Path => Boolean = _ => true)(implicit F: Effect[F],EC: ExecutionContext): Stream[F, Path] =
dirEntries[F](dir).flatMap { p =>
val r = if (include(p)) Stream.emit(p) else Stream.empty
if (Files.isDirectory(p)) r ++ dirEntriesRecursive(p, include)
else r
}
def zipDir[F[_]](dir: Path, chunkSize: Int, include: Path => Boolean = _ => true)(implicit F: Effect[F], EC: ExecutionContext): Stream[F, Byte] = {
val entries = dirEntriesRecursive(dir, e => !Files.isDirectory(e) && include(e))
zip(entries.
map(e => dir.relativize(e).toString -> io.file.readAll(e, chunkSize)), chunkSize)
}
}