Skip to content

Commit

Permalink
SPARK-24863: address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
arunmahadevan committed Aug 13, 2018
1 parent 7129c3f commit ffa20ba
Showing 1 changed file with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private object JsonUtils {
* Write TopicPartitions as json string
*/
def partitions(partitions: Iterable[TopicPartition]): String = {
val result = new HashMap[String, List[Int]]
val result = HashMap.empty[String, List[Int]]
partitions.foreach { tp =>
val parts: List[Int] = result.getOrElse(tp.topic, Nil)
result += tp.topic -> (tp.partition::parts)
Expand Down Expand Up @@ -85,11 +85,11 @@ private object JsonUtils {
* Write per-TopicPartition offsets as json string
*/
def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = {
val result = new HashMap[String, HashMap[Int, Long]]()
val result = HashMap.empty[String, HashMap[Int, Long]]
val partitions = partitionOffsets.keySet.toSeq.sorted // sort for more determinism
partitions.foreach { tp =>
val off = partitionOffsets(tp)
val parts = result.getOrElse(tp.topic, new HashMap[Int, Long])
val parts = result.getOrElse(tp.topic, HashMap.empty[Int, Long])
parts += tp.partition -> off
result += tp.topic -> parts
}
Expand All @@ -99,13 +99,14 @@ private object JsonUtils {
/**
* Write per-topic partition lag as json string
*/
def partitionLags(latestOffsets: Map[TopicPartition, Long],
processedOffsets: Map[TopicPartition, Long]): String = {
val result = new HashMap[String, HashMap[Int, Long]]()
def partitionLags(
latestOffsets: Map[TopicPartition, Long],
processedOffsets: Map[TopicPartition, Long]): String = {
val result = HashMap.empty[String, HashMap[Int, Long]]
val partitions = latestOffsets.keySet.toSeq.sorted
partitions.foreach { tp =>
val lag = latestOffsets(tp) - processedOffsets.getOrElse(tp, 0L)
val parts = result.getOrElse(tp.topic, new HashMap[Int, Long])
val parts = result.getOrElse(tp.topic, HashMap.empty[Int, Long])
parts += tp.partition -> lag
result += tp.topic -> parts
}
Expand Down

0 comments on commit ffa20ba

Please sign in to comment.