Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
164 lines (136 sloc) 6.22 KB
package net.pierreandrews
import{ FilenameFilter, File }
import{ Props, ActorSystem }
import com.quantifind.sumac.validation.{ Positive, Required }
import com.quantifind.sumac.{ FieldArgs, ArgMain }
import com.typesafe.config.ConfigFactory
import net.pierreandrews.utils.LogSplitUtils
* # The Main Application
* This will run on each single server; it will initialize the actor system on
* the current server and setup the cluster properties.
* See `LogSplitApp --help` and LogSplitAppArgs to see the valid and required command line arguments.
* The default is to run on a localhost cluster, where we have a node at `localhost:2550`, `localhost:2551` and
* `localhost:2552`. You should start at least three JVMs one these ports (using `--port 2550`, etc.). You
* can run the node on separate machines, then you have to specify `--seeds` to point to the other machines, or update
* `application.conf`
* User: pierre
* Date: 11/28/14
object LogSplitApp extends ArgMain[LogSplitAppArgs] {
//we assume that the log files are split in buckets and follow this naming pattern (see below)
private final val validLogFile = ".[0-9]+.log$".r
override def main(args: LogSplitAppArgs): Unit = {
//## setup the actorsystem and cluster connection
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=${args.port}")
//if we have a set of seeds on the command line
val configWithSeeds = { seedSeq =>
val seedStr = => s"""akka.tcp://ClusterSystem@$ip""").mkString(",")
//and finally load application.conf and the configuration stack
// as defined by [HOCON](
//## start the system
val system = ActorSystem(s"ClusterSystem", configWithSeeds)
//## start some actors
//- setup some file readers
startReaders(args, system)
//- setup the sorter manager for this node
val sorter = system.actorOf(Props(new SorterActor(args)), name = "sorter")
//- setup the writer manager for this node
system.actorOf(Props(new WriterActor(args, sorter)), name = "writer")
//## setup the reader workers
def startReaders(args: LogSplitAppArgs, system: ActorSystem): Unit = {
//`listFiles` might return `null`, wrap in `Option`.
val files: Seq[File] = Option(args.input.listFiles(new FilenameFilter {
override def accept(dir: File, name: String): Boolean = return validLogFile.findFirstIn(name).isDefined
.sortBy { file =>
//we assume that log files will have the form: filename.NUMBER.log
// and that filename.0.log has more recent logs than filename.1.log, etc.
// (this is a bit naive maybe)
val name = file.getName
val dotIdx = name.indexOf('.')
val secondDot = name.indexOf('.', dotIdx + 1)
val partID = name.substring(dotIdx + 1, secondDot)
// distribute the input files between each workers
// zipWithIndex assignes them an id
val readerSplit = LogSplitUtils.cut(files, args.numReaderWorkers).toSeq
readerSplit.zipWithIndex.foreach {
case (files, partIdx) =>
// for each input split, start a reader that will read the files in
// parallel
system.actorOf(Props(new ReaderActor(args, files, partIdx, readerSplit.size)), name = s"reader-$partIdx")
* # Command line argument definition, using [Sumac](
* These settings are local to a node/server.
class LogSplitAppArgs extends FieldArgs {
//- The id of the server, this is required
// the ID has to be positive and is used to assign users to
// this server (partition)
var serverID: Int = -1
//- What port are we running in.
// It is important that this corresponds to the settings in application.conf
// or provided by `--seeds`
var port: Int = 0
//- Where are we reading the logs from
var input: File = _
//- Where the files going to be output
// part files go here too
var output: File = _
//- How many servers are there.
// This is set to default to 3.
// It is important that this is accurate as it is used to assign users to partitions.
// Each node in the cluster should be configured with the same `numServers`
var numServers: Int = 3
//- How many writer workers do we want on this server
var numWriteWorkers: Int = 60
//- How many file handles should EACH writer worker keep cached. See the `WriterWorkerActor` for more details
var maxWriteOpen: Int = 20
//- How many reader workers do we want on this server?
var numReaderWorkers: Int = 60
//- We buffer log lines in memory,
// this is useful to unblock reads when a particular partition is slower than another one
// a higher value should increase the throughput but will require more memory.
// Each reader will load that amount of lines for each partition in memory, so the actual lines
// in memory will be potentially equal to `numReaderWorkers * numServers*maxReadBuffer`
var maxReadBuffer: Int = 10000
//- Once the log distribution is done, we are left with a number of part files coming from each server
// each part file is sorted but all the parts are not globally sorted. Once we are done collecting logs,
// we have to sort the lines again. How many parallel sorters should we use.
var numSortWorkers: Int = 60
//- Should we delete the partially sorted part files when we are done?
var deletePartFiles: Boolean = true
//- Settings for the cluster seeds. The defaults are set in `application.conf`. This has to be specified as a list of string IP with port:
// e.g. `--seeds,,`
// at least one seed should be setup, the other nodes should be able to auto-discover each other from this.
var seeds: Option[Seq[String]] = None
addValidation {
require(port >= 0, "port cannot be negative")
require(serverID >= 0, "serverID cannot be negative")