Skip to content

Commit

Permalink
Merge pull request #276 from codingteam/feature/storage-refactor
Browse files Browse the repository at this point in the history
New storage API
  • Loading branch information
ForNeVeR committed Feb 18, 2015
2 parents 158a568 + f4e7c69 commit 97946d8
Show file tree
Hide file tree
Showing 23 changed files with 386 additions and 503 deletions.
11 changes: 6 additions & 5 deletions src/main/scala/ru/org/codingteam/horta/core/Core.scala
Expand Up @@ -4,7 +4,7 @@ import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import org.joda.time.DateTime
import ru.org.codingteam.horta.database.{DAO, PersistentStore}
import ru.org.codingteam.horta.database.{RepositoryFactory, PersistentStore}
import ru.org.codingteam.horta.messages._
import ru.org.codingteam.horta.plugins.HelperPlugin.HelperPlugin
import ru.org.codingteam.horta.plugins._
Expand Down Expand Up @@ -212,8 +212,7 @@ class Core extends Actor with ActorLogging {

object Core {

private def getCommands(pluginDefinitions: List[(ActorRef, PluginDefinition)]
): Map[String, List[(ActorRef, CommandDefinition)]] = {
private def getCommands(pluginDefinitions: List[(ActorRef, PluginDefinition)]): Map[String, List[(ActorRef, CommandDefinition)]] = {
val commands = for ((actor, pluginDefinition) <- pluginDefinitions) yield {
for (command <- pluginDefinition.commands) yield (command.name, actor, command)
}
Expand All @@ -228,8 +227,10 @@ object Core {
private def getCommandsDescription(pluginDefinitions: List[(ActorRef, PluginDefinition)]) =
pluginDefinitions.map(t => t._2.name -> t._2.commands.map(cd => cd.name -> cd.level)).toMap

private def getStorages(pluginDefinitions: List[(ActorRef, PluginDefinition)]): Map[String, DAO] = {
pluginDefinitions.map(_._2).filter(_.dao.isDefined).map(definition => (definition.name, definition.dao.get)).toMap
private def getStorages(pluginDefinitions: Seq[(ActorRef, PluginDefinition)]): Map[String, RepositoryFactory] = {
pluginDefinitions.toStream.flatMap { case (_, definition) =>
definition.repositoryFactory.map(factory => (definition.name, factory))
}.toMap
}

}
103 changes: 28 additions & 75 deletions src/main/scala/ru/org/codingteam/horta/database/PersistentStore.scala
Expand Up @@ -2,58 +2,20 @@ package ru.org.codingteam.horta.database

import javax.sql.DataSource

import akka.actor.{Actor, ActorLogging}
import akka.actor.{Actor, ActorLogging, ActorSelection}
import akka.pattern.ask
import akka.util.Timeout
import com.googlecode.flyway.core.Flyway
import org.h2.jdbcx.JdbcConnectionPool
import ru.org.codingteam.horta.configuration.Configuration
import scalikejdbc.{ConnectionPool, DB, DBSession, DataSourceConnectionPool}

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.reflect.ClassTag

case class StoreObject(plugin: String, id: Option[Any], obj: Any)

case class ReadObject(plugin: String, id: Any)

case class DeleteObject(plugin: String, id: Any)

trait DAO {

/**
* Schema name for current DAO.
* @return schema name.
*/
def schema: String

/**
* Store an object in the database.
* @param session session to access the database.
* @param id object id (if null then it should be generated).
* @param obj stored object.
* @return stored object id (or None if object was not stored).
*/
def store(implicit session: DBSession, id: Option[Any], obj: Any): Option[Any]

/**
* Read an object from the database.
* @param session session to access the database.
* @param id object id.
* @return stored object or None if object not found.
*/
def read(implicit session: DBSession, id: Any): Option[Any]

/**
* Delete an object from the database.
* @param session session to access the database.
* @param id object id.
* @return true if object was successfully deleted, false otherwise.
*/
def delete(implicit session: DBSession, id: Any): Boolean

}

class PersistentStore(storages: Map[String, DAO]) extends Actor with ActorLogging {
class PersistentStore(repositories: Map[String, RepositoryFactory]) extends Actor with ActorLogging {

val Url = Configuration.storageUrl
val User = Configuration.storageUser
Expand All @@ -70,45 +32,22 @@ class PersistentStore(storages: Map[String, DAO]) extends Actor with ActorLoggin
}

override def receive = {
case StoreObject(plugin, id, obj) =>
storages.get(plugin) match {
case Some(dao) =>
initializeDatabase(dao)
withTransaction { session =>
sender ! dao.store(session, id, obj)
}

case None =>
log.info(s"Cannot store object $obj for plugin $plugin")
}

case ReadObject(plugin, id) =>
storages.get(plugin) match {
case Some(dao) =>
initializeDatabase(dao)
withTransaction { session =>
sender ! dao.read(session, id)
}

case None =>
log.info(s"Cannot read object $id for plugin $plugin")
}

case DeleteObject(plugin, id) =>
storages.get(plugin) match {
case Some(dao) =>
initializeDatabase(dao)
case PersistentStore.Execute(plugin, action) =>
repositories.get(plugin) match {
case Some(factory) =>
initializeDatabase(factory)
withTransaction { session =>
sender ! dao.delete(session, id)
val repository = factory.create(session)
sender ! action(repository)
}

case None =>
log.info(s"Cannot delete object $id for plugin $plugin")
log.info(s"Cannot execute action $action for plugin $plugin: repository not found")
}
}

private def initializeDatabase(dao: DAO) {
val schema = dao.schema
private def initializeDatabase(factory: RepositoryFactory) {
val schema = factory.schema
if (!initializedSchemas.contains(schema)) {
initializeScript(schema)
initializedSchemas += schema
Expand All @@ -133,4 +72,18 @@ class PersistentStore(storages: Map[String, DAO]) extends Actor with ActorLoggin
flyway.repair()
flyway.migrate()
}

}

object PersistentStore {

private case class Execute(plugin: String, action: (Any) => Any)

def execute[Repository, T: ClassTag](plugin: String, store: ActorSelection)
(action: (Repository) => T)
(implicit timeout: Timeout): Future[T] = {
val message = Execute(plugin, (r) => action(r.asInstanceOf[Repository]))
(store ? message).mapTo[T]
}

}
@@ -0,0 +1,11 @@
package ru.org.codingteam.horta.database

import scalikejdbc.DBSession

/**
* A repository factory. Will be used to create the repository.
*
* @param schema schema name for current repository.
* @param create a repository creation function.
*/
case class RepositoryFactory(schema: String, create: (DBSession => Any))
14 changes: 4 additions & 10 deletions src/main/scala/ru/org/codingteam/horta/plugins/BasePlugin.scala
@@ -1,18 +1,17 @@
package ru.org.codingteam.horta.plugins

import akka.actor.{Actor, ActorLogging}
import ru.org.codingteam.horta.database.DAO

/**
* CommandPlugin class used as base for all command plugins.
* Common plugin functionality. Every plugin should be inherited from this class.
*/
abstract class BasePlugin extends Actor with ActorLogging {

def receive = {
case GetPluginDefinition => sender ! pluginDefinition
}

protected val core = context.actorSelection("/user/core")
protected val store = context.actorSelection("/user/core/store")

/**
* Plugin name.
Expand All @@ -32,15 +31,10 @@ abstract class BasePlugin extends Actor with ActorLogging {
*/
protected def commands: List[CommandDefinition] = List()

/**
* Plugin data access object. May be None if not present.
* @return data access object.
*/
protected def dao: Option[DAO] = None

/**
* A full plugin definition.
* @return plugin definition.
*/
private def pluginDefinition = PluginDefinition(name, notifications, commands, dao)
protected def pluginDefinition = PluginDefinition(name, notifications, commands, None)

}
@@ -0,0 +1,47 @@
package ru.org.codingteam.horta.plugins

import akka.util.Timeout
import ru.org.codingteam.horta.database.{RepositoryFactory, PersistentStore}
import scalikejdbc.DBSession

import scala.concurrent.Future
import scala.reflect.ClassTag

/**
* Trait for the plugins that can access the database.
*/
trait DataAccessingPlugin[Repository] extends BasePlugin {

/**
* A full plugin definition.
* @return plugin definition.
*/
override protected def pluginDefinition: PluginDefinition =
super.pluginDefinition.copy(repositoryFactory = Some(RepositoryFactory(schema, createRepository)))

/**
* Schema name for database storage.
*/
protected val schema: String

/**
* A function creating the repository for database access.
*/
protected val createRepository: (DBSession) => Repository

/**
* Execute repository action. Use with caution - do not mess with plugin data members within action; that is not
* context-safe.
*
* @param action an action.
* @tparam T type of action result.
* @return future that will be resolved after action execution.
*/
protected def withDatabase[T: ClassTag](action: (Repository) => T)
(implicit timeout: Timeout): Future[T] = {
PersistentStore.execute[Repository, T](name, store)(action)
}

private val store = context.actorSelection("/user/core/store")

}
@@ -1,6 +1,6 @@
package ru.org.codingteam.horta.plugins

import ru.org.codingteam.horta.database.DAO
import ru.org.codingteam.horta.database.RepositoryFactory

/**
* Description of events.
Expand All @@ -17,9 +17,10 @@ case class Notifications(messages: Boolean,
* @param name plugin name.
* @param notifications description of events plugin want to be notified of.
* @param commands a list of commands supported by the plugin.
* @param dao plugin data access object if present.
* @param repositoryFactory a factory of repositories used for data access. Data access is disabled if factory isn't
* defined.
*/
case class PluginDefinition(name: String,
notifications: Notifications,
commands: List[CommandDefinition],
dao: Option[DAO])
repositoryFactory: Option[RepositoryFactory])
106 changes: 0 additions & 106 deletions src/main/scala/ru/org/codingteam/horta/plugins/karma/KarmaDAO.scala

This file was deleted.

0 comments on commit 97946d8

Please sign in to comment.