From 9440b6788410a3de53798c2bbacaa1a8c10ba804 Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Thu, 14 Sep 2023 16:15:55 +0200 Subject: [PATCH 1/4] Added a StorageIndex for the source storage to reduce LIST calls Before every restructuring or cleaning operation, the index is updated. The StorageIndex can be updated with a configurable sync time to make a full sync, otherwise it just updates directories that have files in them. A separate sync time can be set to also scan empty directories. The first implementation is only a InMemoryStorageIndex. For very large datasets, a file-based index might be needed. --- README.md | 8 ++ restructure.yml | 6 + .../java/org/radarbase/output/Application.kt | 23 +++- .../org/radarbase/output/FileStoreFactory.kt | 3 + .../output/cleaner/SourceDataCleaner.kt | 25 ++-- .../radarbase/output/config/ResourceConfig.kt | 2 + .../output/config/StorageIndexConfig.kt | 12 ++ .../output/source/AzureSourceStorage.kt | 21 ++- .../output/source/DelegatingStorageIndex.kt | 10 ++ .../output/source/InMemoryStorageIndex.kt | 121 ++++++++++++++++++ .../output/source/MutableStorageIndex.kt | 7 + .../source/MutableStorageIndexManager.kt | 93 ++++++++++++++ .../output/source/S3SourceStorage.kt | 20 ++- .../radarbase/output/source/SourceStorage.kt | 13 +- .../radarbase/output/source/StorageIndex.kt | 13 ++ .../output/source/StorageIndexManager.kt | 8 ++ .../radarbase/output/source/StorageNode.kt | 24 ++++ .../radarbase/output/source/TopicFileList.kt | 2 - .../radarbase/output/util/AvroFileLister.kt | 22 ++-- .../radarbase/output/util/AvroTopicLister.kt | 15 ++- .../org/radarbase/output/util/TreeLister.kt | 8 +- .../org/radarbase/output/worker/FileCache.kt | 14 +- .../output/worker/RadarKafkaRestructure.kt | 23 ++-- src/main/resources/log4j2.properties | 9 -- src/main/resources/log4j2.xml | 13 ++ 25 files changed, 435 insertions(+), 80 deletions(-) create mode 100644 src/main/java/org/radarbase/output/config/StorageIndexConfig.kt create mode 100644 src/main/java/org/radarbase/output/source/DelegatingStorageIndex.kt create mode 100644 src/main/java/org/radarbase/output/source/InMemoryStorageIndex.kt create mode 100644 src/main/java/org/radarbase/output/source/MutableStorageIndex.kt create mode 100644 src/main/java/org/radarbase/output/source/MutableStorageIndexManager.kt create mode 100644 src/main/java/org/radarbase/output/source/StorageIndex.kt create mode 100644 src/main/java/org/radarbase/output/source/StorageIndexManager.kt create mode 100644 src/main/java/org/radarbase/output/source/StorageNode.kt delete mode 100644 src/main/resources/log4j2.properties create mode 100644 src/main/resources/log4j2.xml diff --git a/README.md b/README.md index e1f8ab6..e9c2f5b 100644 --- a/README.md +++ b/README.md @@ -118,8 +118,16 @@ source: # only actually needed if source type is hdfs azure: # azure options + index: + # Interval to fully synchronize the index with the source storage + fullSyncInterval: 3600 + # Interval to sync empty directories with. + # They are also synced during a full sync. + emptyDirectorySyncInterval: 900 ``` +The index makes a scan of the source before any operations. Further list operations are done on the index only. This is especially relevant for S3 storage where list operations are priced. + The target is similar, and in addition supports the local file system (`local`). ```yaml diff --git a/restructure.yml b/restructure.yml index 3a5d967..b6d0c3d 100644 --- a/restructure.yml +++ b/restructure.yml @@ -34,6 +34,12 @@ source: # only actually needed if source type is hdfs hdfs: nameNodes: [hdfs-namenode-1, hdfs-namenode-2] + index: + # Interval to fully synchronize the index with the storage + fullSyncInterval: 3600 + # Interval to sync empty directories with. + # They are also synced during a full sync. + emptyDirectorySyncInterval: 900 # Target data resource # @since: 0.7.0 diff --git a/src/main/java/org/radarbase/output/Application.kt b/src/main/java/org/radarbase/output/Application.kt index a363a80..2850007 100644 --- a/src/main/java/org/radarbase/output/Application.kt +++ b/src/main/java/org/radarbase/output/Application.kt @@ -28,8 +28,7 @@ import org.radarbase.output.config.CommandLineArgs import org.radarbase.output.config.RestructureConfig import org.radarbase.output.format.RecordConverterFactory import org.radarbase.output.path.RecordPathFactory -import org.radarbase.output.source.SourceStorage -import org.radarbase.output.source.SourceStorageFactory +import org.radarbase.output.source.* import org.radarbase.output.target.TargetStorage import org.radarbase.output.target.TargetStorageFactory import org.radarbase.output.util.Timer @@ -39,7 +38,9 @@ import org.radarbase.output.worker.RadarKafkaRestructure import org.slf4j.LoggerFactory import redis.clients.jedis.JedisPool import java.io.IOException +import java.nio.file.Path import java.text.NumberFormat +import java.time.Duration import java.time.LocalDateTime import java.time.format.DateTimeFormatter import java.util.concurrent.atomic.LongAdder @@ -78,9 +79,27 @@ class Application( override val workerSemaphore = Semaphore(config.worker.numThreads * 2) + override val storageIndexManagers: Map + private val jobs: List init { + val indexConfig = config.source.index + val (fullScan, emptyScan) = if (indexConfig == null) { + listOf(3600L, 900L) + } else { + listOf(indexConfig.fullSyncInterval, indexConfig.emptyDirectorySyncInterval) + }.map { Duration.ofSeconds(it) } + + storageIndexManagers = config.paths.inputs.associateWith { input -> + MutableStorageIndexManager( + InMemoryStorageIndex(), + sourceStorage, + fullScan, + emptyScan, + input, + ) + } val serviceMutex = Mutex() jobs = listOfNotNull( RadarKafkaRestructure.job(config, serviceMutex), diff --git a/src/main/java/org/radarbase/output/FileStoreFactory.kt b/src/main/java/org/radarbase/output/FileStoreFactory.kt index d3d1601..9448ac3 100644 --- a/src/main/java/org/radarbase/output/FileStoreFactory.kt +++ b/src/main/java/org/radarbase/output/FileStoreFactory.kt @@ -26,9 +26,11 @@ import org.radarbase.output.config.RestructureConfig import org.radarbase.output.format.RecordConverterFactory import org.radarbase.output.path.RecordPathFactory import org.radarbase.output.source.SourceStorage +import org.radarbase.output.source.StorageIndexManager import org.radarbase.output.target.TargetStorage import org.radarbase.output.worker.FileCacheStore import java.io.IOException +import java.nio.file.Path /** Factory for all factory classes and settings. */ interface FileStoreFactory { @@ -42,6 +44,7 @@ interface FileStoreFactory { val redisHolder: RedisHolder val offsetPersistenceFactory: OffsetPersistenceFactory val workerSemaphore: Semaphore + val storageIndexManagers: Map @Throws(IOException::class) fun newFileCacheStore(accountant: Accountant): FileCacheStore diff --git a/src/main/java/org/radarbase/output/cleaner/SourceDataCleaner.kt b/src/main/java/org/radarbase/output/cleaner/SourceDataCleaner.kt index 1e7a086..c8ab192 100644 --- a/src/main/java/org/radarbase/output/cleaner/SourceDataCleaner.kt +++ b/src/main/java/org/radarbase/output/cleaner/SourceDataCleaner.kt @@ -11,6 +11,8 @@ import org.radarbase.output.FileStoreFactory import org.radarbase.output.accounting.Accountant import org.radarbase.output.accounting.AccountantImpl import org.radarbase.output.config.RestructureConfig +import org.radarbase.output.source.StorageIndex +import org.radarbase.output.source.StorageNode import org.radarbase.output.util.ResourceContext.Companion.resourceContext import org.radarbase.output.util.SuspendedCloseable.Companion.useSuspended import org.radarbase.output.util.Timer @@ -43,11 +45,11 @@ class SourceDataCleaner( private val supervisor = SupervisorJob() @Throws(IOException::class, InterruptedException::class) - suspend fun process(directoryName: String) { + suspend fun process(storageIndex: StorageIndex, directoryName: String) { // Get files and directories val absolutePath = Paths.get(directoryName) - val paths = topicPaths(absolutePath) + val paths = topicPaths(storageIndex, absolutePath) logger.info("{} topics found", paths.size) @@ -56,7 +58,7 @@ class SourceDataCleaner( launch { try { val deleteCount = fileStoreFactory.workerSemaphore.withPermit { - mapTopic(p) + mapTopic(storageIndex, p) } if (deleteCount > 0) { logger.info("Removed {} files in topic {}", deleteCount, p.fileName) @@ -70,7 +72,7 @@ class SourceDataCleaner( } } - private suspend fun mapTopic(topicPath: Path): Long { + private suspend fun mapTopic(storageIndex: StorageIndex, topicPath: Path): Long { val topic = topicPath.fileName.toString() return try { lockManager.tryWithLock(topic) { @@ -84,7 +86,7 @@ class SourceDataCleaner( fileStoreFactory, ) } - deleteOldFiles(accountant, extractionCheck, topic, topicPath).toLong() + deleteOldFiles(storageIndex, accountant, extractionCheck, topic, topicPath).toLong() } } } @@ -95,6 +97,7 @@ class SourceDataCleaner( } private suspend fun deleteOldFiles( + storageIndex: StorageIndex, accountant: Accountant, extractionCheck: ExtractionCheck, topic: String, @@ -102,7 +105,7 @@ class SourceDataCleaner( ): Int { val offsets = accountant.offsets.copyForTopic(topic) - val paths = sourceStorage.listTopicFiles(topic, topicPath, maxFilesPerTopic) { f -> + val paths = sourceStorage.listTopicFiles(storageIndex, topic, topicPath, maxFilesPerTopic) { f -> f.lastModified.isBefore(deleteThreshold) && // ensure that there is a file with a larger offset also // processed, so the largest offset is never removed. @@ -115,6 +118,7 @@ class SourceDataCleaner( logger.info("Removing {}", file.path) Timer.time("cleaner.delete") { sourceStorage.delete(file.path) + storageIndex.remove(StorageNode.StorageFile(file.path, Instant.MIN)) } true } else { @@ -127,8 +131,8 @@ class SourceDataCleaner( } } - private suspend fun topicPaths(path: Path): List = - sourceStorage.listTopics(path, excludeTopics) + private suspend fun topicPaths(storageIndex: StorageIndex, path: Path): List = + sourceStorage.listTopics(storageIndex, path, excludeTopics) // different services start on different topics to decrease lock contention .shuffled() @@ -147,9 +151,10 @@ class SourceDataCleaner( private suspend fun runCleaner(factory: FileStoreFactory) { SourceDataCleaner(factory).useSuspended { cleaner -> - for (input in factory.config.paths.inputs) { + for ((input, indexManager) in factory.storageIndexManagers) { + indexManager.update() logger.info("Cleaning {}", input) - cleaner.process(input.toString()) + cleaner.process(indexManager.storageIndex, input.toString()) } logger.info("Cleaned up {} files", cleaner.deletedFileCount.format()) } diff --git a/src/main/java/org/radarbase/output/config/ResourceConfig.kt b/src/main/java/org/radarbase/output/config/ResourceConfig.kt index 6d8c911..a4d4276 100644 --- a/src/main/java/org/radarbase/output/config/ResourceConfig.kt +++ b/src/main/java/org/radarbase/output/config/ResourceConfig.kt @@ -11,6 +11,7 @@ data class ResourceConfig( val hdfs: HdfsConfig? = null, val local: LocalConfig? = null, val azure: AzureConfig? = null, + val index: StorageIndexConfig? = null, ) { @get:JsonIgnore val sourceType: ResourceType by lazy { @@ -33,3 +34,4 @@ data class ResourceConfig( ResourceType.AZURE -> copyOnChange(azure, { it?.withEnv(prefix) }) { copy(azure = it) } } } + diff --git a/src/main/java/org/radarbase/output/config/StorageIndexConfig.kt b/src/main/java/org/radarbase/output/config/StorageIndexConfig.kt new file mode 100644 index 0000000..05fda27 --- /dev/null +++ b/src/main/java/org/radarbase/output/config/StorageIndexConfig.kt @@ -0,0 +1,12 @@ +package org.radarbase.output.config + +data class StorageIndexConfig( + /** How often to fully sync the storage index, in seconds. */ + val fullSyncInterval: Long = 3600L, + /** + * How often to sync empty directories with the storage index, in seconds. + * If this is very large, empty directories will only be scanned during + * full sync. + */ + val emptyDirectorySyncInterval: Long = 900L, +) diff --git a/src/main/java/org/radarbase/output/source/AzureSourceStorage.kt b/src/main/java/org/radarbase/output/source/AzureSourceStorage.kt index 9e78c87..2ca026d 100644 --- a/src/main/java/org/radarbase/output/source/AzureSourceStorage.kt +++ b/src/main/java/org/radarbase/output/source/AzureSourceStorage.kt @@ -11,6 +11,7 @@ import org.radarbase.output.util.TemporaryDirectory import org.radarbase.output.util.withoutFirstSegment import java.nio.file.Path import java.nio.file.Paths +import java.time.Instant import kotlin.io.path.createTempFile import kotlin.io.path.deleteIfExists @@ -24,22 +25,28 @@ class AzureSourceStorage( private fun blobClient(path: Path) = blobContainerClient.getBlobClient(path.withoutFirstSegment()) - override suspend fun list(path: Path, maxKeys: Int?): List = + override suspend fun list(path: Path, startAfter: Path?, maxKeys: Int?): List = withContext(Dispatchers.IO) { var iterable: Iterable = blobContainerClient.listBlobsByHierarchy("$path/") + if (startAfter != null) { + iterable = iterable.filter { Paths.get(it.name) > startAfter } + } if (maxKeys != null) { iterable = iterable.take(maxKeys) } iterable.map { - SimpleFileStatus( - Paths.get(it.name), - it.isPrefix ?: false, - it.properties?.lastModified?.toInstant(), - ) + if (it.isPrefix == true) { + StorageNode.StorageFile( + Paths.get(it.name), + it.properties?.lastModified?.toInstant() ?: Instant.now(), + ) + } else { + StorageNode.StorageDirectory(Paths.get(it.name)) + } } } - override suspend fun createTopicFile(topic: String, status: SimpleFileStatus): TopicFile { + override suspend fun createTopicFile(topic: String, status: StorageNode): TopicFile { var topicFile = super.createTopicFile(topic, status) if (readOffsetFromMetadata && topicFile.range.range.to == null) { diff --git a/src/main/java/org/radarbase/output/source/DelegatingStorageIndex.kt b/src/main/java/org/radarbase/output/source/DelegatingStorageIndex.kt new file mode 100644 index 0000000..3256efd --- /dev/null +++ b/src/main/java/org/radarbase/output/source/DelegatingStorageIndex.kt @@ -0,0 +1,10 @@ +package org.radarbase.output.source + +class DelegatingStorageIndex( + private val sourceStorage: SourceStorage, +) : StorageIndex { + override suspend fun list(dir: StorageNode.StorageDirectory, maxKeys: Int?): List = + sourceStorage.list(dir.path, maxKeys = maxKeys) + + override suspend fun remove(file: StorageNode.StorageFile) = Unit +} diff --git a/src/main/java/org/radarbase/output/source/InMemoryStorageIndex.kt b/src/main/java/org/radarbase/output/source/InMemoryStorageIndex.kt new file mode 100644 index 0000000..b77d013 --- /dev/null +++ b/src/main/java/org/radarbase/output/source/InMemoryStorageIndex.kt @@ -0,0 +1,121 @@ +package org.radarbase.output.source + +import org.radarbase.output.source.StorageIndex.Companion.ROOT +import java.nio.file.Path +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentMap + +class InMemoryStorageIndex : MutableStorageIndex { + private val fileIndex: ConcurrentMap> = ConcurrentHashMap() + private val rootSet = ConcurrentHashMap() + + init { + fileIndex[ROOT] = rootSet + } + + override suspend fun list(dir: StorageNode.StorageDirectory, maxKeys: Int?): List { + val listing = if (dir === ROOT) { + rootSet + } else { + fileIndex[dir] ?: return listOf() + } + + return if (maxKeys != null) { + listing.values.take(maxKeys) + } else { + listing.values.toList() + } + } + + private fun add(node: StorageNode) { + var currentNode = node + var parent = currentNode.parent() + if (currentNode is StorageNode.StorageDirectory) { + fileIndex.computeIfAbsent(currentNode) { + mapOf() + } + } + while (parent != null) { + fileIndex.compute(parent) { _, map -> + if (map == null) { + mapOf(currentNode.path to currentNode) + } else { + val newMap = map.toMutableMap() + newMap[currentNode.path] = currentNode + newMap + } + } + currentNode = parent + parent = currentNode.parent() + } + rootSet[currentNode.path] = currentNode + } + + override suspend fun addAll(parent: StorageNode.StorageDirectory, nodes: List): Collection { + add(parent) + nodes.asSequence() + .filterIsInstance() + .forEach { node -> + fileIndex.computeIfAbsent(node) { + mapOf() + } + } + val newMap = fileIndex.compute(parent) { _, map -> + if (map == null) { + buildMap(nodes.size) { + nodes.forEach { put(it.path, it) } + } + } else { + buildMap(nodes.size + map.size) { + putAll(map) + nodes.forEach { put(it.path, it) } + } + } + } ?: mapOf() + + return newMap.values + } + + override suspend fun sync(parent: StorageNode.StorageDirectory, nodes: List): Collection { + add(parent) + val newMap = fileIndex.compute(parent) { _, map -> + if (map == null) { + buildMap(nodes.size) { + nodes.forEach { put(it.path, it) } + } + } else { + buildMap(nodes.size) { + nodes.forEach { put(it.path, it) } + } + } + } ?: mapOf() + + nodes.asSequence() + .filterIsInstance() + .filter { it.path !in newMap } + .forEach { removeRecursive(it) } + + return newMap.values + } + + override suspend fun remove(file: StorageNode.StorageFile) { + val parent = file.parent() + + if (parent != null) { + fileIndex.computeIfPresent(parent) { _, map -> + (map - file.path).takeIf { it.isNotEmpty() } + } + } else { + rootSet.remove(file.path) + } + } + + private fun removeRecursive(node: StorageNode.StorageDirectory) { + val directoriesToRemove = ArrayDeque() + fileIndex.remove(node)?.values?.filterIsInstanceTo(directoriesToRemove) + while (directoriesToRemove.isNotEmpty()) { + val first = directoriesToRemove.removeFirst() + fileIndex.remove(first)?.values?.filterIsInstanceTo(directoriesToRemove) + } + } +} diff --git a/src/main/java/org/radarbase/output/source/MutableStorageIndex.kt b/src/main/java/org/radarbase/output/source/MutableStorageIndex.kt new file mode 100644 index 0000000..c367ae4 --- /dev/null +++ b/src/main/java/org/radarbase/output/source/MutableStorageIndex.kt @@ -0,0 +1,7 @@ +package org.radarbase.output.source + +interface MutableStorageIndex : StorageIndex { + suspend fun addAll(parent: StorageNode.StorageDirectory, nodes: List): Collection + + suspend fun sync(parent: StorageNode.StorageDirectory, nodes: List): Collection +} diff --git a/src/main/java/org/radarbase/output/source/MutableStorageIndexManager.kt b/src/main/java/org/radarbase/output/source/MutableStorageIndexManager.kt new file mode 100644 index 0000000..abe7d21 --- /dev/null +++ b/src/main/java/org/radarbase/output/source/MutableStorageIndexManager.kt @@ -0,0 +1,93 @@ +package org.radarbase.output.source + +import org.radarbase.kotlin.coroutines.forkJoin +import org.slf4j.LoggerFactory +import java.nio.file.Path +import java.time.Duration +import java.time.Instant + +class MutableStorageIndexManager( + storageIndex: MutableStorageIndex, + private val sourceStorage: SourceStorage, + private val rescanDirectoryDuration: Duration, + private val rescanEmptyDuration: Duration, + root: Path, +) : StorageIndexManager { + private val root = StorageNode.StorageDirectory(root) + private val mutableStorageIndex = storageIndex + override val storageIndex: StorageIndex = storageIndex + + var nextSync = Instant.MIN + private set + + var nextEmptySync = Instant.MIN + private set + + override suspend fun update() { + if (nextSync < Instant.now()) { + sync() + } else { + val rescanEmpty = nextEmptySync < Instant.now() + if (rescanEmpty) { + logger.info("Updating source {} index (including empty directories)...", root) + nextEmptySync = Instant.now() + rescanEmptyDuration + } else { + logger.info("Updating source {} index (excluding empty directories)...", root) + } + val listOperations = updateLevel(root, rescanEmpty) + logger.debug("Updated source {} with {} list operations...", root, listOperations) + } + } + + private suspend fun updateLevel(node: StorageNode.StorageDirectory, rescanEmpty: Boolean): Long { + val list = storageIndex.list(node) + if (list.isEmpty()) { + return if (rescanEmpty) { + syncLevel(node) + } else { + 0L + } + } + val lastFile = list.asSequence() + .filterIsInstance() + .maxByOrNull { it.path } + + var listOperations = if (lastFile != null) { + mutableStorageIndex.addAll(node, sourceStorage.list(node.path, startAfter = lastFile.path)) + 1L + } else { + 0L + } + + listOperations += storageIndex.list(node) + .filterIsInstance() + .forkJoin { updateLevel(it, rescanEmpty) } + .sum() + + return listOperations + } + + override suspend fun sync() { + logger.info("Syncing source {} index...", root) + val listOperations = syncLevel(root) + logger.debug("Synced source {} index with {} list operations...", root, listOperations) + nextSync = Instant.now() + rescanDirectoryDuration + nextEmptySync = Instant.now() + rescanEmptyDuration + } + + private suspend fun syncLevel(node: StorageNode.StorageDirectory): Long { + mutableStorageIndex.sync(node, sourceStorage.list(node.path)) + var listOperations = 1L + + listOperations += storageIndex.list(node) + .filterIsInstance() + .forkJoin { syncLevel(it) } + .sum() + + return listOperations + } + + companion object { + private val logger = LoggerFactory.getLogger(MutableStorageIndexManager::class.java) + } +} diff --git a/src/main/java/org/radarbase/output/source/S3SourceStorage.kt b/src/main/java/org/radarbase/output/source/S3SourceStorage.kt index 8624ec1..08e0d58 100644 --- a/src/main/java/org/radarbase/output/source/S3SourceStorage.kt +++ b/src/main/java/org/radarbase/output/source/S3SourceStorage.kt @@ -16,6 +16,7 @@ import org.slf4j.LoggerFactory import java.io.FileNotFoundException import java.nio.file.Path import java.nio.file.Paths +import kotlin.io.path.pathString import kotlin.time.Duration.Companion.seconds class S3SourceStorage( @@ -28,14 +29,18 @@ class S3SourceStorage( override suspend fun list( path: Path, + startAfter: Path?, maxKeys: Int?, - ): List { + ): List { val listRequest = ListObjectsArgs.Builder().bucketBuild(bucket) { if (maxKeys != null) { maxKeys(maxKeys.coerceAtMost(1000)) } prefix("$path/") recursive(false) + if (startAfter != null) { + startAfter(startAfter.pathString) + } useUrlEncodingType(false) } var iterable = faultTolerant { s3Client.listObjects(listRequest) } @@ -45,15 +50,16 @@ class S3SourceStorage( return iterable .map { val item = it.get() - SimpleFileStatus( - Paths.get(item.objectName()), - item.isDir, - if (item.isDir) null else item.lastModified().toInstant(), - ) + val itemPath = Paths.get(item.objectName()) + if (item.isDir) { + StorageNode.StorageDirectory(itemPath) + } else { + StorageNode.StorageFile(itemPath, item.lastModified().toInstant()) + } } } - override suspend fun createTopicFile(topic: String, status: SimpleFileStatus): TopicFile { + override suspend fun createTopicFile(topic: String, status: StorageNode): TopicFile { var topicFile = super.createTopicFile(topic, status) if (readEndOffset && topicFile.range.range.to == null) { diff --git a/src/main/java/org/radarbase/output/source/SourceStorage.kt b/src/main/java/org/radarbase/output/source/SourceStorage.kt index 45dda78..d07cf23 100644 --- a/src/main/java/org/radarbase/output/source/SourceStorage.kt +++ b/src/main/java/org/radarbase/output/source/SourceStorage.kt @@ -16,15 +16,16 @@ interface SourceStorage { /** List all files in the given directory. */ suspend fun list( path: Path, + startAfter: Path? = null, maxKeys: Int? = null, - ): List + ): List /** Delete given file. Will not delete any directories. */ suspend fun delete(path: Path) - suspend fun createTopicFile(topic: String, status: SimpleFileStatus): TopicFile = TopicFile( + suspend fun createTopicFile(topic: String, status: StorageNode): TopicFile = TopicFile( topic = topic, path = status.path, - lastModified = status.lastModified ?: Instant.now(), + lastModified = if (status is StorageNode.StorageFile) status.lastModified else Instant.now(), ) /** @@ -32,11 +33,12 @@ interface SourceStorage { * The path must only contain records of a single topic, this is not verified. */ suspend fun listTopicFiles( + storageIndex: StorageIndex, topic: String, topicPath: Path, limit: Int, predicate: (TopicFile) -> Boolean, - ): List = avroFileTreeLister() + ): List = storageIndex.avroFileTreeLister(this) .list(TopicPath(topic, topicPath), limit, predicate) /** @@ -44,9 +46,10 @@ interface SourceStorage { * Exclude paths belonging to the set of given excluded topics. */ suspend fun listTopics( + storageIndex: StorageIndex, root: Path, exclude: Set, - ): List = avroTopicTreeLister() + ): List = storageIndex.avroTopicTreeLister() .listTo(LinkedHashSet(), root) .filter { it.fileName.toString() !in exclude } diff --git a/src/main/java/org/radarbase/output/source/StorageIndex.kt b/src/main/java/org/radarbase/output/source/StorageIndex.kt new file mode 100644 index 0000000..c26f8c8 --- /dev/null +++ b/src/main/java/org/radarbase/output/source/StorageIndex.kt @@ -0,0 +1,13 @@ +package org.radarbase.output.source + +import java.nio.file.Paths + +interface StorageIndex { + suspend fun list(dir: StorageNode.StorageDirectory, maxKeys: Int? = null): List + + suspend fun remove(file: StorageNode.StorageFile) + + companion object { + val ROOT = StorageNode.StorageDirectory(Paths.get("/")) + } +} diff --git a/src/main/java/org/radarbase/output/source/StorageIndexManager.kt b/src/main/java/org/radarbase/output/source/StorageIndexManager.kt new file mode 100644 index 0000000..c0577ee --- /dev/null +++ b/src/main/java/org/radarbase/output/source/StorageIndexManager.kt @@ -0,0 +1,8 @@ +package org.radarbase.output.source + +interface StorageIndexManager { + val storageIndex: StorageIndex + + suspend fun update() + suspend fun sync() +} diff --git a/src/main/java/org/radarbase/output/source/StorageNode.kt b/src/main/java/org/radarbase/output/source/StorageNode.kt new file mode 100644 index 0000000..697ede7 --- /dev/null +++ b/src/main/java/org/radarbase/output/source/StorageNode.kt @@ -0,0 +1,24 @@ +package org.radarbase.output.source + +import java.nio.file.Path +import java.time.Instant + +sealed interface StorageNode { + val path: Path + + fun parent(): StorageDirectory? { + val parentPath = path.parent + return if (parentPath != null) { + StorageDirectory(parentPath) + } else { + null + } + } + + data class StorageDirectory(override val path: Path) : StorageNode + + data class StorageFile( + override val path: Path, + val lastModified: Instant, + ) : StorageNode +} diff --git a/src/main/java/org/radarbase/output/source/TopicFileList.kt b/src/main/java/org/radarbase/output/source/TopicFileList.kt index ee307ab..d8a9027 100644 --- a/src/main/java/org/radarbase/output/source/TopicFileList.kt +++ b/src/main/java/org/radarbase/output/source/TopicFileList.kt @@ -26,5 +26,3 @@ data class TopicFile( val size: Long? = range.range.size } - -data class SimpleFileStatus(val path: Path, val isDirectory: Boolean, val lastModified: Instant?) diff --git a/src/main/java/org/radarbase/output/util/AvroFileLister.kt b/src/main/java/org/radarbase/output/util/AvroFileLister.kt index ab07983..a4267aa 100644 --- a/src/main/java/org/radarbase/output/util/AvroFileLister.kt +++ b/src/main/java/org/radarbase/output/util/AvroFileLister.kt @@ -1,10 +1,14 @@ package org.radarbase.output.util +import kotlinx.coroutines.flow.toList import org.radarbase.output.source.SourceStorage +import org.radarbase.output.source.StorageIndex +import org.radarbase.output.source.StorageNode import org.radarbase.output.source.TopicFile class AvroFileLister( private val storage: SourceStorage, + private val storageIndex: StorageIndex, ) : TreeLister.LevelLister { override suspend fun listLevel( @@ -12,17 +16,19 @@ class AvroFileLister( descend: suspend (TopicPath) -> Unit, emit: suspend (TopicFile) -> Unit, ) { - storage.list(context.path).forEach { status -> - val filename = status.path.fileName.toString() - when { - status.isDirectory && filename != "+tmp" -> descend(context.copy(path = status.path)) - filename.endsWith(".avro") -> emit(storage.createTopicFile(context.topic, status)) - else -> {} + storageIndex.list(StorageNode.StorageDirectory(context.path)) + .toList() + .forEach { status -> + val filename = status.path.fileName.toString() + when { + status is StorageNode.StorageDirectory && filename != "+tmp" -> descend(context.copy(path = status.path)) + status is StorageNode.StorageFile && filename.endsWith(".avro") -> emit(storage.createTopicFile(context.topic, status)) + else -> {} + } } - } } companion object { - fun SourceStorage.avroFileTreeLister() = TreeLister(AvroFileLister(this)) + fun StorageIndex.avroFileTreeLister(sourceStorage: SourceStorage) = TreeLister(AvroFileLister(sourceStorage, this)) } } diff --git a/src/main/java/org/radarbase/output/util/AvroTopicLister.kt b/src/main/java/org/radarbase/output/util/AvroTopicLister.kt index 564a8b6..3e052a1 100644 --- a/src/main/java/org/radarbase/output/util/AvroTopicLister.kt +++ b/src/main/java/org/radarbase/output/util/AvroTopicLister.kt @@ -1,20 +1,23 @@ package org.radarbase.output.util -import org.radarbase.output.source.SourceStorage +import kotlinx.coroutines.flow.toList +import org.radarbase.output.source.StorageIndex +import org.radarbase.output.source.StorageNode import java.nio.file.Path class AvroTopicLister( - private val storage: SourceStorage, + private val storage: StorageIndex, ) : TreeLister.LevelLister { override suspend fun listLevel( context: Path, descend: suspend (Path) -> Unit, emit: suspend (Path) -> Unit, ) { - val fileStatuses = storage.list(context, maxKeys = 256) + val fileStatuses = storage.list(StorageNode.StorageDirectory(context), maxKeys = 256) + .toList() val avroFile = fileStatuses.find { file -> - !file.isDirectory && + file is StorageNode.StorageFile && file.path.fileName.toString().endsWith(".avro", true) } @@ -22,12 +25,12 @@ class AvroTopicLister( emit(avroFile.path.parent.parent) } else { fileStatuses - .filter { file -> file.isDirectory && file.path.fileName.toString() != "+tmp" } + .filter { file -> file is StorageNode.StorageDirectory && file.path.fileName.toString() != "+tmp" } .forEach { file -> descend(file.path) } } } companion object { - fun SourceStorage.avroTopicTreeLister() = TreeLister(AvroTopicLister(this)) + fun StorageIndex.avroTopicTreeLister() = TreeLister(AvroTopicLister(this)) } } diff --git a/src/main/java/org/radarbase/output/util/TreeLister.kt b/src/main/java/org/radarbase/output/util/TreeLister.kt index 3d03a31..dfa7b29 100644 --- a/src/main/java/org/radarbase/output/util/TreeLister.kt +++ b/src/main/java/org/radarbase/output/util/TreeLister.kt @@ -23,11 +23,9 @@ class TreeLister( val channel = Channel(capacity = limit) val producer = launch { - coroutineScope { - descend(context) { value -> - if (predicate == null || predicate(value)) { - channel.send(value) - } + descend(context) { value -> + if (predicate == null || predicate(value)) { + channel.send(value) } } channel.close() diff --git a/src/main/java/org/radarbase/output/worker/FileCache.kt b/src/main/java/org/radarbase/output/worker/FileCache.kt index 0cb8ee7..f46f2cd 100644 --- a/src/main/java/org/radarbase/output/worker/FileCache.kt +++ b/src/main/java/org/radarbase/output/worker/FileCache.kt @@ -17,8 +17,6 @@ package org.radarbase.output.worker import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import org.apache.avro.generic.GenericRecord import org.radarbase.output.FileStoreFactory @@ -109,13 +107,11 @@ class FileCache( converterFactory.converterFor(writer, record, fileIsNew, reader, excludeFields) } } catch (ex: IOException) { - coroutineScope { - launch(Dispatchers.IO) { - try { - writer.close() - } catch (exClose: IOException) { - logger.error("Failed to close writer for {}", path, ex) - } + withContext(Dispatchers.IO) { + try { + writer.close() + } catch (exClose: IOException) { + logger.error("Failed to close writer for {}", path, ex) } } diff --git a/src/main/java/org/radarbase/output/worker/RadarKafkaRestructure.kt b/src/main/java/org/radarbase/output/worker/RadarKafkaRestructure.kt index 7978b75..4fec805 100644 --- a/src/main/java/org/radarbase/output/worker/RadarKafkaRestructure.kt +++ b/src/main/java/org/radarbase/output/worker/RadarKafkaRestructure.kt @@ -28,6 +28,7 @@ import org.radarbase.output.accounting.Accountant import org.radarbase.output.accounting.AccountantImpl import org.radarbase.output.accounting.OffsetRangeSet import org.radarbase.output.config.RestructureConfig +import org.radarbase.output.source.StorageIndex import org.radarbase.output.source.TopicFileList import org.radarbase.output.util.SuspendedCloseable.Companion.useSuspended import org.radarbase.output.util.TimeUtil.durationSince @@ -77,13 +78,13 @@ class RadarKafkaRestructure( val processedRecordsCount = LongAdder() @Throws(IOException::class, InterruptedException::class) - suspend fun process(directoryName: String) { + suspend fun process(directoryName: String, storageIndex: StorageIndex) { // Get files and directories val absolutePath = Paths.get(directoryName) logger.info("Scanning topics...") - val paths = topicPaths(absolutePath) + val paths = topicPaths(storageIndex, absolutePath) logger.info("{} topics found", paths.size) @@ -92,7 +93,7 @@ class RadarKafkaRestructure( launch { try { val (fileCount, recordCount) = fileStoreFactory.workerSemaphore.withPermit { - mapTopic(p) + mapTopic(storageIndex, p) } processedFileCount.add(fileCount) processedRecordsCount.add(recordCount) @@ -104,7 +105,7 @@ class RadarKafkaRestructure( } } - private suspend fun mapTopic(topicPath: Path): ProcessingStatistics { + private suspend fun mapTopic(storageIndex: StorageIndex, topicPath: Path): ProcessingStatistics { val topic = topicPath.fileName.toString() return try { @@ -112,7 +113,7 @@ class RadarKafkaRestructure( coroutineScope { AccountantImpl(fileStoreFactory, topic).useSuspended { accountant -> accountant.initialize(this) - startWorker(topic, topicPath, accountant, accountant.offsets) + startWorker(storageIndex, topic, topicPath, accountant, accountant.offsets) } } } @@ -127,6 +128,7 @@ class RadarKafkaRestructure( } private suspend fun startWorker( + storageIndex: StorageIndex, topic: String, topicPath: Path, accountant: Accountant, @@ -140,7 +142,7 @@ class RadarKafkaRestructure( try { val topicPaths = TopicFileList( topic, - sourceStorage.listTopicFiles(topic, topicPath, maxFilesPerTopic) { f -> + sourceStorage.listTopicFiles(storageIndex, topic, topicPath, maxFilesPerTopic) { f -> !seenFiles.contains(f.range) && f.lastModified.durationSince() >= minimumFileAge }, @@ -161,8 +163,8 @@ class RadarKafkaRestructure( supervisor.cancel() } - private suspend fun topicPaths(root: Path): List = - sourceStorage.listTopics(root, excludeTopics) + private suspend fun topicPaths(storageIndex: StorageIndex, root: Path): List = + sourceStorage.listTopics(storageIndex, root, excludeTopics) // different services start on different topics to decrease lock contention .shuffled() @@ -182,7 +184,8 @@ class RadarKafkaRestructure( private suspend fun runRestructure(factory: FileStoreFactory) { RadarKafkaRestructure(factory).useSuspended { restructure -> - for (input in factory.config.paths.inputs) { + for ((input, index) in factory.storageIndexManagers) { + index.update() logger.info("In: {}", input) logger.info( "Out: bucket {} (default {}) - path {}", @@ -190,7 +193,7 @@ class RadarKafkaRestructure( factory.pathFactory.pathConfig.bucket?.defaultName, factory.pathFactory.pathConfig.path.format, ) - restructure.process(input.toString()) + restructure.process(input.toString(), index.storageIndex) } logger.info( diff --git a/src/main/resources/log4j2.properties b/src/main/resources/log4j2.properties deleted file mode 100644 index 7ba2364..0000000 --- a/src/main/resources/log4j2.properties +++ /dev/null @@ -1,9 +0,0 @@ -status=error -dest=err -name=PropertiesConfig -appender.console.type=Console -appender.console.name=STDOUT -appender.console.layout.type=PatternLayout -appender.console.layout.pattern=%d{yyyy-MM-dd HH:mm:ss} %-5p - %m (%c{1}:%L)%n -rootLogger.level=info -rootLogger.appenderRef.stdout.ref=STDOUT diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml new file mode 100644 index 0000000..725abb3 --- /dev/null +++ b/src/main/resources/log4j2.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + From 97827764c9e991f898e38710aaf0d3ad1caa38b8 Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Thu, 14 Sep 2023 17:24:18 +0200 Subject: [PATCH 2/4] Some small readability fixes --- .../output/source/InMemoryStorageIndex.kt | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/radarbase/output/source/InMemoryStorageIndex.kt b/src/main/java/org/radarbase/output/source/InMemoryStorageIndex.kt index b77d013..4669ffc 100644 --- a/src/main/java/org/radarbase/output/source/InMemoryStorageIndex.kt +++ b/src/main/java/org/radarbase/output/source/InMemoryStorageIndex.kt @@ -53,6 +53,11 @@ class InMemoryStorageIndex : MutableStorageIndex { override suspend fun addAll(parent: StorageNode.StorageDirectory, nodes: List): Collection { add(parent) + + if (nodes.isEmpty()) { + return fileIndex[parent]?.values ?: listOf() + } + nodes.asSequence() .filterIsInstance() .forEach { node -> @@ -78,17 +83,11 @@ class InMemoryStorageIndex : MutableStorageIndex { override suspend fun sync(parent: StorageNode.StorageDirectory, nodes: List): Collection { add(parent) - val newMap = fileIndex.compute(parent) { _, map -> - if (map == null) { - buildMap(nodes.size) { - nodes.forEach { put(it.path, it) } - } - } else { - buildMap(nodes.size) { - nodes.forEach { put(it.path, it) } - } - } - } ?: mapOf() + val newMap = buildMap(nodes.size) { + nodes.forEach { put(it.path, it) } + } + + fileIndex[parent] = newMap nodes.asSequence() .filterIsInstance() From e38c34217a1f1bfde42df06aa65e62ed9b7a8ddc Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Thu, 14 Sep 2023 20:30:12 +0200 Subject: [PATCH 3/4] FIx test --- src/main/java/org/radarbase/output/config/ResourceConfig.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/org/radarbase/output/config/ResourceConfig.kt b/src/main/java/org/radarbase/output/config/ResourceConfig.kt index a4d4276..418f87d 100644 --- a/src/main/java/org/radarbase/output/config/ResourceConfig.kt +++ b/src/main/java/org/radarbase/output/config/ResourceConfig.kt @@ -34,4 +34,3 @@ data class ResourceConfig( ResourceType.AZURE -> copyOnChange(azure, { it?.withEnv(prefix) }) { copy(azure = it) } } } - From c0f5373cfab7c582cd8602cd0e3bea335dd46151 Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Mon, 18 Sep 2023 13:57:40 +0200 Subject: [PATCH 4/4] Added kdoc and disabled wildcard imports --- .editorconfig | 3 - .../output/RestructureS3IntegrationTest.kt | 22 +++- .../output/accounting/OffsetRangeRedisTest.kt | 6 +- .../java/org/radarbase/output/Application.kt | 19 +++- .../output/format/JsonAvroConverterFactory.kt | 6 +- .../output/format/RecordConverterFactory.kt | 6 +- .../output/path/MPPathFormatterPlugin.kt | 7 +- .../output/source/DelegatingStorageIndex.kt | 4 + .../output/source/InMemoryStorageIndex.kt | 43 ++++---- .../output/source/MutableStorageIndex.kt | 15 ++- .../source/MutableStorageIndexManager.kt | 93 ---------------- .../output/source/S3SourceStorage.kt | 5 +- .../radarbase/output/source/StorageIndex.kt | 16 +++ .../output/source/StorageIndexManager.kt | 100 +++++++++++++++++- .../radarbase/output/source/StorageNode.kt | 13 +++ .../output/target/LocalTargetStorage.kt | 9 +- .../output/target/S3TargetStorage.kt | 11 +- .../radarbase/output/util/PostponedWriter.kt | 6 +- .../org/radarbase/output/util/TimeUtil.kt | 10 +- .../org/radarbase/output/worker/FileCache.kt | 7 +- .../radarbase/output/OffsetRangeFileTest.kt | 6 +- .../radarbase/output/OffsetRangeSetTest.kt | 4 +- .../output/accounting/OffsetIntervalsTest.kt | 4 +- .../output/data/CompressionFactoryTest.kt | 6 +- .../output/data/CsvAvroConverterTest.kt | 4 +- .../output/data/FileCacheStoreTest.kt | 6 +- 26 files changed, 283 insertions(+), 148 deletions(-) delete mode 100644 src/main/java/org/radarbase/output/source/MutableStorageIndexManager.kt diff --git a/.editorconfig b/.editorconfig index 3108529..78b36ca 100644 --- a/.editorconfig +++ b/.editorconfig @@ -1,4 +1 @@ root = true - -[*.kt] -ktlint_standard_no-wildcard-imports = disabled diff --git a/src/integrationTest/java/org/radarbase/output/RestructureS3IntegrationTest.kt b/src/integrationTest/java/org/radarbase/output/RestructureS3IntegrationTest.kt index 0bf8ecd..60fcb7b 100644 --- a/src/integrationTest/java/org/radarbase/output/RestructureS3IntegrationTest.kt +++ b/src/integrationTest/java/org/radarbase/output/RestructureS3IntegrationTest.kt @@ -1,12 +1,28 @@ package org.radarbase.output -import io.minio.* +import io.minio.BucketExistsArgs +import io.minio.GetObjectArgs +import io.minio.ListObjectsArgs +import io.minio.MakeBucketArgs import io.minio.ObjectWriteArgs.MAX_PART_SIZE -import kotlinx.coroutines.* +import io.minio.PutObjectArgs +import io.minio.RemoveBucketArgs +import io.minio.RemoveObjectArgs +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.joinAll +import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.withContext import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test -import org.radarbase.output.config.* +import org.radarbase.output.config.PathConfig +import org.radarbase.output.config.PathFormatterConfig +import org.radarbase.output.config.ResourceConfig +import org.radarbase.output.config.RestructureConfig +import org.radarbase.output.config.S3Config +import org.radarbase.output.config.TopicConfig +import org.radarbase.output.config.WorkerConfig import org.radarbase.output.util.SuspendedCloseable.Companion.useSuspended import org.radarbase.output.util.Timer import org.radarbase.output.util.bucketBuild diff --git a/src/integrationTest/java/org/radarbase/output/accounting/OffsetRangeRedisTest.kt b/src/integrationTest/java/org/radarbase/output/accounting/OffsetRangeRedisTest.kt index 62c757a..5c26ded 100644 --- a/src/integrationTest/java/org/radarbase/output/accounting/OffsetRangeRedisTest.kt +++ b/src/integrationTest/java/org/radarbase/output/accounting/OffsetRangeRedisTest.kt @@ -2,7 +2,11 @@ package org.radarbase.output.accounting import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.AfterEach -import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.radarbase.output.accounting.OffsetRedisPersistence.Companion.redisOffsetReader diff --git a/src/main/java/org/radarbase/output/Application.kt b/src/main/java/org/radarbase/output/Application.kt index 2850007..b6baddb 100644 --- a/src/main/java/org/radarbase/output/Application.kt +++ b/src/main/java/org/radarbase/output/Application.kt @@ -18,17 +18,26 @@ package org.radarbase.output import com.beust.jcommander.JCommander import com.beust.jcommander.ParameterException -import kotlinx.coroutines.* +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Semaphore -import org.radarbase.output.accounting.* +import org.radarbase.output.accounting.Accountant +import org.radarbase.output.accounting.OffsetPersistenceFactory +import org.radarbase.output.accounting.OffsetRedisPersistence +import org.radarbase.output.accounting.RedisHolder +import org.radarbase.output.accounting.RedisRemoteLockManager +import org.radarbase.output.accounting.RemoteLockManager import org.radarbase.output.cleaner.SourceDataCleaner import org.radarbase.output.compression.Compression import org.radarbase.output.config.CommandLineArgs import org.radarbase.output.config.RestructureConfig import org.radarbase.output.format.RecordConverterFactory import org.radarbase.output.path.RecordPathFactory -import org.radarbase.output.source.* +import org.radarbase.output.source.InMemoryStorageIndex +import org.radarbase.output.source.SourceStorage +import org.radarbase.output.source.SourceStorageFactory +import org.radarbase.output.source.StorageIndexManager import org.radarbase.output.target.TargetStorage import org.radarbase.output.target.TargetStorageFactory import org.radarbase.output.util.Timer @@ -92,12 +101,12 @@ class Application( }.map { Duration.ofSeconds(it) } storageIndexManagers = config.paths.inputs.associateWith { input -> - MutableStorageIndexManager( + StorageIndexManager( InMemoryStorageIndex(), sourceStorage, + input, fullScan, emptyScan, - input, ) } val serviceMutex = Mutex() diff --git a/src/main/java/org/radarbase/output/format/JsonAvroConverterFactory.kt b/src/main/java/org/radarbase/output/format/JsonAvroConverterFactory.kt index d032f3e..02226b9 100644 --- a/src/main/java/org/radarbase/output/format/JsonAvroConverterFactory.kt +++ b/src/main/java/org/radarbase/output/format/JsonAvroConverterFactory.kt @@ -6,7 +6,11 @@ import org.radarbase.output.format.JsonAvroConverter.Companion.JSON_READER import org.radarbase.output.format.JsonAvroConverter.Companion.JSON_WRITER import org.radarbase.output.util.ResourceContext.Companion.resourceContext import org.radarbase.output.util.TimeUtil.getDate -import java.io.* +import java.io.BufferedReader +import java.io.IOException +import java.io.InputStream +import java.io.Reader +import java.io.Writer import java.nio.file.Path import kotlin.io.path.inputStream diff --git a/src/main/java/org/radarbase/output/format/RecordConverterFactory.kt b/src/main/java/org/radarbase/output/format/RecordConverterFactory.kt index 8843a0d..f0c70a9 100644 --- a/src/main/java/org/radarbase/output/format/RecordConverterFactory.kt +++ b/src/main/java/org/radarbase/output/format/RecordConverterFactory.kt @@ -21,7 +21,11 @@ import org.apache.avro.generic.GenericData import org.apache.avro.generic.GenericRecord import org.radarbase.output.compression.Compression import org.radarbase.output.util.ResourceContext.Companion.resourceContext -import java.io.* +import java.io.BufferedReader +import java.io.IOException +import java.io.InputStream +import java.io.Reader +import java.io.Writer import java.nio.file.Path import java.util.regex.Pattern import kotlin.collections.component1 diff --git a/src/main/java/org/radarbase/output/path/MPPathFormatterPlugin.kt b/src/main/java/org/radarbase/output/path/MPPathFormatterPlugin.kt index 4f4e702..883bc80 100644 --- a/src/main/java/org/radarbase/output/path/MPPathFormatterPlugin.kt +++ b/src/main/java/org/radarbase/output/path/MPPathFormatterPlugin.kt @@ -1,6 +1,11 @@ package org.radarbase.output.path -import kotlinx.coroutines.* +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch import org.radarbase.kotlin.coroutines.CacheConfig import org.radarbase.kotlin.coroutines.CachedMap import org.radarbase.ktor.auth.ClientCredentialsConfig diff --git a/src/main/java/org/radarbase/output/source/DelegatingStorageIndex.kt b/src/main/java/org/radarbase/output/source/DelegatingStorageIndex.kt index 3256efd..a9754df 100644 --- a/src/main/java/org/radarbase/output/source/DelegatingStorageIndex.kt +++ b/src/main/java/org/radarbase/output/source/DelegatingStorageIndex.kt @@ -1,5 +1,9 @@ package org.radarbase.output.source +/** + * Delegate all calls directly to the underlying storage. This effectively means that no caching + * takes place. + */ class DelegatingStorageIndex( private val sourceStorage: SourceStorage, ) : StorageIndex { diff --git a/src/main/java/org/radarbase/output/source/InMemoryStorageIndex.kt b/src/main/java/org/radarbase/output/source/InMemoryStorageIndex.kt index 4669ffc..e167624 100644 --- a/src/main/java/org/radarbase/output/source/InMemoryStorageIndex.kt +++ b/src/main/java/org/radarbase/output/source/InMemoryStorageIndex.kt @@ -5,6 +5,11 @@ import java.nio.file.Path import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentMap +/** + * Storage index that keeps the given file tree in memory. + * For very large file systems, this may + * cause a memory issue. + */ class InMemoryStorageIndex : MutableStorageIndex { private val fileIndex: ConcurrentMap> = ConcurrentHashMap() private val rootSet = ConcurrentHashMap() @@ -27,35 +32,33 @@ class InMemoryStorageIndex : MutableStorageIndex { } } - private fun add(node: StorageNode) { - var currentNode = node - var parent = currentNode.parent() - if (currentNode is StorageNode.StorageDirectory) { - fileIndex.computeIfAbsent(currentNode) { - mapOf() - } - } - while (parent != null) { - fileIndex.compute(parent) { _, map -> + /** Adds a node and all its parents to the file hierarchy. */ + private fun add(dir: StorageNode.StorageDirectory) { + var currentDir = dir + var parentDir = currentDir.parent() + while (parentDir != null) { + fileIndex.compute(parentDir) { _, map -> if (map == null) { - mapOf(currentNode.path to currentNode) + mapOf(currentDir.path to currentDir) } else { - val newMap = map.toMutableMap() - newMap[currentNode.path] = currentNode - newMap + buildMap(map.size + 1) { + putAll(map) + put(currentDir.path, currentDir) + } } } - currentNode = parent - parent = currentNode.parent() + currentDir = parentDir + parentDir = currentDir.parent() } - rootSet[currentNode.path] = currentNode + rootSet[currentDir.path] = currentDir } override suspend fun addAll(parent: StorageNode.StorageDirectory, nodes: List): Collection { add(parent) if (nodes.isEmpty()) { - return fileIndex[parent]?.values ?: listOf() + return fileIndex[parent]?.values + ?: listOf() } nodes.asSequence() @@ -81,7 +84,7 @@ class InMemoryStorageIndex : MutableStorageIndex { return newMap.values } - override suspend fun sync(parent: StorageNode.StorageDirectory, nodes: List): Collection { + override suspend fun sync(parent: StorageNode.StorageDirectory, nodes: List) { add(parent) val newMap = buildMap(nodes.size) { nodes.forEach { put(it.path, it) } @@ -93,8 +96,6 @@ class InMemoryStorageIndex : MutableStorageIndex { .filterIsInstance() .filter { it.path !in newMap } .forEach { removeRecursive(it) } - - return newMap.values } override suspend fun remove(file: StorageNode.StorageFile) { diff --git a/src/main/java/org/radarbase/output/source/MutableStorageIndex.kt b/src/main/java/org/radarbase/output/source/MutableStorageIndex.kt index c367ae4..30f850a 100644 --- a/src/main/java/org/radarbase/output/source/MutableStorageIndex.kt +++ b/src/main/java/org/radarbase/output/source/MutableStorageIndex.kt @@ -1,7 +1,20 @@ package org.radarbase.output.source +/** Storage index that may be modified by the storage index manager. */ interface MutableStorageIndex : StorageIndex { + /** + * Add a list of storage nodes to the given directory. + * All values in [nodes] should have [parent] as parent node. No nodes will be removed from the + * current directory listing, but updated values (e.g. last modified values) will be overridden. + * + * @return the current file listing after adding new nodes. + */ suspend fun addAll(parent: StorageNode.StorageDirectory, nodes: List): Collection - suspend fun sync(parent: StorageNode.StorageDirectory, nodes: List): Collection + /** + * Fully sync a storage node list with the index. + * All values in [nodes] should have [parent] as parent node. All nodes in the index + * corresponding to [parent] will be removed from that directory and replaced by the given list. + */ + suspend fun sync(parent: StorageNode.StorageDirectory, nodes: List) } diff --git a/src/main/java/org/radarbase/output/source/MutableStorageIndexManager.kt b/src/main/java/org/radarbase/output/source/MutableStorageIndexManager.kt deleted file mode 100644 index abe7d21..0000000 --- a/src/main/java/org/radarbase/output/source/MutableStorageIndexManager.kt +++ /dev/null @@ -1,93 +0,0 @@ -package org.radarbase.output.source - -import org.radarbase.kotlin.coroutines.forkJoin -import org.slf4j.LoggerFactory -import java.nio.file.Path -import java.time.Duration -import java.time.Instant - -class MutableStorageIndexManager( - storageIndex: MutableStorageIndex, - private val sourceStorage: SourceStorage, - private val rescanDirectoryDuration: Duration, - private val rescanEmptyDuration: Duration, - root: Path, -) : StorageIndexManager { - private val root = StorageNode.StorageDirectory(root) - private val mutableStorageIndex = storageIndex - override val storageIndex: StorageIndex = storageIndex - - var nextSync = Instant.MIN - private set - - var nextEmptySync = Instant.MIN - private set - - override suspend fun update() { - if (nextSync < Instant.now()) { - sync() - } else { - val rescanEmpty = nextEmptySync < Instant.now() - if (rescanEmpty) { - logger.info("Updating source {} index (including empty directories)...", root) - nextEmptySync = Instant.now() + rescanEmptyDuration - } else { - logger.info("Updating source {} index (excluding empty directories)...", root) - } - val listOperations = updateLevel(root, rescanEmpty) - logger.debug("Updated source {} with {} list operations...", root, listOperations) - } - } - - private suspend fun updateLevel(node: StorageNode.StorageDirectory, rescanEmpty: Boolean): Long { - val list = storageIndex.list(node) - if (list.isEmpty()) { - return if (rescanEmpty) { - syncLevel(node) - } else { - 0L - } - } - val lastFile = list.asSequence() - .filterIsInstance() - .maxByOrNull { it.path } - - var listOperations = if (lastFile != null) { - mutableStorageIndex.addAll(node, sourceStorage.list(node.path, startAfter = lastFile.path)) - 1L - } else { - 0L - } - - listOperations += storageIndex.list(node) - .filterIsInstance() - .forkJoin { updateLevel(it, rescanEmpty) } - .sum() - - return listOperations - } - - override suspend fun sync() { - logger.info("Syncing source {} index...", root) - val listOperations = syncLevel(root) - logger.debug("Synced source {} index with {} list operations...", root, listOperations) - nextSync = Instant.now() + rescanDirectoryDuration - nextEmptySync = Instant.now() + rescanEmptyDuration - } - - private suspend fun syncLevel(node: StorageNode.StorageDirectory): Long { - mutableStorageIndex.sync(node, sourceStorage.list(node.path)) - var listOperations = 1L - - listOperations += storageIndex.list(node) - .filterIsInstance() - .forkJoin { syncLevel(it) } - .sum() - - return listOperations - } - - companion object { - private val logger = LoggerFactory.getLogger(MutableStorageIndexManager::class.java) - } -} diff --git a/src/main/java/org/radarbase/output/source/S3SourceStorage.kt b/src/main/java/org/radarbase/output/source/S3SourceStorage.kt index 08e0d58..8a33444 100644 --- a/src/main/java/org/radarbase/output/source/S3SourceStorage.kt +++ b/src/main/java/org/radarbase/output/source/S3SourceStorage.kt @@ -1,6 +1,9 @@ package org.radarbase.output.source -import io.minio.* +import io.minio.GetObjectTagsArgs +import io.minio.ListObjectsArgs +import io.minio.MinioClient +import io.minio.RemoveObjectArgs import io.minio.errors.ErrorResponseException import io.minio.messages.Tags import kotlinx.coroutines.Dispatchers diff --git a/src/main/java/org/radarbase/output/source/StorageIndex.kt b/src/main/java/org/radarbase/output/source/StorageIndex.kt index c26f8c8..51dd258 100644 --- a/src/main/java/org/radarbase/output/source/StorageIndex.kt +++ b/src/main/java/org/radarbase/output/source/StorageIndex.kt @@ -2,12 +2,28 @@ package org.radarbase.output.source import java.nio.file.Paths +/** + * Index of files in a source storage. + * This index does not modify itself so it needs to be synced by a [StorageIndexManager]. + */ interface StorageIndex { + /** + * List given directory. + * If [maxKeys] is given, no more than that many entries will be returned. + */ suspend fun list(dir: StorageNode.StorageDirectory, maxKeys: Int? = null): List + /** + * Remove a file from the index. + * This will typically be called if the file was removed by the current process. + */ suspend fun remove(file: StorageNode.StorageFile) companion object { + /** + * Root directory. All files that are in the index can be found by traversing the index + * starting at this root. + */ val ROOT = StorageNode.StorageDirectory(Paths.get("/")) } } diff --git a/src/main/java/org/radarbase/output/source/StorageIndexManager.kt b/src/main/java/org/radarbase/output/source/StorageIndexManager.kt index c0577ee..c9ed4ed 100644 --- a/src/main/java/org/radarbase/output/source/StorageIndexManager.kt +++ b/src/main/java/org/radarbase/output/source/StorageIndexManager.kt @@ -1,8 +1,100 @@ package org.radarbase.output.source -interface StorageIndexManager { - val storageIndex: StorageIndex +import org.radarbase.kotlin.coroutines.forkJoin +import org.slf4j.LoggerFactory +import java.nio.file.Path +import java.time.Duration +import java.time.Instant - suspend fun update() - suspend fun sync() +/** Manager to manage a storage index. */ +class StorageIndexManager( + /** Storage index to manage. */ + val storageIndex: StorageIndex, + /** Source storage to index. */ + private val sourceStorage: SourceStorage, + /** Root directory in source storage to start scanning. */ + root: Path, + /** How often to rescan the full directory structure. */ + private val rescanDirectoryDuration: Duration, + /** How often to rescan empty directories. */ + private val rescanEmptyDuration: Duration, +) { + private val root = StorageNode.StorageDirectory(root) + + private var nextSync = Instant.MIN + + private var nextEmptySync = Instant.MIN + + /** Update the storage index, taking into account caching times. */ + suspend fun update() { + if (storageIndex !is MutableStorageIndex) return + if (nextSync < Instant.now()) { + sync() + } else { + val rescanEmpty = nextEmptySync < Instant.now() + if (rescanEmpty) { + logger.info("Updating source {} index (including empty directories)...", root) + nextEmptySync = Instant.now() + rescanEmptyDuration + } else { + logger.info("Updating source {} index (excluding empty directories)...", root) + } + val listOperations = storageIndex.updateLevel(root, rescanEmpty) + logger.debug("Updated source {} with {} list operations...", root, listOperations) + } + } + + private suspend fun MutableStorageIndex.updateLevel(node: StorageNode.StorageDirectory, rescanEmpty: Boolean): Long { + val list = list(node) + if (list.isEmpty()) { + return if (rescanEmpty) { + syncLevel(node) + } else { + 0L + } + } + val lastFile = list.asSequence() + .filterIsInstance() + .maxByOrNull { it.path } + + val currentOperations = if (lastFile != null) { + addAll(node, sourceStorage.list(node.path, startAfter = lastFile.path)) + 1L + } else { + 0L + } + + val listOperations = list(node) + .filterIsInstance() + .filterNot { it.path.fileName.toString() == "+tmp" } + .forkJoin { updateLevel(it, rescanEmpty) } + .sum() + + return currentOperations + listOperations + } + + /** Fully synchronize the storage index with the source storage. */ + suspend fun sync() { + if (storageIndex !is MutableStorageIndex) return + logger.info("Syncing source {} index...", root) + val listOperations = storageIndex.syncLevel(root) + logger.debug("Synced source {} index with {} list operations...", root, listOperations) + nextSync = Instant.now() + rescanDirectoryDuration + nextEmptySync = Instant.now() + rescanEmptyDuration + } + + private suspend fun MutableStorageIndex.syncLevel(node: StorageNode.StorageDirectory): Long { + sync(node, sourceStorage.list(node.path)) + + val listOperations = list(node) + .filterIsInstance() + .filterNot { it.path.fileName.toString() == "+tmp" } + .forkJoin { syncLevel(it) } + .sum() + + return 1L + listOperations + } + + companion object { + private val logger = LoggerFactory.getLogger(StorageIndexManager::class.java) + } } diff --git a/src/main/java/org/radarbase/output/source/StorageNode.kt b/src/main/java/org/radarbase/output/source/StorageNode.kt index 697ede7..14fd1d1 100644 --- a/src/main/java/org/radarbase/output/source/StorageNode.kt +++ b/src/main/java/org/radarbase/output/source/StorageNode.kt @@ -3,9 +3,17 @@ package org.radarbase.output.source import java.nio.file.Path import java.time.Instant +/** + * A node in a file tree of the source or target storage. + */ sealed interface StorageNode { + /** Path that the node represents.. */ val path: Path + /** + * Parent of the current node, or `null` if the current node is the storage root or topmost + * level of a relative path. + */ fun parent(): StorageDirectory? { val parentPath = path.parent return if (parentPath != null) { @@ -15,10 +23,15 @@ sealed interface StorageNode { } } + /** Storage node that represents a directory. */ data class StorageDirectory(override val path: Path) : StorageNode + /** + * Storage node that represents a file. + */ data class StorageFile( override val path: Path, + /** Time that the file was last modified. */ val lastModified: Instant, ) : StorageNode } diff --git a/src/main/java/org/radarbase/output/target/LocalTargetStorage.kt b/src/main/java/org/radarbase/output/target/LocalTargetStorage.kt index 5a9e692..be1aafb 100644 --- a/src/main/java/org/radarbase/output/target/LocalTargetStorage.kt +++ b/src/main/java/org/radarbase/output/target/LocalTargetStorage.kt @@ -27,7 +27,14 @@ import java.nio.file.Path import java.nio.file.StandardCopyOption.ATOMIC_MOVE import java.nio.file.StandardCopyOption.REPLACE_EXISTING import java.nio.file.attribute.PosixFilePermissions -import kotlin.io.path.* +import kotlin.io.path.createDirectories +import kotlin.io.path.deleteExisting +import kotlin.io.path.exists +import kotlin.io.path.fileSize +import kotlin.io.path.inputStream +import kotlin.io.path.moveTo +import kotlin.io.path.setAttribute +import kotlin.io.path.setPosixFilePermissions class LocalTargetStorage(private val config: LocalConfig) : TargetStorage { init { diff --git a/src/main/java/org/radarbase/output/target/S3TargetStorage.kt b/src/main/java/org/radarbase/output/target/S3TargetStorage.kt index 3556c6e..6eba54a 100644 --- a/src/main/java/org/radarbase/output/target/S3TargetStorage.kt +++ b/src/main/java/org/radarbase/output/target/S3TargetStorage.kt @@ -16,7 +16,16 @@ package org.radarbase.output.target -import io.minio.* +import io.minio.BucketArgs +import io.minio.BucketExistsArgs +import io.minio.CopyObjectArgs +import io.minio.CopySource +import io.minio.GetObjectArgs +import io.minio.MakeBucketArgs +import io.minio.MinioClient +import io.minio.RemoveObjectArgs +import io.minio.StatObjectArgs +import io.minio.UploadObjectArgs import org.radarbase.kotlin.coroutines.CacheConfig import org.radarbase.kotlin.coroutines.CachedValue import org.radarbase.output.config.S3Config diff --git a/src/main/java/org/radarbase/output/util/PostponedWriter.kt b/src/main/java/org/radarbase/output/util/PostponedWriter.kt index 8ebccbd..d1da23d 100644 --- a/src/main/java/org/radarbase/output/util/PostponedWriter.kt +++ b/src/main/java/org/radarbase/output/util/PostponedWriter.kt @@ -16,7 +16,11 @@ package org.radarbase.output.util -import kotlinx.coroutines.* +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import java.io.IOException diff --git a/src/main/java/org/radarbase/output/util/TimeUtil.kt b/src/main/java/org/radarbase/output/util/TimeUtil.kt index cd1252a..aef9dc5 100644 --- a/src/main/java/org/radarbase/output/util/TimeUtil.kt +++ b/src/main/java/org/radarbase/output/util/TimeUtil.kt @@ -5,7 +5,11 @@ import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.radarbase.output.path.RecordPathFactory.Companion.getFieldOrNull import java.math.RoundingMode -import java.time.* +import java.time.Duration +import java.time.Instant +import java.time.LocalDate +import java.time.LocalDateTime +import java.time.ZoneOffset.UTC import java.time.format.DateTimeParseException import java.time.temporal.Temporal @@ -131,7 +135,7 @@ object TimeUtil { fun String.parseDate(): Instant? = try { LocalDate.parse(this) - .atStartOfDay(ZoneOffset.UTC) + .atStartOfDay(UTC) .toInstant() } catch (ex: DateTimeParseException) { null @@ -141,7 +145,7 @@ object TimeUtil { if (this[lastIndex] == 'Z') { Instant.parse(this) } else { - LocalDateTime.parse(this).toInstant(ZoneOffset.UTC) + LocalDateTime.parse(this).toInstant(UTC) } } catch (ex: DateTimeParseException) { null diff --git a/src/main/java/org/radarbase/output/worker/FileCache.kt b/src/main/java/org/radarbase/output/worker/FileCache.kt index f46f2cd..a7602e6 100644 --- a/src/main/java/org/radarbase/output/worker/FileCache.kt +++ b/src/main/java/org/radarbase/output/worker/FileCache.kt @@ -30,7 +30,12 @@ import org.radarbase.output.util.SuspendedCloseable import org.radarbase.output.util.SuspendedCloseable.Companion.useSuspended import org.radarbase.output.util.Timer.time import org.slf4j.LoggerFactory -import java.io.* +import java.io.ByteArrayInputStream +import java.io.FileNotFoundException +import java.io.IOException +import java.io.InputStream +import java.io.OutputStream +import java.io.Writer import java.nio.file.AtomicMoveNotSupportedException import java.nio.file.Path import java.nio.file.StandardCopyOption diff --git a/src/test/java/org/radarbase/output/OffsetRangeFileTest.kt b/src/test/java/org/radarbase/output/OffsetRangeFileTest.kt index cac51c8..bc51430 100644 --- a/src/test/java/org/radarbase/output/OffsetRangeFileTest.kt +++ b/src/test/java/org/radarbase/output/OffsetRangeFileTest.kt @@ -17,7 +17,11 @@ package org.radarbase.output import kotlinx.coroutines.test.runTest -import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.io.TempDir diff --git a/src/test/java/org/radarbase/output/OffsetRangeSetTest.kt b/src/test/java/org/radarbase/output/OffsetRangeSetTest.kt index f291f87..bc8bc4d 100644 --- a/src/test/java/org/radarbase/output/OffsetRangeSetTest.kt +++ b/src/test/java/org/radarbase/output/OffsetRangeSetTest.kt @@ -1,6 +1,8 @@ package org.radarbase.output -import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test import org.radarbase.output.accounting.OffsetRangeSet import org.radarbase.output.accounting.TopicPartition diff --git a/src/test/java/org/radarbase/output/accounting/OffsetIntervalsTest.kt b/src/test/java/org/radarbase/output/accounting/OffsetIntervalsTest.kt index bc8cf97..349d343 100644 --- a/src/test/java/org/radarbase/output/accounting/OffsetIntervalsTest.kt +++ b/src/test/java/org/radarbase/output/accounting/OffsetIntervalsTest.kt @@ -1,6 +1,8 @@ package org.radarbase.output.accounting -import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test import java.time.Instant diff --git a/src/test/java/org/radarbase/output/data/CompressionFactoryTest.kt b/src/test/java/org/radarbase/output/data/CompressionFactoryTest.kt index f95fff9..98c9933 100644 --- a/src/test/java/org/radarbase/output/data/CompressionFactoryTest.kt +++ b/src/test/java/org/radarbase/output/data/CompressionFactoryTest.kt @@ -4,7 +4,11 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertSame import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.radarbase.output.compression.* +import org.radarbase.output.compression.Compression +import org.radarbase.output.compression.CompressionFactory +import org.radarbase.output.compression.GzipCompression +import org.radarbase.output.compression.IdentityCompression +import org.radarbase.output.compression.ZipCompression import java.io.ByteArrayInputStream import java.io.ByteArrayOutputStream import java.io.IOException diff --git a/src/test/java/org/radarbase/output/data/CsvAvroConverterTest.kt b/src/test/java/org/radarbase/output/data/CsvAvroConverterTest.kt index 05e05aa..429b8f6 100644 --- a/src/test/java/org/radarbase/output/data/CsvAvroConverterTest.kt +++ b/src/test/java/org/radarbase/output/data/CsvAvroConverterTest.kt @@ -26,7 +26,9 @@ import org.apache.avro.generic.GenericDatumReader import org.apache.avro.generic.GenericRecord import org.apache.avro.generic.GenericRecordBuilder import org.apache.avro.io.DecoderFactory -import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Test import org.junit.jupiter.api.io.TempDir import org.radarbase.output.compression.GzipCompression diff --git a/src/test/java/org/radarbase/output/data/FileCacheStoreTest.kt b/src/test/java/org/radarbase/output/data/FileCacheStoreTest.kt index 4e0818b..dd2c64f 100644 --- a/src/test/java/org/radarbase/output/data/FileCacheStoreTest.kt +++ b/src/test/java/org/radarbase/output/data/FileCacheStoreTest.kt @@ -35,7 +35,11 @@ import org.radarbase.output.accounting.Accountant import org.radarbase.output.accounting.OffsetRangeSet import org.radarbase.output.accounting.TopicPartition import org.radarbase.output.accounting.TopicPartitionOffsetRange -import org.radarbase.output.config.* +import org.radarbase.output.config.HdfsConfig +import org.radarbase.output.config.PathConfig +import org.radarbase.output.config.ResourceConfig +import org.radarbase.output.config.RestructureConfig +import org.radarbase.output.config.WorkerConfig import org.radarbase.output.util.SuspendedCloseable.Companion.useSuspended import org.radarbase.output.worker.FileCacheStore import java.io.IOException