Skip to content
This repository has been archived by the owner on May 22, 2019. It is now read-only.

Commit

Permalink
Check in first pass at new NameServer API
Browse files Browse the repository at this point in the history
note that tests dont pass atm
  • Loading branch information
Ed Ceaser committed Mar 18, 2010
1 parent e767fc0 commit cc3026e
Show file tree
Hide file tree
Showing 14 changed files with 188 additions and 125 deletions.
4 changes: 2 additions & 2 deletions src/main/scala/com/twitter/gizzard/jobs/CopyMachine.scala
Expand Up @@ -7,7 +7,7 @@ import shards.{Busy, Shard, ShardTimeoutException}
import nameserver.{NameServer, NonExistentShard}


object CopyMachine {
/*object CopyMachine {
val MIN_COPY = 500
def pack(sourceShardId: Int, destinationShardId: Int, count: Int): Map[String, AnyVal] = {
Expand Down Expand Up @@ -66,4 +66,4 @@ abstract class CopyMachine[S <: Shard](attributes: Map[String, AnyVal]) extends
protected def copyPage(sourceShard: S, destinationShard: S, count: Int): Map[String, AnyVal]
protected def finished: Boolean
}
} */
@@ -0,0 +1,79 @@
package com.twitter.gizzard.nameserver

import java.util.TreeMap
import scala.collection.mutable
import shards._


class CachingNameServer(nameServer: ManagingNameServer, mappingFunction: Long => Long)
extends ForwardingNameServer with ManagingNameServer {
val children = List()
val shardInfo = new ShardInfo("com.twitter.gizzard.nameserver.CachingNameServer", "", "")
val weight = 1 // hardcode for now

@volatile protected var shardInfos = mutable.Map.empty[Int, ShardInfo]
@volatile private var familyTree: scala.collection.Map[Int, Seq[ChildInfo]] = null
@volatile private var forwardings: scala.collection.Map[List[Int], TreeMap[Long, ShardInfo]] = null

reload()

def getShardInfo(id: Int) = shardInfos(id)

def reload() {
val newShardInfos = mutable.Map.empty[Int, ShardInfo]
nameServer.listShards().foreach { shardInfo =>
newShardInfos += (shardInfo.shardId -> shardInfo)
}

val newFamilyTree = nameServer.listShardChildren()

val newForwardings = new mutable.HashMap[List[Int], TreeMap[Long, ShardInfo]]
nameServer.getForwardings().foreach { forwarding =>
val treeMap = newForwardings.getOrElseUpdate(forwarding.tableId, new TreeMap[Long, ShardInfo])
treeMap.put(forwarding.baseId, newShardInfos(forwarding.shardId))
}

shardInfos = newShardInfos
familyTree = newFamilyTree
forwardings = newForwardings
}

def findCurrentForwarding(tableId: List[Int], id: Long) = {
forwardings.get(tableId).flatMap { bySourceIds =>
val item = bySourceIds.floorEntry(mappingFunction(id))
if (item != null) {
Some(item.getValue)
} else {
None
}
} getOrElse {
throw new NonExistentShard
}
}

// delegation
def listShardChildren(parentId: Int) = nameServer.listShardChildren(parentId)
def createShard(shardInfo: ShardInfo) = nameServer.createShard(shardInfo)
def findShard(shardInfo: ShardInfo) = nameServer.findShard(shardInfo)
def getShard(shardId: Int) = nameServer.getShard(shardId)
def updateShard(shardInfo: ShardInfo) = nameServer.updateShard(shardInfo)
def deleteShard(shardId: Int) = nameServer.deleteShard(shardId)
def addChildShard(parentShardId: Int, childShardId: Int, weight: Int) = nameServer.addChildShard(parentShardId, childShardId, weight)
def removeChildShard(parentShardId: Int, childShardId: Int) = nameServer.removeChildShard(parentShardId, childShardId)
def replaceChildShard(oldChildShardId: Int, newChildShardId: Int) = nameServer.replaceChildShard(oldChildShardId, newChildShardId)
def markShardBusy(shardId: Int, busy: Busy.Value) = nameServer.markShardBusy(shardId, busy)
def setForwarding(forwarding: Forwarding) = nameServer.setForwarding(forwarding)
def replaceForwarding(oldShardId: Int, newShardId: Int) = nameServer.replaceForwarding(oldShardId, newShardId)
def getForwarding(tableId: List[Int], baseId: Long) = nameServer.getForwarding(tableId, baseId)
def getForwardingForShard(shardId: Int) = nameServer.getForwardingForShard(shardId)
def getForwardings() = nameServer.getForwardings()
def shardIdsForHostname(hostname: String, className: String) = nameServer.shardIdsForHostname(hostname, className)
def listShards() = nameServer.listShards()
def listShardChildren() = nameServer.listShardChildren()
def shardsForHostname(hostname: String, className: String) = nameServer.shardsForHostname(hostname, className)
def getBusyShards() = nameServer.getBusyShards()
def getParentShard(shardId: Int) = nameServer.getParentShard(shardId)
def getRootShard(shardId: Int) = nameServer.getRootShard(shardId)
def getChildShardsOfClass(parentShardId: Int, className: String) = nameServer.getChildShardsOfClass(parentShardId, className)
def rebuildSchema() = nameServer.rebuildSchema()
}
10 changes: 5 additions & 5 deletions src/main/scala/com/twitter/gizzard/nameserver/CopyManager.scala
Expand Up @@ -4,13 +4,13 @@ import jobs.{CopyMachine, JobScheduler}
import shards.Shard


trait CopyManager[S <: Shard] {
/* trait CopyManager[S <: Shard] {
/** Return a scheduler to be used for running copy/migrate jobs. */
def scheduler: JobScheduler
// def scheduler: JobScheduler
/** Create a new CopyMachine job for copying one shard to another. */
def newCopyJob(sourceShardId: Int, destinationShardId: Int): CopyMachine[S]
// def newCopyJob(sourceShardId: Int, destinationShardId: Int): CopyMachine[S]
/** Create a new CopyMachine job for migrating one shard to another. */
def newMigrateJob(migration: ShardMigration): CopyMachine[S]
}
//def newMigrateJob(migration: ShardMigration): CopyMachine[S]
//} */
@@ -0,0 +1,12 @@
package com.twitter.gizzard.nameserver

import shards._


trait ForwardingNameServer {
def findCurrentForwarding(tableId: List[Int], id: Long): ShardInfo
def getShardInfo(shardId: Int): ShardInfo
// def findShardById(shardId: Int, weight: Int): ShardInfo
// def findShardById(shardId: Int): ShardInfo = findShardById(shardId, 1)
}

@@ -0,0 +1,35 @@
package com.twitter.gizzard.nameserver

import shards._
import scala.collection.Map


trait ManagingNameServer extends Shard {
def createShard(shardInfo: ShardInfo): Int
def findShard(shardInfo: ShardInfo): Int
def getShard(shardId: Int): ShardInfo
def updateShard(shardInfo: ShardInfo)
def deleteShard(shardId: Int)
def addChildShard(parentShardId: Int, childShardId: Int, weight: Int)
def removeChildShard(parentShardId: Int, childShardId: Int)
def replaceChildShard(oldChildShardId: Int, newChildShardId: Int)
def markShardBusy(shardId: Int, busy: Busy.Value)
def setForwarding(forwarding: Forwarding)
def replaceForwarding(oldShardId: Int, newShardId: Int)
def getForwarding(tableId: List[Int], baseId: Long): ShardInfo
def getForwardingForShard(shardId: Int): Forwarding
def getForwardings(): List[Forwarding]
def shardIdsForHostname(hostname: String, className: String): List[Int]
def listShards(): Seq[ShardInfo]
def listShardChildren(): Map[Int, Seq[ChildInfo]]
def shardsForHostname(hostname: String, className: String): List[ShardInfo]
def getBusyShards(): Seq[ShardInfo]
def getParentShard(shardId: Int): ShardInfo
def getRootShard(shardId: Int): ShardInfo
def getChildShardsOfClass(parentShardId: Int, className: String): List[ShardInfo]
def rebuildSchema()
def listShardChildren(parentId: Int): Seq[ChildInfo]
}

// def reloadForwardings()
// def reload()
29 changes: 0 additions & 29 deletions src/main/scala/com/twitter/gizzard/nameserver/NameServer.scala
Expand Up @@ -2,35 +2,6 @@ package com.twitter.gizzard.nameserver

import shards._

trait NameServer[S <: Shard] extends Shard {
def createShard(shardInfo: ShardInfo): Int
def findShard(shardInfo: ShardInfo): Int
def getShard(shardId: Int): ShardInfo
def updateShard(shardInfo: ShardInfo)
def deleteShard(shardId: Int)
def addChildShard(parentShardId: Int, childShardId: Int, weight: Int)
def removeChildShard(parentShardId: Int, childShardId: Int)
def replaceChildShard(oldChildShardId: Int, newChildShardId: Int)
def listShardChildren(shardId: Int): Seq[ChildInfo]
def markShardBusy(shardId: Int, busy: Busy.Value)
def setForwarding(forwarding: Forwarding)
def replaceForwarding(oldShardId: Int, newShardId: Int)
def getForwarding(tableId: List[Int], baseId: Long): ShardInfo
def getForwardingForShard(shardId: Int): Forwarding
def getForwardings(): List[Forwarding]
def findCurrentForwarding(tableId: List[Int], id: Long): S
def reloadForwardings()
def shardIdsForHostname(hostname: String, className: String): List[Int]
def shardsForHostname(hostname: String, className: String): List[ShardInfo]
def getBusyShards(): Seq[ShardInfo]
def getParentShard(shardId: Int): ShardInfo
def getRootShard(shardId: Int): ShardInfo
def getChildShardsOfClass(parentShardId: Int, className: String): List[ShardInfo]
def reload()
def findShardById(shardId: Int, weight: Int): S
def findShardById(shardId: Int): S = findShardById(shardId, 1)
def rebuildSchema()
}

class NonExistentShard extends ShardException("Shard does not exist")
class InvalidShard extends ShardException("Shard has invalid attributes (such as hostname)")
Expand Up @@ -3,8 +3,7 @@ package com.twitter.gizzard.nameserver
import shards._


trait ReadWriteNameServer[S <: Shard] extends NameServer[S] with ReadWriteShard[NameServer[S]] {
def findCurrentForwarding(tableId: List[Int], id: Long) = readOperation(_.findCurrentForwarding(tableId, id))
trait ReadWriteNameServer extends ManagingNameServer with ReadWriteShard[ManagingNameServer] {
def findShard(shardInfo: ShardInfo) = readOperation(_.findShard(shardInfo))
def getBusyShards() = readOperation(_.getBusyShards())
def getChildShardsOfClass(parentShardId: Int, className: String) = readOperation(_.getChildShardsOfClass(parentShardId, className))
Expand All @@ -14,7 +13,9 @@ trait ReadWriteNameServer[S <: Shard] extends NameServer[S] with ReadWriteShard[
def getParentShard(shardId: Int) = readOperation(_.getParentShard(shardId))
def getRootShard(shardId: Int) = readOperation(_.getRootShard(shardId))
def getShard(shardId: Int) = readOperation(_.getShard(shardId))
def listShardChildren(shardId: Int) = readOperation(_.listShardChildren(shardId))
def listShardChildren(parentId: Int) = readOperation(_.listShardChildren(parentId))
def listShardChildren() = readOperation(_.listShardChildren())
def listShards() = readOperation(_.listShards())
def shardIdsForHostname(hostname: String, className: String): List[Int] = readOperation(_.shardIdsForHostname(hostname, className))
def shardsForHostname(hostname: String, className: String): List[ShardInfo] = readOperation(_.shardsForHostname(hostname, className))

Expand All @@ -29,7 +30,4 @@ trait ReadWriteNameServer[S <: Shard] extends NameServer[S] with ReadWriteShard[
def updateShard(shardInfo: ShardInfo) = writeOperation(_.updateShard(shardInfo))

def rebuildSchema() = readOperation(_.rebuildSchema())
def findShardById(shardId: Int, weight: Int) = readOperation(_.findShardById(shardId, weight))
def reload() = readOperation(_.reload())
def reloadForwardings() = readOperation(_.reloadForwardings())
}
Expand Up @@ -4,6 +4,6 @@ import shards._
import net.lag.logging.ThrottledLogger


class ReplicatingNameServer[S <: Shard](shardInfo: ShardInfo, weight: Int, replicas: Seq[Shard], log: ThrottledLogger[String], future: Future)
extends ReplicatingShard[NameServer[S]](shardInfo, weight, replicas, log, future, None)
with ReadWriteNameServer[S]
class ReplicatingNameServer(shardInfo: ShardInfo, weight: Int, replicas: Seq[Shard], log: ThrottledLogger[String], future: Future)
extends ReplicatingShard[ManagingNameServer](shardInfo, weight, replicas, log, future, None)
with ReadWriteNameServer
Expand Up @@ -4,7 +4,7 @@ import shards._


object ShardMigration {
def setupMigration[S <: Shard](sourceShardInfo: ShardInfo, destinationShardInfo: ShardInfo, nameServer: NameServer[S]): ShardMigration = {
def setupMigration(sourceShardInfo: ShardInfo, destinationShardInfo: ShardInfo, nameServer: ManagingNameServer): ShardMigration = {
val lastDot = sourceShardInfo.className.lastIndexOf('.')
val packageName = if (lastDot >= 0) sourceShardInfo.className.substring(0, lastDot + 1) else ""
val sourceShardId = nameServer.findShard(sourceShardInfo)
Expand All @@ -26,7 +26,7 @@ object ShardMigration {
new ShardMigration(sourceShardId, destinationShardId, replicatingShardId, writeOnlyShardId)
}

def finishMigration[S <: Shard](migration: ShardMigration, nameServer: NameServer[S]) {
def finishMigration(migration: ShardMigration, nameServer: ManagingNameServer) {
nameServer.removeChildShard(migration.writeOnlyShardId, migration.destinationShardId)
nameServer.replaceChildShard(migration.replicatingShardId, migration.destinationShardId)
nameServer.replaceForwarding(migration.replicatingShardId, migration.destinationShardId)
Expand Down

0 comments on commit cc3026e

Please sign in to comment.