Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,14 @@ class DockerToActivationFileLogStore(system: ActorSystem, destinationDirectory:

val logs = container.logs(action.limits.logs.asMegaBytes, action.exec.sentinelledLogs)(transid)

// Adding the userId field to every written record, so any background process can properly correlate.
val userIdField = Map("namespaceId" -> user.authkey.uuid.toJson)

val additionalMetadata = Map(
"activationId" -> activation.activationId.asString.toJson,
"action" -> action.fullyQualifiedName(false).asString.toJson,
"userId" -> user.authkey.uuid.toJson)
"action" -> action.fullyQualifiedName(false).asString.toJson) ++ userIdField

val augmentedActivation = JsObject(activation.toJson.fields ++ userIdField)

// Manually construct JSON fields to omit parsing the whole structure
val metadata = ByteString("," + fieldsString(additionalMetadata))
Expand All @@ -113,7 +117,7 @@ class DockerToActivationFileLogStore(system: ActorSystem, destinationDirectory:
// the closing "}", adding the fields and finally add "}\n" to the end again.
.map(_.dropRight(1) ++ metadata ++ eventEnd)
// As the last element of the stream, print the activation record.
.concat(Source.single(ByteString(activation.toJson.compactPrint + "\n")))
.concat(Source.single(ByteString(augmentedActivation.toJson.compactPrint + "\n")))
.to(writeToFile)

val combined = OwSink.combine(toSeq, toFile)(Broadcast[ByteString](_))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import akka.util.ByteString
import common.{StreamLogging, WskActorSystem}
import org.scalatest.Matchers
import spray.json._
import spray.json.DefaultJsonProtocol._
import whisk.common.TransactionId
import whisk.core.containerpool.logging.{DockerToActivationFileLogStore, LogLine}
import whisk.core.entity._
Expand All @@ -41,14 +42,21 @@ class DockerToActivationFileLogStoreTests

override def createStore() = new TestLogStoreTo(Sink.ignore)

def toLoggedEvent(line: LogLine, userId: UUID, activationId: ActivationId, actionName: FullyQualifiedEntityName) = {
def toLoggedEvent(line: LogLine,
userId: UUID,
activationId: ActivationId,
actionName: FullyQualifiedEntityName): String = {
val event = line.toJson.compactPrint
val concatenated =
s""","activationId":"${activationId.asString}","action":"${actionName.asString}","userId":"${userId.asString}""""
s""","activationId":"${activationId.asString}","action":"${actionName.asString}","namespaceId":"${userId.asString}""""

event.dropRight(1) ++ concatenated ++ "}\n"
}

def toLoggedActivation(activation: WhiskActivation): String = {
JsObject(activation.toJson.fields ++ Map("namespaceId" -> user.authkey.uuid.asString.toJson)).compactPrint + "\n"
}

behavior of "DockerCouchDbFileLogStore"

it should "read logs returned by the container,in mem and enrich + write them to the provided sink" in {
Expand All @@ -70,7 +78,7 @@ class DockerToActivationFileLogStoreTests
}

// Last message should be the full activation
testActor.expectMsg(activation.toJson.compactPrint + "\n")
testActor.expectMsg(toLoggedActivation(activation))
}

class TestLogStoreTo(override val writeToFile: Sink[ByteString, _])
Expand Down