Skip to content

Commit

Permalink
Added kdoc and disabled wildcard imports
Browse files Browse the repository at this point in the history
  • Loading branch information
blootsvoets committed Sep 18, 2023
1 parent e38c342 commit c0f5373
Show file tree
Hide file tree
Showing 26 changed files with 283 additions and 148 deletions.
3 changes: 0 additions & 3 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
root = true

[*.kt]
ktlint_standard_no-wildcard-imports = disabled
Original file line number Diff line number Diff line change
@@ -1,12 +1,28 @@
package org.radarbase.output

import io.minio.*
import io.minio.BucketExistsArgs
import io.minio.GetObjectArgs
import io.minio.ListObjectsArgs
import io.minio.MakeBucketArgs
import io.minio.ObjectWriteArgs.MAX_PART_SIZE
import kotlinx.coroutines.*
import io.minio.PutObjectArgs
import io.minio.RemoveBucketArgs
import io.minio.RemoveObjectArgs
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.radarbase.output.config.*
import org.radarbase.output.config.PathConfig
import org.radarbase.output.config.PathFormatterConfig
import org.radarbase.output.config.ResourceConfig
import org.radarbase.output.config.RestructureConfig
import org.radarbase.output.config.S3Config
import org.radarbase.output.config.TopicConfig
import org.radarbase.output.config.WorkerConfig
import org.radarbase.output.util.SuspendedCloseable.Companion.useSuspended
import org.radarbase.output.util.Timer
import org.radarbase.output.util.bucketBuild
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ package org.radarbase.output.accounting

import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.Assertions.assertNull
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.radarbase.output.accounting.OffsetRedisPersistence.Companion.redisOffsetReader
Expand Down
19 changes: 14 additions & 5 deletions src/main/java/org/radarbase/output/Application.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,26 @@ package org.radarbase.output

import com.beust.jcommander.JCommander
import com.beust.jcommander.ParameterException
import kotlinx.coroutines.*
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.Semaphore
import org.radarbase.output.accounting.*
import org.radarbase.output.accounting.Accountant
import org.radarbase.output.accounting.OffsetPersistenceFactory
import org.radarbase.output.accounting.OffsetRedisPersistence
import org.radarbase.output.accounting.RedisHolder
import org.radarbase.output.accounting.RedisRemoteLockManager
import org.radarbase.output.accounting.RemoteLockManager
import org.radarbase.output.cleaner.SourceDataCleaner
import org.radarbase.output.compression.Compression
import org.radarbase.output.config.CommandLineArgs
import org.radarbase.output.config.RestructureConfig
import org.radarbase.output.format.RecordConverterFactory
import org.radarbase.output.path.RecordPathFactory
import org.radarbase.output.source.*
import org.radarbase.output.source.InMemoryStorageIndex
import org.radarbase.output.source.SourceStorage
import org.radarbase.output.source.SourceStorageFactory
import org.radarbase.output.source.StorageIndexManager
import org.radarbase.output.target.TargetStorage
import org.radarbase.output.target.TargetStorageFactory
import org.radarbase.output.util.Timer
Expand Down Expand Up @@ -92,12 +101,12 @@ class Application(
}.map { Duration.ofSeconds(it) }

storageIndexManagers = config.paths.inputs.associateWith { input ->
MutableStorageIndexManager(
StorageIndexManager(
InMemoryStorageIndex(),
sourceStorage,
input,
fullScan,
emptyScan,
input,
)
}
val serviceMutex = Mutex()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import org.radarbase.output.format.JsonAvroConverter.Companion.JSON_READER
import org.radarbase.output.format.JsonAvroConverter.Companion.JSON_WRITER
import org.radarbase.output.util.ResourceContext.Companion.resourceContext
import org.radarbase.output.util.TimeUtil.getDate
import java.io.*
import java.io.BufferedReader
import java.io.IOException
import java.io.InputStream
import java.io.Reader
import java.io.Writer
import java.nio.file.Path
import kotlin.io.path.inputStream

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import org.radarbase.output.compression.Compression
import org.radarbase.output.util.ResourceContext.Companion.resourceContext
import java.io.*
import java.io.BufferedReader
import java.io.IOException
import java.io.InputStream
import java.io.Reader
import java.io.Writer
import java.nio.file.Path
import java.util.regex.Pattern
import kotlin.collections.component1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package org.radarbase.output.path

import kotlinx.coroutines.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import org.radarbase.kotlin.coroutines.CacheConfig
import org.radarbase.kotlin.coroutines.CachedMap
import org.radarbase.ktor.auth.ClientCredentialsConfig
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package org.radarbase.output.source

/**
* Delegate all calls directly to the underlying storage. This effectively means that no caching
* takes place.
*/
class DelegatingStorageIndex(
private val sourceStorage: SourceStorage,
) : StorageIndex {
Expand Down
43 changes: 22 additions & 21 deletions src/main/java/org/radarbase/output/source/InMemoryStorageIndex.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import java.nio.file.Path
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentMap

/**
* Storage index that keeps the given file tree in memory.
* For very large file systems, this may
* cause a memory issue.
*/
class InMemoryStorageIndex : MutableStorageIndex {
private val fileIndex: ConcurrentMap<StorageNode.StorageDirectory, Map<Path, StorageNode>> = ConcurrentHashMap()
private val rootSet = ConcurrentHashMap<Path, StorageNode>()
Expand All @@ -27,35 +32,33 @@ class InMemoryStorageIndex : MutableStorageIndex {
}
}

private fun add(node: StorageNode) {
var currentNode = node
var parent = currentNode.parent()
if (currentNode is StorageNode.StorageDirectory) {
fileIndex.computeIfAbsent(currentNode) {
mapOf()
}
}
while (parent != null) {
fileIndex.compute(parent) { _, map ->
/** Adds a node and all its parents to the file hierarchy. */
private fun add(dir: StorageNode.StorageDirectory) {
var currentDir = dir
var parentDir = currentDir.parent()
while (parentDir != null) {
fileIndex.compute(parentDir) { _, map ->
if (map == null) {
mapOf(currentNode.path to currentNode)
mapOf(currentDir.path to currentDir)
} else {
val newMap = map.toMutableMap()
newMap[currentNode.path] = currentNode
newMap
buildMap(map.size + 1) {
putAll(map)
put(currentDir.path, currentDir)
}
}
}
currentNode = parent
parent = currentNode.parent()
currentDir = parentDir
parentDir = currentDir.parent()
}
rootSet[currentNode.path] = currentNode
rootSet[currentDir.path] = currentDir
}

override suspend fun addAll(parent: StorageNode.StorageDirectory, nodes: List<StorageNode>): Collection<StorageNode> {
add(parent)

if (nodes.isEmpty()) {
return fileIndex[parent]?.values ?: listOf()
return fileIndex[parent]?.values
?: listOf()
}

nodes.asSequence()
Expand All @@ -81,7 +84,7 @@ class InMemoryStorageIndex : MutableStorageIndex {
return newMap.values
}

override suspend fun sync(parent: StorageNode.StorageDirectory, nodes: List<StorageNode>): Collection<StorageNode> {
override suspend fun sync(parent: StorageNode.StorageDirectory, nodes: List<StorageNode>) {
add(parent)
val newMap = buildMap(nodes.size) {
nodes.forEach { put(it.path, it) }
Expand All @@ -93,8 +96,6 @@ class InMemoryStorageIndex : MutableStorageIndex {
.filterIsInstance<StorageNode.StorageDirectory>()
.filter { it.path !in newMap }
.forEach { removeRecursive(it) }

return newMap.values
}

override suspend fun remove(file: StorageNode.StorageFile) {
Expand Down
15 changes: 14 additions & 1 deletion src/main/java/org/radarbase/output/source/MutableStorageIndex.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
package org.radarbase.output.source

/** Storage index that may be modified by the storage index manager. */
interface MutableStorageIndex : StorageIndex {
/**
* Add a list of storage nodes to the given directory.
* All values in [nodes] should have [parent] as parent node. No nodes will be removed from the
* current directory listing, but updated values (e.g. last modified values) will be overridden.
*
* @return the current file listing after adding new nodes.
*/
suspend fun addAll(parent: StorageNode.StorageDirectory, nodes: List<StorageNode>): Collection<StorageNode>

suspend fun sync(parent: StorageNode.StorageDirectory, nodes: List<StorageNode>): Collection<StorageNode>
/**
* Fully sync a storage node list with the index.
* All values in [nodes] should have [parent] as parent node. All nodes in the index
* corresponding to [parent] will be removed from that directory and replaced by the given list.
*/
suspend fun sync(parent: StorageNode.StorageDirectory, nodes: List<StorageNode>)
}

This file was deleted.

5 changes: 4 additions & 1 deletion src/main/java/org/radarbase/output/source/S3SourceStorage.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package org.radarbase.output.source

import io.minio.*
import io.minio.GetObjectTagsArgs
import io.minio.ListObjectsArgs
import io.minio.MinioClient
import io.minio.RemoveObjectArgs
import io.minio.errors.ErrorResponseException
import io.minio.messages.Tags
import kotlinx.coroutines.Dispatchers
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/org/radarbase/output/source/StorageIndex.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,28 @@ package org.radarbase.output.source

import java.nio.file.Paths

/**
* Index of files in a source storage.
* This index does not modify itself so it needs to be synced by a [StorageIndexManager].
*/
interface StorageIndex {
/**
* List given directory.
* If [maxKeys] is given, no more than that many entries will be returned.
*/
suspend fun list(dir: StorageNode.StorageDirectory, maxKeys: Int? = null): List<StorageNode>

/**
* Remove a file from the index.
* This will typically be called if the file was removed by the current process.
*/
suspend fun remove(file: StorageNode.StorageFile)

companion object {
/**
* Root directory. All files that are in the index can be found by traversing the index
* starting at this root.
*/
val ROOT = StorageNode.StorageDirectory(Paths.get("/"))
}
}
Loading

0 comments on commit c0f5373

Please sign in to comment.