Skip to content

Commit

Permalink
Add logback logger
Browse files Browse the repository at this point in the history
  • Loading branch information
ashkrit committed May 27, 2018
1 parent 45aa703 commit 970151f
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 4 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#Blog
http://ashkrit.blogspot.com/2018/05/spark-microservices.html
http://ashkrit.blogspot.sg/2018/05/custom-logs-in-apache-spark.html

#How to Build

Expand Down
25 changes: 25 additions & 0 deletions src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<configuration>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<appender name="METRICS" class="micro.logback.MetricsLogbackAppender"/>

<root level="info">
<appender-ref ref="STDOUT" />
</root>

<logger level="info" name="micro" additivity="true">
<appender-ref ref="METRICS" />
</logger>

<logger level="info" name="org.apache.spark.scheduler.DAGScheduler" additivity="true">
<appender-ref ref="METRICS" />
</logger>

</configuration>
30 changes: 30 additions & 0 deletions src/main/scala/micro/logback/MetricsLogbackAppender.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package micro.logback

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.BiFunction

import ch.qos.logback.classic.spi.ILoggingEvent
import ch.qos.logback.core.UnsynchronizedAppenderBase

class MetricsLogbackAppender extends UnsynchronizedAppenderBase[ILoggingEvent] {

override def append(e: ILoggingEvent) = {
//Send this message to elastic search or REST end point
messageCount.compute(Thread.currentThread().getName, mergeValue)
System.out.println(messageCount + " " + e)
}

val messageCount = new ConcurrentHashMap[String, AtomicInteger]()
val mergeValue = new BiFunction[String, AtomicInteger, AtomicInteger] {
def apply(key: String, currentValue: AtomicInteger) = {
val nextValue = currentValue match {
case null => new AtomicInteger(0)
case _ => currentValue
}
nextValue.incrementAndGet()
nextValue
}
}

}
11 changes: 7 additions & 4 deletions src/main/scala/micro/main/SparkFactory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,33 @@ import org.slf4j.LoggerFactory

object SparkFactory {

val _LOG = LoggerFactory.getLogger(this.getClass.getName)
val _LOG = LoggerFactory.getLogger(this.getClass.getName)

def newSparkSession(appName: String, master: String) = {
def newSparkSession(appName: String, master: String): SparkSession = {
val sparkConf = new SparkConf().setAppName(appName).setMaster(master)
val sparkSession = SparkSession.builder()
.appName(appName)
.config(sparkConf)
.getOrCreate()

_LOG.info("Context details {}", sparkSession)
sparkSession

}

def loadAsTable(context: SparkSession, fileName: String, table: String): SparkSession = {
import context.implicits._
val data = context.read.format("csv").option("header", "true").csv(fileName).as[Vehicle]
data.printSchema()
_LOG.info("Schema details {}", data.schema.treeString)
data.cache()
data.createOrReplaceTempView(table)
context
}


def filePath(filePath: String): String = {
val url = this.getClass.getResource("/logback.xml")
_LOG.info("Path {}" , url)
_LOG.info("Loading content from file {}", filePath)
val fileUrl = this.getClass.getResource(filePath)
Paths.get(fileUrl.toURI).toFile.getAbsolutePath
}
Expand Down

0 comments on commit 970151f

Please sign in to comment.