diff --git a/src/main/scala/com/comcast/xfinity/sirius/SiriusFactory.scala b/src/main/scala/com/comcast/xfinity/sirius/SiriusFactory.scala new file mode 100644 index 00000000..98640729 --- /dev/null +++ b/src/main/scala/com/comcast/xfinity/sirius/SiriusFactory.scala @@ -0,0 +1,183 @@ +/* + * Copyright 2012-2014 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.comcast.xfinity.sirius + +import java.io.File +import java.lang.management.ManagementFactory +import java.net.InetAddress +import java.util.{HashMap => JHashMap} + +import com.comcast.xfinity.sirius.admin.ObjectNameHelper +import com.comcast.xfinity.sirius.api.RequestHandler +import com.comcast.xfinity.sirius.api.{Sirius, SiriusConfiguration} +import com.comcast.xfinity.sirius.api.impl.SiriusImpl +import com.comcast.xfinity.sirius.info.SiriusInfo +import com.comcast.xfinity.sirius.writeaheadlog.CachedSiriusLog +import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory + +import akka.actor.ActorRef +import akka.actor.ActorSystem +import javax.management.ObjectName +import com.comcast.xfinity.sirius.uberstore.segmented.SegmentedUberStore +import com.comcast.xfinity.sirius.uberstore.UberStore + +/** + * Factory for [[com.comcast.xfinity.sirius.api.Sirius]] instances + */ +object SiriusFactory { + + /** + * Sirius factory method, takes parameters to construct a + * SiriusImpl and the dependent ActorSystem and return the + * created instance. Calling shutdown on the produced SiriusImpl + * will also shutdown the dependent ActorSystem. + * + * @param requestHandler the RequestHandler containing callbacks for + * manipulating the system's state + * @param siriusConfig a SiriusConfiguration containing configuration + * info needed for this node. + * @see SiriusConfiguration for info on needed config. + * + * @return A Sirius instance constructed using the parameters + */ + def createInstance(requestHandler: RequestHandler, + siriusConfig: SiriusConfiguration): Sirius = { + + val uberStoreDir = siriusConfig.getProp[String](SiriusConfiguration.LOG_LOCATION) match { + case Some(dir) => dir + case None => + throw new IllegalArgumentException(SiriusConfiguration.LOG_LOCATION + + " must be set on config") + } + + val backendLog = { + siriusConfig.getProp(SiriusConfiguration.LOG_VERSION_ID, "") match { + case version if version == SegmentedUberStore.versionId => + SegmentedUberStore(uberStoreDir, siriusConfig) + case _ => UberStore(uberStoreDir) + } + } + + val log: SiriusLog = { + if (siriusConfig.getProp(SiriusConfiguration.LOG_USE_WRITE_CACHE, true)) { + val cacheSize = siriusConfig.getProp(SiriusConfiguration.LOG_WRITE_CACHE_SIZE, 10000) + CachedSiriusLog(backendLog, cacheSize) + } else { + backendLog + } + } + + createInstance(requestHandler, siriusConfig, log) + } + + /** + * USE ONLY FOR TESTING HOOK WHEN YOU NEED TO MOCK OUT A LOG. + * Real code should use the two argument factory method. + * + * @param requestHandler the RequestHandler containing callbacks for + * manipulating the system's state + * @param siriusConfig a SiriusConfiguration containing configuration + * info needed for this node. + * @see SiriusConfiguration for info on needed config. + * @param siriusLog the persistence layer to which events should be + * committed to and replayed from. + * + * @return A SiriusImpl constructed using the parameters (note: this + * differs from the 2-arg method, which returns a Sirius trait) + */ + private[sirius] def createInstance(requestHandler: RequestHandler, + siriusConfig: SiriusConfiguration, + siriusLog: SiriusLog): SiriusImpl = { + + val systemName = siriusConfig.getProp(SiriusConfiguration.AKKA_SYSTEM_NAME, "sirius-system") + + implicit val actorSystem = ActorSystem(systemName, createActorSystemConfig(siriusConfig)) + + // inject an mbean server, without regard for the one that may have been there + val mbeanServer = ManagementFactory.getPlatformMBeanServer + siriusConfig.setProp(SiriusConfiguration.MBEAN_SERVER, mbeanServer) + + // here it is! the real deal creation + val impl = SiriusImpl(requestHandler, siriusLog, siriusConfig) + + // create a SiriusInfo MBean which will remain registered until we explicity shutdown sirius + val (siriusInfoObjectName, siriusInfo) = createSiriusInfoMBean(actorSystem, impl.supervisor) + mbeanServer.registerMBean(siriusInfo, siriusInfoObjectName) + + // need to shut down the actor system and unregister the mbeans when sirius is done + impl.onShutdown({ + actorSystem.shutdown() + actorSystem.awaitTermination() + mbeanServer.unregisterMBean(siriusInfoObjectName) + }) + + impl + } + + private def createSiriusInfoMBean(actorSystem: ActorSystem, siriusSup: ActorRef): (ObjectName, SiriusInfo) = { + val siriusInfo = new SiriusInfo(actorSystem, siriusSup) + val objectNameHelper = new ObjectNameHelper + val siriusInfoObjectName = objectNameHelper.getObjectName(siriusInfo, siriusSup, actorSystem) + (siriusInfoObjectName, siriusInfo) + } + + /** + * Creates configuration for the ActorSystem. The config precedence is as follows: + * 1) host/port config trump all + * 2) siriusConfig supplied external config next + * 3) sirius-akka-base.conf, packaged with sirius, loaded with ConfigFactory.load + */ + private def createActorSystemConfig(siriusConfig: SiriusConfiguration): Config = { + val hostPortConfig = createHostPortConfig(siriusConfig) + val externalConfig = createExternalConfig(siriusConfig) + + val baseAkkaConfig = ConfigFactory.load("sirius-akka-base.conf") + + hostPortConfig.withFallback(externalConfig).withFallback(baseAkkaConfig) + } + + private def createHostPortConfig(siriusConfig: SiriusConfiguration): Config = { + val configMap = new JHashMap[String, Any]() + + configMap.put("akka.remote.netty.tcp.hostname", + siriusConfig.getProp(SiriusConfiguration.HOST, InetAddress.getLocalHost.getHostName)) + configMap.put("akka.remote.netty.tcp.port", + siriusConfig.getProp(SiriusConfiguration.PORT, 2552)) + + // this is just so that the intellij shuts up + ConfigFactory.parseMap(configMap.asInstanceOf[JHashMap[String, _ <: AnyRef]]) + } + + /** + * If siriusConfig is such configured, will load up an external configuration + * for the Akka ActorSystem which is created. The filesystem is checked first, + * then the classpath, if neither exist, or siriusConfig is not configured as + * much, then an empty Config object is returned. + */ + private def createExternalConfig(siriusConfig: SiriusConfiguration): Config = + siriusConfig.getProp[String](SiriusConfiguration.AKKA_EXTERN_CONFIG) match { + case None => ConfigFactory.empty() + case Some(externConfig) => + val externConfigFile = new File(externConfig) + if (externConfigFile.exists()) { + ConfigFactory.parseFile(externConfigFile) + } else { + ConfigFactory.parseResources(externConfig) + } + } +} diff --git a/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusFactory.scala b/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusFactory.scala index c0cf78c0..a5e38229 100644 --- a/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusFactory.scala +++ b/src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusFactory.scala @@ -1,5 +1,5 @@ /* - * Copyright 2012-2013 Comcast Cable Communications Management, LLC + * Copyright 2012-2014 Comcast Cable Communications Management, LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,156 +15,33 @@ */ package com.comcast.xfinity.sirius.api.impl -import java.io.File -import java.lang.management.ManagementFactory -import java.net.InetAddress -import java.util.{HashMap => JHashMap} - -import com.comcast.xfinity.sirius.admin.ObjectNameHelper +import com.comcast.xfinity.sirius.{SiriusFactory => NewFactory} import com.comcast.xfinity.sirius.api.RequestHandler import com.comcast.xfinity.sirius.api.SiriusConfiguration -import com.comcast.xfinity.sirius.info.SiriusInfo -import com.comcast.xfinity.sirius.writeaheadlog.CachedSiriusLog import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog -import com.typesafe.config.Config -import com.typesafe.config.ConfigFactory - -import akka.actor.ActorRef -import akka.actor.ActorSystem -import javax.management.ObjectName -import com.comcast.xfinity.sirius.uberstore.segmented.SegmentedUberStore -import com.comcast.xfinity.sirius.uberstore.UberStore /** - * Provides the factory for [[com.comcast.xfinity.sirius.api.impl.SiriusImpl]] instances + * Compatibility wrapper for factory methods now in top-level sirius package. */ object SiriusFactory { /** - * SiriusImpl factory method, takes parameters to construct a SiriusImplementation and the dependent - * ActorSystem and return the created instance. Calling shutdown on the produced SiriusImpl will also - * shutdown the dependent ActorSystem. - * - * @param requestHandler the RequestHandler containing callbacks for manipulating the system's state - * @param siriusConfig a SiriusConfiguration containing configuration info needed for this node. - * @see SiriusConfiguration for info on needed config. - * - * @return A SiriusImpl constructed using the parameters + * Deprecated - see [[com.comcast.xfinity.sirius.SiriusFactory]] */ - def createInstance(requestHandler: RequestHandler, siriusConfig: SiriusConfiguration): SiriusImpl = { - val uberStoreDir = siriusConfig.getProp[String](SiriusConfiguration.LOG_LOCATION) match { - case Some(dir) => dir - case None => - throw new IllegalArgumentException(SiriusConfiguration.LOG_LOCATION + " must be set on config") - } - - val backendLog = { - siriusConfig.getProp(SiriusConfiguration.LOG_VERSION_ID, "") match { - case version if version == SegmentedUberStore.versionId => SegmentedUberStore(uberStoreDir, siriusConfig) - case _ => UberStore(uberStoreDir) - } - } - - val log: SiriusLog = { - if (siriusConfig.getProp(SiriusConfiguration.LOG_USE_WRITE_CACHE, true)) { - val cacheSize = siriusConfig.getProp(SiriusConfiguration.LOG_WRITE_CACHE_SIZE, 10000) - CachedSiriusLog(backendLog, cacheSize) - } else { - backendLog - } - } - - createInstance(requestHandler, siriusConfig, log) + @deprecated("see top-level SiriusFactory object", "1.2.0") + def createInstance(requestHandler: RequestHandler, + siriusConfig: SiriusConfiguration): SiriusImpl = { + val res = NewFactory.createInstance(requestHandler, siriusConfig) + res.asInstanceOf[SiriusImpl] } /** - * USE ONLY FOR TESTING HOOK WHEN YOU NEED TO MOCK OUT A LOG. - * Real code should use the two argument factory method. - * - * @param requestHandler the RequestHandler containing callbacks for manipulating the system's state - * @param siriusConfig a SiriusConfiguration containing configuration info needed for this node. - * @see SiriusConfiguration for info on needed config. - * @param siriusLog the persistence layer to which events should be committed to and replayed from. - * - * @return A SiriusImpl constructed using the parameters + * Deprecated - see [[com.comcast.xfinity.sirius.SiriusFactory]] */ - private[sirius] def createInstance(requestHandler: RequestHandler, siriusConfig: SiriusConfiguration, - siriusLog: SiriusLog): SiriusImpl = { - - val systemName = siriusConfig.getProp(SiriusConfiguration.AKKA_SYSTEM_NAME, "sirius-system") - - implicit val actorSystem = ActorSystem(systemName, createActorSystemConfig(siriusConfig)) - - // inject an mbean server, without regard for the one that may have been there - val mbeanServer = ManagementFactory.getPlatformMBeanServer - siriusConfig.setProp(SiriusConfiguration.MBEAN_SERVER, mbeanServer) - - // here it is! the real deal creation - val impl = SiriusImpl(requestHandler, siriusLog, siriusConfig) - - // create a SiriusInfo MBean which will remain registered until we explicity shutdown sirius - val (siriusInfoObjectName, siriusInfo) = createSiriusInfoMBean(actorSystem, impl.supervisor) - mbeanServer.registerMBean(siriusInfo, siriusInfoObjectName) - - // need to shut down the actor system and unregister the mbeans when sirius is done - impl.onShutdown({ - actorSystem.shutdown() - actorSystem.awaitTermination() - mbeanServer.unregisterMBean(siriusInfoObjectName) - }) - - impl - } - - private def createSiriusInfoMBean(actorSystem: ActorSystem, siriusSup: ActorRef): (ObjectName, SiriusInfo) = { - val siriusInfo = new SiriusInfo(actorSystem, siriusSup) - val objectNameHelper = new ObjectNameHelper - val siriusInfoObjectName = objectNameHelper.getObjectName(siriusInfo, siriusSup, actorSystem) - (siriusInfoObjectName, siriusInfo) + @deprecated("see top-level SiriusFactory object", "1.2.0") + private[sirius] def createInstance(requestHandler: RequestHandler, + siriusConfig: SiriusConfiguration, + siriusLog: SiriusLog): SiriusImpl = { + NewFactory.createInstance(requestHandler, siriusConfig, siriusLog) } - - /** - * Creates configuration for the ActorSystem. The config precedence is as follows: - * 1) host/port config trump all - * 2) siriusConfig supplied external config next - * 3) sirius-akka-base.conf, packaged with sirius, loaded with ConfigFactory.load - */ - private def createActorSystemConfig(siriusConfig: SiriusConfiguration): Config = { - val hostPortConfig = createHostPortConfig(siriusConfig) - val externalConfig = createExternalConfig(siriusConfig) - - val baseAkkaConfig = ConfigFactory.load("sirius-akka-base.conf") - - hostPortConfig.withFallback(externalConfig).withFallback(baseAkkaConfig) - } - - private def createHostPortConfig(siriusConfig: SiriusConfiguration): Config = { - val configMap = new JHashMap[String, Any]() - - configMap.put("akka.remote.netty.tcp.hostname", - siriusConfig.getProp(SiriusConfiguration.HOST, InetAddress.getLocalHost.getHostName)) - configMap.put("akka.remote.netty.tcp.port", - siriusConfig.getProp(SiriusConfiguration.PORT, 2552)) - - // this is just so that the intellij shuts up - ConfigFactory.parseMap(configMap.asInstanceOf[JHashMap[String, _ <: AnyRef]]) - } - - /** - * If siriusConfig is such configured, will load up an external configuration - * for the Akka ActorSystem which is created. The filesystem is checked first, - * then the classpath, if neither exist, or siriusConfig is not configured as - * much, then an empty Config object is returned. - */ - private def createExternalConfig(siriusConfig: SiriusConfiguration): Config = - siriusConfig.getProp[String](SiriusConfiguration.AKKA_EXTERN_CONFIG) match { - case None => ConfigFactory.empty() - case Some(externConfig) => - val externConfigFile = new File(externConfig) - if (externConfigFile.exists()) { - ConfigFactory.parseFile(externConfigFile) - } else { - ConfigFactory.parseResources(externConfig) - } - } } diff --git a/src/test/scala/com/comcast/xfinity/sirius/itest/FullSystemITest.scala b/src/test/scala/com/comcast/xfinity/sirius/itest/FullSystemITest.scala index 76f729e9..25a6ec10 100644 --- a/src/test/scala/com/comcast/xfinity/sirius/itest/FullSystemITest.scala +++ b/src/test/scala/com/comcast/xfinity/sirius/itest/FullSystemITest.scala @@ -1,5 +1,5 @@ /* - * Copyright 2012-2013 Comcast Cable Communications Management, LLC + * Copyright 2012-2014 Comcast Cable Communications Management, LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ import java.util.UUID import com.comcast.xfinity.sirius.uberstore.UberStore import com.comcast.xfinity.sirius.api.impl.SiriusSupervisor.CheckPaxosMembership import annotation.tailrec +import com.comcast.xfinity.sirius.SiriusFactory import com.comcast.xfinity.sirius.api.{SiriusResult, RequestHandler, SiriusConfiguration} import com.comcast.xfinity.sirius.api.impl.membership.MembershipActor.CheckClusterConfig import org.slf4j.LoggerFactory