diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/Sirius.scala b/src/main/scala/com/comcast/xfinity/sirius/api/Sirius.scala index a917314e..8edcc6b4 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/Sirius.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/Sirius.scala @@ -41,3 +41,8 @@ trait Sirius { */ def isOnline: Boolean } + +trait Sirius1Dot3 extends Sirius { + def shutdown(): Unit +} + diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/SiriusFactory.scala b/src/main/scala/com/comcast/xfinity/sirius/api/SiriusFactory.scala new file mode 100644 index 00000000..bfe19579 --- /dev/null +++ b/src/main/scala/com/comcast/xfinity/sirius/api/SiriusFactory.scala @@ -0,0 +1,209 @@ +/* + * Copyright 2012-2014 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.comcast.xfinity.sirius.api + +import java.io.File +import java.lang.management.ManagementFactory +import java.net.InetAddress +import java.util.{HashMap => JHashMap} +import javax.management.ObjectName + +import com.comcast.xfinity.sirius.admin.ObjectNameHelper +import com.comcast.xfinity.sirius.api.impl.SiriusImpl +import com.comcast.xfinity.sirius.info.SiriusInfo +import com.comcast.xfinity.sirius.uberstore.UberStore +import com.comcast.xfinity.sirius.uberstore.segmented.SegmentedUberStore +import com.comcast.xfinity.sirius.util.AkkaExternalAddressResolver +import com.comcast.xfinity.sirius.writeaheadlog.{CachedSiriusLog, SiriusLog} +import com.typesafe.config.{Config, ConfigFactory} +import akka.actor.{ActorRef, ActorSystem} +import org.slf4j.LoggerFactory + +import scala.collection.JavaConverters._ + +/** + * Provides the factory for [[com.comcast.xfinity.sirius.api.impl.SiriusImpl]] instances + */ +object SiriusFactory { + val traceLog = LoggerFactory.getLogger("SiriusFactory") + + /** + * SiriusImpl factory method, takes parameters to construct a SiriusImplementation and the dependent + * ActorSystem and return the created instance. Calling shutdown on the produced SiriusImpl will also + * shutdown the dependent ActorSystem. + * + * @param requestHandler the RequestHandler containing callbacks for manipulating the system's state + * @param siriusConfig a SiriusConfiguration containing configuration info needed for this node. + * @see SiriusConfiguration for info on needed config. + * + * @return A SiriusImpl constructed using the parameters + */ + def createInstance(requestHandler: RequestHandler, siriusConfig: SiriusConfiguration): Sirius1Dot3 = { + val uberStoreDir = siriusConfig.getProp[String](SiriusConfiguration.LOG_LOCATION) match { + case Some(dir) => dir + case None => + throw new IllegalArgumentException(SiriusConfiguration.LOG_LOCATION + " must be set on config") + } + + val backendLog = { + siriusConfig.getProp(SiriusConfiguration.LOG_VERSION_ID, "") match { + case version if version == SegmentedUberStore.versionId => SegmentedUberStore(uberStoreDir, siriusConfig) + case _ => UberStore(uberStoreDir) + } + } + + val log: SiriusLog = { + if (siriusConfig.getProp(SiriusConfiguration.LOG_USE_WRITE_CACHE, true)) { + val cacheSize = siriusConfig.getProp(SiriusConfiguration.LOG_WRITE_CACHE_SIZE, 10000) + CachedSiriusLog(backendLog, cacheSize) + } else { + backendLog + } + } + + createInstance(requestHandler, siriusConfig, log) + } + + /** + * USE ONLY FOR TESTING HOOK WHEN YOU NEED TO MOCK OUT A LOG. + * Real code should use the two argument factory method. + * + * @param requestHandler the RequestHandler containing callbacks for manipulating the system's state + * @param siriusConfig a SiriusConfiguration containing configuration info needed for this node. + * @see SiriusConfiguration for info on needed config. + * @param siriusLog the persistence layer to which events should be committed to and replayed from. + * + * @return A SiriusImpl constructed using the parameters + */ + private[sirius] def createInstance(requestHandler: RequestHandler, siriusConfig: SiriusConfiguration, + siriusLog: SiriusLog): Sirius1Dot3 = { + + val systemName = siriusConfig.getProp(SiriusConfiguration.AKKA_SYSTEM_NAME, "sirius-system") + + implicit val actorSystem = ActorSystem(systemName, createActorSystemConfig(siriusConfig)) + + // inject an mbean server, without regard for the one that may have been there + val mbeanServer = ManagementFactory.getPlatformMBeanServer + siriusConfig.setProp(SiriusConfiguration.MBEAN_SERVER, mbeanServer) + + // inject AkkaExternalAddressResolver + siriusConfig.setProp(SiriusConfiguration.AKKA_EXTERNAL_ADDRESS_RESOLVER, AkkaExternalAddressResolver(actorSystem) (siriusConfig)) + + // here it is! the real deal creation + val impl = SiriusImpl(requestHandler, siriusLog, siriusConfig) + + // create a SiriusInfo MBean which will remain registered until we explicitly shutdown sirius + val (siriusInfoObjectName, siriusInfo) = createSiriusInfoMBean(actorSystem, impl.supervisor)(siriusConfig) + mbeanServer.registerMBean(siriusInfo, siriusInfoObjectName) + + // need to shut down the actor system and unregister the mbeans when sirius is done + impl.onShutdown({ + actorSystem.shutdown() + actorSystem.awaitTermination() + mbeanServer.unregisterMBean(siriusInfoObjectName) + }) + + impl + } + + private def createSiriusInfoMBean(actorSystem: ActorSystem, siriusSup: ActorRef) + (siriusConfig: SiriusConfiguration): (ObjectName, SiriusInfo) = { + val resolver = siriusConfig.getProp[AkkaExternalAddressResolver](SiriusConfiguration.AKKA_EXTERNAL_ADDRESS_RESOLVER). + getOrElse(throw new IllegalStateException("SiriusConfiguration.AKKA_EXTERNAL_ADDRESS_RESOLVER returned nothing")) + val siriusInfo = new SiriusInfo(actorSystem, siriusSup, resolver) + val objectNameHelper = new ObjectNameHelper + val siriusInfoObjectName = objectNameHelper.getObjectName(siriusInfo, siriusSup, actorSystem)(siriusConfig) + (siriusInfoObjectName, siriusInfo) + } + + /** + * Creates configuration for the ActorSystem. The config precedence is as follows: + * 1) host/port config trump all + * 2) siriusConfig supplied external config next + * 3) sirius-akka-base.conf, packaged with sirius, loaded with ConfigFactory.load + */ + private def createActorSystemConfig(siriusConfig: SiriusConfiguration): Config = { + val hostPortConfig = createHostPortConfig(siriusConfig) + val externalConfig = createExternalConfig(siriusConfig) + val baseAkkaConfig = ConfigFactory.load("sirius-akka-base.conf") + + hostPortConfig.withFallback(externalConfig).withFallback(baseAkkaConfig) + } + + private def createHostPortConfig(siriusConfig: SiriusConfiguration): Config = { + val configMap = new JHashMap[String, Any]() + val sslEnabled = siriusConfig.getProp(SiriusConfiguration.ENABLE_SSL, false) + val transportPrefix = if (sslEnabled) "akka.remote.netty.ssl" else "akka.remote.netty.tcp" + traceLog.info(s"AKKA using transport: $transportPrefix") + + configMap.put("akka.remote.enabled-transports", List(transportPrefix).asJava) + configMap.put(s"$transportPrefix.hostname", + siriusConfig.getProp(SiriusConfiguration.HOST, InetAddress.getLocalHost.getHostName)) + configMap.put(s"$transportPrefix.port", siriusConfig.getProp(SiriusConfiguration.PORT, 2552)) + + val maxMessageSize = siriusConfig.getProp(SiriusConfiguration.MAX_AKKA_MESSAGE_SIZE_KB, "1024") + val bufferSize = maxMessageSize * 2 + configMap.put(s"$transportPrefix.maximum-frame-size", s"${maxMessageSize}k") + configMap.put(s"$transportPrefix.send-buffer-size", s"${bufferSize}k") + configMap.put(s"$transportPrefix.receive-buffer-size", s"${bufferSize}k") + + if (sslEnabled) { + configMap.put(s"$transportPrefix.random-number-generator", + siriusConfig.getProp(SiriusConfiguration.SSL_RANDOM_NUMBER_GENERATOR, "")) + + configMap.put(s"$transportPrefix.security.key-store", + siriusConfig.getProp(SiriusConfiguration.KEY_STORE_LOCATION, + throw new IllegalArgumentException("No key-store provided"))) + + configMap.put(s"$transportPrefix.security.trust-store", + siriusConfig.getProp(SiriusConfiguration.TRUST_STORE_LOCATION, + throw new IllegalArgumentException("No trust-store provided"))) + + configMap.put(s"$transportPrefix.security.key-store-password", + siriusConfig.getProp(SiriusConfiguration.KEY_STORE_PASSWORD, + throw new IllegalArgumentException("No key-store-password value provided"))) + + configMap.put(s"$transportPrefix.security.key-password", + siriusConfig.getProp(SiriusConfiguration.KEY_PASSWORD, + throw new IllegalArgumentException("No key-password value provided"))) + + configMap.put(s"$transportPrefix.security.trust-store-password", + siriusConfig.getProp(SiriusConfiguration.TRUST_STORE_PASSWORD, + throw new IllegalArgumentException("No trust-store-password value provided"))) + } + + // this is just so that the intellij shuts up + ConfigFactory.parseMap(configMap.asInstanceOf[JHashMap[String, _ <: AnyRef]]) + } + + /** + * If siriusConfig is such configured, will load up an external configuration + * for the Akka ActorSystem which is created. The filesystem is checked first, + * then the classpath, if neither exist, or siriusConfig is not configured as + * much, then an empty Config object is returned. + */ + private def createExternalConfig(siriusConfig: SiriusConfiguration): Config = + siriusConfig.getProp[String](SiriusConfiguration.AKKA_EXTERN_CONFIG) match { + case None => ConfigFactory.empty() + case Some(externConfig) => + val externConfigFile = new File(externConfig) + if (externConfigFile.exists()) { + ConfigFactory.parseFile(externConfigFile).resolve() + } else { + ConfigFactory.parseResources(externConfig).resolve() + } + } +} diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusFactory.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusFactory.scala index dc75e543..ad72ec79 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusFactory.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusFactory.scala @@ -23,6 +23,7 @@ import java.util.{HashMap => JHashMap} import com.comcast.xfinity.sirius.admin.ObjectNameHelper import com.comcast.xfinity.sirius.api.RequestHandler import com.comcast.xfinity.sirius.api.SiriusConfiguration +import com.comcast.xfinity.sirius.api.{SiriusFactory => NewSiriusFactory} import com.comcast.xfinity.sirius.info.SiriusInfo import com.comcast.xfinity.sirius.writeaheadlog.CachedSiriusLog import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog @@ -39,11 +40,15 @@ import scala.collection.JavaConverters._ import org.slf4j.LoggerFactory /** - * Provides the factory for [[com.comcast.xfinity.sirius.api.impl.SiriusImpl]] instances + * Provides the factory for [[com.comcast.xfinity.sirius.api.impl.SiriusImpl]] instances. Please note that + * this is no longer the recommended way of creating a Sirius implementation; this factory ought to have + * lived in the top-level API package, and really ought to have returned a trait rather than a concrete. + * This implementation is preserved for backwards compatibility, although it delegates to a corrected + * [[com.comcast.xfinity.sirius.api.SiriusFactory]]. */ +@deprecated(message = "Please use com.comcast.xfinity.sirius.api.SiriusFactory instead", + since = "1.3.0") object SiriusFactory { - val traceLog = LoggerFactory.getLogger("SiriusFactory") - /** * SiriusImpl factory method, takes parameters to construct a SiriusImplementation and the dependent * ActorSystem and return the created instance. Calling shutdown on the produced SiriusImpl will also @@ -56,29 +61,7 @@ object SiriusFactory { * @return A SiriusImpl constructed using the parameters */ def createInstance(requestHandler: RequestHandler, siriusConfig: SiriusConfiguration): SiriusImpl = { - val uberStoreDir = siriusConfig.getProp[String](SiriusConfiguration.LOG_LOCATION) match { - case Some(dir) => dir - case None => - throw new IllegalArgumentException(SiriusConfiguration.LOG_LOCATION + " must be set on config") - } - - val backendLog = { - siriusConfig.getProp(SiriusConfiguration.LOG_VERSION_ID, "") match { - case version if version == SegmentedUberStore.versionId => SegmentedUberStore(uberStoreDir, siriusConfig) - case _ => UberStore(uberStoreDir) - } - } - - val log: SiriusLog = { - if (siriusConfig.getProp(SiriusConfiguration.LOG_USE_WRITE_CACHE, true)) { - val cacheSize = siriusConfig.getProp(SiriusConfiguration.LOG_WRITE_CACHE_SIZE, 10000) - CachedSiriusLog(backendLog, cacheSize) - } else { - backendLog - } - } - - createInstance(requestHandler, siriusConfig, log) + NewSiriusFactory.createInstance(requestHandler, siriusConfig).asInstanceOf[SiriusImpl] } /** @@ -94,120 +77,7 @@ object SiriusFactory { */ private[sirius] def createInstance(requestHandler: RequestHandler, siriusConfig: SiriusConfiguration, siriusLog: SiriusLog): SiriusImpl = { - - val systemName = siriusConfig.getProp(SiriusConfiguration.AKKA_SYSTEM_NAME, "sirius-system") - - implicit val actorSystem = ActorSystem(systemName, createActorSystemConfig(siriusConfig)) - - // inject an mbean server, without regard for the one that may have been there - val mbeanServer = ManagementFactory.getPlatformMBeanServer - siriusConfig.setProp(SiriusConfiguration.MBEAN_SERVER, mbeanServer) - - // inject AkkaExternalAddressResolver - siriusConfig.setProp(SiriusConfiguration.AKKA_EXTERNAL_ADDRESS_RESOLVER, AkkaExternalAddressResolver(actorSystem) (siriusConfig)) - - // here it is! the real deal creation - val impl = SiriusImpl(requestHandler, siriusLog, siriusConfig) - - // create a SiriusInfo MBean which will remain registered until we explicitly shutdown sirius - val (siriusInfoObjectName, siriusInfo) = createSiriusInfoMBean(actorSystem, impl.supervisor)(siriusConfig) - mbeanServer.registerMBean(siriusInfo, siriusInfoObjectName) - - // need to shut down the actor system and unregister the mbeans when sirius is done - impl.onShutdown({ - actorSystem.shutdown() - actorSystem.awaitTermination() - mbeanServer.unregisterMBean(siriusInfoObjectName) - }) - - impl - } - - private def createSiriusInfoMBean(actorSystem: ActorSystem, siriusSup: ActorRef) - (siriusConfig: SiriusConfiguration): (ObjectName, SiriusInfo) = { - val resolver = siriusConfig.getProp[AkkaExternalAddressResolver](SiriusConfiguration.AKKA_EXTERNAL_ADDRESS_RESOLVER). - getOrElse(throw new IllegalStateException("SiriusConfiguration.AKKA_EXTERNAL_ADDRESS_RESOLVER returned nothing")) - val siriusInfo = new SiriusInfo(actorSystem, siriusSup, resolver) - val objectNameHelper = new ObjectNameHelper - val siriusInfoObjectName = objectNameHelper.getObjectName(siriusInfo, siriusSup, actorSystem)(siriusConfig) - (siriusInfoObjectName, siriusInfo) - } - - /** - * Creates configuration for the ActorSystem. The config precedence is as follows: - * 1) host/port config trump all - * 2) siriusConfig supplied external config next - * 3) sirius-akka-base.conf, packaged with sirius, loaded with ConfigFactory.load - */ - private def createActorSystemConfig(siriusConfig: SiriusConfiguration): Config = { - val hostPortConfig = createHostPortConfig(siriusConfig) - val externalConfig = createExternalConfig(siriusConfig) - val baseAkkaConfig = ConfigFactory.load("sirius-akka-base.conf") - - hostPortConfig.withFallback(externalConfig).withFallback(baseAkkaConfig) - } - - private def createHostPortConfig(siriusConfig: SiriusConfiguration): Config = { - val configMap = new JHashMap[String, Any]() - val sslEnabled = siriusConfig.getProp(SiriusConfiguration.ENABLE_SSL, false) - val transportPrefix = if (sslEnabled) "akka.remote.netty.ssl" else "akka.remote.netty.tcp" - traceLog.info(s"AKKA using transport: $transportPrefix") - - configMap.put("akka.remote.enabled-transports", List(transportPrefix).asJava) - configMap.put(s"$transportPrefix.hostname", - siriusConfig.getProp(SiriusConfiguration.HOST, InetAddress.getLocalHost.getHostName)) - configMap.put(s"$transportPrefix.port", siriusConfig.getProp(SiriusConfiguration.PORT, 2552)) - - val maxMessageSize = siriusConfig.getProp(SiriusConfiguration.MAX_AKKA_MESSAGE_SIZE_KB, "1024") - val bufferSize = maxMessageSize * 2 - configMap.put(s"$transportPrefix.maximum-frame-size", s"${maxMessageSize}k") - configMap.put(s"$transportPrefix.send-buffer-size", s"${bufferSize}k") - configMap.put(s"$transportPrefix.receive-buffer-size", s"${bufferSize}k") - - if (sslEnabled) { - configMap.put(s"$transportPrefix.random-number-generator", - siriusConfig.getProp(SiriusConfiguration.SSL_RANDOM_NUMBER_GENERATOR, "")) - - configMap.put(s"$transportPrefix.security.key-store", - siriusConfig.getProp(SiriusConfiguration.KEY_STORE_LOCATION, - throw new IllegalArgumentException("No key-store provided"))) - - configMap.put(s"$transportPrefix.security.trust-store", - siriusConfig.getProp(SiriusConfiguration.TRUST_STORE_LOCATION, - throw new IllegalArgumentException("No trust-store provided"))) - - configMap.put(s"$transportPrefix.security.key-store-password", - siriusConfig.getProp(SiriusConfiguration.KEY_STORE_PASSWORD, - throw new IllegalArgumentException("No key-store-password value provided"))) - - configMap.put(s"$transportPrefix.security.key-password", - siriusConfig.getProp(SiriusConfiguration.KEY_PASSWORD, - throw new IllegalArgumentException("No key-password value provided"))) - - configMap.put(s"$transportPrefix.security.trust-store-password", - siriusConfig.getProp(SiriusConfiguration.TRUST_STORE_PASSWORD, - throw new IllegalArgumentException("No trust-store-password value provided"))) - } - - // this is just so that the intellij shuts up - ConfigFactory.parseMap(configMap.asInstanceOf[JHashMap[String, _ <: AnyRef]]) + NewSiriusFactory.createInstance(requestHandler, siriusConfig, siriusLog).asInstanceOf[SiriusImpl] } - /** - * If siriusConfig is such configured, will load up an external configuration - * for the Akka ActorSystem which is created. The filesystem is checked first, - * then the classpath, if neither exist, or siriusConfig is not configured as - * much, then an empty Config object is returned. - */ - private def createExternalConfig(siriusConfig: SiriusConfiguration): Config = - siriusConfig.getProp[String](SiriusConfiguration.AKKA_EXTERN_CONFIG) match { - case None => ConfigFactory.empty() - case Some(externConfig) => - val externConfigFile = new File(externConfig) - if (externConfigFile.exists()) { - ConfigFactory.parseFile(externConfigFile).resolve() - } else { - ConfigFactory.parseResources(externConfig).resolve() - } - } } diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusImpl.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusImpl.scala index 416c93bc..5a6be810 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusImpl.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusImpl.scala @@ -17,7 +17,7 @@ package com.comcast.xfinity.sirius.api.impl import compat.AkkaFutureAdapter import com.comcast.xfinity.sirius.api.RequestHandler -import com.comcast.xfinity.sirius.api.Sirius +import com.comcast.xfinity.sirius.api.Sirius1Dot3 import akka.pattern.ask import membership.MembershipActor._ import com.comcast.xfinity.sirius.api.SiriusResult @@ -62,7 +62,7 @@ object SiriusImpl { * @param actorSystem the actorSystem to use to create the Actors for Sirius */ class SiriusImpl(config: SiriusConfiguration, supProps: Props)(implicit val actorSystem: ActorSystem) - extends Sirius { + extends Sirius1Dot3 { val supName = config.getProp(SiriusConfiguration.SIRIUS_SUPERVISOR_NAME, "sirius") implicit val timeout: Timeout = diff --git a/src/test/scala/com/comcast/xfinity/sirius/itest/FullSystemITest.scala b/src/test/scala/com/comcast/xfinity/sirius/itest/FullSystemITest.scala index afd84537..1054f18c 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/itest/FullSystemITest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/itest/FullSystemITest.scala @@ -20,16 +20,15 @@ import scalax.file.Path import com.comcast.xfinity.sirius.{LatchedRequestHandler, TimedTest, NiceTest} import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog import java.io.File -import com.comcast.xfinity.sirius.api.impl._ import util.Random -import com.comcast.xfinity.sirius.api.impl.OrderedEvent +import com.comcast.xfinity.sirius.api.impl.{SiriusImpl, NonCommutativeSiriusRequest, OrderedEvent} import scala.Some import scala.Tuple2 import java.util.UUID import com.comcast.xfinity.sirius.uberstore.UberStore import com.comcast.xfinity.sirius.api.impl.SiriusSupervisor.CheckPaxosMembership import annotation.tailrec -import com.comcast.xfinity.sirius.api.{SiriusResult, RequestHandler, SiriusConfiguration} +import com.comcast.xfinity.sirius.api._ import com.comcast.xfinity.sirius.api.impl.membership.MembershipActor.CheckClusterConfig import org.slf4j.LoggerFactory import com.comcast.xfinity.sirius.uberstore.segmented.SegmentedUberStore @@ -134,7 +133,7 @@ class FullSystemITest extends NiceTest with TimedTest { assert(waitForTrue(sirius.isOnline, 2000, 500), "Failed while waiting for sirius to boot") - (sirius, finalHandler, finalWal) + (sirius.asInstanceOf[SiriusImpl], finalHandler, finalWal) } var tempDir: File = _