Skip to content
This repository
branch: master
Fetching contributors…

Octocat-spinner-32-eaf2f5

Cannot retrieve contributors at this time

file 103 lines (87 sloc) 3.208 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/

package akka.remote

import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }

import akka.actor.Actor
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout

import java.util.concurrent.{ TimeUnit, CountDownLatch }
import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.{ ExecutionContext, Future }

trait NetworkFailureSpec extends DefaultTimeout { self: AkkaSpec
  import Actor._
  import scala.concurrent.duration.Duration

  import system.dispatcher

  val BytesPerSecond = "60KByte/s"
  val DelayMillis = "350ms"
  val PortRange = "1024-65535"

  def replyWithTcpResetFor(duration: Duration, dead: AtomicBoolean) = {
    Future {
      try {
        enableTcpReset()
        println("===>>> Reply with [TCP RST] for [" + duration + "]")
        Thread.sleep(duration.toMillis)
        restoreIP
      } catch {
        case e: Throwable
          dead.set(true)
          e.printStackTrace
      }
    }
  }

  def throttleNetworkFor(duration: Duration, dead: AtomicBoolean) = {
    Future {
      try {
        enableNetworkThrottling()
        println("===>>> Throttling network with [" + BytesPerSecond + ", " + DelayMillis + "] for [" + duration + "]")
        Thread.sleep(duration.toMillis)
        restoreIP
      } catch {
        case e: Throwable
          dead.set(true)
          e.printStackTrace
      }
    }
  }

  def dropNetworkFor(duration: Duration, dead: AtomicBoolean) = {
    Future {
      try {
        enableNetworkDrop()
        println("===>>> Blocking network [TCP DENY] for [" + duration + "]")
        Thread.sleep(duration.toMillis)
        restoreIP
      } catch {
        case e: Throwable
          dead.set(true)
          e.printStackTrace
      }
    }
  }

  def sleepFor(duration: Duration) = {
    println("===>>> Sleeping for [" + duration + "]")
    Thread sleep (duration.toMillis)
  }

  def enableNetworkThrottling() = {
    restoreIP()
    assert(new ProcessBuilder("ipfw", "add", "pipe", "1", "ip", "from", "any", "to", "any").start.waitFor == 0)
    assert(new ProcessBuilder("ipfw", "add", "pipe", "2", "ip", "from", "any", "to", "any").start.waitFor == 0)
    assert(new ProcessBuilder("ipfw", "pipe", "1", "config", "bw", BytesPerSecond, "delay", DelayMillis).start.waitFor == 0)
    assert(new ProcessBuilder("ipfw", "pipe", "2", "config", "bw", BytesPerSecond, "delay", DelayMillis).start.waitFor == 0)
  }

  def enableNetworkDrop() = {
    restoreIP()
    assert(new ProcessBuilder("ipfw", "add", "1", "deny", "tcp", "from", "any", "to", "any", PortRange).start.waitFor == 0)
  }

  def enableTcpReset() = {
    restoreIP()
    assert(new ProcessBuilder("ipfw", "add", "1", "reset", "tcp", "from", "any", "to", "any", PortRange).start.waitFor == 0)
  }

  def restoreIP() = {
    println("===>>> Restoring network")
    assert(new ProcessBuilder("ipfw", "del", "pipe", "1").start.waitFor == 0)
    assert(new ProcessBuilder("ipfw", "del", "pipe", "2").start.waitFor == 0)
    assert(new ProcessBuilder("ipfw", "flush").start.waitFor == 0)
    assert(new ProcessBuilder("ipfw", "pipe", "flush").start.waitFor == 0)
  }
}
Something went wrong with that request. Please try again.