Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move SiriusFactory to api package (per issue #11) #23

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
183 changes: 183 additions & 0 deletions src/main/scala/com/comcast/xfinity/sirius/SiriusFactory.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Copyright 2012-2014 Comcast Cable Communications Management, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.comcast.xfinity.sirius

import java.io.File
import java.lang.management.ManagementFactory
import java.net.InetAddress
import java.util.{HashMap => JHashMap}

import com.comcast.xfinity.sirius.admin.ObjectNameHelper
import com.comcast.xfinity.sirius.api.RequestHandler
import com.comcast.xfinity.sirius.api.{Sirius, SiriusConfiguration}
import com.comcast.xfinity.sirius.api.impl.SiriusImpl
import com.comcast.xfinity.sirius.info.SiriusInfo
import com.comcast.xfinity.sirius.writeaheadlog.CachedSiriusLog
import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory

import akka.actor.ActorRef
import akka.actor.ActorSystem
import javax.management.ObjectName
import com.comcast.xfinity.sirius.uberstore.segmented.SegmentedUberStore
import com.comcast.xfinity.sirius.uberstore.UberStore

/**
* Factory for [[com.comcast.xfinity.sirius.api.Sirius]] instances
*/
object SiriusFactory {

/**
* Sirius factory method, takes parameters to construct a
* SiriusImpl and the dependent ActorSystem and return the
* created instance. Calling shutdown on the produced SiriusImpl
* will also shutdown the dependent ActorSystem.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also need to revamp this a little. Sirius trait doesn't have shutdown(), for one, and we're return Sirius instead of SiriusImpl.

*
* @param requestHandler the RequestHandler containing callbacks for
* manipulating the system's state
* @param siriusConfig a SiriusConfiguration containing configuration
* info needed for this node.
* @see SiriusConfiguration for info on needed config.
*
* @return A Sirius instance constructed using the parameters
*/
def createInstance(requestHandler: RequestHandler,
siriusConfig: SiriusConfiguration): Sirius = {

val uberStoreDir = siriusConfig.getProp[String](SiriusConfiguration.LOG_LOCATION) match {
case Some(dir) => dir
case None =>
throw new IllegalArgumentException(SiriusConfiguration.LOG_LOCATION +
" must be set on config")
}

val backendLog = {
siriusConfig.getProp(SiriusConfiguration.LOG_VERSION_ID, "") match {
case version if version == SegmentedUberStore.versionId =>
SegmentedUberStore(uberStoreDir, siriusConfig)
case _ => UberStore(uberStoreDir)
}
}

val log: SiriusLog = {
if (siriusConfig.getProp(SiriusConfiguration.LOG_USE_WRITE_CACHE, true)) {
val cacheSize = siriusConfig.getProp(SiriusConfiguration.LOG_WRITE_CACHE_SIZE, 10000)
CachedSiriusLog(backendLog, cacheSize)
} else {
backendLog
}
}

createInstance(requestHandler, siriusConfig, log)
}

/**
* USE ONLY FOR TESTING HOOK WHEN YOU NEED TO MOCK OUT A LOG.
* Real code should use the two argument factory method.
*
* @param requestHandler the RequestHandler containing callbacks for
* manipulating the system's state
* @param siriusConfig a SiriusConfiguration containing configuration
* info needed for this node.
* @see SiriusConfiguration for info on needed config.
* @param siriusLog the persistence layer to which events should be
* committed to and replayed from.
*
* @return A SiriusImpl constructed using the parameters (note: this
* differs from the 2-arg method, which returns a Sirius trait)
*/
private[sirius] def createInstance(requestHandler: RequestHandler,
siriusConfig: SiriusConfiguration,
siriusLog: SiriusLog): SiriusImpl = {

val systemName = siriusConfig.getProp(SiriusConfiguration.AKKA_SYSTEM_NAME, "sirius-system")

implicit val actorSystem = ActorSystem(systemName, createActorSystemConfig(siriusConfig))

// inject an mbean server, without regard for the one that may have been there
val mbeanServer = ManagementFactory.getPlatformMBeanServer
siriusConfig.setProp(SiriusConfiguration.MBEAN_SERVER, mbeanServer)

// here it is! the real deal creation
val impl = SiriusImpl(requestHandler, siriusLog, siriusConfig)

// create a SiriusInfo MBean which will remain registered until we explicity shutdown sirius
val (siriusInfoObjectName, siriusInfo) = createSiriusInfoMBean(actorSystem, impl.supervisor)
mbeanServer.registerMBean(siriusInfo, siriusInfoObjectName)

// need to shut down the actor system and unregister the mbeans when sirius is done
impl.onShutdown({
actorSystem.shutdown()
actorSystem.awaitTermination()
mbeanServer.unregisterMBean(siriusInfoObjectName)
})

impl
}

private def createSiriusInfoMBean(actorSystem: ActorSystem, siriusSup: ActorRef): (ObjectName, SiriusInfo) = {
val siriusInfo = new SiriusInfo(actorSystem, siriusSup)
val objectNameHelper = new ObjectNameHelper
val siriusInfoObjectName = objectNameHelper.getObjectName(siriusInfo, siriusSup, actorSystem)
(siriusInfoObjectName, siriusInfo)
}

/**
* Creates configuration for the ActorSystem. The config precedence is as follows:
* 1) host/port config trump all
* 2) siriusConfig supplied external config next
* 3) sirius-akka-base.conf, packaged with sirius, loaded with ConfigFactory.load
*/
private def createActorSystemConfig(siriusConfig: SiriusConfiguration): Config = {
val hostPortConfig = createHostPortConfig(siriusConfig)
val externalConfig = createExternalConfig(siriusConfig)

val baseAkkaConfig = ConfigFactory.load("sirius-akka-base.conf")

hostPortConfig.withFallback(externalConfig).withFallback(baseAkkaConfig)
}

private def createHostPortConfig(siriusConfig: SiriusConfiguration): Config = {
val configMap = new JHashMap[String, Any]()

configMap.put("akka.remote.netty.tcp.hostname",
siriusConfig.getProp(SiriusConfiguration.HOST, InetAddress.getLocalHost.getHostName))
configMap.put("akka.remote.netty.tcp.port",
siriusConfig.getProp(SiriusConfiguration.PORT, 2552))

// this is just so that the intellij shuts up
ConfigFactory.parseMap(configMap.asInstanceOf[JHashMap[String, _ <: AnyRef]])
}

/**
* If siriusConfig is such configured, will load up an external configuration
* for the Akka ActorSystem which is created. The filesystem is checked first,
* then the classpath, if neither exist, or siriusConfig is not configured as
* much, then an empty Config object is returned.
*/
private def createExternalConfig(siriusConfig: SiriusConfiguration): Config =
siriusConfig.getProp[String](SiriusConfiguration.AKKA_EXTERN_CONFIG) match {
case None => ConfigFactory.empty()
case Some(externConfig) =>
val externConfigFile = new File(externConfig)
if (externConfigFile.exists()) {
ConfigFactory.parseFile(externConfigFile)
} else {
ConfigFactory.parseResources(externConfig)
}
}
}
153 changes: 15 additions & 138 deletions src/main/scala/com/comcast/xfinity/sirius/api/impl/SiriusFactory.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2013 Comcast Cable Communications Management, LLC
* Copyright 2012-2014 Comcast Cable Communications Management, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,156 +15,33 @@
*/
package com.comcast.xfinity.sirius.api.impl

import java.io.File
import java.lang.management.ManagementFactory
import java.net.InetAddress
import java.util.{HashMap => JHashMap}

import com.comcast.xfinity.sirius.admin.ObjectNameHelper
import com.comcast.xfinity.sirius.{SiriusFactory => NewFactory}
import com.comcast.xfinity.sirius.api.RequestHandler
import com.comcast.xfinity.sirius.api.SiriusConfiguration
import com.comcast.xfinity.sirius.info.SiriusInfo
import com.comcast.xfinity.sirius.writeaheadlog.CachedSiriusLog
import com.comcast.xfinity.sirius.writeaheadlog.SiriusLog
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory

import akka.actor.ActorRef
import akka.actor.ActorSystem
import javax.management.ObjectName
import com.comcast.xfinity.sirius.uberstore.segmented.SegmentedUberStore
import com.comcast.xfinity.sirius.uberstore.UberStore

/**
* Provides the factory for [[com.comcast.xfinity.sirius.api.impl.SiriusImpl]] instances
* Compatibility wrapper for factory methods now in top-level sirius package.
*/
object SiriusFactory {

/**
* SiriusImpl factory method, takes parameters to construct a SiriusImplementation and the dependent
* ActorSystem and return the created instance. Calling shutdown on the produced SiriusImpl will also
* shutdown the dependent ActorSystem.
*
* @param requestHandler the RequestHandler containing callbacks for manipulating the system's state
* @param siriusConfig a SiriusConfiguration containing configuration info needed for this node.
* @see SiriusConfiguration for info on needed config.
*
* @return A SiriusImpl constructed using the parameters
* Deprecated - see [[com.comcast.xfinity.sirius.SiriusFactory]]
*/
def createInstance(requestHandler: RequestHandler, siriusConfig: SiriusConfiguration): SiriusImpl = {
val uberStoreDir = siriusConfig.getProp[String](SiriusConfiguration.LOG_LOCATION) match {
case Some(dir) => dir
case None =>
throw new IllegalArgumentException(SiriusConfiguration.LOG_LOCATION + " must be set on config")
}

val backendLog = {
siriusConfig.getProp(SiriusConfiguration.LOG_VERSION_ID, "") match {
case version if version == SegmentedUberStore.versionId => SegmentedUberStore(uberStoreDir, siriusConfig)
case _ => UberStore(uberStoreDir)
}
}

val log: SiriusLog = {
if (siriusConfig.getProp(SiriusConfiguration.LOG_USE_WRITE_CACHE, true)) {
val cacheSize = siriusConfig.getProp(SiriusConfiguration.LOG_WRITE_CACHE_SIZE, 10000)
CachedSiriusLog(backendLog, cacheSize)
} else {
backendLog
}
}

createInstance(requestHandler, siriusConfig, log)
@deprecated("see top-level SiriusFactory object", "1.2.0")
def createInstance(requestHandler: RequestHandler,
siriusConfig: SiriusConfiguration): SiriusImpl = {
val res = NewFactory.createInstance(requestHandler, siriusConfig)
res.asInstanceOf[SiriusImpl]
}

/**
* USE ONLY FOR TESTING HOOK WHEN YOU NEED TO MOCK OUT A LOG.
* Real code should use the two argument factory method.
*
* @param requestHandler the RequestHandler containing callbacks for manipulating the system's state
* @param siriusConfig a SiriusConfiguration containing configuration info needed for this node.
* @see SiriusConfiguration for info on needed config.
* @param siriusLog the persistence layer to which events should be committed to and replayed from.
*
* @return A SiriusImpl constructed using the parameters
* Deprecated - see [[com.comcast.xfinity.sirius.SiriusFactory]]
*/
private[sirius] def createInstance(requestHandler: RequestHandler, siriusConfig: SiriusConfiguration,
siriusLog: SiriusLog): SiriusImpl = {

val systemName = siriusConfig.getProp(SiriusConfiguration.AKKA_SYSTEM_NAME, "sirius-system")

implicit val actorSystem = ActorSystem(systemName, createActorSystemConfig(siriusConfig))

// inject an mbean server, without regard for the one that may have been there
val mbeanServer = ManagementFactory.getPlatformMBeanServer
siriusConfig.setProp(SiriusConfiguration.MBEAN_SERVER, mbeanServer)

// here it is! the real deal creation
val impl = SiriusImpl(requestHandler, siriusLog, siriusConfig)

// create a SiriusInfo MBean which will remain registered until we explicity shutdown sirius
val (siriusInfoObjectName, siriusInfo) = createSiriusInfoMBean(actorSystem, impl.supervisor)
mbeanServer.registerMBean(siriusInfo, siriusInfoObjectName)

// need to shut down the actor system and unregister the mbeans when sirius is done
impl.onShutdown({
actorSystem.shutdown()
actorSystem.awaitTermination()
mbeanServer.unregisterMBean(siriusInfoObjectName)
})

impl
}

private def createSiriusInfoMBean(actorSystem: ActorSystem, siriusSup: ActorRef): (ObjectName, SiriusInfo) = {
val siriusInfo = new SiriusInfo(actorSystem, siriusSup)
val objectNameHelper = new ObjectNameHelper
val siriusInfoObjectName = objectNameHelper.getObjectName(siriusInfo, siriusSup, actorSystem)
(siriusInfoObjectName, siriusInfo)
@deprecated("see top-level SiriusFactory object", "1.2.0")
private[sirius] def createInstance(requestHandler: RequestHandler,
siriusConfig: SiriusConfiguration,
siriusLog: SiriusLog): SiriusImpl = {
NewFactory.createInstance(requestHandler, siriusConfig, siriusLog)
}

/**
* Creates configuration for the ActorSystem. The config precedence is as follows:
* 1) host/port config trump all
* 2) siriusConfig supplied external config next
* 3) sirius-akka-base.conf, packaged with sirius, loaded with ConfigFactory.load
*/
private def createActorSystemConfig(siriusConfig: SiriusConfiguration): Config = {
val hostPortConfig = createHostPortConfig(siriusConfig)
val externalConfig = createExternalConfig(siriusConfig)

val baseAkkaConfig = ConfigFactory.load("sirius-akka-base.conf")

hostPortConfig.withFallback(externalConfig).withFallback(baseAkkaConfig)
}

private def createHostPortConfig(siriusConfig: SiriusConfiguration): Config = {
val configMap = new JHashMap[String, Any]()

configMap.put("akka.remote.netty.tcp.hostname",
siriusConfig.getProp(SiriusConfiguration.HOST, InetAddress.getLocalHost.getHostName))
configMap.put("akka.remote.netty.tcp.port",
siriusConfig.getProp(SiriusConfiguration.PORT, 2552))

// this is just so that the intellij shuts up
ConfigFactory.parseMap(configMap.asInstanceOf[JHashMap[String, _ <: AnyRef]])
}

/**
* If siriusConfig is such configured, will load up an external configuration
* for the Akka ActorSystem which is created. The filesystem is checked first,
* then the classpath, if neither exist, or siriusConfig is not configured as
* much, then an empty Config object is returned.
*/
private def createExternalConfig(siriusConfig: SiriusConfiguration): Config =
siriusConfig.getProp[String](SiriusConfiguration.AKKA_EXTERN_CONFIG) match {
case None => ConfigFactory.empty()
case Some(externConfig) =>
val externConfigFile = new File(externConfig)
if (externConfigFile.exists()) {
ConfigFactory.parseFile(externConfigFile)
} else {
ConfigFactory.parseResources(externConfig)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2013 Comcast Cable Communications Management, LLC
* Copyright 2012-2014 Comcast Cable Communications Management, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,7 @@ import java.util.UUID
import com.comcast.xfinity.sirius.uberstore.UberStore
import com.comcast.xfinity.sirius.api.impl.SiriusSupervisor.CheckPaxosMembership
import annotation.tailrec
import com.comcast.xfinity.sirius.SiriusFactory
import com.comcast.xfinity.sirius.api.{SiriusResult, RequestHandler, SiriusConfiguration}
import com.comcast.xfinity.sirius.api.impl.membership.MembershipActor.CheckClusterConfig
import org.slf4j.LoggerFactory
Expand Down