From e819a566612f2f6345825deecd18397a425b68fe Mon Sep 17 00:00:00 2001 From: Kartik Date: Fri, 5 Apr 2019 13:13:56 +0530 Subject: [PATCH 1/2] Tranquilizer isn't closed explicitly when BeamSink is closed --- flink/src/main/scala/com/metamx/tranquility/flink/BeamSink.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/flink/src/main/scala/com/metamx/tranquility/flink/BeamSink.scala b/flink/src/main/scala/com/metamx/tranquility/flink/BeamSink.scala index 9dc9001..35e5f73 100644 --- a/flink/src/main/scala/com/metamx/tranquility/flink/BeamSink.scala +++ b/flink/src/main/scala/com/metamx/tranquility/flink/BeamSink.scala @@ -68,6 +68,7 @@ class BeamSink[T](beamFactory: BeamFactory[T], reportDropsAsExceptions: Boolean override def close() = { sender.get.flush() maybeThrow() + sender.get.stop() } private def maybeThrow() { From fbbc44529cccaec9b055bc68cdb614824d02223f Mon Sep 17 00:00:00 2001 From: Kartik Date: Wed, 10 Apr 2019 17:06:51 +0530 Subject: [PATCH 2/2] enable flink to close BeamSink without any error --- .../com/metamx/tranquility/flink/BeamSink.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/flink/src/main/scala/com/metamx/tranquility/flink/BeamSink.scala b/flink/src/main/scala/com/metamx/tranquility/flink/BeamSink.scala index 35e5f73..f123362 100644 --- a/flink/src/main/scala/com/metamx/tranquility/flink/BeamSink.scala +++ b/flink/src/main/scala/com/metamx/tranquility/flink/BeamSink.scala @@ -18,12 +18,13 @@ */ package com.metamx.tranquility.flink -import com.metamx.common.scala.Logging +import com.metamx.common.scala.{Logger, Logging} import com.metamx.tranquility.tranquilizer.MessageDroppedException import com.metamx.tranquility.tranquilizer.Tranquilizer import com.twitter.util.Return import com.twitter.util.Throw import java.util.concurrent.atomic.AtomicReference + import org.apache.flink.api.common.accumulators.LongCounter import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.RichSinkFunction @@ -66,9 +67,14 @@ class BeamSink[T](beamFactory: BeamFactory[T], reportDropsAsExceptions: Boolean } override def close() = { - sender.get.flush() - maybeThrow() - sender.get.stop() + try { + sender.get.flush() + maybeThrow() + sender.get.stop() + } catch { + case e : NoSuchElementException => log.warn(e, "No Tranquility object found while closing BeamSink") + case e : IllegalStateException => log.warn(e, "Exception while stopping Tranquility") + } } private def maybeThrow() {