From ef7525c776a898ca52f4b8edf580337b793f35b5 Mon Sep 17 00:00:00 2001 From: huafengw Date: Fri, 14 Oct 2016 12:41:43 +0800 Subject: [PATCH 1/4] [GEARPUMP-224] merge gearpump-daemon to gearpump-core --- .../gearpump/cluster/DaemonMessage.scala | 1 - .../cluster/embedded/EmbeddedCluster.scala | 0 .../apache/gearpump/cluster/main/Local.scala | 11 +++-- .../apache/gearpump/cluster/main/Master.scala | 18 ++++---- .../apache/gearpump/cluster/main/Worker.scala | 13 +++--- .../gearpump/cluster/master/AppManager.scala | 11 +++-- .../cluster/master/InMemoryKVService.scala | 8 ++-- .../gearpump/cluster/master/Master.scala | 0 .../cluster/scheduler/PriorityScheduler.scala | 8 ++-- .../cluster/scheduler/Scheduler.scala | 10 ++--- .../DefaultExecutorProcessLauncher.scala | 3 +- .../gearpump/cluster/worker/Worker.scala | 24 +++++------ .../apache/gearpump/cluster/MiniCluster.scala | 7 ++-- .../cluster/appmaster}/AppManagerSpec.scala | 11 +++-- .../appmaster}/InMemoryKVServiceSpec.scala | 10 ++--- .../gearpump/cluster/main/MainSpec.scala | 18 ++++---- .../cluster/main/MasterWatcherSpec.scala | 7 ++-- .../scheduler/PrioritySchedulerSpec.scala | 10 ++--- .../gearpump/cluster/worker/WorkerSpec.scala | 9 ++-- project/Build.scala | 41 +++++++------------ project/BuildExample.scala | 8 ++-- project/Pack.scala | 14 +++---- 22 files changed, 103 insertions(+), 139 deletions(-) rename {daemon => core}/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala (99%) rename {daemon => core}/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala (100%) rename {daemon => core}/src/main/scala/org/apache/gearpump/cluster/main/Local.scala (99%) rename {daemon => core}/src/main/scala/org/apache/gearpump/cluster/main/Master.scala (97%) rename {daemon => core}/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala (99%) rename {daemon => core}/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala (99%) rename {daemon => core}/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala (98%) rename {daemon => core}/src/main/scala/org/apache/gearpump/cluster/master/Master.scala (100%) rename {daemon => core}/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala (99%) rename {daemon => core}/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala (99%) rename {daemon => core}/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala (99%) rename {daemon => core}/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala (98%) rename {daemon => core}/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala (99%) rename {daemon/src/test/scala/org/apache/gearpump/cluster/master => core/src/test/scala/org/apache/gearpump/cluster/appmaster}/AppManagerSpec.scala (98%) rename {daemon/src/test/scala/org/apache/gearpump/cluster/master => core/src/test/scala/org/apache/gearpump/cluster/appmaster}/InMemoryKVServiceSpec.scala (95%) rename {daemon => core}/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala (99%) rename {daemon => core}/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala (99%) rename {daemon => core}/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala (99%) rename {daemon => core}/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala (99%) diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala b/core/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala similarity index 99% rename from daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala rename to core/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala index 9e55be68f..1e94132b6 100644 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala @@ -18,7 +18,6 @@ package org.apache.gearpump.cluster import akka.actor.ActorRef - import org.apache.gearpump.cluster.master.Master.MasterInfo import org.apache.gearpump.cluster.scheduler.Resource import org.apache.gearpump.cluster.worker.WorkerId diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala similarity index 100% rename from daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala rename to core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Local.scala similarity index 99% rename from daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala rename to core/src/main/scala/org/apache/gearpump/cluster/main/Local.scala index db71b7b10..db2cd8afe 100644 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Local.scala @@ -18,20 +18,19 @@ package org.apache.gearpump.cluster.main -import scala.collection.JavaConverters._ -import scala.concurrent.Await -import scala.concurrent.duration.Duration - import akka.actor.{ActorSystem, Props} import com.typesafe.config.ConfigValueFactory -import org.slf4j.Logger - import org.apache.gearpump.cluster.ClusterConfig import org.apache.gearpump.cluster.master.{Master => MasterActor} import org.apache.gearpump.cluster.worker.{Worker => WorkerActor} import org.apache.gearpump.util.Constants._ import org.apache.gearpump.util.LogUtil.ProcessType import org.apache.gearpump.util.{ActorUtil, AkkaApp, Constants, LogUtil, Util} +import org.slf4j.Logger + +import scala.collection.JavaConverters._ +import scala.concurrent.Await +import scala.concurrent.duration.Duration object Local extends AkkaApp with ArgumentsParser { override def akkaConfig: Config = ClusterConfig.master() diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Master.scala similarity index 97% rename from daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala rename to core/src/main/scala/org/apache/gearpump/cluster/main/Master.scala index f1b9bdff2..f758720a1 100644 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Master.scala @@ -19,25 +19,25 @@ package org.apache.gearpump.cluster.main import java.util.concurrent.TimeUnit -import scala.collection.JavaConverters._ -import scala.collection.immutable -import scala.concurrent.Await -import scala.concurrent.duration._ import akka.actor._ import akka.cluster.ClusterEvent._ +import akka.cluster.{MemberStatus, Member, Cluster} import akka.cluster.ddata.DistributedData -import akka.cluster.singleton.{ClusterSingletonManager, ClusterSingletonManagerSettings, ClusterSingletonProxy, ClusterSingletonProxySettings} -import akka.cluster.{Cluster, Member, MemberStatus} +import akka.cluster.singleton.{ClusterSingletonProxySettings, ClusterSingletonProxy, ClusterSingletonManagerSettings, ClusterSingletonManager} import com.typesafe.config.ConfigValueFactory -import org.slf4j.Logger - import org.apache.gearpump.cluster.ClusterConfig -import org.apache.gearpump.cluster.master.{Master => MasterActor, MasterNode} import org.apache.gearpump.cluster.master.Master.MasterListUpdated +import org.apache.gearpump.cluster.master.{Master => MasterActor, MasterNode} import org.apache.gearpump.util.Constants._ import org.apache.gearpump.util.LogUtil.ProcessType import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil} +import org.slf4j.Logger + +import scala.collection.JavaConverters._ +import scala.collection.immutable +import scala.concurrent.Await +import scala.concurrent.duration._ object Master extends AkkaApp with ArgumentsParser { diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala similarity index 99% rename from daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala rename to core/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala index 58a9dec74..3d8d823ed 100644 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala @@ -18,20 +18,19 @@ package org.apache.gearpump.cluster.main -import scala.collection.JavaConverters._ -import scala.concurrent.Await -import scala.concurrent.duration.Duration - import akka.actor.{ActorSystem, Props} -import org.slf4j.Logger - import org.apache.gearpump.cluster.ClusterConfig import org.apache.gearpump.cluster.master.MasterProxy +import org.apache.gearpump.cluster.worker.{Worker => WorkerActor} import org.apache.gearpump.transport.HostPort import org.apache.gearpump.util.Constants._ -import org.apache.gearpump.cluster.worker.{Worker => WorkerActor} import org.apache.gearpump.util.LogUtil.ProcessType import org.apache.gearpump.util.{AkkaApp, LogUtil} +import org.slf4j.Logger + +import scala.collection.JavaConverters._ +import scala.concurrent.Await +import scala.concurrent.duration.Duration /** Tool to start a worker daemon process */ object Worker extends AkkaApp with ArgumentsParser { diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala similarity index 99% rename from daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala rename to core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala index 9a3a1199f..0ae7365f3 100644 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala @@ -18,14 +18,8 @@ package org.apache.gearpump.cluster.master -import scala.concurrent.Future -import scala.concurrent.duration._ -import scala.util.{Failure, Success} - import akka.actor._ import akka.pattern.ask -import org.slf4j.Logger - import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, SaveAppDataFailed, _} import org.apache.gearpump.cluster.AppMasterToWorker._ import org.apache.gearpump.cluster.ClientToMaster._ @@ -38,6 +32,11 @@ import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKVResult, PutKVR import org.apache.gearpump.cluster.master.Master._ import org.apache.gearpump.util.Constants._ import org.apache.gearpump.util.{ActorUtil, TimeOutScheduler, Util, _} +import org.slf4j.Logger + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.{Failure, Success} /** * AppManager is dedicated child of Master to manager all applications. diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala similarity index 98% rename from daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala rename to core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala index 3e54214ce..fd19bade3 100644 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala @@ -19,16 +19,16 @@ package org.apache.gearpump.cluster.master import java.util.concurrent.TimeUnit -import scala.concurrent.TimeoutException -import scala.concurrent.duration.Duration import akka.actor._ import akka.cluster.Cluster +import akka.cluster.ddata.{LWWMap, LWWMapKey, DistributedData} import akka.cluster.ddata.Replicator._ -import akka.cluster.ddata.{DistributedData, LWWMap, LWWMapKey} +import org.apache.gearpump.util.LogUtil import org.slf4j.Logger -import org.apache.gearpump.util.LogUtil +import scala.concurrent.TimeoutException +import scala.concurrent.duration.Duration /** * A replicated simple in-memory KV service. The replications are stored on all masters. diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala similarity index 100% rename from daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala rename to core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala similarity index 99% rename from daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala rename to core/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala index 1429694bb..623e3ff28 100644 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala @@ -18,16 +18,14 @@ package org.apache.gearpump.cluster.scheduler -import org.apache.gearpump.cluster.worker.WorkerId - -import scala.collection.mutable - import akka.actor.ActorRef - import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated import org.apache.gearpump.cluster.scheduler.Relaxation._ import org.apache.gearpump.cluster.scheduler.Scheduler.PendingRequest +import org.apache.gearpump.cluster.worker.WorkerId + +import scala.collection.mutable /** Assign resource to application based on the priority of the application */ class PriorityScheduler extends Scheduler { diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala similarity index 99% rename from daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala rename to core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala index 7187c1ada..ec9f1ba25 100644 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala @@ -17,19 +17,17 @@ */ package org.apache.gearpump.cluster.scheduler -import org.apache.gearpump.cluster.worker.WorkerId - -import scala.collection.mutable - import akka.actor.{Actor, ActorRef} -import org.slf4j.Logger - import org.apache.gearpump.TimeStamp import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, UpdateResourceSucceed, WorkerRegistered} import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate import org.apache.gearpump.cluster.master.Master.WorkerTerminated import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished +import org.apache.gearpump.cluster.worker.WorkerId import org.apache.gearpump.util.LogUtil +import org.slf4j.Logger + +import scala.collection.mutable /** * Scheduler schedule resource for different applications. diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala b/core/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala similarity index 99% rename from daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala rename to core/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala index b4e6f9eef..3d5b0af03 100644 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala @@ -20,10 +20,9 @@ package org.apache.gearpump.cluster.worker import java.io.File import com.typesafe.config.Config -import org.slf4j.Logger - import org.apache.gearpump.cluster.scheduler.Resource import org.apache.gearpump.util.{LogUtil, RichProcess, Util} +import org.slf4j.Logger /** Launcher to start an executor process */ class DefaultExecutorProcessLauncher(val config: Config) extends ExecutorProcessLauncher { diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala b/core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala similarity index 98% rename from daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala rename to core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala index 1b52e5d8a..447b03405 100644 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala @@ -22,34 +22,33 @@ import java.io.File import java.lang.management.ManagementFactory import java.net.URL import java.util.concurrent.{Executors, TimeUnit} -import org.apache.gearpump.cluster.worker.Worker.ExecutorWatcher - -import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContext, Future, Promise} -import scala.util.{Failure, Success, Try} import akka.actor.SupervisorStrategy.Stop import akka.actor._ -import com.typesafe.config.{ConfigValueFactory, Config, ConfigFactory} -import org.slf4j.Logger - +import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} import org.apache.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData} import org.apache.gearpump.cluster.AppMasterToWorker._ import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig} import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, WorkerConfig} -import org.apache.gearpump.cluster.MasterToWorker._ +import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceSucceed, UpdateResourceFailed, WorkerRegistered} import org.apache.gearpump.cluster.WorkerToAppMaster._ -import org.apache.gearpump.cluster.WorkerToMaster._ +import org.apache.gearpump.cluster.WorkerToMaster.{RegisterNewWorker, RegisterWorker, ResourceUpdate} import org.apache.gearpump.cluster.master.Master.MasterInfo import org.apache.gearpump.cluster.scheduler.Resource +import org.apache.gearpump.cluster.worker.Worker.ExecutorWatcher import org.apache.gearpump.cluster.{ClusterConfig, ExecutorJVMConfig} -import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer} +import org.apache.gearpump.jarstore.JarStoreClient import org.apache.gearpump.metrics.Metrics.ReportMetrics import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService} import org.apache.gearpump.util.ActorSystemBooter.Daemon import org.apache.gearpump.util.Constants._ import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig import org.apache.gearpump.util.{TimeOutScheduler, _} +import org.slf4j.Logger + +import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.{Failure, Success, Try} /** * Worker is used to track the resource on single machine, it is like @@ -519,8 +518,7 @@ private[cluster] object Worker { } // The folders are under ${GEARPUMP_HOME} - val daemonPathPattern = List("lib" + File.separator + "daemon", "lib" + - File.separator + "yarn") + val daemonPathPattern = List("lib" + File.separator + "yarn") override def receive: Receive = { case ShutdownExecutor(appId, executorId, reason: String) => diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala b/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala similarity index 99% rename from daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala rename to core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala index a6b75cb3e..0a2224502 100644 --- a/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala +++ b/core/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala @@ -17,20 +17,19 @@ */ package org.apache.gearpump.cluster -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, Future} - import akka.actor.{Actor, ActorRef, ActorSystem, Props} import akka.pattern.ask import akka.testkit.TestActorRef import com.typesafe.config.ConfigValueFactory - import org.apache.gearpump.cluster.AppMasterToMaster.GetAllWorkers import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList import org.apache.gearpump.cluster.master.Master import org.apache.gearpump.cluster.worker.Worker import org.apache.gearpump.util.Constants +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} + class MiniCluster { private val mockMasterIP = "127.0.0.1" diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala similarity index 98% rename from daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala rename to core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala index 58e35939f..bc38553ca 100644 --- a/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala +++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala @@ -16,24 +16,23 @@ * limitations under the License. */ -package org.apache.gearpump.cluster.master - -import scala.util.Success +package org.apache.gearpump.cluster.appmaster import akka.actor.{Actor, ActorRef, Props} import akka.testkit.TestProbe import com.typesafe.config.Config -import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} - import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, _} import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication, SubmitApplication} import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterRegistered, AppMastersData, AppMastersDataRequest, _} import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult} -import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, ApplicationState} +import org.apache.gearpump.cluster.master.AppManager import org.apache.gearpump.cluster.master.AppManager._ import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKV, GetKVSuccess, PutKV, PutKVSuccess} import org.apache.gearpump.cluster.{TestUtil, _} import org.apache.gearpump.util.LogUtil +import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} + +import scala.util.Success class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness { var kvService: TestProbe = null diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala similarity index 95% rename from daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala rename to core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala index 325a4846c..d3e739fc4 100644 --- a/daemon/src/test/scala/org/apache/gearpump/cluster/master/InMemoryKVServiceSpec.scala +++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/InMemoryKVServiceSpec.scala @@ -16,17 +16,17 @@ * limitations under the License. */ -package org.apache.gearpump.cluster.master - -import scala.concurrent.duration._ +package org.apache.gearpump.cluster.appmaster import akka.actor.Props import akka.testkit.TestProbe import com.typesafe.config.Config -import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} - +import org.apache.gearpump.cluster.master.InMemoryKVService import org.apache.gearpump.cluster.master.InMemoryKVService._ import org.apache.gearpump.cluster.{MasterHarness, TestUtil} +import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} + +import scala.concurrent.duration._ class InMemoryKVServiceSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness { diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala similarity index 99% rename from daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala rename to core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala index 90fdd3961..216697646 100644 --- a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala +++ b/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala @@ -21,23 +21,21 @@ package org.apache.gearpump.cluster.main import java.util.Properties import akka.testkit.TestProbe -import org.apache.gearpump.cluster.MasterToWorker.WorkerRegistered -import org.apache.gearpump.cluster.master.MasterProxy -import org.apache.gearpump.transport.HostPort - -import scala.concurrent.Future -import scala.util.{Success, Try} - -import com.typesafe.config.{ConfigFactory, Config} -import org.scalatest._ - +import com.typesafe.config.{Config, ConfigFactory} import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication} import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersDataRequest, ReplayFromTimestampWindowTrailingEdge, _} import org.apache.gearpump.cluster.MasterToClient.{ReplayApplicationResult, ResolveAppIdResult, ShutdownApplicationResult} +import org.apache.gearpump.cluster.MasterToWorker.WorkerRegistered import org.apache.gearpump.cluster.WorkerToMaster.RegisterNewWorker +import org.apache.gearpump.cluster.master.MasterProxy import org.apache.gearpump.cluster.{MasterHarness, TestUtil} +import org.apache.gearpump.transport.HostPort import org.apache.gearpump.util.Constants._ import org.apache.gearpump.util.{Constants, LogUtil, Util} +import org.scalatest._ + +import scala.concurrent.Future +import scala.util.{Success, Try} class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness { diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala similarity index 99% rename from daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala rename to core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala index e1ba8f677..b48fc2ace 100644 --- a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala +++ b/core/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala @@ -17,15 +17,14 @@ */ package org.apache.gearpump.cluster.main -import scala.concurrent.Await -import scala.concurrent.duration._ - import akka.actor.{ActorSystem, Props} import akka.testkit.TestProbe import com.typesafe.config.Config +import org.apache.gearpump.cluster.TestUtil import org.scalatest.{FlatSpec, Matchers} -import org.apache.gearpump.cluster.TestUtil +import scala.concurrent.Await +import scala.concurrent.duration._ class MasterWatcherSpec extends FlatSpec with Matchers { def config: Config = TestUtil.MASTER_CONFIG diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala similarity index 99% rename from daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala rename to core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala index e82dff3fe..8a3d7d1bd 100644 --- a/daemon/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala +++ b/core/src/test/scala/org/apache/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala @@ -17,14 +17,8 @@ */ package org.apache.gearpump.cluster.scheduler -import org.apache.gearpump.cluster.worker.WorkerId - -import scala.concurrent.duration._ - import akka.actor.{ActorSystem, Props} import akka.testkit.{ImplicitSender, TestKit, TestProbe} -import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} - import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered} @@ -33,6 +27,10 @@ import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate import org.apache.gearpump.cluster.master.Master.MasterInfo import org.apache.gearpump.cluster.scheduler.Priority.{HIGH, LOW, NORMAL} import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished +import org.apache.gearpump.cluster.worker.WorkerId +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} + +import scala.concurrent.duration._ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll{ diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala similarity index 99% rename from daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala rename to core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala index bf25057b2..e0233f822 100644 --- a/daemon/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala +++ b/core/src/test/scala/org/apache/gearpump/cluster/worker/WorkerSpec.scala @@ -17,14 +17,9 @@ */ package org.apache.gearpump.cluster.worker -import scala.concurrent.Await -import scala.concurrent.duration._ - import akka.actor.{ActorSystem, PoisonPill, Props} import akka.testkit.TestProbe import com.typesafe.config.{Config, ConfigFactory} -import org.scalatest._ - import org.apache.gearpump.cluster.AppMasterToWorker.{ChangeExecutorResource, LaunchExecutor, ShutdownExecutor} import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered} import org.apache.gearpump.cluster.WorkerToAppMaster.{ExecutorLaunchRejected, ShutdownExecutorFailed, ShutdownExecutorSucceed} @@ -33,6 +28,10 @@ import org.apache.gearpump.cluster.master.Master.MasterInfo import org.apache.gearpump.cluster.scheduler.Resource import org.apache.gearpump.cluster.{ExecutorJVMConfig, MasterHarness, TestUtil} import org.apache.gearpump.util.{ActorSystemBooter, ActorUtil, Constants} +import org.scalatest._ + +import scala.concurrent.Await +import scala.concurrent.duration._ class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with MasterHarness { override def config: Config = TestUtil.DEFAULT_CONFIG diff --git a/project/Build.scala b/project/Build.scala index 4552a644a..40b57436f 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -140,15 +140,6 @@ object Build extends sbt.Build { publishArtifact in Test := false ) - val daemonDependencies = Seq( - libraryDependencies ++= Seq( - "com.typesafe.akka" %% "akka-cluster" % akkaVersion, - "com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion, - "commons-logging" % "commons-logging" % commonsLoggingVersion, - "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion - ) - ) - val coreDependencies = Seq( libraryDependencies ++= Seq( "org.slf4j" % "slf4j-api" % slf4jVersion, @@ -171,6 +162,10 @@ object Build extends sbt.Build { "com.typesafe.akka" %% "akka-remote" % akkaVersion exclude("io.netty", "netty"), + "com.typesafe.akka" %% "akka-cluster" % akkaVersion, + "com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion, + "commons-logging" % "commons-logging" % commonsLoggingVersion, + "com.typesafe.akka" %% "akka-distributed-data-experimental" % akkaVersion, "com.typesafe.akka" %% "akka-actor" % akkaVersion, "com.typesafe.akka" %% "akka-agent" % akkaVersion, "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, @@ -256,7 +251,7 @@ object Build extends sbt.Build { id = "gearpump", base = file("."), settings = commonSettings ++ noPublish ++ gearpumpUnidocSetting) - .aggregate(shaded, core, daemon, streaming, services, external_kafka, external_monoid, + .aggregate(shaded, core, streaming, services, external_kafka, external_monoid, external_serializer, examples, storm, yarn, external_hbase, gearpumpHadoop, packProject, external_hadoopfs, integration_test).settings(Defaults.itSettings: _*) .disablePlugins(sbtassembly.AssemblyPlugin) @@ -271,20 +266,13 @@ object Build extends sbt.Build { getShadedDepXML(organization.value, shaded_guava.id, version.value), getShadedDepXML(organization.value, shaded_metrics_graphite.id, version.value)), node) } - )) - .disablePlugins(sbtassembly.AssemblyPlugin) + )).disablePlugins(sbtassembly.AssemblyPlugin) - lazy val daemon = Project( - id = "gearpump-daemon", - base = file("daemon"), - settings = commonSettings ++ daemonDependencies) - .dependsOn(core % "test->test; compile->compile", cgroup % "test->test; compile->compile") - .disablePlugins(sbtassembly.AssemblyPlugin) lazy val cgroup = Project( id = "gearpump-experimental-cgroup", base = file("experiments/cgroup"), - settings = commonSettings ++ noPublish ++ daemonDependencies) + settings = commonSettings ++ noPublish) .dependsOn (core % "test->test; compile->compile") .disablePlugins(sbtassembly.AssemblyPlugin) @@ -301,7 +289,7 @@ object Build extends sbt.Build { getShadedDepXML(organization.value, shaded_gs_collections.id, version.value)), node) } )) - .dependsOn(core % "test->test; compile->compile", shaded_gs_collections, daemon % "test->test") + .dependsOn(core % "test->test; compile->compile", shaded_gs_collections) .disablePlugins(sbtassembly.AssemblyPlugin) lazy val external_kafka = Project( @@ -412,19 +400,18 @@ object Build extends sbt.Build { ), mainClass in(Compile, packageBin) := Some("akka.stream.gearpump.example.Test") )) - .dependsOn(streaming % "test->test; provided", daemon % "test->test; provided") + .dependsOn(streaming % "test->test; provided") lazy val redis = Project( id = "gearpump-experiments-redis", base = file("experiments/redis"), - settings = commonSettings ++ noPublish ++ myAssemblySettings ++ + settings = commonSettings ++ noPublish ++ Seq( libraryDependencies ++= Seq( "redis.clients" % "jedis" % "2.9.0" - ), - mainClass in(Compile, packageBin) := Some("org.apache.gearpump.example.Test") - )) - .dependsOn(streaming % "test->test; provided", daemon % "test->test; provided") + ) + ) + ).dependsOn(streaming % "test->test; provided") lazy val storm = Project( id = "gearpump-experiments-storm", @@ -489,7 +476,7 @@ object Build extends sbt.Build { "org.apache.hadoop" % "hadoop-yarn-server-nodemanager" % hadoopVersion % "provided" ) )) - .dependsOn(services % "test->test;compile->compile", daemon % "provided", + .dependsOn(services % "test->test;compile->compile", core % "provided", gearpumpHadoop).disablePlugins(sbtassembly.AssemblyPlugin) lazy val external_hbase = Project( diff --git a/project/BuildExample.scala b/project/BuildExample.scala index 75fc9be43..fadc1ec33 100644 --- a/project/BuildExample.scala +++ b/project/BuildExample.scala @@ -42,7 +42,7 @@ object BuildExample extends sbt.Build { target in assembly := baseDirectory.value.getParentFile.getParentFile / "target" / CrossVersion.binaryScalaVersion(scalaVersion.value) ) - ) dependsOn(streaming % "test->test; provided", daemon % "test->test; provided") + ) dependsOn(streaming % "test->test; provided") lazy val wordcount = Project( id = "gearpump-examples-wordcount", @@ -55,7 +55,7 @@ object BuildExample extends sbt.Build { target in assembly := baseDirectory.value.getParentFile.getParentFile / "target" / CrossVersion.binaryScalaVersion(scalaVersion.value) ) - ) dependsOn(streaming % "test->test; provided", daemon % "test->test; provided") + ) dependsOn(streaming % "test->test; provided") lazy val sol = Project( id = "gearpump-examples-sol", @@ -113,7 +113,7 @@ object BuildExample extends sbt.Build { target in assembly := baseDirectory.value.getParentFile / "target" / CrossVersion.binaryScalaVersion(scalaVersion.value) ) - ) dependsOn (daemon % "test->test; provided") + ) dependsOn (core % "test->test; provided") lazy val distributeservice = Project( id = "gearpump-examples-distributeservice", @@ -133,7 +133,7 @@ object BuildExample extends sbt.Build { target in assembly := baseDirectory.value.getParentFile / "target" / CrossVersion.binaryScalaVersion(scalaVersion.value) ) - ) dependsOn (daemon % "test->test; provided") + ) dependsOn (core % "test->test; provided") lazy val fsio = Project( id = "gearpump-examples-fsio", diff --git a/project/Pack.scala b/project/Pack.scala index 13e53de5c..54b1d4313 100644 --- a/project/Pack.scala +++ b/project/Pack.scala @@ -24,7 +24,6 @@ import xerial.sbt.Pack._ object Pack extends sbt.Build { val daemonClassPath = Seq( "${PROG_HOME}/conf", - "${PROG_HOME}/lib/daemon/*", // This is for DFSJarStore "${PROG_HOME}/lib/yarn/*" ) @@ -37,14 +36,12 @@ object Pack extends sbt.Build { val serviceClassPath = Seq( "${PROG_HOME}/conf", - "${PROG_HOME}/lib/daemon/*", "${PROG_HOME}/lib/services/*", "${PROG_HOME}/dashboard" ) val yarnClassPath = Seq( "${PROG_HOME}/conf", - "${PROG_HOME}/lib/daemon/*", "${PROG_HOME}/lib/services/*", "${PROG_HOME}/lib/yarn/*", "${PROG_HOME}/conf/yarnconf", @@ -112,11 +109,10 @@ object Pack extends sbt.Build { "-Dgearpump.home=${PROG_HOME}") ), packLibDir := Map( - "lib" -> new ProjectsToPack(core.id, streaming.id), - "lib/daemon" -> new ProjectsToPack(daemon.id, cgroup.id).exclude(core.id, streaming.id), + "lib" -> new ProjectsToPack(core.id, cgroup.id, streaming.id), "lib/yarn" -> new ProjectsToPack(gearpumpHadoop.id, yarn.id). - exclude(services.id, daemon.id, core.id), - "lib/services" -> new ProjectsToPack(services.id).exclude(daemon.id), + exclude(services.id, core.id), + "lib/services" -> new ProjectsToPack(services.id).exclude(core.id), "lib/storm" -> new ProjectsToPack(storm.id).exclude(streaming.id) ), packExclude := Seq(thisProjectRef.value.project), @@ -139,7 +135,7 @@ object Pack extends sbt.Build { "gear" -> applicationClassPath, "local" -> daemonClassPath, "master" -> daemonClassPath, - "worker" -> daemonClassPath, + "worker" -> applicationClassPath, "services" -> serviceClassPath, "yarnclient" -> yarnClassPath, "storm" -> stormClassPath @@ -149,6 +145,6 @@ object Pack extends sbt.Build { packArchiveExcludes := Seq("integrationtest") ) - ).dependsOn(core, streaming, services, yarn, storm). + ).dependsOn(core, streaming, services, yarn, storm, cgroup). disablePlugins(sbtassembly.AssemblyPlugin) } From 73716a714e4170328dc5eeedf27670b15250921f Mon Sep 17 00:00:00 2001 From: huafengw Date: Fri, 14 Oct 2016 12:43:10 +0800 Subject: [PATCH 2/4] minor --- .../apache/gearpump/redis/RedisMessage.scala | 148 ++++++++++++------ .../org/apache/gearpump/redis/RedisSink.scala | 27 ++-- 2 files changed, 114 insertions(+), 61 deletions(-) diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala index 84dec70be..2988f5b9e 100644 --- a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala +++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala @@ -20,7 +20,6 @@ package org.apache.gearpump.redis import java.nio.charset.Charset object RedisMessage { - private def toBytes(strings: List[String]): List[Array[Byte]] = strings.map(string => string.getBytes(Charset.forName("UTF8"))) @@ -48,11 +47,10 @@ object RedisMessage { * @param latitude * @param member */ - case class GEOADD(key: Array[Byte], longitude: Double, - latitude: Double, member: Array[Byte]) { - def this(key: String, longitude: Double, - latitude: Double, member: String) = + case class GEOADD(key: Array[Byte], longitude: Double, latitude: Double, member: Array[Byte]) { + def this(key: String, longitude: Double, latitude: Double, member: String) = { this(toBytes(key), longitude, latitude, toBytes(member)) + } } } @@ -66,7 +64,9 @@ object RedisMessage { * @param field */ case class HDEL(key: Array[Byte], field: Array[Byte]) { - def this(key: String, field: String) = this(toBytes(key), toBytes(field)) + def this(key: String, field: String) = { + this(toBytes(key), toBytes(field)) + } } /** @@ -77,8 +77,9 @@ object RedisMessage { * @param increment */ case class HINCRBY(key: Array[Byte], field: Array[Byte], increment: Long) { - def this(key: String, field: String, increment: Long) = + def this(key: String, field: String, increment: Long) = { this(toBytes(key), toBytes(field), increment) + } } /** @@ -89,8 +90,9 @@ object RedisMessage { * @param increment */ case class HINCRBYFLOAT(key: Array[Byte], field: Array[Byte], increment: Float) { - def this(key: String, field: String, increment: Float) = + def this(key: String, field: String, increment: Float) = { this(toBytes(key), toBytes(field), increment) + } } @@ -102,8 +104,9 @@ object RedisMessage { * @param value */ case class HSET(key: Array[Byte], field: Array[Byte], value: Array[Byte]) { - def this(key: String, field: String, value: String) = + def this(key: String, field: String, value: String) = { this(toBytes(key), toBytes(field), toBytes(value)) + } } /** @@ -114,8 +117,9 @@ object RedisMessage { * @param value */ case class HSETNX(key: Array[Byte], field: Array[Byte], value: Array[Byte]) { - def this(key: String, field: String, value: String) = + def this(key: String, field: String, value: String) = { this(toBytes(key), toBytes(field), toBytes(value)) + } } } @@ -142,8 +146,9 @@ object RedisMessage { * @param value */ case class LPUSH(key: Array[Byte], value: Array[Byte]) { - - def this(key: String, value: String) = this(key, toBytes(value)) + def this(key: String, value: String) = { + this(key, toBytes(value)) + } } /** @@ -153,7 +158,9 @@ object RedisMessage { * @param value */ case class LPUSHX(key: Array[Byte], value: Array[Byte]) { - def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + def this(key: String, value: String) = { + this(toBytes(key), toBytes(value)) + } } /** @@ -164,7 +171,9 @@ object RedisMessage { * @param value */ case class LSET(key: Array[Byte], index: Long, value: Array[Byte]) { - def this(key: String, index: Long, value: String) = this(toBytes(key), index, toBytes(value)) + def this(key: String, index: Long, value: String) = { + this(toBytes(key), index, toBytes(value)) + } } /** @@ -174,8 +183,9 @@ object RedisMessage { * @param value */ case class RPUSH(key: Array[Byte], value: Array[Byte]) { - - def this(key: String, value: String) = this(key, toBytes(value)) + def this(key: String, value: String) = { + this(key, toBytes(value)) + } } /** @@ -185,7 +195,9 @@ object RedisMessage { * @param value */ case class RPUSHX(key: Array[Byte], value: Array[Byte]) { - def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + def this(key: String, value: String) = { + this(toBytes(key), toBytes(value)) + } } } @@ -198,8 +210,9 @@ object RedisMessage { * @param message */ case class DEL(message: Array[Byte]) { - - def this(message: String) = this(toBytes(message)) + def this(message: String) = { + this(toBytes(message)) + } } /** @@ -208,7 +221,9 @@ object RedisMessage { * @param key */ case class EXPIRE(key: Array[Byte], seconds: Int) { - def this(key: String, seconds: Int) = this(toBytes(key), seconds) + def this(key: String, seconds: Int) = { + this(toBytes(key), seconds) + } } /** @@ -218,7 +233,9 @@ object RedisMessage { * @param timestamp */ case class EXPIREAT(key: Array[Byte], timestamp: Long) { - def this(key: String, timestamp: Long) = this(toBytes(key), timestamp) + def this(key: String, timestamp: Long) = { + this(toBytes(key), timestamp) + } } /** @@ -230,9 +247,11 @@ object RedisMessage { * @param database * @param timeout */ - case class MIGRATE(host: Array[Byte], port: Int, key: Array[Byte], database: Int, timeout: Int) { - def this(host: String, port: Int, key: String, database: Int, timeout: Int) = + case class MIGRATE(host: Array[Byte], port: Int, key: Array[Byte], + database: Int, timeout: Int) { + def this(host: String, port: Int, key: String, database: Int, timeout: Int) = { this(toBytes(host), port, toBytes(key), database, timeout) + } } /** @@ -242,7 +261,9 @@ object RedisMessage { * @param db */ case class MOVE(key: Array[Byte], db: Int) { - def this(key: String, db: Int) = this(toBytes(key), db) + def this(key: String, db: Int) = { + this(toBytes(key), db) + } } /** @@ -251,7 +272,9 @@ object RedisMessage { * @param key */ case class PERSIST(key: Array[Byte]) { - def this(key: String) = this(toBytes(key)) + def this(key: String) = { + this(toBytes(key)) + } } /** @@ -261,7 +284,9 @@ object RedisMessage { * @param milliseconds */ case class PEXPIRE(key: Array[Byte], milliseconds: Long) { - def this(key: String, milliseconds: Long) = this(toBytes(key), milliseconds) + def this(key: String, milliseconds: Long) = { + this(toBytes(key), milliseconds) + } } /** @@ -271,7 +296,9 @@ object RedisMessage { * @param timestamp */ case class PEXPIREAT(key: Array[Byte], timestamp: Long) { - def this(key: String, milliseconds: Long) = this(toBytes(key), milliseconds) + def this(key: String, milliseconds: Long) = { + this(toBytes(key), milliseconds) + } } /** @@ -281,7 +308,9 @@ object RedisMessage { * @param newKey */ case class RENAME(key: Array[Byte], newKey: Array[Byte]) { - def this(key: String, newKey: String) = this(toBytes(key), toBytes(newKey)) + def this(key: String, newKey: String) = { + this(toBytes(key), toBytes(newKey)) + } } /** @@ -291,7 +320,9 @@ object RedisMessage { * @param newKey */ case class RENAMENX(key: Array[Byte], newKey: Array[Byte]) { - def this(key: String, newKey: String) = this(toBytes(key), toBytes(newKey)) + def this(key: String, newKey: String) = { + this(toBytes(key), toBytes(newKey)) + } } } @@ -306,8 +337,9 @@ object RedisMessage { * @param members */ case class SADD(key: Array[Byte], members: Array[Byte]) { - - def this(key: String, members: String) = this(key, toBytes(members)) + def this(key: String, members: String) = { + this(key, toBytes(members)) + } } @@ -319,8 +351,9 @@ object RedisMessage { * @param member */ case class SMOVE(source: Array[Byte], destination: Array[Byte], member: Array[Byte]) { - def this(source: String, destination: String, member: String) = + def this(source: String, destination: String, member: String) = { this(toBytes(source), toBytes(destination), toBytes(member)) + } } @@ -331,8 +364,9 @@ object RedisMessage { * @param member */ case class SREM(key: Array[Byte], member: Array[Byte]) { - - def this(key: String, member: String) = this(key, toBytes(member)) + def this(key: String, member: String) = { + this(key, toBytes(member)) + } } } @@ -346,7 +380,9 @@ object RedisMessage { * @param value */ case class APPEND(key: Array[Byte], value: Array[Byte]) { - def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + def this(key: String, value: String) = { + this(toBytes(key), toBytes(value)) + } } /** @@ -355,7 +391,9 @@ object RedisMessage { * @param key */ case class DECR(key: Array[Byte]) { - def this(key: String) = this(toBytes(key)) + def this(key: String) = { + this(toBytes(key)) + } } /** @@ -365,7 +403,9 @@ object RedisMessage { * @param decrement */ case class DECRBY(key: Array[Byte], decrement: Int) { - def this(key: String, decrement: Int) = this(toBytes(key), decrement) + def this(key: String, decrement: Int) = { + this(toBytes(key), decrement) + } } /** @@ -374,7 +414,9 @@ object RedisMessage { * @param key */ case class INCR(key: Array[Byte]) { - def this(key: String) = this(toBytes(key)) + def this(key: String) = { + this(toBytes(key)) + } } /** @@ -384,7 +426,9 @@ object RedisMessage { * @param increment */ case class INCRBY(key: Array[Byte], increment: Int) { - def this(key: String, increment: Int) = this(toBytes(key), increment) + def this(key: String, increment: Int) = { + this(toBytes(key), increment) + } } /** @@ -394,7 +438,9 @@ object RedisMessage { * @param increment */ case class INCRBYFLOAT(key: Array[Byte], increment: Double) { - def this(key: String, increment: Number) = this(toBytes(key), increment) + def this(key: String, increment: Number) = { + this(toBytes(key), increment) + } } @@ -405,7 +451,9 @@ object RedisMessage { * @param value */ case class SET(key: Array[Byte], value: Array[Byte]) { - def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + def this(key: String, value: String) = { + this(toBytes(key), toBytes(value)) + } } /** @@ -416,7 +464,9 @@ object RedisMessage { * @param value */ case class SETBIT(key: Array[Byte], offset: Long, value: Array[Byte]) { - def this(key: String, offset: Long, value: String) = this(toBytes(key), offset, toBytes(value)) + def this(key: String, offset: Long, value: String) = { + this(toBytes(key), offset, toBytes(value)) + } } /** @@ -427,7 +477,9 @@ object RedisMessage { * @param value */ case class SETEX(key: Array[Byte], seconds: Int, value: Array[Byte]) { - def this(key: String, seconds: Int, value: String) = this(toBytes(key), seconds, toBytes(value)) + def this(key: String, seconds: Int, value: String) = { + this(toBytes(key), seconds, toBytes(value)) + } } /** @@ -437,7 +489,9 @@ object RedisMessage { * @param value */ case class SETNX(key: Array[Byte], value: Array[Byte]) { - def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + def this(key: String, value: String) = { + this(toBytes(key), toBytes(value)) + } } /** @@ -448,9 +502,9 @@ object RedisMessage { * @param value */ case class SETRANGE(key: Array[Byte], offset: Int, value: Array[Byte]) { - def this(key: String, offset: Int, value: String) = this(toBytes(key), offset, toBytes(value)) + def this(key: String, offset: Int, value: String) = { + this(toBytes(key), offset, toBytes(value)) + } } - } - } diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala index 3f7594907..36a9fe3ab 100644 --- a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala +++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala @@ -32,20 +32,20 @@ import redis.clients.jedis.Jedis import redis.clients.jedis.Protocol.{DEFAULT_DATABASE, DEFAULT_HOST, DEFAULT_PORT, DEFAULT_TIMEOUT} /** - * Save message in Redis Instance - * - * @param host - * @param port - * @param timeout - * @param database - * @param password - */ + * Save message in Redis Instance + * + * @param host + * @param port + * @param timeout + * @param database + * @param password + */ class RedisSink( - host: String = DEFAULT_HOST, - port: Int = DEFAULT_PORT, - timeout: Int = DEFAULT_TIMEOUT, - database: Int = DEFAULT_DATABASE, - password: String = "") extends DataSink { + host: String = DEFAULT_HOST, + port: Int = DEFAULT_PORT, + timeout: Int = DEFAULT_TIMEOUT, + database: Int = DEFAULT_DATABASE, + password: String = "") extends DataSink { private val LOG = LogUtil.getLogger(getClass) @transient private lazy val client = new Jedis(host, port, timeout) @@ -59,7 +59,6 @@ class RedisSink( } override def write(message: Message): Unit = { - message.msg match { // GEO case msg: GEOADD => client.geoadd(msg.key, msg.longitude, msg.latitude, msg.member) From 9c16b0dc7a521bb76053d8472337bfa13bd2c265 Mon Sep 17 00:00:00 2001 From: huafengw Date: Fri, 14 Oct 2016 12:48:31 +0800 Subject: [PATCH 3/4] minor --- .../org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala index bc38553ca..049f36d44 100644 --- a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala +++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala @@ -25,7 +25,7 @@ import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, _} import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication, SubmitApplication} import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterRegistered, AppMastersData, AppMastersDataRequest, _} import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult} -import org.apache.gearpump.cluster.master.AppManager +import org.apache.gearpump.cluster.master.{AppMasterLauncherFactory, AppManager} import org.apache.gearpump.cluster.master.AppManager._ import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKV, GetKVSuccess, PutKV, PutKVSuccess} import org.apache.gearpump.cluster.{TestUtil, _} @@ -165,7 +165,6 @@ class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with } class DummyAppMasterLauncherFactory(test: TestProbe) extends AppMasterLauncherFactory { - override def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar], username: String, master: ActorRef, client: Option[ActorRef]): Props = { Props(new DummyAppMasterLauncher(test, appId)) From 5f32c55283a80f3dd3bbbd82df8115ea1018fe0a Mon Sep 17 00:00:00 2001 From: huafengw Date: Fri, 14 Oct 2016 14:37:54 +0800 Subject: [PATCH 4/4] minor --- .../org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala index 049f36d44..f9b0762e4 100644 --- a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala +++ b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala @@ -172,8 +172,8 @@ class DummyAppMasterLauncherFactory(test: TestProbe) extends AppMasterLauncherFa } class DummyAppMasterLauncher(test: TestProbe, appId: Int) extends Actor { - test.ref ! LauncherStarted(appId) + override def receive: Receive = { case any: Any => test.ref forward any }