Skip to content

Commit

Permalink
HttpRequestLogger supports content logging: cs-content anf sc-content…
Browse files Browse the repository at this point in the history
… fields
  • Loading branch information
Michael Lagutko committed Aug 4, 2011
1 parent 186e709 commit 071ee78
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 55 deletions.
4 changes: 3 additions & 1 deletion src/main/scala/blueeyes/concurrent/Future.scala
Expand Up @@ -348,12 +348,14 @@ class Future[T]{
def zip[A](f2: Future[A]): Future[(T, A)] = {
val f1 = this

val zipped = new Future[(T, A)] {
class ZippedFuture extends Future[(T, A)] {
override def cancel(why: Option[Throwable]): Boolean = {
f1.cancel(why) || f2.cancel(why)
}
}

val zipped = new ZippedFuture()

def deliverZip = {
if (f1.isDelivered && f2.isDelivered && !zipped.isDone) {
try {
Expand Down
9 changes: 2 additions & 7 deletions src/main/scala/blueeyes/core/data/AggregatedByteChunk.scala
Expand Up @@ -3,15 +3,10 @@ package blueeyes.core.data
import blueeyes.util.metrics.DataSize
import blueeyes.concurrent.Future
import java.io.ByteArrayOutputStream

import scalaz.Scalaz._

object AggregatedByteChunk {
def apply(chunk: ByteChunk, chunkSizeInBytes: Option[DataSize]): Future[ByteChunk] = {

val aggregated = new Future[ByteChunk]()
aggregateContent(chunk, new ByteArrayOutputStream(), aggregated, chunkSizeInBytes)
aggregated
}
def apply(chunk: ByteChunk, chunkSizeInBytes: Option[DataSize] = None): Future[ByteChunk] = new Future[ByteChunk] ->- {aggregateContent(chunk, new ByteArrayOutputStream(), _, chunkSizeInBytes)}

private def aggregateContent(chunk: ByteChunk, buffer: ByteArrayOutputStream, result: Future[ByteChunk], chunkSizeInBytes: Option[DataSize]) {
def done = {
Expand Down
34 changes: 22 additions & 12 deletions src/main/scala/blueeyes/core/service/HttpRequestLogger.scala
@@ -1,27 +1,29 @@
package blueeyes.core.service

import org.joda.time.DateTime
import org.joda.time.DateTimeZone
import org.joda.time.format.DateTimeFormat
import org.joda.time.format.DateTimeFormatter
import java.net.InetAddress

import blueeyes.core.http.{HttpRequest, HttpResponse, HttpHeaders}
import blueeyes.util.Clock
import java.net.InetAddress
import blueeyes.concurrent.Future

/** A request logger is a function from (request/future of response) to future
import org.joda.time.format.DateTimeFormat
import org.apache.commons.codec.binary.Base64
import blueeyes.core.data.{AggregatedByteChunk, Bijection, ByteChunk}

/** A request logger is a function from (request/future of response) to future
* of log line. Request loggers do not have side effects.
*/
trait HttpRequestLogger[T, S] extends ((HttpRequest[T], Future[HttpResponse[S]]) => Future[String]) { self =>
/** Combines this logger with the specified logger to produce another logger.
* If necessary, the items are separated by a single space character.
*/
def :+ (logger: HttpRequestLogger[T, S]): HttpRequestLogger[T, S] = new HttpRequestLogger[T, S] {
def :+ (logger: HttpRequestLogger[T, S]): HttpRequestLogger[T, S] = new CompositeHttpRequestLogger(logger)

private[HttpRequestLogger] class CompositeHttpRequestLogger(logger: HttpRequestLogger[T, S]) extends HttpRequestLogger[T, S]{
def apply(request: HttpRequest[T], response: Future[HttpResponse[S]]): Future[String] = {
self(request, response).zip(logger(request, response)).map { t =>
val (prefix, suffix) = t

val infix = if (suffix.length == 0) "" else (if (prefix.endsWith(" ")) "" else " ")

prefix + infix + suffix
Expand All @@ -41,28 +43,31 @@ object HttpRequestLogger{
private val TimeFormatter = DateTimeFormat.forPattern("HH:mm:ss.S")

private val IpIdentifierValue = try {
InetAddress.getLocalHost().getHostAddress()
InetAddress.getLocalHost.getHostAddress
}
catch {
case error: Throwable => "127.0.0.1"
}

private val DnsNameIdentifierValue = try {
InetAddress.getLocalHost().getHostName()
InetAddress.getLocalHost.getHostName
}
catch {
case error: Throwable => "localhost"
}

private def lift[T, S](f: (HttpRequest[T], Future[HttpResponse[S]]) => Future[String]): HttpRequestLogger[T, S] = new HttpRequestLogger[T, S] {
private def lift[T, S](f: (HttpRequest[T], Future[HttpResponse[S]]) => Future[String]): HttpRequestLogger[T, S] = new HttpRequestLoggerImpl[T, S] (f)

private[HttpRequestLogger] class HttpRequestLoggerImpl[T, S](f: (HttpRequest[T], Future[HttpResponse[S]]) => Future[String]) extends HttpRequestLogger[T, S]{
def apply(request: HttpRequest[T], response: Future[HttpResponse[S]]): Future[String] = f(request, response)
}

/** Creates a logger from a W3 Extended Log fields directive. e.g.:
*
* #Fields: time cs-method cs-uri
*/
def apply[T, S](fieldsDirective: FieldsDirective)(implicit clock: Clock): HttpRequestLogger[T, S] = {
def apply[T, S](fieldsDirective: FieldsDirective)(implicit clock: Clock, requestBijection: Bijection[T, ByteChunk], responseBijection: Bijection[S, ByteChunk]): HttpRequestLogger[T, S] = {
def encodeBase64(chunk: Option[ByteChunk]) = chunk.map(AggregatedByteChunk(_).map(aggregated => new String(Base64.encodeBase64(aggregated.data), "UTF-8"))).getOrElse(Future.sync(""))
def apply0(identifiers: List[FieldIdentifier]): HttpRequestLogger[T, S] = identifiers match {
case Nil =>
lift((rq, rs) => Future.sync(""))
Expand Down Expand Up @@ -115,6 +120,11 @@ object HttpRequestLogger{
case ServerPrefix => Future.sync(DnsNameIdentifierValue)
case _ => Future.sync("")
}
case ContentIdentifier(prefix) => prefix match {
case ClientToServerPrefix => encodeBase64(request.content.map(requestBijection(_)))
case ServerToClientPrefix => response flatMap { response => encodeBase64(response.content.map(responseBijection(_))) }
case _ => Future.sync("")
}
case StatusIdentifier(prefix) => prefix match {
case ServerToClientPrefix => response map { response => response.status.code.name }
case _ => Future.sync("")
Expand Down
Expand Up @@ -4,15 +4,15 @@ import blueeyes.health.HealthMonitor
import net.lag.logging.Logger
import blueeyes.core.http.{HttpRequest, HttpResponse}
import blueeyes.json.JsonAST._
import blueeyes.core.data.Bijection
import blueeyes.json.{JPathField, JPath, JPathImplicits}
import blueeyes.parsers.W3ExtendedLogAST.FieldsDirective
import net.lag.configgy.{Config, ConfigMap, Configgy}
import net.lag.configgy.{Config, Configgy}
import blueeyes.parsers.W3ExtendedLog
import blueeyes.concurrent._
import blueeyes.util._
import blueeyes.util.logging._
import java.util.Calendar
import blueeyes.core.data._

trait HttpServiceDescriptorFactoryCombinators extends HttpRequestHandlerCombinators with RestPathPatternImplicits with FutureImplicits with blueeyes.json.Implicits{
// private[this] object TransformerCombinators
Expand Down Expand Up @@ -83,7 +83,7 @@ trait HttpServiceDescriptorFactoryCombinators extends HttpRequestHandlerCombinat
* }
* }}}
*/
def requestLogging[T, S](f: => HttpServiceDescriptorFactory[T, S]): HttpServiceDescriptorFactory[T, S] = {
def requestLogging[T, S](f: => HttpServiceDescriptorFactory[T, S])(implicit contentBijection: Bijection[T, ByteChunk]): HttpServiceDescriptorFactory[T, S] = {
import RollPolicies._
(context: HttpServiceContext) => {
val underlying = f(context)
Expand Down Expand Up @@ -189,15 +189,15 @@ trait HttpServiceDescriptorFactoryCombinators extends HttpRequestHandlerCombinat
}
}

private[service] class HttpRequestLoggerHandler[T](fieldsDirective: FieldsDirective, log: W3ExtendedLogger, underlying: HttpRequestHandler[T]) extends HttpRequestHandler[T] with ClockSystem{
private[service] class HttpRequestLoggerHandler[T](fieldsDirective: FieldsDirective, log: W3ExtendedLogger, underlying: HttpRequestHandler[T])(implicit contentBijection: Bijection[T, ByteChunk]) extends HttpRequestHandler[T] with ClockSystem{
private val requestLogger = HttpRequestLogger[T, T](fieldsDirective)
def isDefinedAt(request: HttpRequest[T]) = underlying.isDefinedAt(request)

def apply(request: HttpRequest[T]) = {

val response = underlying(request)

val logRecord = requestLogger(request, response)

logRecord foreach { log(_)}

response
Expand Down
30 changes: 26 additions & 4 deletions src/main/scala/blueeyes/demo/BlueEyesDemo.scala
Expand Up @@ -5,22 +5,27 @@ import blueeyes.BlueEyesServer
import blueeyes.BlueEyesServiceBuilder
import blueeyes.concurrent.Future
import blueeyes.concurrent.Future._
import blueeyes.core.data.{ByteChunk, BijectionsChunkJson}
import blueeyes.core.http.HttpHeaders
import blueeyes.core.http.HttpStatusCodes._
import blueeyes.core.http.combinators.HttpRequestCombinators
import blueeyes.core.http.MimeTypes._
import blueeyes.json.JsonAST._
import blueeyes.persistence.mongo.MongoImplicits._
import blueeyes.persistence.mongo.{ConfigurableMongo, MongoFilterAll, Mongo, MongoFilter}
import blueeyes.core.service.ServerHealthMonitorService
import blueeyes.core.http.{HttpStatus, HttpRequest, HttpResponse}
import blueeyes.core.http.{HttpStatusCodes, HttpStatus, HttpRequest, HttpResponse}
import blueeyes.core.data.FileSource._
import blueeyes.core.data.{FileSource, ByteChunk, BijectionsChunkJson}
import java.io.File

object BlueEyesDemo extends BlueEyesServer with BlueEyesDemoService with ServerHealthMonitorService{
override def main(args: Array[String]) = super.main(Array("--configFile", "/etc/default/blueeyes.conf"))
}

trait BlueEyesDemoService extends BlueEyesServiceBuilder with HttpRequestCombinators with BijectionsChunkJson with ConfigurableMongo{
val contactListService = service("contactlist", "1.0.0") {
requestLogging{

healthMonitor { monitor => context =>
startup {
val mongoConfig = context.config.configMap("mongo")
Expand Down Expand Up @@ -71,14 +76,31 @@ trait BlueEyesDemoService extends BlueEyesServiceBuilder with HttpRequestCombina
}
}
}
}~
path("/file/read"){
compress{
produce(image / jpeg){
get { request: HttpRequest[ByteChunk] =>
val response = HttpResponse[ByteChunk](status = HttpStatus(HttpStatusCodes.OK), content = FileSource(new File("/Users/mlagutko/Downloads/victoria-parkside-resort.jpg")), headers = HttpHeaders.Empty + ("Content-Encoding", "gzip"))
Future.sync[HttpResponse[ByteChunk]](response)
}
}
}
}
}~
path("/ping") {
produce(text/plain) {
get { request: HttpRequest[ByteChunk] =>
Future.sync(HttpResponse[JValue](content = Some(JBool(true))))
}
}
}
} ->
}->
shutdown { demoConfig: BlueEyesDemoConfig =>
().future
}
}
}
}}

private def searchContacts(filterJObject: Option[JObject], config: BlueEyesDemoConfig): Future[List[JString]] = {
createFilter(filterJObject) map { filter =>
Expand Down
6 changes: 5 additions & 1 deletion src/main/scala/blueeyes/parsers/W3ExtendedLogGrammar.scala
Expand Up @@ -125,6 +125,9 @@ object W3ExtendedLogAST {
case class UriQueryIdentifier (prefix: Prefix) extends PredefinedIdentifierPrefix{
def identifier = "uri-query"
}
case class ContentIdentifier (prefix: Prefix) extends PredefinedIdentifierPrefix{
def identifier = "content"
}

case class LogEntry(fields: List[Field])

Expand Down Expand Up @@ -222,7 +225,8 @@ trait W3ExtendedLogGrammar extends JavaTokenParsers {
("method" ^^^ MethodIdentifier(prefix)) |
("uri-stem" ^^^ UriStemIdentifier(prefix)) |
("uri-query" ^^^ UriQueryIdentifier(prefix)) |
("uri" ^^^ UriIdentifier(prefix))
("uri" ^^^ UriIdentifier(prefix)) |
("content" ^^^ ContentIdentifier(prefix))
}
}

Expand Down
41 changes: 20 additions & 21 deletions src/main/scala/blueeyes/util/logging/W3ExtendedLogger.scala
Expand Up @@ -2,7 +2,7 @@ package blueeyes.util.logging

import blueeyes.concurrent.Future
import blueeyes.parsers.W3ExtendedLogAST.FieldsDirective
import blueeyes.persistence.cache.{ExpirationPolicy, CacheSettings, Stage}
import blueeyes.persistence.cache.{ExpirationPolicy, Stage}
import blueeyes.util.RichThrowableImplicits

import java.io.{FileOutputStream, OutputStreamWriter, File, Writer}
Expand Down Expand Up @@ -38,22 +38,22 @@ object W3ExtendedLogger{

class W3ExtendedLogger(baseFileName: String, policy: Policy, fieldsDirective: FieldsDirective, writeDelaySeconds: Int){
private val fileHandler = new FileHandler(baseFileName, policy, fieldsDirective)
private val logStage = Stage[String, String](ExpirationPolicy(None, Some(writeDelaySeconds), SECONDS), 100, write)
private val logStage = Stage[String, String](ExpirationPolicy(None, Some(writeDelaySeconds), SECONDS), 1000, write)

implicit val LogLineSemigroup = new Semigroup[String] {
def append(l1: String, l2: => String): String = l1 + "\n" + l2
}

def apply(logEntry: String) = logStage += ("log", logEntry)
def apply(logEntry: String) { logStage += ("log", logEntry) }

def close: Future[Unit] = {
val flushFuture = logStage.flushAll
flushFuture.map((v: Unit) => fileHandler.close)
val flushFuture = logStage.flushAll()
flushFuture.map((v: Unit) => fileHandler.close())
}

def fileName: Option[String] = fileHandler.fileName

private def write(key: String, record: String) = fileHandler.publish(record)
private def write(key: String, record: String) {fileHandler.publish(record)}
}

class FileHandler(baseFileName: String, policy: Policy, fieldsDirective: FieldsDirective) extends RichThrowableImplicits with NameFormat with Roll{
Expand All @@ -64,38 +64,37 @@ class FileHandler(baseFileName: String, policy: Policy, fieldsDirective: FieldsD
private var _openTime: Long = System.currentTimeMillis
private var _nextRollTime: Long = 0

roll
roll()

def fileName = _fileName

def flush = _stream.foreach(_.flush())
def flush() {_stream.foreach(_.flush())}

def close = {
flush
def close() {
flush()
try {
_stream.foreach(_.close())
_stream = None
} catch { case _ => () }
}

def publish(record: String) = {
rollIfNecessary
def publish(record: String) {
rollIfNecessary()
writeRecord(record)
}

private def writeRecord(record: String){
_stream foreach { streamValue =>
try {
streamValue.write(record + "\n")
streamValue.flush
streamValue.flush()
} catch {
case e => System.err.println(e.fullStackTrace)
}
}
}

private def rollIfNecessary = {

private def rollIfNecessary() {
lock.readLock.lock()
val shouldBeRolled = System.currentTimeMillis > _nextRollTime
lock.readLock.unlock()
Expand All @@ -104,7 +103,7 @@ class FileHandler(baseFileName: String, policy: Policy, fieldsDirective: FieldsD
lock.writeLock.lock()
try {
if (System.currentTimeMillis > _nextRollTime) {
roll
roll()
}
}
finally {
Expand All @@ -113,7 +112,7 @@ class FileHandler(baseFileName: String, policy: Policy, fieldsDirective: FieldsD
}
}

private def openLog = {
private def openLog() {
_stream = _fileName map { fileNameValue =>
val dir = new File(fileNameValue).getParentFile
if ((dir ne null) && !dir.exists) dir.mkdirs
Expand All @@ -125,23 +124,23 @@ class FileHandler(baseFileName: String, policy: Policy, fieldsDirective: FieldsD
stream.write("#Version: 1.0\n")
stream.write("#Date: %s\n".format(new SimpleDateFormat("dd-MMM-yyyy HH:MM:SS").format(new Date())))
stream.write(fieldsDirective.toString+ "\n")
stream.flush
stream.flush()
}
stream
}
}


private def roll = {
close
private def roll() {
close()

val newFileName = timedName(baseFileName, policy, _openTime)

_fileName foreach {fileNameValue => new File(fileNameValue).renameTo(new File(newFileName))}

_fileName = Some(newFileName)

openLog
openLog()
}
}

Expand Down

0 comments on commit 071ee78

Please sign in to comment.