diff --git a/build.sbt b/build.sbt index 3f4ca47d1..845d32bfc 100644 --- a/build.sbt +++ b/build.sbt @@ -3,7 +3,7 @@ import scala.util.Try name := "scorex-core" lazy val commonSettings = Seq( - scalaVersion := "2.12.10", + scalaVersion := "2.12.12", resolvers += Resolver.sonatypeRepo("public"), resolvers += "Maven Central Server" at "https://repo1.maven.org/maven2", resolvers += "Typesafe Server" at "https://repo.typesafe.com/typesafe/releases", @@ -72,9 +72,9 @@ version in ThisBuild := { git.gitUncommittedChanges in ThisBuild := true -val circeVersion = "0.9.0" -val akkaVersion = "2.5.24" -val akkaHttpVersion = "10.1.9" +val circeVersion = "0.13.0" +val akkaVersion = "2.6.10" +val akkaHttpVersion = "10.2.1" val networkDependencies = Seq( "com.typesafe.akka" %% "akka-actor" % akkaVersion, diff --git a/examples/build.sbt b/examples/build.sbt index 7c1f79f12..895f5ffed 100644 --- a/examples/build.sbt +++ b/examples/build.sbt @@ -5,7 +5,7 @@ libraryDependencies ++= Seq( "org.scalatest" %% "scalatest" % "3.0.1" % "test", "org.scalacheck" %% "scalacheck" % "1.13.+" % "test", "org.scorexfoundation" %% "iodb" % "0.3.2", - "com.typesafe.akka" %% "akka-testkit" % "2.4.17" % "test" + "com.typesafe.akka" %% "akka-testkit" % "2.6.10" % "test" ) mainClass in assembly := Some("examples.hybrid.HybridApp") diff --git a/src/main/scala/scorex/core/app/Application.scala b/src/main/scala/scorex/core/app/Application.scala index f2524d722..b2734ab32 100644 --- a/src/main/scala/scorex/core/app/Application.scala +++ b/src/main/scala/scorex/core/app/Application.scala @@ -5,7 +5,6 @@ import java.net.InetSocketAddress import akka.actor.{ActorRef, ActorSystem} import akka.http.scaladsl.Http import akka.http.scaladsl.server.{ExceptionHandler, RejectionHandler, Route} -import akka.stream.ActorMaterializer import scorex.core.api.http.{ApiErrorHandler, ApiRejectionHandler, ApiRoute, CompositeHttpService} import scorex.core.network._ import scorex.core.network.message._ @@ -101,10 +100,9 @@ trait Application extends ScorexLogging { log.debug(s"Max memory available: ${Runtime.getRuntime.maxMemory}") log.debug(s"RPC is allowed at ${settings.restApi.bindAddress.toString}") - implicit val materializer: ActorMaterializer = ActorMaterializer() val bindAddress = settings.restApi.bindAddress - Http().bindAndHandle(combinedRoute, bindAddress.getAddress.getHostAddress, bindAddress.getPort) + Http().newServerAt(bindAddress.getAddress.getHostAddress, bindAddress.getPort).bindFlow(combinedRoute) //on unexpected shutdown Runtime.getRuntime.addShutdownHook(new Thread() { diff --git a/src/main/scala/scorex/core/network/NetworkController.scala b/src/main/scala/scorex/core/network/NetworkController.scala index 9ff0786bf..d50023cd7 100644 --- a/src/main/scala/scorex/core/network/NetworkController.scala +++ b/src/main/scala/scorex/core/network/NetworkController.scala @@ -39,17 +39,17 @@ class NetworkController(settings: NetworkSettings, import akka.actor.SupervisorStrategy._ override val supervisorStrategy: OneForOneStrategy = OneForOneStrategy( - maxNrOfRetries = NetworkController.ChildActorHandlingRetriesNr, - withinTimeRange = 1.minute) { - case _: ActorKilledException => Stop - case _: DeathPactException => Stop - case e: ActorInitializationException => - log.warn(s"Stopping child actor failed with: $e") - Stop - case e: Exception => - log.warn(s"Restarting child actor failed with: $e") - Restart - } + maxNrOfRetries = NetworkController.ChildActorHandlingRetriesNr, + withinTimeRange = 1.minute) { + case _: ActorKilledException => Stop + case _: DeathPactException => Stop + case e: ActorInitializationException => + log.warn(s"Stopping child actor failed with: $e") + Stop + case e: Exception => + log.warn(s"Restarting child actor failed with: $e") + Restart + } private implicit val system: ActorSystem = context.system @@ -66,7 +66,7 @@ class NetworkController(settings: NetworkSettings, * Storing timestamp of a last message got via p2p network. * Used to check whether connectivity is lost. */ - private var lastIncomingMessageTime : TimeProvider.Time = 0L + private var lastIncomingMessageTime: TimeProvider.Time = 0L //check own declared address for validity validateDeclaredAddress() @@ -174,7 +174,7 @@ class NetworkController(settings: NetworkSettings, // If a message received from p2p within connection timeout, // connectivity is not lost thus we're removing the peer - if(networkTime() - lastIncomingMessageTime < settings.connectionTimeout.toMillis) { + if (networkTime() - lastIncomingMessageTime < settings.connectionTimeout.toMillis) { peerManagerRef ! RemovePeer(c.remoteAddress) } @@ -219,8 +219,8 @@ class NetworkController(settings: NetworkSettings, * Schedule a periodic connection to a random known peer */ private def scheduleConnectionToPeer(): Unit = { - context.system.scheduler.schedule(5.seconds, 5.seconds) { - if (connections.size < settings.maxConnections) { + context.system.scheduler.scheduleWithFixedDelay(5.seconds, 5.seconds) { + () => if (connections.size < settings.maxConnections) { log.debug(s"Looking for a new random connection") val randomPeerF = peerManagerRef ? RandomPeerExcluding(connections.values.flatMap(_.peerInfo).toSeq) randomPeerF.mapTo[Option[PeerInfo]].foreach { peerInfoOpt => @@ -234,16 +234,18 @@ class NetworkController(settings: NetworkSettings, * Schedule a periodic dropping of connections which seem to be inactive */ private def scheduleDroppingDeadConnections(): Unit = { - context.system.scheduler.schedule(60.seconds, 60.seconds) { - // Drop connections with peers if they seem to be inactive - val now = networkTime() - connections.values.foreach { cp => - val lastSeen = cp.lastMessage - val timeout = settings.inactiveConnectionDeadline.toMillis - val delta = now - lastSeen - if (delta > timeout) { - log.info(s"Dropping connection with ${cp.peerInfo}, last seen ${delta / 1000.0} seconds ago") - cp.handlerRef ! CloseConnection + context.system.scheduler.scheduleWithFixedDelay(60.seconds, 60.seconds) { + () => { + // Drop connections with peers if they seem to be inactive + val now = networkTime() + connections.values.foreach { cp => + val lastSeen = cp.lastMessage + val timeout = settings.inactiveConnectionDeadline.toMillis + val delta = now - lastSeen + if (delta > timeout) { + log.info(s"Dropping connection with ${cp.peerInfo}, last seen ${delta / 1000.0} seconds ago") + cp.handlerRef ! CloseConnection + } } } } @@ -278,7 +280,7 @@ class NetworkController(settings: NetworkSettings, * Creates a PeerConnectionHandler for the established connection * * @param connectionId - connection detailed info - * @param connection - connection ActorRef + * @param connection - connection ActorRef */ private def createPeerConnectionHandler(connectionId: ConnectionId, connection: ActorRef): Unit = { @@ -495,6 +497,7 @@ object NetworkController { * Get p2p network status */ case object GetPeersStatus + } } diff --git a/src/main/scala/scorex/core/network/PeerSynchronizer.scala b/src/main/scala/scorex/core/network/PeerSynchronizer.scala index 53237010c..0ebd92cda 100644 --- a/src/main/scala/scorex/core/network/PeerSynchronizer.scala +++ b/src/main/scala/scorex/core/network/PeerSynchronizer.scala @@ -40,7 +40,7 @@ class PeerSynchronizer(val networkControllerRef: ActorRef, val msg = Message[Unit](GetPeersSpec, Right(Unit), None) val stn = SendToNetwork(msg, SendToRandom) - context.system.scheduler.schedule(2.seconds, settings.getPeersInterval)(networkControllerRef ! stn) + context.system.scheduler.scheduleWithFixedDelay(2.seconds, settings.getPeersInterval, networkControllerRef, stn) } override def receive: Receive = { diff --git a/src/main/scala/scorex/core/network/SyncTracker.scala b/src/main/scala/scorex/core/network/SyncTracker.scala index ccc751e7d..5cce57941 100644 --- a/src/main/scala/scorex/core/network/SyncTracker.scala +++ b/src/main/scala/scorex/core/network/SyncTracker.scala @@ -38,7 +38,9 @@ class SyncTracker(nvsRef: ActorRef, schedule foreach { _.cancel() } - schedule = Some(context.system.scheduler.schedule(2.seconds, minInterval())(nvsRef ! SendLocalSyncInfo)) + + val syncTask = context.system.scheduler.scheduleWithFixedDelay(2.seconds, minInterval(),nvsRef, SendLocalSyncInfo) + schedule = Some(syncTask) } def maxInterval(): FiniteDuration = diff --git a/testkit/build.sbt b/testkit/build.sbt index e7f3e9ff4..0f52260d2 100644 --- a/testkit/build.sbt +++ b/testkit/build.sbt @@ -4,7 +4,7 @@ libraryDependencies ++= Seq( "org.scalactic" %% "scalactic" % "3.0.1", "org.scalatest" %% "scalatest" % "3.0.1", "org.scalacheck" %% "scalacheck" % "1.13.+", - "com.typesafe.akka" %% "akka-testkit" % "2.4.+" + "com.typesafe.akka" %% "akka-testkit" % "2.6.10" ) fork in Test := true