Skip to content
This repository has been archived by the owner on Mar 11, 2022. It is now read-only.

Commit

Permalink
Merge pull request #24 from PagerDuty/add-scalafmt
Browse files Browse the repository at this point in the history
Add scalafmt
  • Loading branch information
David van Geest committed Jul 11, 2017
2 parents c037f29 + 72f13e5 commit 29d08c0
Show file tree
Hide file tree
Showing 84 changed files with 1,492 additions and 1,414 deletions.
4 changes: 4 additions & 0 deletions .scalafmt.conf
@@ -0,0 +1,4 @@
verticalMultilineAtDefinitionSite = true
maxColumn = 120
align.openParenCallSite = false
danglingParentheses = true
66 changes: 36 additions & 30 deletions build.sbt
Expand Up @@ -3,8 +3,7 @@ lazy val bintraySettings = Seq(
bintrayRepository := "oss-maven",
licenses += ("BSD New", url("https://opensource.org/licenses/BSD-3-Clause")),
publishMavenStyle := true,
pomExtra := (
<url>https://github.com/PagerDuty/scheduler</url>
pomExtra := (<url>https://github.com/PagerDuty/scheduler</url>
<scm>
<url>git@github.com:PagerDuty/scheduler.git</url>
<connection>scm:git:git@github.com:PagerDuty/scheduler.git</connection>
Expand Down Expand Up @@ -57,14 +56,14 @@ lazy val sharedSettings = Seq(
)
) ++ bintraySettings

lazy val common = (project in file("common")).
settings(sharedSettings: _*).
settings(
lazy val common = (project in file("common"))
.settings(sharedSettings: _*)
.settings(
name := "scheduler-common",
libraryDependencies ++= Seq(
"com.pagerduty" %% "eris-core" % "2.0.4" exclude("org.slf4j", "slf4j-log4j12"),
"com.pagerduty" %% "eris-core" % "2.0.4" exclude ("org.slf4j", "slf4j-log4j12"),
"com.pagerduty" %% "metrics-api" % "1.3.0",
"org.json4s" %% "json4s-jackson" % "3.3.0",
"org.json4s" %% "json4s-jackson" % "3.3.0",
"org.slf4j" % "slf4j-api" % "1.7.13",
"org.slf4j" % "jul-to-slf4j" % "1.7.13",
"org.apache.kafka" % "kafka-clients" % "0.10.1.1",
Expand All @@ -74,10 +73,10 @@ lazy val common = (project in file("common")).
)
)

lazy val scalaApi = (project in file("scala-api")).
dependsOn(common).
settings(sharedSettings: _*).
settings(
lazy val scalaApi = (project in file("scala-api"))
.dependsOn(common)
.settings(sharedSettings: _*)
.settings(
name := "scheduler-scala-api",
libraryDependencies ++= Seq(
"com.pagerduty" %% "metrics-api" % "1.3.0",
Expand All @@ -87,13 +86,14 @@ lazy val scalaApi = (project in file("scala-api")).
)
)

lazy val scheduler = (project in file("scheduler")).
dependsOn(common % "it,test->test;compile->compile").
dependsOn(scalaApi % "it").
configs(IntegrationTest).
settings(sharedSettings: _*).
settings(Defaults.itSettings: _*).
settings(
lazy val scheduler = (project in file("scheduler"))
.dependsOn(common % "it,test->test;compile->compile")
.dependsOn(scalaApi % "it")
.configs(IntegrationTest)
.settings(inConfig(IntegrationTest)(scalafmtSettings))
.settings(sharedSettings: _*)
.settings(Defaults.itSettings: _*)
.settings(
name := "scheduler",
unmanagedSourceDirectories in IntegrationTest +=
baseDirectory.value / "src/test/scala/com/pagerduty/scheduler/specutil",
Expand All @@ -105,7 +105,7 @@ lazy val scheduler = (project in file("scheduler")).
"com.pagerduty" %% "eris-dao" % "2.0.0",
"com.pagerduty" %% "eris-dao" % "2.0.0" % "it" classifier "tests",
"com.pagerduty" %% "kafka-consumer" % kafkaConsumerVersion,
"com.pagerduty" %% "kafka-consumer-test-support" % kafkaConsumerVersion exclude("org.slf4j", "slf4j-simple"),
"com.pagerduty" %% "kafka-consumer-test-support" % kafkaConsumerVersion exclude ("org.slf4j", "slf4j-simple"),
"com.typesafe.akka" %% "akka-actor" % "2.3.14",
"com.typesafe.akka" %% "akka-slf4j" % "2.3.14",
"com.typesafe.akka" %% "akka-testkit" % "2.3.14" % "it,test",
Expand All @@ -114,15 +114,17 @@ lazy val scheduler = (project in file("scheduler")).
"org.scalatest" %% "scalatest" % "2.2.6" % "it,test",
"ch.qos.logback" % "logback-classic" % "1.1.3" % "it,test"
)
})
}
)

lazy val httpAdmin = (project in file("http-admin")).
dependsOn(common % "test->test;compile->compile").
dependsOn(scheduler % "it->it;test->test;compile->compile").
configs(IntegrationTest).
settings(sharedSettings: _*).
settings(Defaults.itSettings: _*).
settings(
lazy val httpAdmin = (project in file("http-admin"))
.dependsOn(common % "test->test;compile->compile")
.dependsOn(scheduler % "it->it;test->test;compile->compile")
.configs(IntegrationTest)
.settings(inConfig(IntegrationTest)(scalafmtSettings))
.settings(sharedSettings: _*)
.settings(Defaults.itSettings: _*)
.settings(
name := "scheduler-http-admin",
libraryDependencies ++= {
val scalatraVersion = "2.4.0"
Expand All @@ -140,6 +142,10 @@ lazy val httpAdmin = (project in file("http-admin")).
}
)

lazy val root = (project in file(".")).settings(
publish := { }
).aggregate(common, scalaApi, scheduler, httpAdmin)
lazy val root = (project in file("."))
.settings(
publish := {}
)
.aggregate(common, scalaApi, scheduler, httpAdmin)

scalafmtOnCompile in ThisBuild := true
60 changes: 33 additions & 27 deletions common/src/main/scala/com/pagerduty/scheduler/LoggingSupport.scala
Expand Up @@ -3,31 +3,33 @@ package com.pagerduty.scheduler
import com.pagerduty.metrics.Metrics
import com.pagerduty.scheduler.model.Task
import org.slf4j.Logger
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success }
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

/**
* Some logging code shared between scheduler and scheduler-scala-api
*/
* Some logging code shared between scheduler and scheduler-scala-api
*/
object LoggingSupport {

/**
* Report results on a future
* @param metrics Metrics instance to write metrics to
* @param log Logger instance to write logging to
* @param name Name for the future
* @param logString Description of what the future does
* @param future The future itself
* @param additionalTags Any additional datadog tags to write out
* @tparam T The return type of the future
*/
* Report results on a future
* @param metrics Metrics instance to write metrics to
* @param log Logger instance to write logging to
* @param name Name for the future
* @param logString Description of what the future does
* @param future The future itself
* @param additionalTags Any additional datadog tags to write out
* @tparam T The return type of the future
*/
def reportFutureResults[T](
metrics: Metrics,
log: Logger,
name: String,
logString: Option[String],
future: Future[T],
additionalTags: Seq[(String, String)] = Seq.empty[(String, String)]
)(implicit ec: ExecutionContext): Unit = {
metrics: Metrics,
log: Logger,
name: String,
logString: Option[String],
future: Future[T],
additionalTags: Seq[(String, String)] = Seq.empty[(String, String)]
)(implicit ec: ExecutionContext
): Unit = {

val startTime = System.currentTimeMillis()
def timeTaken() = (System.currentTimeMillis() - startTime).toInt
Expand All @@ -38,23 +40,27 @@ object LoggingSupport {
logString.foreach(str => log.info(s"Succeeded: ${str}"))
val tags = additionalTags :+ ("result" -> "success")
metrics.histogram(
name, timeTaken(), tags: _*
name,
timeTaken(),
tags: _*
)
case Failure(e) =>
logString.foreach(str => log.error(s"Failed: ${str}.", e))
val tags = additionalTags :+ ("result" -> "failure") :+ ("exception" -> e.getClass.getSimpleName)
metrics.histogram(
name, timeTaken(), tags: _*
name,
timeTaken(),
tags: _*
)
}
}

/**
* Setup additional tags based on the Task
* @param task The Task to generate tags from
* @param taskDataTagNames Task data keys to use for tags
* @return Tags pulled out of task.taskData appearing in taskDataDatNames.
*/
* Setup additional tags based on the Task
* @param task The Task to generate tags from
* @param taskDataTagNames Task data keys to use for tags
* @return Tags pulled out of task.taskData appearing in taskDataDatNames.
*/
def additionalTags(task: Task, taskDataTagNames: Set[String]): Seq[(String, String)] =
task.taskData.filterKeys(taskDataTagNames.contains).toSeq
}
17 changes: 9 additions & 8 deletions common/src/main/scala/com/pagerduty/scheduler/Partitioner.scala
@@ -1,22 +1,23 @@
package com.pagerduty.scheduler

import com.pagerduty.scheduler.model.Task._
import org.apache.kafka.common.utils.{ Utils => KafkaUtils }
import org.apache.kafka.common.utils.{Utils => KafkaUtils}

/**
* Created by cees on 2016/06/08.
*/
* Created by cees on 2016/06/08.
*/
object Partitioner {

/**
* This method calculates a Kafka partitionId for the given bytes. It is a direct copy of the
* partitioning logic found in org.apache.kafka.clients.producer.internals.DefaultPartitioner.
*/
* This method calculates a Kafka partitionId for the given bytes. It is a direct copy of the
* partitioning logic found in org.apache.kafka.clients.producer.internals.DefaultPartitioner.
*/
def partitionId(bytes: Array[Byte], numPartitions: Int): PartitionId = {
toPositive(KafkaUtils.murmur2(bytes)) % numPartitions
}

/**
* Again, this is a direct copy of the function in org.apache.kafka.clients.producer.internals.DefaultPartitioner
*/
* Again, this is a direct copy of the function in org.apache.kafka.clients.producer.internals.DefaultPartitioner
*/
private def toPositive(number: Int): Int = number & 0x7fffffff
}
59 changes: 29 additions & 30 deletions common/src/main/scala/com/pagerduty/scheduler/model/Task.scala
Expand Up @@ -6,43 +6,42 @@ import java.util.UUID
import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization.{ read, write }
import org.json4s.jackson.Serialization.{read, write}

/**
* A task for scheduling.
*
* @param orderingId The key of a logical ordering with which to associate the task. Ordering of
* tasks is defined only for tasks with the same orderingId, and only when this
* method is called in the same order as the scheduled times. That is, task1 is
* only guaranteed to execute before task2 if they have the same orderingId, task1
* has a scheduledTime before the scheduledTime of task2, and scheduleTask is
* called for task1 before it is called for task2.
*
* As a result of this ordering guarantee note that if a task with a given
* orderingId is blocked, this will not allow other tasks with the same orderingId to
* execute.
*
* DO NOT put commas in this string! It will break parsing the key from a string.
* @param scheduledTime The time at which to run the task.
* @param uniquenessKey The (orderingId, scheduledTime, uniquenessKey) tuple is the ID for a task.
* When a task is scheduled with an existing ID, it will overwrite the
* existing task. If the original task has been completed, the new task will
* not be re-run. This key can be used to schedule tasks with the same
* orderingId and scheduledTime.
*
* DO NOT put commas in this string!
* @param taskData Application-defined task data. This likely includes a task identifier,
* and possibly task state.
* @param version Serialization version number so we can make changes in the future.
*
*/
* A task for scheduling.
*
* @param orderingId The key of a logical ordering with which to associate the task. Ordering of
* tasks is defined only for tasks with the same orderingId, and only when this
* method is called in the same order as the scheduled times. That is, task1 is
* only guaranteed to execute before task2 if they have the same orderingId, task1
* has a scheduledTime before the scheduledTime of task2, and scheduleTask is
* called for task1 before it is called for task2.
*
* As a result of this ordering guarantee note that if a task with a given
* orderingId is blocked, this will not allow other tasks with the same orderingId to
* execute.
*
* DO NOT put commas in this string! It will break parsing the key from a string.
* @param scheduledTime The time at which to run the task.
* @param uniquenessKey The (orderingId, scheduledTime, uniquenessKey) tuple is the ID for a task.
* When a task is scheduled with an existing ID, it will overwrite the
* existing task. If the original task has been completed, the new task will
* not be re-run. This key can be used to schedule tasks with the same
* orderingId and scheduledTime.
*
* DO NOT put commas in this string!
* @param taskData Application-defined task data. This likely includes a task identifier,
* and possibly task state.
* @param version Serialization version number so we can make changes in the future.
*
*/
final case class Task(
orderingId: Task.OrderingId,
scheduledTime: Instant,
uniquenessKey: Task.UniquenessKey,
taskData: Task.TaskData,
version: Int = 1
) {
version: Int = 1) {
def taskKey: TaskKey = TaskKey(scheduledTime, orderingId, uniquenessKey)

def taskDataString: String = {
Expand Down
45 changes: 23 additions & 22 deletions common/src/main/scala/com/pagerduty/scheduler/model/TaskKey.scala
Expand Up @@ -3,15 +3,12 @@ package com.pagerduty.scheduler.model
import com.pagerduty.scheduler.Partitioner
import com.pagerduty.scheduler.model.Task.PartitionId
import java.time.format.DateTimeFormatter
import java.time.{ Instant, ZoneOffset }
import java.time.{Instant, ZoneOffset}
import org.json4s.CustomSerializer
import org.json4s.JsonAST.JString

case class TaskKey(
scheduledTime: Instant,
orderingId: Task.OrderingId,
uniquenessKey: Task.UniquenessKey
) extends Ordered[TaskKey] {
case class TaskKey(scheduledTime: Instant, orderingId: Task.OrderingId, uniquenessKey: Task.UniquenessKey)
extends Ordered[TaskKey] {
def asTuple: (Instant, Task.OrderingId, Task.UniquenessKey) = TaskKey.unapply(this).get

def compare(that: TaskKey): Int = {
Expand All @@ -30,9 +27,9 @@ case class TaskKey(
}

/**
* This method calculates a Kafka partitionId for the task key. It is a direct copy of the
* partitioning logic found in org.apache.kafka.clients.producer.internals.DefaultPartitioner.
*/
* This method calculates a Kafka partitionId for the task key. It is a direct copy of the
* partitioning logic found in org.apache.kafka.clients.producer.internals.DefaultPartitioner.
*/
def partitionId(numPartitions: Int): PartitionId = {
val partitionKeyBytes = orderingId.getBytes("UTF8")

Expand All @@ -46,10 +43,10 @@ object TaskKey {
DateTimeFormatter.ofPattern(ScheduledTimeFormat).withZone(ZoneOffset.UTC)

def apply(
formattedScheduledTime: String,
orderingId: Task.OrderingId,
uniquenessKey: Task.UniquenessKey
): TaskKey = {
formattedScheduledTime: String,
orderingId: Task.OrderingId,
uniquenessKey: Task.UniquenessKey
): TaskKey = {
TaskKey(Instant.parse(formattedScheduledTime), orderingId, uniquenessKey)
}

Expand All @@ -60,18 +57,22 @@ object TaskKey {
}

def lowerBound(
scheduledTime: Instant,
orderingId: Option[Task.OrderingId] = None,
uniquenessKey: Option[Task.UniquenessKey] = None
): TaskKey = {
scheduledTime: Instant,
orderingId: Option[Task.OrderingId] = None,
uniquenessKey: Option[Task.UniquenessKey] = None
): TaskKey = {
val oId = orderingId.getOrElse("")
val uKey = uniquenessKey.getOrElse("")
TaskKey(scheduledTime, oId, uKey)
}
}

class TaskKeyTimeSerializer extends CustomSerializer[Instant](format => ({
case JString(s) => Instant.parse(s)
}, {
case t: Instant => JString(TaskKey.TimeFormat.format(t))
}))
class TaskKeyTimeSerializer
extends CustomSerializer[Instant](
format =>
({
case JString(s) => Instant.parse(s)
}, {
case t: Instant => JString(TaskKey.TimeFormat.format(t))
})
)

0 comments on commit 29d08c0

Please sign in to comment.