Skip to content

Commit

Permalink
feat(core): add a multifile outputstream to split a video in multiple…
Browse files Browse the repository at this point in the history
… parts
  • Loading branch information
ThibaultBee committed Apr 11, 2023
1 parent 9001929 commit 75df86a
Show file tree
Hide file tree
Showing 5 changed files with 289 additions and 2 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ android {
* Processing: Noise suppressor or echo cancellation
* Audio only mode
* Device audio capabilities
* Network: RTMP/RTMPS or SRT
* File: TS or FLV
* Write to a single file or multiple chunk files
* Streaming: RTMP/RTMPS or SRT
* Ultra low-latency based on [SRT](https://github.com/Haivision/srt)
* Network adaptive bitrate mechanism for [SRT](https://github.com/Haivision/srt)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Copyright (C) 2022 Thibault B.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.thibaultbee.streampack.utils

import io.github.thibaultbee.streampack.streamers.interfaces.IFileStreamer
import java.io.File
import java.io.FileOutputStream
import java.io.OutputStream

/**
* A class that allows to write to multiple files.
* If you are looking to write the video or audio in multiple files, use this class
* as an [OutputStream] for a [IFileStreamer].
*
* @param filesDir the directory where the files will be written
* @param partSize the size of each file
* @param namePrefix the prefix of each file
* @param listener the listener that will be called when a file is created
*/
class MultiFileOutputStream(
val filesDir: File,
private val partSize: Long,
private val namePrefix: String = "part_",
private val listener: Listener
) : OutputStream() {
private var currentFileBytesWritten = 0L
private var bytesWritten = 0

private var _isClosed = false

/**
* Get if the stream is closed.
*/
val isClosed: Boolean
get() = _isClosed

private var _outputStream: FileOutputStream? = null
private val outputStream: FileOutputStream
get() {
if (_isClosed) {
throw IllegalStateException("Stream is closed")
}
synchronized(this) {
if ((_outputStream == null) || (currentFileBytesWritten >= partSize)) {
_outputStream?.let {
it.close()
listener.onFileCreated(
numOfFileWritten,
false,
getFile(numOfFileWritten)
)
}

currentFileBytesWritten = 0
_numOfFileWritten++

_outputStream = FileOutputStream(getFile(numOfFileWritten))
}
return _outputStream!!
}
}

private var _numOfFileWritten: Int = 0
/**
* Get the number of files written.
*/
val numOfFileWritten: Int
get() = _numOfFileWritten

init {
require(partSize > 0) { "Part size must be greater than 0" }
require(filesDir.isDirectory) { "Files directory must be a directory" }
require(filesDir.canWrite()) { "Files directory must be writable" }
}

private fun getFile(fileIndex: Int): File {
return File(filesDir, "$namePrefix$fileIndex")
}

/**
* Write [b] to the stream.
*
* @param b the byte to write
*/
override fun write(b: Int) {
outputStream.write(b)
currentFileBytesWritten++
bytesWritten++
}

/**
* Write [b] to the stream.
*
* @param b the byte to write
*/
override fun write(b: ByteArray) {
outputStream.write(b)
currentFileBytesWritten += b.size
bytesWritten += b.size
}

/**
* Write [len] bytes from [b] starting at [off].
*
* @param b the bytes to write
* @param off the offset in [b] to start writing
* @param len the number of bytes to write
*/
override fun write(b: ByteArray, off: Int, len: Int) {
outputStream.write(b, off, len)
currentFileBytesWritten += len
bytesWritten += len
}

/**
* Close the stream.
* This will close the current file and call [Listener.onFileCreated] with the last file.
*/
override fun close() {
if (_isClosed) {
return
}
_isClosed = true
_outputStream?.let {
it.close()
listener.onFileCreated(numOfFileWritten, true, getFile(numOfFileWritten))
}
_outputStream = null
}

override fun flush() {
_outputStream?.flush()
}

/**
* Delete all files
*/
fun delete() {
filesDir.deleteRecursively()
}

/**
* Listener for [MultiFileOutputStream]
*/
interface Listener {
/**
* Called when a file is created.
*
* @param index the index of the file
* @param isLast true if this is the last file
* @param file the file
*/
fun onFileCreated(index: Int, isLast: Boolean, file: File) {}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package io.github.thibaultbee.streampack.utils

import org.junit.After
import org.junit.Assert
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import java.io.File
import java.util.concurrent.CountDownLatch


class MultiFileOutputStreamTest {
@get:Rule
val rootFolder: TemporaryFolder = TemporaryFolder()
private lateinit var multiFileOutputStream: MultiFileOutputStream

@After
fun tearDown() {
multiFileOutputStream.close()
}

@Test
fun `write data`() {
val chunkReadyCountDownLatch = CountDownLatch(3)
val isLastChunkCountDownLatch = CountDownLatch(1)
var lastChunkId = 0
val listener = object : MultiFileOutputStream.Listener {
override fun onFileCreated(index: Int, isLast: Boolean, file: File) {
Assert.assertEquals(lastChunkId + 1, index)
chunkReadyCountDownLatch.countDown()
if (isLast) {
isLastChunkCountDownLatch.countDown()
}
lastChunkId = index
}
}
multiFileOutputStream =
MultiFileOutputStream(rootFolder.newFolder(), DEFAULT_CHUNK_SIZE, "", listener)

multiFileOutputStream.write(Utils.generateRandomArray(2048))
multiFileOutputStream.write(Utils.generateRandomArray(2048))
multiFileOutputStream.write(Utils.generateRandomArray(600))
multiFileOutputStream.close()

Assert.assertEquals(0, chunkReadyCountDownLatch.count)
Assert.assertEquals(0, isLastChunkCountDownLatch.count)
Assert.assertEquals(3, lastChunkId)
}

@Test
fun `write data size == chunk size`() {
val countDownLatch = CountDownLatch(4)
val listener = object : MultiFileOutputStream.Listener {
override fun onFileCreated(index: Int, isLast: Boolean, file: File) {
countDownLatch.countDown()
}
}
multiFileOutputStream =
MultiFileOutputStream(rootFolder.newFolder(), DEFAULT_CHUNK_SIZE, "", listener)

multiFileOutputStream.write(Utils.generateRandomArray(DEFAULT_CHUNK_SIZE))
multiFileOutputStream.write(Utils.generateRandomArray(DEFAULT_CHUNK_SIZE))
multiFileOutputStream.write(Utils.generateRandomArray(DEFAULT_CHUNK_SIZE))
multiFileOutputStream.close() // Must not create an empty chunk

Assert.assertEquals(1, countDownLatch.count)
}

@Test
fun `multiple close test`() {
val countDownLatch = CountDownLatch(3)
val listener = object : MultiFileOutputStream.Listener {
override fun onFileCreated(index: Int, isLast: Boolean, file: File) {
countDownLatch.countDown()
}
}
multiFileOutputStream =
MultiFileOutputStream(rootFolder.newFolder(), DEFAULT_CHUNK_SIZE, "", listener)
multiFileOutputStream.write(Utils.generateRandomArray(2048))
multiFileOutputStream.write(Utils.generateRandomArray(600))
multiFileOutputStream.close()
multiFileOutputStream.close()
multiFileOutputStream.close()
multiFileOutputStream.close()

Assert.assertEquals(1, countDownLatch.count)
}

@Test
fun `close without writing data`() {
val countDownLatch = CountDownLatch(1)
val listener = object : MultiFileOutputStream.Listener {
override fun onFileCreated(index: Int, isLast: Boolean, file: File) {
countDownLatch.countDown()
}
}
multiFileOutputStream =
MultiFileOutputStream(rootFolder.newFolder(), DEFAULT_CHUNK_SIZE, "", listener)
multiFileOutputStream.close()

Assert.assertEquals(1, countDownLatch.count)
}

companion object {
const val DEFAULT_CHUNK_SIZE = 1024L
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,13 @@ object Utils {
fun generateRandomArray(size: Int): ByteArray {
return Random.nextBytes(size)
}

/**
* Generates a randomized ByteArray
* @param size size of buffer to generates
* @return random ByteBuffer
*/
fun generateRandomArray(size: Long): ByteArray {
return generateRandomArray(size.toInt())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ class StreamerManager(
streamer?.getFileStreamer()?.let {
/**
* Use OutputStream.
* FYI, outputStream is closed by stopStream
* FYI, outputStream is closed by stopStream.
* To cut the video into multiple parts/chunks, use [MultiFileOutputStream].
*/
it.outputStream =
context.createVideoMediaOutputStream(configuration.endpoint.file.filename)
Expand Down

0 comments on commit 75df86a

Please sign in to comment.