Skip to content

Commit

Permalink
update to new config style
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanking committed Jun 21, 2011
1 parent 2afb616 commit 9334f84
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 100 deletions.
28 changes: 0 additions & 28 deletions config/development.conf

This file was deleted.

22 changes: 22 additions & 0 deletions config/development.scala
@@ -0,0 +1,22 @@
import com.twitter.service.snowflake.{SnowflakeConfig, ReporterConfig}

new SnowflakeConfig {
val serverPort: Int = 7609
val datacenterId: Int = 0
val workerId: Int = 0
val adminPort: Int = 9990
val adminBacklog: Int = 100
val workerIdZkPath: String = "/snowflake-servers"
val zkHostlist: String = "localhost"
val skipSanityChecks: Boolean = false
val startupSleepMs: Int = 10000
val thriftServerThreads: Int = 2

val reporterConfig = new ReporterConfig {
val scribeCategory = "snowflake"
val scribeHost = "localhost"
val scribePort = 1463
val scribeSocketTimeout = 5000
val flushQueueLimit = 100000
}
}
5 changes: 3 additions & 2 deletions src/main/scala/com/twitter/service/snowflake/IdWorker.scala
Expand Up @@ -12,11 +12,12 @@ import java.util.Random
* we ever want to support multiple worker threads
* per process
*/
class IdWorker(workerId: Long, datacenterId: Long) extends Snowflake.Iface {
class IdWorker(workerId: Long, datacenterId: Long, reporterConfig: ReporterConfig) extends Snowflake.Iface {
private val log = Logger.get
private val genCounter = Stats.getCounter("ids_generated")
private val exceptionCounter = Stats.getCounter("exceptions")
private val reporter = new Reporter
private val reporter = new Reporter(reporterConfig.scribeCategory, reporterConfig.scribeHost,
reporterConfig.scribePort, reporterConfig.scribeSocketTimeout, reporterConfig.flushQueueLimit)
private val rand = new Random

val twepoch = 1288834974657L
Expand Down
17 changes: 6 additions & 11 deletions src/main/scala/com/twitter/service/snowflake/Reporter.scala
Expand Up @@ -9,7 +9,6 @@ import java.net.ConnectException
import java.net.Socket
import java.util.ArrayList
import java.util.concurrent.LinkedBlockingDeque
import net.lag.configgy._
import net.lag.logging.Logger
import org.apache.commons.codec.binary.Base64;
import org.apache.thrift.transport.TIOStreamTransport;
Expand All @@ -19,15 +18,11 @@ import org.apache.thrift.protocol.{TBinaryProtocol, TProtocolFactory}
import org.apache.thrift.transport.{TTransportException, TFramedTransport, TSocket}
import org.apache.thrift.{TBase, TException, TFieldIdEnum, TSerializer, TDeserializer}

class Reporter {
class Reporter(scribeCategory: String, scribeHost: String, scribePort: Int,
scribeSocketTimeout: Int, flushQueueLimit: Int) {
private val log = Logger.get(getClass.getName)
private val config = Configgy.config.configMap("snowflake.reporter")
private val scribe_category = config("scribe_category")
private val scribe_host = config("scribe_host")
private val scribe_port = config("scribe_port").toInt
private val scribe_socket_timeout = config("scribe_socket_timeout").toInt

val queue = new LinkedBlockingDeque[TBase[_,_]](config("flush_queue_limit").toInt)
val queue = new LinkedBlockingDeque[TBase[_,_]](flushQueueLimit)
private val structs = new ArrayList[TBase[_,_]](100)
private val entries = new ArrayList[LogEntry](100)
private var scribeClient: Option[Client] = None
Expand All @@ -45,7 +40,7 @@ class Reporter {
queue.drainTo(structs, 100)
if (structs.size > 0) {
for (i <- 0 until structs.size) {
entries.add(i, new LogEntry(scribe_category, serialize(structs.get(i))))
entries.add(i, new LogEntry(scribeCategory, serialize(structs.get(i))))
}
scribeClient.get.Log(entries)
log.trace("reported %d items to scribe. queue is %d".format(entries.size, queue.size))
Expand Down Expand Up @@ -79,8 +74,8 @@ class Reporter {
private def connect {
while(scribeClient.isEmpty) {
try {
log.debug("connection to scribe at %s:%d with timeout %d".format(scribe_host, scribe_port, scribe_socket_timeout))
var sock = new TSocket(scribe_host, scribe_port, scribe_socket_timeout)
log.debug("connection to scribe at %s:%d with timeout %d".format(scribeHost, scribePort, scribeSocketTimeout))
var sock = new TSocket(scribeHost, scribePort, scribeSocketTimeout)
sock.open()
var transport = new TFramedTransport(sock)
var protocol = new TBinaryProtocol(transport, false, false)
Expand Down
95 changes: 53 additions & 42 deletions src/main/scala/com/twitter/service/snowflake/SnowflakeServer.scala
Expand Up @@ -21,54 +21,63 @@ import com.twitter.ostrich.stats.Stats
import com.twitter.ostrich.admin.config.ServerConfig
import com.twitter.ostrich.admin.Service

trait ReporterConfig {
val scribeCategory: String
val scribeHost: String
val scribePort: Int
val scribeSocketTimeout: Int
val flushQueueLimit: Int
}

trait SnowflakeConfig extends ServerConfig[SnowflakeServer] {
val serverPort: Int
val datacenterId: Int
val workerId: Int
val adminHttpPort: Int
val adminHttpBacklog: Int
val adminPort: Int
val adminBacklog: Int
val workerIdZkPath: String
val zkHostlist: String
val skipSanityChecks: Boolean
val startupSleepMs: Int
val thriftServerThreads: Int

val reporterConfig: ReporterConfig

def apply(runtime: RuntimeEnvironment) = {
new SnowflakeServer(serverPort, datacenterId, workerId, adminHttpPort,
adminHttpBacklog, workerIdZkPath, zkHostlist, skipSanityChecks, startupSleepMs,
thriftServerThreads)
new SnowflakeServer(this)
}
}

case class Peer(hostname: String, port: Int)

object SnowflakeServer {
def main(args: Array[String]) {
val runtime = new RuntimeEnvironment(this, args)
val server = runtime.loadRuntimeConfig[SnowflakeServer]()
try {
server.start
} catch {
case e: Exception =>
e.printStackTrace()
println(e, "Unexpected exception: %s", e.getMessage)
System.exit(0)
}
}
val runtime = new RuntimeEnvironment(this)
runtime.parseArgs(args.toList)

val server = runtime.loadRuntimeConfig[SnowflakeServer]()
try {
server.start
} catch {
case e: Exception =>
e.printStackTrace()
println(e, "Unexpected exception: %s", e.getMessage)
System.exit(0)
}
}
}

class SnowflakeServer(serverPort: Int, datacenterId: Int, workerId: Int, adminPort: Int,
adminBacklog: Int, workerIdZkPath: String, zkHostlist: String, skipSanityChecks: Boolean,
startupSleepMs: Int, thriftServerThreads: Int) extends Service {
class SnowflakeServer(config: SnowflakeConfig) extends Service {
private val log = Logger.get
var server: TServer = null
lazy val zkClient = {
log.info("Creating ZooKeeper client connected to %s", zkHostlist)
new ZooKeeperClient(zkHostlist)
log.info("Creating ZooKeeper client connected to %s", config.zkHostlist)
new ZooKeeperClient(config.zkHostlist)
}

Stats.addGauge("datacenter_id") { datacenterId }
Stats.addGauge("worker_id") { workerId }
Stats.addGauge("datacenter_id") { config.datacenterId }
Stats.addGauge("worker_id") { config.workerId }

def shutdown(): Unit = {
if (server != null) {
log.info("Shutting down.")
Expand All @@ -78,41 +87,42 @@ class SnowflakeServer(serverPort: Int, datacenterId: Int, workerId: Int, adminPo
}

def start {
if (!skipSanityChecks) {
if (!config.skipSanityChecks) {
sanityCheckPeers()
}
registerWorkerId(workerId)
val admin = new AdminService(adminPort, adminBacklog, new RuntimeEnvironment(getClass))

Thread.sleep(startupSleepMs)
registerWorkerId(config.workerId)
val admin = new AdminService(config.adminPort, config.adminBacklog, new RuntimeEnvironment(getClass))

Thread.sleep(config.startupSleepMs)

try {
val worker = new IdWorker(workerId, datacenterId)
val worker = new IdWorker(config.workerId, config.datacenterId, config.reporterConfig)

val processor = new Snowflake.Processor(worker)
val transport = new TNonblockingServerSocket(serverPort)
val transport = new TNonblockingServerSocket(config.serverPort)
val serverOpts = new THsHaServer.Options
serverOpts.workerThreads = thriftServerThreads
serverOpts.workerThreads = config.thriftServerThreads

val server = new THsHaServer(processor, transport, serverOpts)

log.info("Starting server on port %s with workerThreads=%s", serverPort, serverOpts.workerThreads)
log.info("Starting server on port %s with workerThreads=%s", config.serverPort, serverOpts.workerThreads)
server.serve()
} catch {
case e: Exception => {
log.error(e, "Unexpected exception while initializing server: %s", e.getMessage)
throw e
}
}

}

def registerWorkerId(i: Int):Unit = {
log.info("trying to claim workerId %d", i)
var tries = 0
while (true) {
try {
zkClient.create("%s/%s".format(workerIdZkPath, i), (getHostname + ':' + serverPort).getBytes(), EPHEMERAL)
zkClient.create("%s/%s".format(config.workerIdZkPath, i),
(getHostname + ':' + config.serverPort).getBytes(), EPHEMERAL)
return
} catch {
case e: NodeExistsException => {
Expand All @@ -132,17 +142,17 @@ class SnowflakeServer(serverPort: Int, datacenterId: Int, workerId: Int, adminPo
def peers(): mutable.HashMap[Int, Peer] = {
var peerMap = new mutable.HashMap[Int, Peer]
try {
zkClient.get(workerIdZkPath)
zkClient.get(config.workerIdZkPath)
} catch {
case _ => {
log.info("%s missing, trying to create it", workerIdZkPath)
zkClient.create(workerIdZkPath, Array(), PERSISTENT)
log.info("%s missing, trying to create it", config.workerIdZkPath)
zkClient.create(config.workerIdZkPath, Array(), PERSISTENT)
}
}

val children = zkClient.getChildren(workerIdZkPath)
val children = zkClient.getChildren(config.workerIdZkPath)
children.foreach { i =>
val peer = zkClient.get("%s/%s".format(workerIdZkPath, i))
val peer = zkClient.get("%s/%s".format(config.workerIdZkPath, i))
val list = new String(peer).split(':')
peerMap(i.toInt) = new Peer(new String(list(0)), list(1).toInt)
}
Expand All @@ -154,7 +164,7 @@ class SnowflakeServer(serverPort: Int, datacenterId: Int, workerId: Int, adminPo
def sanityCheckPeers() {
var peerCount = 0
val timestamps = peers().filter{ case (id: Int, peer: Peer) =>
!(peer.hostname == getHostname && peer.port == serverPort)
!(peer.hostname == getHostname && peer.port == config.serverPort)
}.map { case (id: Int, peer: Peer) =>
try {
log.info("connecting to %s:%s".format(peer.hostname, peer.port))
Expand All @@ -166,16 +176,17 @@ class SnowflakeServer(serverPort: Int, datacenterId: Int, workerId: Int, adminPo
}

val reportedDatacenterId = c.get_datacenter_id()
if (reportedDatacenterId != datacenterId) {
log.error("Worker at %s:%s has datacenter_id %d, but ours is %d", peer.hostname, peer.port, reportedDatacenterId, datacenterId)
if (reportedDatacenterId != config.datacenterId) {
log.error("Worker at %s:%s has datacenter_id %d, but ours is %d",
peer.hostname, peer.port, reportedDatacenterId, config.datacenterId)
throw new IllegalStateException("Datacenter id insanity.")
}

peerCount = peerCount + 1
c.get_timestamp().toLong
} catch {
case e: TTransportException => {
log.error("Couldn't talk to peer %s at %s:%s", workerId, peer.hostname, peer.port)
log.error("Couldn't talk to peer %s at %s:%s", config.workerId, peer.hostname, peer.port)
throw e
}
}
Expand Down

0 comments on commit 9334f84

Please sign in to comment.