Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graphite: Revised TcpSender #4202

Closed
wants to merge 15 commits into from
Closed

Conversation

vJoeyz
Copy link

@vJoeyz vJoeyz commented Feb 1, 2022

Contents
This pull request contains the implementation of a revised TcpSender used for sending statistics to Graphite. This revised version implements NACK-based write back-pressure with suspending, providing more reliability and data conservation when sending statistics.

Rationale
In the current version of the TcpSender, when a response isn't received within the WritePeriod (1 second by default) from Gatling's config (due to a slow db connection, resource problems etc.), the request is rejected, but the data is never buffered or resent. Furthermore, if this happens more than 5 times (as specified in the hardcoded maxRetries), the writing to Graphite halts completely, never to be resumed.

I have tested this implementation in my load tests and it fully resolves all issues I had with the current TcpSender when recording large amounts of data to a database.

Credits
This implementation was heavily inspired by Akka's sample implementation.

vJoeyz and others added 6 commits December 15, 2021 12:43
…4195

Motivation:

In 3.7, we added some `Int => Expression[FiniteDuration]` implicits that in some cases take over the expected `Any => Expression[Any]`.

Modification:

Drop those implicits and add lots for overrides for during-like loops.

Result:

No more conflicts. Breaking binary change, can only be released in 3.8.
@vJoeyz vJoeyz changed the title Tcp writeback pressure Graphite: Revised TcpSender Feb 1, 2022
Copy link
Member

@slandelle slandelle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks fro contributing!

Could you please:

Thanks!

case Event(GraphiteMetrics(bytes), data: ConnectedData) =>
buffer(bytes, data) match {
case Success(data) => stay() using data
case Failure(_) => goto(BufferOverflow) using NoData
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case _


private def writeFirst(data: ConnectedData): Unit = {
data.connection ! Write(data.storage(0), Ack(data.storageOffset))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

single instruction => useless {}

for ((bytes, i) <- data.storage.zipWithIndex) {
data.connection ! Write(bytes, Ack(data.storageOffset + i))
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

single instruction => useless {}


private def acknowledge(ack: Int, data: ConnectedData): ConnectedData = {
require(ack == data.storageOffset, s"Received wrong ack $ack at ${data.storageOffset}")
require(data.storage.nonEmpty, s"Storage was empty at ack $ack")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this throws?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the actor will be restarted

@@ -47,7 +55,14 @@ private[graphite] class TcpSender(
unstashAll()
val connection = sender()
connection ! Register(self)
goto(Running) using ConnectedData(connection, failures)
goto(Running) using ConnectedData(connection, failures, 0, Vector.empty[ByteString], 0L, false, 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vector.empty[ByteString] => Nil


private[sender] sealed trait TcpSenderData
private[sender] case object NoData extends TcpSenderData
private[sender] final case class DisconnectedData(retry: Retry) extends TcpSenderData
private[sender] final case class ConnectedData(connection: ActorRef, retry: Retry) extends TcpSenderData
private[sender] final case class ConnectedData(connection: ActorRef, retry: Retry, storageOffset: Int, storage: Vector[ByteString], stored: Long, suspended: Boolean, nack: Int) extends TcpSenderData
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Vector?

data.connection ! Write(bytes, Ack(currentOffset(data)))
buffer(bytes, data) match {
case Success(data) => stay() using data
case Failure(_) => goto(BufferOverflow) using NoData
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case _ =>

logger.info(s"Sending metrics to Graphite server located at: $remote")
data.connection ! Write(bytes, Ack(currentOffset(data)))
buffer(bytes, data) match {
case Success(data) => stay() using data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename so you're not shadowing the data reference.

}
// Connection actor failed to send metric, log it as a failure
case Event(CommandFailed(Write(_, Ack(ack))), data: ConnectedData) =>
logger.info(s"Failed to write to Graphite server located at: $remote, buffering...")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be a warning?

goto(WaitingForConnection) using DisconnectedData(newFailures)
when(Buffering)(event => {
var toAck = 10
var peerClosed = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those should be in ConnectedData.

@vJoeyz vJoeyz requested a review from slandelle February 4, 2022 14:51
@slandelle
Copy link
Member

@vJoeyz
Copy link
Author

vJoeyz commented Feb 4, 2022

@vJoeyz Have you seen all the ^M chars in your last commits? eg https://github.com/gatling/gatling/pull/4202/files#diff-696b3ab9a3d6b03bfb45bb037cc20cb0ac1e81f9070f05116d63a99c5e796419R23

They appeared after running compile

@slandelle
Copy link
Member

They appeared after running compile

???
What's your OS? Windows?

@vJoeyz
Copy link
Author

vJoeyz commented Feb 4, 2022

They appeared after running compile

??? What's your OS? Windows?

Yes.

@slandelle
Copy link
Member

Do you use something like git config core.autocrlf?

@vJoeyz
Copy link
Author

vJoeyz commented Feb 5, 2022

Do you use something like git config core.autocrlf?

Not that I'm aware of, no. Anyways, I've removed them. :)

@slandelle
Copy link
Member

Anyways, I've removed them. :)

Yeah, but your code is not properly formatted. It would if you were using compile to trigger scalafmt. As is, your PR won't pass our CI. We're going to investigate our settings for Windows but I'm not sure the issue is there.

Also, you commit history is broken because it doesn't only have your commits rebased on top of our main branch.

@slandelle
Copy link
Member

@vJoeyz We'll release 3.7.5 next week. By then, do you thing you'll be able to fix your commit history and your formatting?

@slandelle
Copy link
Member

slandelle commented Feb 11, 2022

I tried to fix formatting and wartremover on your branch, but still, TcpSenderSpec fails:

[info] - should go to the Running state and send metrics if it could connect without issues *** FAILED ***
[info]   java.lang.AssertionError: assertion failed: expected Write(ByteString(102, 111, 111, 32, 49, 32, 49, 10),NoAck(null)), found Write(ByteString(102, 111, 111, 32, 49, 32, 49, 10),Ack(0))
[info]   at scala.Predef$.assert(Predef.scala:279)
[info]   at akka.testkit.TestKitBase.expectMsg_internal(TestKit.scala:462)
[info]   at akka.testkit.TestKitBase.expectMsg(TestKit.scala:438)
[info]   at akka.testkit.TestKitBase.expectMsg$(TestKit.scala:438)
[info]   at akka.testkit.TestKit.expectMsg(TestKit.scala:973)
[info]   at io.gatling.graphite.sender.TcpSenderSpec.$anonfun$new$3(TcpSenderSpec.scala:64)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   ...
14:30:06.218 [pool-1-thread-1-ScalaTest-running-TcpSenderSpec] WARN io.gatling.graphite.sender.TcpSenderSpec$TcpSenderNoIo - Disconnected from Graphite server located at: 0.0.0.0/0.0.0.0:9999, retrying...
[info] - should retry to connected until the retry limit has been exceeded to finally stop *** FAILED ***
[info]   FlushingBuffer was not equal to Running (TcpSenderSpec.scala:80)

@biski
Copy link

biski commented Sep 5, 2022

I've tested this fix and it works in my case. Before fix integration with graphite used to stop after ~24h. Please merge it to stable, thanks :)

@slandelle
Copy link
Member

@biski There's no way to merge this work as is:

  • code formatting violation everywhere, while one just has to run compile/Test:compile to format automatically
  • merge conflict

Contributions welcome

@vJoeyz
Copy link
Author

vJoeyz commented Oct 12, 2022

@biski There's no way to merge this work as is:

  • code formatting violation everywhere, while one just has to run compile/Test:compile to format automatically
  • merge conflict

Contributions welcome

Sorry for my late reply, the past half year my health got in the way. I'll try to fix it up this week. :)

@slandelle
Copy link
Member

@vJoeyz Oh, sorry for you! I hope you're doing well now.
No worries, take you time. I haven't closed to PR for inactivity because I believe it has value. I've tried to rebase myself but had limited time for it and didn't manage to succeed.

@slandelle slandelle closed this Jan 3, 2023
@slandelle
Copy link
Member

Closing as idle for too long and can't be merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants