Skip to content

Commit

Permalink
#131 Use a ServiceMessageHandler interface instead of manually parsi…
Browse files Browse the repository at this point in the history
…ng TeamCity Service Messages and avoid storing corresponding text lines in memory
  • Loading branch information
NikolayPianikov authored and NikolayPianikov committed Jun 29, 2018
1 parent 74cd78f commit 18abac4
Show file tree
Hide file tree
Showing 34 changed files with 1,184 additions and 188 deletions.
2 changes: 1 addition & 1 deletion build.gradle
@@ -1,5 +1,5 @@
plugins {
id "org.jetbrains.kotlin.jvm" version "1.1.51"
id "org.jetbrains.kotlin.jvm" version "1.2.50"
}

ext {
Expand Down
1 change: 1 addition & 0 deletions modules/plugin-dotnet-agent.iml
Expand Up @@ -17,5 +17,6 @@
<orderEntry type="library" name="commons-io" level="project" />
<orderEntry type="library" scope="TEST" name="jmock-2.5.1" level="project" />
<orderEntry type="library" scope="TEST" name="hamcrest-1.3" level="project" />
<orderEntry type="module" module-name="serviceMessages" scope="PROVIDED" />
</component>
</module>
4 changes: 3 additions & 1 deletion plugin-dotnet-agent/build.gradle
Expand Up @@ -21,7 +21,9 @@ dependencies {
provided "org.jetbrains.teamcity.internal:agent:${teamcityVersion}"
provided "org.jetbrains.teamcity:common-api:${teamcityVersion}"
testCompile 'org.testng:testng:6.8'
testCompile 'org.jmock:jmock:2.5.1'
testCompile 'org.jmock:jmock:2.5.1'
testCompile 'org.hamcrest:hamcrest-all:1.3'
testCompile 'org.jetbrains.kotlin:kotlin-reflect'
}

project.tasks.nugetRestore.doFirst {
Expand Down
@@ -0,0 +1,6 @@
package jetbrains.buildServer.agent.runner

import jetbrains.buildServer.messages.serviceMessages.ServiceMessage
import jetbrains.buildServer.rx.Observable

interface ServiceMessageSource: Observable<ServiceMessage> { }
@@ -0,0 +1,34 @@
package jetbrains.buildServer.agent.runner

import jetbrains.buildServer.messages.serviceMessages.ServiceMessage
import jetbrains.buildServer.messages.serviceMessages.ServiceMessageHandler
import jetbrains.buildServer.messages.serviceMessages.ServiceMessageTypes
import jetbrains.buildServer.messages.serviceMessages.ServiceMessagesRegister
import jetbrains.buildServer.rx.*

class ServiceMessageSourceImpl(
private val _serviceMessagesRegister: ServiceMessagesRegister)
: ServiceMessageSource, ServiceMessageHandler {
private val _subject: Subject<ServiceMessage> = subjectOf()
private val _sharedSource: Observable<ServiceMessage> = _subject
.track(
{ if(it) activate() },
{ if(!it) deactivate() })
.share()

override fun subscribe(observer: Observer<ServiceMessage>): Disposable =
_sharedSource.subscribe(observer)

override fun handle(serviceMessage: ServiceMessage) =
_subject.onNext(serviceMessage)

private fun activate() =
_serviceMessages.forEach { _serviceMessagesRegister.registerHandler(it, this) }

private fun deactivate() =
_serviceMessages.forEach { _serviceMessagesRegister.removeHandler(it) }

companion object {
private val _serviceMessages = sequenceOf(ServiceMessageTypes.TEST_FAILED)
}
}
Expand Up @@ -2,7 +2,8 @@ package jetbrains.buildServer.agent.runner

import jetbrains.buildServer.agent.TargetType

interface WorkflowComposer {
interface
WorkflowComposer {
val target: TargetType

fun compose(context: WorkflowContext, workflow: Workflow = Workflow()): Workflow
Expand Down
Expand Up @@ -10,6 +10,4 @@ interface WorkflowContext {
val status: WorkflowStatus

fun abort(buildFinishedStatus: BuildFinishedStatus)

fun registerOutputFilter(listener: WorkflowOutputFilter): Closeable
}

This file was deleted.

Expand Up @@ -7,13 +7,8 @@

package jetbrains.buildServer.agent.runner

import jetbrains.buildServer.BuildProblemData
import jetbrains.buildServer.RunBuildException
import jetbrains.buildServer.agent.ArgumentsService
import jetbrains.buildServer.agent.BuildFinishedStatus
import jetbrains.buildServer.agent.CommandLine
import jetbrains.buildServer.agent.CommandLineResult
import java.io.Closeable
import jetbrains.buildServer.agent.*
import java.io.File

class WorkflowSessionImpl(
Expand All @@ -23,13 +18,10 @@ class WorkflowSessionImpl(
private val _argumentsService: ArgumentsService)
: MultiCommandBuildSession, WorkflowContext {

private val _outputFilters = mutableListOf<WorkflowOutputFilter>()
private var _commandLinesIterator: Iterator<CommandLine>? = null
private var _lastResult: CommandLineResult? = null
private var _buildFinishedStatus: BuildFinishedStatus? = null

override fun sessionStarted() = Unit

override fun getNextCommand(): CommandExecution? {
val commandLinesIterator: Iterator<CommandLine> = _commandLinesIterator ?: _workflowComposer.compose(this).commandLines.iterator()
_commandLinesIterator = commandLinesIterator
Expand All @@ -48,29 +40,24 @@ class WorkflowSessionImpl(
}

val exitCode = ArrayList<Int>()
val standardOutput = ArrayList<String>()
val errorOutput = ArrayList<String>()
_lastResult = CommandLineResult(exitCode.asSequence(), standardOutput.asSequence(), errorOutput.asSequence())
_lastResult = CommandLineResult(exitCode.asSequence(), emptySequence(), emptySequence())

return CommandExecutionAdapter(
commandLinesIterator.next(),
_outputFilters,
exitCode,
standardOutput,
errorOutput,
_buildStepContext,
_loggerService,
_argumentsService)
}

override val status: WorkflowStatus
get() {
val cuStatus = _buildFinishedStatus
if (cuStatus == null) {
val curStatus = _buildFinishedStatus
if (curStatus == null) {
return WorkflowStatus.Running
}

when(cuStatus) {
when (curStatus) {
BuildFinishedStatus.FINISHED_SUCCESS, BuildFinishedStatus.FINISHED_WITH_PROBLEMS -> return WorkflowStatus.Completed
else -> return WorkflowStatus.Failed
}
Expand All @@ -80,22 +67,17 @@ class WorkflowSessionImpl(
_buildFinishedStatus = buildFinishedStatus
}

override fun sessionFinished(): BuildFinishedStatus? = _buildFinishedStatus ?: BuildFinishedStatus.FINISHED_SUCCESS
override fun sessionStarted() = Unit

override fun sessionFinished(): BuildFinishedStatus? =
_buildFinishedStatus ?: BuildFinishedStatus.FINISHED_SUCCESS

override val lastResult: CommandLineResult
get() = _lastResult ?: throw RunBuildException("There are no any results yet")

override fun registerOutputFilter(listener: WorkflowOutputFilter): Closeable {
_outputFilters.add(listener)
return Closeable { _outputFilters.remove(listener) }
}

private class CommandExecutionAdapter(
private val _commandLine: CommandLine,
private val _outputFilters: List<WorkflowOutputFilter>,
private val _exitCode: MutableCollection<Int>,
private val _standardOutput: MutableCollection<String>,
private val _errorOutput: MutableCollection<String>,
private val _buildStepContext: BuildStepContext,
private val _loggerService: LoggerService,
private val _argumentsService: ArgumentsService) : CommandExecution {
Expand All @@ -113,21 +95,9 @@ class WorkflowSessionImpl(
_commandLine,
_buildStepContext.runnerContext.buildParameters.environmentVariables)

override fun onStandardOutput(text: String) {
if (_outputFilters.filter { it.acceptStandardOutput(text) }.any()){
_standardOutput.add(text)
}

_loggerService.onStandardOutput(text)
}
override fun onStandardOutput(text: String)= _loggerService.onStandardOutput(text)

override fun onErrorOutput(text: String) {
if (_outputFilters.filter { it.acceptErrorOutput(text) }.any()){
_errorOutput.add(text)
}

_loggerService.onErrorOutput(text)
}
override fun onErrorOutput(text: String) = _loggerService.onErrorOutput(text)

override fun interruptRequested(): TerminationAction = TerminationAction.KILL_PROCESS_TREE

Expand Down
Expand Up @@ -6,85 +6,86 @@ import jetbrains.buildServer.agent.BuildFinishedStatus
import jetbrains.buildServer.agent.CommandLine
import jetbrains.buildServer.agent.TargetType
import jetbrains.buildServer.agent.runner.*
import jetbrains.buildServer.messages.serviceMessages.ServiceMessageTypes
import jetbrains.buildServer.rx.*
import java.io.Closeable
import java.util.*
import kotlin.coroutines.experimental.buildSequence

@Suppress("EXPERIMENTAL_FEATURE_WARNING")
class DotnetWorkflowComposer(
private val _pathsService: PathsService,
private val _loggerService: LoggerService,
private val _failedTestDetector: FailedTestDetector,
private val _argumentsService: ArgumentsService,
private val _defaultEnvironmentVariables: EnvironmentVariables,
private val _dotnetWorkflowAnalyzer: DotnetWorkflowAnalyzer,
private val _commandSet: CommandSet) : WorkflowComposer, WorkflowOutputFilter {
private val _commandSet: CommandSet,
private val _failedTestSource: FailedTestSource) : WorkflowComposer {

override val target: TargetType
get() = TargetType.Tool

override fun compose(context: WorkflowContext, workflow: Workflow): Workflow {
return Workflow(buildSequence {
context.registerOutputFilter(this@DotnetWorkflowComposer).use {
val analyzerContext = DotnetWorkflowAnalyzerContext()
for(command in _commandSet.commands) {
// Build the environment
val environmentTokens = mutableListOf<Closeable>()
for (environmentBuilder in command.environmentBuilders) {
environmentTokens.add(environmentBuilder.build(command))
}
val analyzerContext = DotnetWorkflowAnalyzerContext()
for(command in _commandSet.commands) {
val result = EnumSet.noneOf(CommandResult::class.java)
// Build the environment
val environmentTokens = mutableListOf<Closeable>()
for (environmentBuilder in command.environmentBuilders) {
environmentTokens.add(environmentBuilder.build(command))
}

try {
val executableFile = command.toolResolver.executableFile
val args = command.arguments.toList()
val commandHeader = _argumentsService.combine(sequenceOf(executableFile.name).plus(args.map { it.value }))
_loggerService.onStandardOutput(commandHeader)
val commandName = command.commandType.id.replace('-', ' ')
val blockName = if (commandName.isNotBlank()) {
commandName
} else {
args.firstOrNull()?.value ?: ""
}
_loggerService.onBlock(blockName).use {
yield(CommandLine(
TargetType.Tool,
executableFile,
_pathsService.getPath(PathType.WorkingDirectory),
args,
_defaultEnvironmentVariables.variables.toList()))
}
try {
val executableFile = command.toolResolver.executableFile
val args = command.arguments.toList()
val commandHeader = _argumentsService.combine(sequenceOf(executableFile.name).plus(args.map { it.value }))
_loggerService.onStandardOutput(commandHeader)
val commandName = command.commandType.id.replace('-', ' ')
val blockName = if (commandName.isNotBlank()) {
commandName
}
finally {
// Clean the environment
for (environmentToken in environmentTokens) {
try
{
environmentToken.close()
}
catch(ex: Exception) {
LOG.error("Error during cleaning environment.", ex)
}
}
else {
args.firstOrNull()?.value ?: ""
}

val result = context.lastResult
val commandResult = command.resultsAnalyzer.analyze(result)
_dotnetWorkflowAnalyzer.registerResult(analyzerContext, commandResult, result.exitCode)
if (commandResult.contains(CommandResult.Fail)) {
context.abort(BuildFinishedStatus.FINISHED_FAILED)
return@buildSequence
_loggerService.onBlock(blockName).use {
_failedTestSource
.subscribe({ result.add(CommandResult.FailedTests) })
.use {
yield(CommandLine(
TargetType.Tool,
executableFile,
_pathsService.getPath(PathType.WorkingDirectory),
args,
_defaultEnvironmentVariables.variables.toList()))
}
}
}
finally {
// Clean the environment
for (environmentToken in environmentTokens) {
try
{
environmentToken.close()
}
catch(ex: Exception) {
LOG.error("Error during cleaning environment.", ex)
}
}
}

_dotnetWorkflowAnalyzer.summarize(analyzerContext)
val exitCode = context.lastResult.exitCode
val commandResult = command.resultsAnalyzer.analyze(exitCode, result)
_dotnetWorkflowAnalyzer.registerResult(analyzerContext, commandResult, exitCode)
if (commandResult.contains(CommandResult.Fail)) {
context.abort(BuildFinishedStatus.FINISHED_FAILED)
return@buildSequence
}
}
})
}

override fun acceptStandardOutput(text: String): Boolean {
return _failedTestDetector.hasFailedTest(text)
}

override fun acceptErrorOutput(text: String): Boolean {
return false
_dotnetWorkflowAnalyzer.summarize(analyzerContext)
})
}

companion object {
Expand Down

This file was deleted.

@@ -0,0 +1,6 @@
package jetbrains.buildServer.dotnet

import jetbrains.buildServer.rx.Observable

interface FailedTestSource: Observable<Unit> {
}
@@ -0,0 +1,16 @@
package jetbrains.buildServer.dotnet

import jetbrains.buildServer.agent.runner.ServiceMessageSource
import jetbrains.buildServer.messages.serviceMessages.ServiceMessageTypes
import jetbrains.buildServer.rx.*

class FailedTestSourceImpl(
private val _serviceMessageSource: ServiceMessageSource)
: FailedTestSource {
override fun subscribe(observer: Observer<Unit>): Disposable =
_serviceMessageSource
.filter { ServiceMessageTypes.TEST_FAILED.equals(it.messageName, true) }
.first()
.map { Unit }
.subscribe(observer)
}
Expand Up @@ -4,5 +4,5 @@ import jetbrains.buildServer.agent.CommandLineResult
import java.util.*

interface ResultsAnalyzer {
fun analyze(result: CommandLineResult): EnumSet<CommandResult>
fun analyze(exitCode: Int, result: EnumSet<CommandResult>): EnumSet<CommandResult>
}

0 comments on commit 18abac4

Please sign in to comment.