diff --git a/flink/src/main/scala/com/metamx/tranquility/flink/BeamSink.scala b/flink/src/main/scala/com/metamx/tranquility/flink/BeamSink.scala index 35e5f73..f123362 100644 --- a/flink/src/main/scala/com/metamx/tranquility/flink/BeamSink.scala +++ b/flink/src/main/scala/com/metamx/tranquility/flink/BeamSink.scala @@ -18,12 +18,13 @@ */ package com.metamx.tranquility.flink -import com.metamx.common.scala.Logging +import com.metamx.common.scala.{Logger, Logging} import com.metamx.tranquility.tranquilizer.MessageDroppedException import com.metamx.tranquility.tranquilizer.Tranquilizer import com.twitter.util.Return import com.twitter.util.Throw import java.util.concurrent.atomic.AtomicReference + import org.apache.flink.api.common.accumulators.LongCounter import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.RichSinkFunction @@ -66,9 +67,14 @@ class BeamSink[T](beamFactory: BeamFactory[T], reportDropsAsExceptions: Boolean } override def close() = { - sender.get.flush() - maybeThrow() - sender.get.stop() + try { + sender.get.flush() + maybeThrow() + sender.get.stop() + } catch { + case e : NoSuchElementException => log.warn(e, "No Tranquility object found while closing BeamSink") + case e : IllegalStateException => log.warn(e, "Exception while stopping Tranquility") + } } private def maybeThrow() {