Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release 1.1.0 #75

Merged
merged 26 commits into from
Jun 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
6c4b0b3
Implemented data integrity check to complement delete functionality
blootsvoets Jun 4, 2020
ee511de
Fix tests
blootsvoets Jun 5, 2020
4b2a1b5
Merge branch 'dev' into data-integrity-check
blootsvoets Jun 9, 2020
073c26d
Merge remote-tracking branch 'origin/dev' into data-integrity-check
blootsvoets Jun 9, 2020
a3c872e
Updated documentation and enable batch size for cleaner
blootsvoets Jun 9, 2020
ee2bc48
bump release
nivemaham Jun 9, 2020
4445ef2
Merge pull request #72 from RADAR-base/release-1.0.1
nivemaham Jun 9, 2020
27c578c
Refactor as suggested in PR
blootsvoets Jun 9, 2020
15a9b55
calculate range.to when not present
nivemaham Jun 9, 2020
105b409
Merge pull request #73 from RADAR-base/fix-null-checking-end-of-range
blootsvoets Jun 9, 2020
cbcbacf
Added S3 file format to integration test
blootsvoets Jun 9, 2020
c83f803
Merge branch 'dev' into data-integrity-check
blootsvoets Jun 9, 2020
6392b7a
Small modifications after dev merge
blootsvoets Jun 9, 2020
c0891d8
define common topicPath finding on interface and override s3 specific…
nivemaham Jun 9, 2020
e99f7ce
Merge branch 'dev' into fix-finding-topic-paths-for-s3
nivemaham Jun 9, 2020
5cdf4d5
remove unused code
nivemaham Jun 9, 2020
c6da31a
add test to check multiple topic processing
nivemaham Jun 9, 2020
2bcefdc
Use recursive false
blootsvoets Jun 10, 2020
db39b19
Merge pull request #74 from RADAR-base/fix-finding-topic-paths-for-s3
nivemaham Jun 10, 2020
a4e46cd
Merge branch 'dev' into data-integrity-check
blootsvoets Jun 10, 2020
cf47cd3
Simplification of parsing long time
blootsvoets Jun 10, 2020
80fbcd5
PR comments
blootsvoets Jun 10, 2020
6e08882
Comment why to remove offsets if file is not extracted
blootsvoets Jun 10, 2020
86cb04d
Merge pull request #71 from RADAR-base/data-integrity-check
blootsvoets Jun 10, 2020
a51409b
Check data conversion in integration test
blootsvoets Jun 10, 2020
8e21a95
Bump version
blootsvoets Jun 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ It supports data written by [RADAR HDFS sink connector](https://github.com/RADAR

## Upgrade instructions

When upgrading to version 1.0.0 from version 0.6.0 please follow the following instructions:
When upgrading to version 1.0.0 or later from version 0.6.0 please follow the following instructions:

- This package now relies on Redis for locking and offset management. Please install Redis or use
the docker-compose.yml file to start it.
Expand Down Expand Up @@ -61,7 +61,7 @@ When upgrading to version 0.6.0 from version 0.5.x or earlier, please follow the

This package is available as docker image [`radarbase/radar-output-restructure`](https://hub.docker.com/r/radarbase/radar-output-restructure). The entrypoint of the image is the current application. So in all the commands listed in usage, replace `radar-output-restructure` with for example:
```shell
docker run --rm -t --network hadoop -v "$PWD/output:/output" radarbase/radar-output-restructure:1.0.0-hdfs -n hdfs-namenode -o /output /myTopic
docker run --rm -t --network hadoop -v "$PWD/output:/output" radarbase/radar-output-restructure:1.1.0-hdfs -n hdfs-namenode -o /output /myTopic
```
if your docker cluster is running in the `hadoop` network and your output directory should be `./output`.

Expand Down Expand Up @@ -140,6 +140,22 @@ target:
groupId: 100 # write as regular group, use -1 to use current user (default).
```

### Cleaner

Source files can be automatically be removed by a cleaner process. This checks whether the file has already been extracted and is older than a configured age. This feature is not enabled by default. It can be configured in the `cleaner` configuration section:

```yaml
cleaner:
# Enable cleaning up old source files
enable: true
# Interval in seconds to clean data
interval: 1260 # 21 minutes
# Number of days after which a source file is considered old
age: 7
```

The cleaner can also be enabled with the `--cleaner` command-line flag. To run the cleaner as a separate process from output restructuring, start a process that has configuration property `worker: enable: false` or command-line argument `--no-restructure`.

### Service

To run the output generator as a service that will regularly poll the HDFS directory, add the `--service` flag and optionally the `--interval` flag to adjust the polling interval or use the corresponding configuration file parameters.
Expand All @@ -155,7 +171,7 @@ This package requires at least Java JDK 8. Build the distribution with
and install the package into `/usr/local` with for example
```shell
sudo mkdir -p /usr/local
sudo tar -xzf build/distributions/radar-output-restructure-1.0.0.tar.gz -C /usr/local --strip-components=1
sudo tar -xzf build/distributions/radar-output-restructure-1.1.0.tar.gz -C /usr/local --strip-components=1
```

Now the `radar-output-restructure` command should be available.
Expand Down
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ plugins {
}

group 'org.radarbase'
version '1.0.1'
version '1.1.0'
mainClassName = 'org.radarbase.output.Application'

sourceCompatibility = '1.8'
Expand Down Expand Up @@ -140,6 +140,7 @@ task integrationTest(type: Test) {
testClassesDirs = sourceSets.integrationTest.output.classesDirs
classpath = sourceSets.integrationTest.runtimeClasspath
testLogging {
showStandardStreams = true
setExceptionFormat("full")
}
shouldRunAfter test
Expand Down
19 changes: 17 additions & 2 deletions restructure.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,30 @@ format:
# Additional format properties
# properties: {}

# Worker settings
# Worker settings. Each worker thread has its own cache and topic, so the
# settings only apply to a single thread.
worker:
# Maximum number of files and converters to keep open while processing
# Enable processing files for extraction
enable: true
# Maximum number of files and converters to keep open while processing. Increasing this will
# decrease memory pressure but slow down processing.
cacheSize: 300
# Maximum number of offsets in cache. Increasing this will decrease memory
# pressure but slow down processing.
cacheOffsetsSize: 500000
# Number of threads to do processing on
numThreads: 2
# Maximum number of files to process in any given topic.
maxFilesPerTopic: null

cleaner:
# Enable cleaning up old source files
enable: true
# Interval in seconds to clean data
interval: 1260 # 21 minutes
# Number of days after which a source file is considered old
age: 7

# Path settings
paths:
# Input directories in source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.radarbase.output.config.ResourceConfig
import org.radarbase.output.config.RestructureConfig
import org.radarbase.output.config.S3Config
import org.radarbase.output.util.Timer
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.Paths

class RestructureS3IntegrationTest {
Expand Down Expand Up @@ -36,9 +37,16 @@ class RestructureS3IntegrationTest {
sourceClient.makeBucket(sourceConfig.bucket)
}

val statusFileName = Paths.get("in/application_server_status/partition=1/application_server_status+1+0000000018+0000000020.avro")
javaClass.getResourceAsStream("/application_server_status/application_server_status+1+0000000018+0000000020.avro").use { statusFile ->
sourceClient.putObject(sourceConfig.bucket, statusFileName.toString(), statusFile, PutObjectOptions(-1, MAX_PART_SIZE))
val resourceFiles = listOf(
"application_server_status/partition=1/application_server_status+1+0000000018+0000000020.avro",
"application_server_status/partition=1/application_server_status+1+0000000021.avro",
"android_phone_acceleration/partition=0/android_phone_acceleration+0+0003018784.avro"
)
val targetFiles = resourceFiles.map { Paths.get("in/$it") }
resourceFiles.forEachIndexed { i, resourceFile ->
javaClass.getResourceAsStream("/$resourceFile").use { statusFile ->
sourceClient.putObject(sourceConfig.bucket, targetFiles[i].toString(), statusFile, PutObjectOptions(-1, MAX_PART_SIZE))
}
}

application.start()
Expand All @@ -50,17 +58,35 @@ class RestructureS3IntegrationTest {

application.redisPool.resource.use { redis ->
assertEquals(1L, redis.del("offsets/application_server_status.json"))
assertEquals(1L, redis.del("offsets/android_phone_acceleration.json"))
}

val outputFolder = "output/STAGING_PROJECT/1543bc93-3c17-4381-89a5-c5d6272b827c/application_server_status"
val firstParticipantOutput = "output/STAGING_PROJECT/1543bc93-3c17-4381-89a5-c5d6272b827c/application_server_status"
val secondParticipantOutput = "output/radar-test-root/4ab9b985-6eec-4e51-9a29-f4c571c89f99/android_phone_acceleration"
assertEquals(
listOf(
"$outputFolder/20200128_1300.csv",
"$outputFolder/20200128_1400.csv",
"$outputFolder/schema-application_server_status.json"),
"$firstParticipantOutput/20200128_1300.csv",
"$firstParticipantOutput/20200128_1400.csv",
"$firstParticipantOutput/schema-application_server_status.json",
"$secondParticipantOutput/20200528_1000.csv",
"$secondParticipantOutput/schema-android_phone_acceleration.json"),
files)

sourceClient.removeObject(sourceConfig.bucket, statusFileName.toString())
println(targetClient.getObject(targetConfig.bucket, "$firstParticipantOutput/20200128_1300.csv").readBytes().toString(UTF_8))

val csvContents = """
key.projectId,key.userId,key.sourceId,value.time,value.serverStatus,value.ipAddress
STAGING_PROJECT,1543bc93-3c17-4381-89a5-c5d6272b827c,99caf236-bbe6-4eed-9c63-fba77349821d,1.58021982003E9,CONNECTED,
STAGING_PROJECT,1543bc93-3c17-4381-89a5-c5d6272b827c,99caf236-bbe6-4eed-9c63-fba77349821d,1.58021982003E9,CONNECTED,

""".trimIndent()
assertEquals(csvContents, targetClient.getObject(targetConfig.bucket, "$firstParticipantOutput/20200128_1300.csv")
.readBytes()
.toString(UTF_8))

targetFiles.forEach {
sourceClient.removeObject(sourceConfig.bucket, it.toString())
}
sourceClient.removeBucket(sourceConfig.bucket)
files.forEach {
targetClient.removeObject(targetConfig.bucket, it)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,41 +26,41 @@ internal class RedisRemoteLockManagerTest {

@Test
fun testExclusiveLock() {
lockManager1.acquireTopicLock("t").use { l1 ->
lockManager1.acquireLock("t").use { l1 ->
assertThat(l1, not(nullValue()))
lockManager2.acquireTopicLock("t").use { l2 ->
lockManager2.acquireLock("t").use { l2 ->
assertThat(l2, nullValue())
}
}
}

@Test
fun testGranularityLock() {
lockManager1.acquireTopicLock("t1").use { l1 ->
lockManager1.acquireLock("t1").use { l1 ->
assertThat(l1, not(nullValue()))
lockManager2.acquireTopicLock("t2").use { l2 ->
lockManager2.acquireLock("t2").use { l2 ->
assertThat(l2, not(nullValue()))
}
}
}

@Test
fun testNonOverlappingLock() {
lockManager1.acquireTopicLock("t").use { l1 ->
lockManager1.acquireLock("t").use { l1 ->
assertThat(l1, not(nullValue()))
}
lockManager2.acquireTopicLock("t").use { l2 ->
lockManager2.acquireLock("t").use { l2 ->
assertThat(l2, not(nullValue()))
}
}


@Test
fun testNonOverlappingLockSameManager() {
lockManager1.acquireTopicLock("t").use { l1 ->
lockManager1.acquireLock("t").use { l1 ->
assertThat(l1, not(nullValue()))
}
lockManager1.acquireTopicLock("t").use { l2 ->
lockManager1.acquireLock("t").use { l2 ->
assertThat(l2, not(nullValue()))
}
}
Expand Down
Binary file not shown.
Binary file not shown.
63 changes: 48 additions & 15 deletions src/main/java/org/radarbase/output/Application.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.radarbase.output.target.TargetStorageFactory
import org.radarbase.output.util.ProgressBar.Companion.format
import org.radarbase.output.util.Timer
import org.radarbase.output.worker.FileCacheStore
import org.radarbase.output.cleaner.SourceDataCleaner
import org.radarbase.output.worker.RadarKafkaRestructure
import org.slf4j.LoggerFactory
import redis.clients.jedis.JedisPool
Expand Down Expand Up @@ -88,7 +89,12 @@ class Application(
if (config.service.enable) {
runService()
} else {
runRestructure()
if (config.worker.enable) {
runRestructure()
}
if (config.cleaner.enable) {
runCleaner()
}
}
}

Expand All @@ -97,8 +103,15 @@ class Application(
logger.info("Press Ctrl+C to exit...")
val executorService = Executors.newSingleThreadScheduledExecutor()

executorService.scheduleAtFixedRate(::runRestructure,
config.service.interval / 4, config.service.interval, TimeUnit.SECONDS)
if (config.worker.enable) {
executorService.scheduleAtFixedRate(::runRestructure,
config.service.interval / 4, config.service.interval, TimeUnit.SECONDS)
}

if (config.cleaner.enable) {
executorService.scheduleAtFixedRate(::runCleaner,
config.cleaner.interval / 4, config.cleaner.interval, TimeUnit.SECONDS)
}

try {
Thread.sleep(java.lang.Long.MAX_VALUE)
Expand All @@ -114,33 +127,53 @@ class Application(
}
}

private fun runCleaner() {
val timeStart = Instant.now()

try {
val numberFormat = NumberFormat.getNumberInstance()
SourceDataCleaner(this).use { cleaner ->
for (input in config.paths.inputs) {
logger.info("Cleaning {}", input)
cleaner.process(input.toString())
}
logger.info("Cleaned up {} files in {}",
numberFormat.format(cleaner.deletedFileCount.sum()),
timeStart.durationSince().format())
}
} catch (e: InterruptedException) {
logger.error("Cleaning interrupted")
} catch (ex: Exception) {
logger.error("Failed to clean records", ex)
} finally {
if (Timer.isEnabled) {
logger.info("{}", Timer)
Timer.reset()
}
}
}

private fun runRestructure() {
val timeStart = Instant.now()
try {
val numberFormat = NumberFormat.getNumberInstance()

RadarKafkaRestructure(this).use { restructure ->
for (input in config.paths.inputs) {
logger.info("In: {}", input)
logger.info("Out: {}", pathFactory.root)
restructure.process(input.toString())
}

val numberFormat = NumberFormat.getNumberInstance()
logger.info("Processed {} files and {} records in {}",
numberFormat.format(restructure.processedFileCount),
numberFormat.format(restructure.processedRecordsCount),
numberFormat.format(restructure.processedFileCount.sum()),
numberFormat.format(restructure.processedRecordsCount.sum()),
timeStart.durationSince().format())
if (restructure.deletedFileCount.sum() > 0) {
logger.info("Deleted {} old files", numberFormat.format(restructure.deletedFileCount))
} else {
logger.info("No files were deleted")
}
}
} catch (ex: Exception) {
logger.error("Failed to process records", ex)
} catch (ex: IOException) {
logger.error("Processing failed", ex)
} catch (e: InterruptedException) {
logger.error("Processing interrupted")
} catch (ex: Exception) {
logger.error("Failed to process records", ex)
} finally {
// Print timings and reset the timings for the next iteration.
if (Timer.isEnabled) {
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/org/radarbase/output/accounting/Accountant.kt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ constructor(factory: FileStoreFactory, topic: String) : Flushable, Closeable {
} else null
}

open fun remove(range: TopicPartitionOffsetRange) = time("accounting.remove") {
offsetFile.offsets.remove(range)
offsetFile.triggerWrite()
}

open fun process(ledger: Ledger) = time("accounting.process") {
offsetFile.addAll(ledger.offsets)
offsetFile.triggerWrite()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ class HdfsRemoteLockManager(
fileSystem.mkdirs(lockPath)
}

override fun acquireTopicLock(topic: String): RemoteLockManager.RemoteLock? {
val topicLockPath = Path(lockPath, "$topic.lock")
override fun acquireLock(name: String): RemoteLockManager.RemoteLock? {
val topicLockPath = Path(lockPath, "$name.lock")
return try {
HdfsRemoteLock(topicLockPath, fileSystem.create(topicLockPath, true, 1, 1, MINIMUM_BLOCK_SIZE))
} catch (ex: FileAlreadyExistsException) {
Expand Down
Loading