Skip to content

Commit

Permalink
Merge pull request #547 from RADAR-base/addStorageIndex
Browse files Browse the repository at this point in the history
Added a StorageIndex for the source storage to reduce LIST calls
  • Loading branch information
blootsvoets committed Sep 28, 2023
2 parents 0de47d2 + c0f5373 commit 05ec3b7
Show file tree
Hide file tree
Showing 40 changed files with 592 additions and 104 deletions.
3 changes: 0 additions & 3 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
root = true

[*.kt]
ktlint_standard_no-wildcard-imports = disabled
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,16 @@ source:
# only actually needed if source type is hdfs
azure:
# azure options
index:
# Interval to fully synchronize the index with the source storage
fullSyncInterval: 3600
# Interval to sync empty directories with.
# They are also synced during a full sync.
emptyDirectorySyncInterval: 900
```

The index makes a scan of the source before any operations. Further list operations are done on the index only. This is especially relevant for S3 storage where list operations are priced.

The target is similar, and in addition supports the local file system (`local`).

```yaml
Expand Down
6 changes: 6 additions & 0 deletions restructure.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ source:
# only actually needed if source type is hdfs
hdfs:
nameNodes: [hdfs-namenode-1, hdfs-namenode-2]
index:
# Interval to fully synchronize the index with the storage
fullSyncInterval: 3600
# Interval to sync empty directories with.
# They are also synced during a full sync.
emptyDirectorySyncInterval: 900

# Target data resource
# @since: 0.7.0
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,28 @@
package org.radarbase.output

import io.minio.*
import io.minio.BucketExistsArgs
import io.minio.GetObjectArgs
import io.minio.ListObjectsArgs
import io.minio.MakeBucketArgs
import io.minio.ObjectWriteArgs.MAX_PART_SIZE
import kotlinx.coroutines.*
import io.minio.PutObjectArgs
import io.minio.RemoveBucketArgs
import io.minio.RemoveObjectArgs
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.radarbase.output.config.*
import org.radarbase.output.config.PathConfig
import org.radarbase.output.config.PathFormatterConfig
import org.radarbase.output.config.ResourceConfig
import org.radarbase.output.config.RestructureConfig
import org.radarbase.output.config.S3Config
import org.radarbase.output.config.TopicConfig
import org.radarbase.output.config.WorkerConfig
import org.radarbase.output.util.SuspendedCloseable.Companion.useSuspended
import org.radarbase.output.util.Timer
import org.radarbase.output.util.bucketBuild
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ package org.radarbase.output.accounting

import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.Assertions.assertNull
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.radarbase.output.accounting.OffsetRedisPersistence.Companion.redisOffsetReader
Expand Down
32 changes: 30 additions & 2 deletions src/main/java/org/radarbase/output/Application.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,26 @@ package org.radarbase.output

import com.beust.jcommander.JCommander
import com.beust.jcommander.ParameterException
import kotlinx.coroutines.*
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.Semaphore
import org.radarbase.output.accounting.*
import org.radarbase.output.accounting.Accountant
import org.radarbase.output.accounting.OffsetPersistenceFactory
import org.radarbase.output.accounting.OffsetRedisPersistence
import org.radarbase.output.accounting.RedisHolder
import org.radarbase.output.accounting.RedisRemoteLockManager
import org.radarbase.output.accounting.RemoteLockManager
import org.radarbase.output.cleaner.SourceDataCleaner
import org.radarbase.output.compression.Compression
import org.radarbase.output.config.CommandLineArgs
import org.radarbase.output.config.RestructureConfig
import org.radarbase.output.format.RecordConverterFactory
import org.radarbase.output.path.RecordPathFactory
import org.radarbase.output.source.InMemoryStorageIndex
import org.radarbase.output.source.SourceStorage
import org.radarbase.output.source.SourceStorageFactory
import org.radarbase.output.source.StorageIndexManager
import org.radarbase.output.target.TargetStorage
import org.radarbase.output.target.TargetStorageFactory
import org.radarbase.output.util.Timer
Expand All @@ -39,7 +47,9 @@ import org.radarbase.output.worker.RadarKafkaRestructure
import org.slf4j.LoggerFactory
import redis.clients.jedis.JedisPool
import java.io.IOException
import java.nio.file.Path
import java.text.NumberFormat
import java.time.Duration
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.util.concurrent.atomic.LongAdder
Expand Down Expand Up @@ -78,9 +88,27 @@ class Application(

override val workerSemaphore = Semaphore(config.worker.numThreads * 2)

override val storageIndexManagers: Map<Path, StorageIndexManager>

private val jobs: List<Job>

init {
val indexConfig = config.source.index
val (fullScan, emptyScan) = if (indexConfig == null) {
listOf(3600L, 900L)
} else {
listOf(indexConfig.fullSyncInterval, indexConfig.emptyDirectorySyncInterval)
}.map { Duration.ofSeconds(it) }

storageIndexManagers = config.paths.inputs.associateWith { input ->
StorageIndexManager(
InMemoryStorageIndex(),
sourceStorage,
input,
fullScan,
emptyScan,
)
}
val serviceMutex = Mutex()
jobs = listOfNotNull(
RadarKafkaRestructure.job(config, serviceMutex),
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/org/radarbase/output/FileStoreFactory.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ import org.radarbase.output.config.RestructureConfig
import org.radarbase.output.format.RecordConverterFactory
import org.radarbase.output.path.RecordPathFactory
import org.radarbase.output.source.SourceStorage
import org.radarbase.output.source.StorageIndexManager
import org.radarbase.output.target.TargetStorage
import org.radarbase.output.worker.FileCacheStore
import java.io.IOException
import java.nio.file.Path

/** Factory for all factory classes and settings. */
interface FileStoreFactory {
Expand All @@ -42,6 +44,7 @@ interface FileStoreFactory {
val redisHolder: RedisHolder
val offsetPersistenceFactory: OffsetPersistenceFactory
val workerSemaphore: Semaphore
val storageIndexManagers: Map<Path, StorageIndexManager>

@Throws(IOException::class)
fun newFileCacheStore(accountant: Accountant): FileCacheStore
Expand Down
25 changes: 15 additions & 10 deletions src/main/java/org/radarbase/output/cleaner/SourceDataCleaner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import org.radarbase.output.FileStoreFactory
import org.radarbase.output.accounting.Accountant
import org.radarbase.output.accounting.AccountantImpl
import org.radarbase.output.config.RestructureConfig
import org.radarbase.output.source.StorageIndex
import org.radarbase.output.source.StorageNode
import org.radarbase.output.util.ResourceContext.Companion.resourceContext
import org.radarbase.output.util.SuspendedCloseable.Companion.useSuspended
import org.radarbase.output.util.Timer
Expand Down Expand Up @@ -43,11 +45,11 @@ class SourceDataCleaner(
private val supervisor = SupervisorJob()

@Throws(IOException::class, InterruptedException::class)
suspend fun process(directoryName: String) {
suspend fun process(storageIndex: StorageIndex, directoryName: String) {
// Get files and directories
val absolutePath = Paths.get(directoryName)

val paths = topicPaths(absolutePath)
val paths = topicPaths(storageIndex, absolutePath)

logger.info("{} topics found", paths.size)

Expand All @@ -56,7 +58,7 @@ class SourceDataCleaner(
launch {
try {
val deleteCount = fileStoreFactory.workerSemaphore.withPermit {
mapTopic(p)
mapTopic(storageIndex, p)
}
if (deleteCount > 0) {
logger.info("Removed {} files in topic {}", deleteCount, p.fileName)
Expand All @@ -70,7 +72,7 @@ class SourceDataCleaner(
}
}

private suspend fun mapTopic(topicPath: Path): Long {
private suspend fun mapTopic(storageIndex: StorageIndex, topicPath: Path): Long {
val topic = topicPath.fileName.toString()
return try {
lockManager.tryWithLock(topic) {
Expand All @@ -84,7 +86,7 @@ class SourceDataCleaner(
fileStoreFactory,
)
}
deleteOldFiles(accountant, extractionCheck, topic, topicPath).toLong()
deleteOldFiles(storageIndex, accountant, extractionCheck, topic, topicPath).toLong()
}
}
}
Expand All @@ -95,14 +97,15 @@ class SourceDataCleaner(
}

private suspend fun deleteOldFiles(
storageIndex: StorageIndex,
accountant: Accountant,
extractionCheck: ExtractionCheck,
topic: String,
topicPath: Path,
): Int {
val offsets = accountant.offsets.copyForTopic(topic)

val paths = sourceStorage.listTopicFiles(topic, topicPath, maxFilesPerTopic) { f ->
val paths = sourceStorage.listTopicFiles(storageIndex, topic, topicPath, maxFilesPerTopic) { f ->
f.lastModified.isBefore(deleteThreshold) &&
// ensure that there is a file with a larger offset also
// processed, so the largest offset is never removed.
Expand All @@ -115,6 +118,7 @@ class SourceDataCleaner(
logger.info("Removing {}", file.path)
Timer.time("cleaner.delete") {
sourceStorage.delete(file.path)
storageIndex.remove(StorageNode.StorageFile(file.path, Instant.MIN))
}
true
} else {
Expand All @@ -127,8 +131,8 @@ class SourceDataCleaner(
}
}

private suspend fun topicPaths(path: Path): List<Path> =
sourceStorage.listTopics(path, excludeTopics)
private suspend fun topicPaths(storageIndex: StorageIndex, path: Path): List<Path> =
sourceStorage.listTopics(storageIndex, path, excludeTopics)
// different services start on different topics to decrease lock contention
.shuffled()

Expand All @@ -147,9 +151,10 @@ class SourceDataCleaner(

private suspend fun runCleaner(factory: FileStoreFactory) {
SourceDataCleaner(factory).useSuspended { cleaner ->
for (input in factory.config.paths.inputs) {
for ((input, indexManager) in factory.storageIndexManagers) {
indexManager.update()
logger.info("Cleaning {}", input)
cleaner.process(input.toString())
cleaner.process(indexManager.storageIndex, input.toString())
}
logger.info("Cleaned up {} files", cleaner.deletedFileCount.format())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ data class ResourceConfig(
val hdfs: HdfsConfig? = null,
val local: LocalConfig? = null,
val azure: AzureConfig? = null,
val index: StorageIndexConfig? = null,
) {
@get:JsonIgnore
val sourceType: ResourceType by lazy {
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/org/radarbase/output/config/StorageIndexConfig.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.radarbase.output.config

data class StorageIndexConfig(
/** How often to fully sync the storage index, in seconds. */
val fullSyncInterval: Long = 3600L,
/**
* How often to sync empty directories with the storage index, in seconds.
* If this is very large, empty directories will only be scanned during
* full sync.
*/
val emptyDirectorySyncInterval: Long = 900L,
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import org.radarbase.output.format.JsonAvroConverter.Companion.JSON_READER
import org.radarbase.output.format.JsonAvroConverter.Companion.JSON_WRITER
import org.radarbase.output.util.ResourceContext.Companion.resourceContext
import org.radarbase.output.util.TimeUtil.getDate
import java.io.*
import java.io.BufferedReader
import java.io.IOException
import java.io.InputStream
import java.io.Reader
import java.io.Writer
import java.nio.file.Path
import kotlin.io.path.inputStream

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import org.radarbase.output.compression.Compression
import org.radarbase.output.util.ResourceContext.Companion.resourceContext
import java.io.*
import java.io.BufferedReader
import java.io.IOException
import java.io.InputStream
import java.io.Reader
import java.io.Writer
import java.nio.file.Path
import java.util.regex.Pattern
import kotlin.collections.component1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package org.radarbase.output.path

import kotlinx.coroutines.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import org.radarbase.kotlin.coroutines.CacheConfig
import org.radarbase.kotlin.coroutines.CachedMap
import org.radarbase.ktor.auth.ClientCredentialsConfig
Expand Down
21 changes: 14 additions & 7 deletions src/main/java/org/radarbase/output/source/AzureSourceStorage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.radarbase.output.util.TemporaryDirectory
import org.radarbase.output.util.withoutFirstSegment
import java.nio.file.Path
import java.nio.file.Paths
import java.time.Instant
import kotlin.io.path.createTempFile
import kotlin.io.path.deleteIfExists

Expand All @@ -24,22 +25,28 @@ class AzureSourceStorage(

private fun blobClient(path: Path) = blobContainerClient.getBlobClient(path.withoutFirstSegment())

override suspend fun list(path: Path, maxKeys: Int?): List<SimpleFileStatus> =
override suspend fun list(path: Path, startAfter: Path?, maxKeys: Int?): List<StorageNode> =
withContext(Dispatchers.IO) {
var iterable: Iterable<BlobItem> = blobContainerClient.listBlobsByHierarchy("$path/")
if (startAfter != null) {
iterable = iterable.filter { Paths.get(it.name) > startAfter }
}
if (maxKeys != null) {
iterable = iterable.take(maxKeys)
}
iterable.map {
SimpleFileStatus(
Paths.get(it.name),
it.isPrefix ?: false,
it.properties?.lastModified?.toInstant(),
)
if (it.isPrefix == true) {
StorageNode.StorageFile(
Paths.get(it.name),
it.properties?.lastModified?.toInstant() ?: Instant.now(),
)
} else {
StorageNode.StorageDirectory(Paths.get(it.name))
}
}
}

override suspend fun createTopicFile(topic: String, status: SimpleFileStatus): TopicFile {
override suspend fun createTopicFile(topic: String, status: StorageNode): TopicFile {
var topicFile = super.createTopicFile(topic, status)

if (readOffsetFromMetadata && topicFile.range.range.to == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.radarbase.output.source

/**
* Delegate all calls directly to the underlying storage. This effectively means that no caching
* takes place.
*/
class DelegatingStorageIndex(
private val sourceStorage: SourceStorage,
) : StorageIndex {
override suspend fun list(dir: StorageNode.StorageDirectory, maxKeys: Int?): List<StorageNode> =
sourceStorage.list(dir.path, maxKeys = maxKeys)

override suspend fun remove(file: StorageNode.StorageFile) = Unit
}

0 comments on commit 05ec3b7

Please sign in to comment.